Let say an event is just a pointer.
[apps/pfixtools.git] / common / server.c
1 /******************************************************************************/
2 /*          pfixtools: a collection of postfix related tools                  */
3 /*          ~~~~~~~~~                                                         */
4 /*  ________________________________________________________________________  */
5 /*                                                                            */
6 /*  Redistribution and use in source and binary forms, with or without        */
7 /*  modification, are permitted provided that the following conditions        */
8 /*  are met:                                                                  */
9 /*                                                                            */
10 /*  1. Redistributions of source code must retain the above copyright         */
11 /*     notice, this list of conditions and the following disclaimer.          */
12 /*  2. Redistributions in binary form must reproduce the above copyright      */
13 /*     notice, this list of conditions and the following disclaimer in the    */
14 /*     documentation and/or other materials provided with the distribution.   */
15 /*  3. The names of its contributors may not be used to endorse or promote    */
16 /*     products derived from this software without specific prior written     */
17 /*     permission.                                                            */
18 /*                                                                            */
19 /*  THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND   */
20 /*  ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE     */
21 /*  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR        */
22 /*  PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS    */
23 /*  BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR    */
24 /*  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF      */
25 /*  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS  */
26 /*  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN   */
27 /*  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)   */
28 /*  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF    */
29 /*  THE POSSIBILITY OF SUCH DAMAGE.                                           */
30 /******************************************************************************/
31
32 /*
33  * Copyright © 2008 Florent Bruneau
34  */
35
36 #include "server.h"
37 #include "epoll.h"
38 #include "common.h"
39
40 static PA(server_t) listeners   = ARRAY_INIT;
41 static PA(server_t) server_pool = ARRAY_INIT;
42
43 static server_t* server_new(void)
44 {
45     server_t* server = p_new(server_t, 1);
46     server->fd  = -1;
47     server->fd2 = -1;
48     return server;
49 }
50
51 static void server_wipe(server_t *server)
52 {
53     server->listener = server->event = false;
54     if (server->fd > 0) {
55         epoll_modify(server->fd, 0, NULL);
56         close(server->fd);
57         server->fd = -1;
58     }
59     if (server->fd2 > 0) {
60         close(server->fd2);
61         server->fd2 = -1;
62     }
63     if (server->data && server->clear_data) {
64         server->clear_data(&server->data);
65     }
66     array_shrink(server->ibuf, 512);
67     array_shrink(server->obuf, 512);
68 }
69
70 static void server_delete(server_t **server)
71 {
72     if (*server) {
73         buffer_wipe(&(*server)->ibuf);
74         buffer_wipe(&(*server)->obuf);
75         server_wipe(*server);
76         p_delete(server);
77     }
78 }
79
80 static server_t* server_acquire(void)
81 {
82     if (server_pool.len != 0) {
83         return array_elt(server_pool, --server_pool.len);
84     } else {
85         return server_new();
86     }
87 }
88
89 static void server_release(server_t *server)
90 {
91     server_wipe(server);
92     array_add(server_pool, server);
93 }
94
95 static void server_shutdown(void)
96 {
97     array_deep_wipe(listeners, server_delete);
98     array_deep_wipe(server_pool, server_delete);
99 }
100
101 module_exit(server_shutdown);
102
103 int start_server(int port, start_listener_t starter, delete_client_t deleter)
104 {
105     struct sockaddr_in addr = {
106         .sin_family = AF_INET,
107         .sin_addr   = { htonl(INADDR_LOOPBACK) },
108     };
109     server_t *tmp;
110     void* data = NULL;
111     int sock;
112
113     addr.sin_port = htons(port);
114     sock = tcp_listen_nonblock((const struct sockaddr *)&addr, sizeof(addr));
115     if (sock < 0) {
116         return -1;
117     }
118
119     if (starter) {
120       data = starter();
121       if (data == NULL) {
122         close(sock);
123         return -1;
124       }
125     }
126
127     tmp             = server_acquire();
128     tmp->fd         = sock;
129     tmp->listener   = true;
130     tmp->data       = data;
131     tmp->clear_data = deleter;
132     epoll_register(sock, EPOLLIN, tmp);
133     array_add(listeners, tmp);
134     return 0;
135 }
136
137 static int start_client(server_t *server, start_client_t starter,
138                         delete_client_t deleter)
139 {
140     server_t *tmp;
141     void* data = NULL;
142     int sock;
143
144     sock = accept_nonblock(server->fd);
145     if (sock < 0) {
146         UNIXERR("accept");
147         return -1;
148     }
149
150     if (starter) {
151         data = starter(server);
152         if (data == NULL) {
153             close(sock);
154             return -1;
155         }
156     }
157
158     tmp             = server_acquire();
159     tmp->fd         = sock;
160     tmp->data       = data;
161     tmp->clear_data = deleter;
162     epoll_register(sock, EPOLLIN, tmp);
163     return 0;
164 }
165
166 event_t event_register(void *data)
167 {
168     int fds[2];
169     if (pipe(fds) != 0) {
170         UNIXERR("pipe");
171         return NULL;
172     }
173     if (setnonblock(fds[0]) != 0) {
174         close(fds[0]);
175         close(fds[1]);
176         return NULL;
177     }
178
179     server_t *tmp = server_acquire();
180     tmp->event = true;
181     tmp->fd    = fds[0];
182     tmp->fd2   = fds[1];
183     tmp->data  = data;
184     epoll_register(fds[0], EPOLLIN, tmp);
185     return tmp;
186 }
187
188 bool event_fire(event_t event)
189 {
190     static const char *data = "";
191     return write(event->fd2, data, 1) == 0;
192 }
193
194 static bool event_cancel(event_t event)
195 {
196     char buff[32];
197     while (true) {
198         ssize_t res = read(event->fd, buff, 32);
199         if (res == -1 && errno != EAGAIN && errno != EINTR) {
200             UNIXERR("read");
201             return false;
202         } else if (res == -1 && errno == EINTR) {
203             continue;
204         } else if (res != 32) {
205             return true;
206         }
207     }
208 }
209
210 int server_loop(start_client_t starter, delete_client_t deleter,
211                 run_client_t runner, event_handler_t handler,
212                 refresh_t refresh, void* config) {
213     info("entering processing loop");
214     while (!sigint) {
215         struct epoll_event evts[1024];
216         int n;
217
218         if (sighup && refresh) {
219             sighup = false;
220             info("refreshing...");
221             if (!refresh(config)) {
222                 crit("error while refreshing configuration");
223                 return EXIT_FAILURE;
224             }
225             info("refresh done, processing loop restarts");
226         }
227
228         n = epoll_select(evts, countof(evts), -1);
229         if (n < 0) {
230             if (errno != EAGAIN && errno != EINTR) {
231                 UNIXERR("epoll_wait");
232                 return EXIT_FAILURE;
233             }
234             continue;
235         }
236
237         while (--n >= 0) {
238             server_t *d = evts[n].data.ptr;
239
240             if (d->listener) {
241                 (void)start_client(d, starter, deleter);
242                 continue;
243             } else if (d->event) {
244                 if (!event_cancel(d)) {
245                     server_release(d);
246                     continue;
247                 }
248                 if (handler) {
249                     if (!handler(d, config)) {
250                         server_release(d);
251                     }
252                 }
253                 continue;
254             }
255
256             if (evts[n].events & EPOLLIN) {
257                 if (runner(d, config) < 0) {
258                     server_release(d);
259                     continue;
260                 }
261             }
262
263             if ((evts[n].events & EPOLLOUT) && d->obuf.len) {
264                 if (buffer_write(&d->obuf, d->fd) < 0) {
265                     server_release(d);
266                     continue;
267                 }
268                 if (!d->obuf.len) {
269                     epoll_modify(d->fd, EPOLLIN, d);
270                 }
271             }
272         }
273     }
274     info("exit requested");
275     return EXIT_SUCCESS;
276 }