Basic support for async filters.
[apps/pfixtools.git] / common / server.c
1 /******************************************************************************/
2 /*          pfixtools: a collection of postfix related tools                  */
3 /*          ~~~~~~~~~                                                         */
4 /*  ________________________________________________________________________  */
5 /*                                                                            */
6 /*  Redistribution and use in source and binary forms, with or without        */
7 /*  modification, are permitted provided that the following conditions        */
8 /*  are met:                                                                  */
9 /*                                                                            */
10 /*  1. Redistributions of source code must retain the above copyright         */
11 /*     notice, this list of conditions and the following disclaimer.          */
12 /*  2. Redistributions in binary form must reproduce the above copyright      */
13 /*     notice, this list of conditions and the following disclaimer in the    */
14 /*     documentation and/or other materials provided with the distribution.   */
15 /*  3. The names of its contributors may not be used to endorse or promote    */
16 /*     products derived from this software without specific prior written     */
17 /*     permission.                                                            */
18 /*                                                                            */
19 /*  THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND   */
20 /*  ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE     */
21 /*  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR        */
22 /*  PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS    */
23 /*  BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR    */
24 /*  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF      */
25 /*  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS  */
26 /*  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN   */
27 /*  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)   */
28 /*  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF    */
29 /*  THE POSSIBILITY OF SUCH DAMAGE.                                           */
30 /******************************************************************************/
31
32 /*
33  * Copyright © 2008 Florent Bruneau
34  */
35
36 #include "server.h"
37 #include "epoll.h"
38 #include "common.h"
39
40 static PA(server_t) listeners   = ARRAY_INIT;
41 static PA(server_t) server_pool = ARRAY_INIT;
42
43 static server_t* server_new(void)
44 {
45     server_t* server = p_new(server_t, 1);
46     server->fd  = -1;
47     server->fd2 = -1;
48     return server;
49 }
50
51 static void server_wipe(server_t *server)
52 {
53     server->listener = server->event = false;
54     if (server->fd > 0) {
55         epoll_modify(server->fd, 0, NULL);
56         close(server->fd);
57         server->fd = -1;
58     }
59     if (server->fd2 > 0) {
60         close(server->fd2);
61         server->fd2 = -1;
62     }
63     if (server->data && server->clear_data) {
64         server->clear_data(&server->data);
65     }
66 }
67
68 static void server_delete(server_t **server)
69 {
70     if (*server) {
71         buffer_wipe(&(*server)->ibuf);
72         buffer_wipe(&(*server)->obuf);
73         server_wipe(*server);
74         p_delete(server);
75     }
76 }
77
78 static server_t* server_acquire(void)
79 {
80     if (server_pool.len != 0) {
81         return array_elt(server_pool, --server_pool.len);
82     } else {
83         return server_new();
84     }
85 }
86
87 void server_release(server_t *server)
88 {
89     server_wipe(server);
90     array_add(server_pool, server);
91 }
92
93 static void server_shutdown(void)
94 {
95     printf("Server shutdown");
96     array_deep_wipe(listeners, server_delete);
97     array_deep_wipe(server_pool, server_delete);
98 }
99
100 module_exit(server_shutdown);
101
102 int start_server(int port, start_listener_t starter, delete_client_t deleter)
103 {
104     struct sockaddr_in addr = {
105         .sin_family = AF_INET,
106         .sin_addr   = { htonl(INADDR_LOOPBACK) },
107     };
108     server_t *tmp;
109     void* data = NULL;
110     int sock;
111
112     addr.sin_port = htons(port);
113     sock = tcp_listen_nonblock((const struct sockaddr *)&addr, sizeof(addr));
114     if (sock < 0) {
115         return -1;
116     }
117
118     if (starter) {
119       data = starter();
120       if (data == NULL) {
121         close(sock);
122         return -1;
123       }
124     }
125
126     tmp             = server_acquire();
127     tmp->fd         = sock;
128     tmp->listener   = true;
129     tmp->event      = false;
130     tmp->data       = data;
131     tmp->clear_data = deleter;
132     epoll_register(sock, EPOLLIN, tmp);
133     array_add(listeners, tmp);
134     return 0;
135 }
136
137 static int start_client(server_t *server, start_client_t starter,
138                         delete_client_t deleter)
139 {
140     server_t *tmp;
141     void* data = NULL;
142     int sock;
143
144     sock = accept_nonblock(server->fd);
145     if (sock < 0) {
146         UNIXERR("accept");
147         return -1;
148     }
149
150     if (starter) {
151         data = starter(server);
152         if (data == NULL) {
153             close(sock);
154             return -1;
155         }
156     }
157
158     tmp             = server_acquire();
159     tmp->listener   = false;
160     tmp->event      = false;
161     tmp->fd         = sock;
162     tmp->data       = data;
163     tmp->clear_data = deleter;
164     epoll_register(sock, EPOLLIN, tmp);
165     return 0;
166 }
167
168 server_t * event_register(int fd, void *data)
169 {
170     int fds[2];
171     if (fd == -1) {
172         if (pipe(fds) != 0) {
173             UNIXERR("pipe");
174             return NULL;
175         }
176         if (setnonblock(fds[0]) != 0) {
177             close(fds[0]);
178             close(fds[1]);
179             return NULL;
180         }
181     }
182
183     server_t *tmp = server_acquire();
184     tmp->listener = false;
185     tmp->event = true;
186     tmp->fd    = fd == -1 ? fds[0] : fd;
187     tmp->fd2   = fd == -1 ? fds[1] : -1;
188     tmp->data  = data;
189     epoll_register(fds[0], EPOLLIN, tmp);
190     return tmp;
191 }
192
193 bool event_fire(server_t *event)
194 {
195     static const char *data = "";
196     if (event->fd2 == -1) {
197         return false;
198     }
199     return write(event->fd2, data, 1) == 0;
200 }
201
202 bool event_cancel(server_t *event)
203 {
204     char buff[32];
205     while (true) {
206         ssize_t res = read(event->fd, buff, 32);
207         if (res == -1 && errno != EAGAIN && errno != EINTR) {
208             UNIXERR("read");
209             return false;
210         } else if (res == -1 && errno == EINTR) {
211             continue;
212         } else if (res != 32) {
213             return true;
214         }
215     }
216 }
217
218 void event_release(server_t *event)
219 {
220     epoll_unregister(event->fd);
221     server_release(event);
222 }
223
224 int server_loop(start_client_t starter, delete_client_t deleter,
225                 run_client_t runner, event_handler_t handler,
226                 refresh_t refresh, void* config)
227 {
228     info("entering processing loop");
229     while (!sigint) {
230         struct epoll_event evts[1024];
231         int n;
232
233         if (sighup && refresh) {
234             sighup = false;
235             info("refreshing...");
236             if (!refresh(config)) {
237                 crit("error while refreshing configuration");
238                 return EXIT_FAILURE;
239             }
240             info("refresh done, processing loop restarts");
241         }
242
243         n = epoll_select(evts, countof(evts), -1);
244         if (n < 0) {
245             if (errno != EAGAIN && errno != EINTR) {
246                 UNIXERR("epoll_wait");
247                 return EXIT_FAILURE;
248             }
249             continue;
250         }
251
252         while (--n >= 0) {
253             server_t *d = evts[n].data.ptr;
254
255             if (d->listener) {
256                 (void)start_client(d, starter, deleter);
257                 continue;
258             } else if (d->event) {
259                 if (handler) {
260                     if (!handler(d, config)) {
261                         event_release(d);
262                     }
263                 } else {
264                     event_release(d);
265                 }
266                 continue;
267             }
268
269             if (evts[n].events & EPOLLIN) {
270                 if (runner(d, config) < 0) {
271                     server_release(d);
272                     continue;
273                 }
274             }
275
276             if ((evts[n].events & EPOLLOUT) && d->obuf.len) {
277                 if (buffer_write(&d->obuf, d->fd) < 0) {
278                     server_release(d);
279                     continue;
280                 }
281                 if (!d->obuf.len) {
282                     epoll_modify(d->fd, EPOLLIN, d);
283                 }
284             }
285         }
286     }
287     info("exit requested");
288     return EXIT_SUCCESS;
289 }