Add event to interrupt the server_loop.
authorFlorent Bruneau <florent.bruneau@polytechnique.org>
Tue, 7 Oct 2008 20:50:04 +0000 (22:50 +0200)
committerFlorent Bruneau <florent.bruneau@polytechnique.org>
Tue, 7 Oct 2008 20:50:04 +0000 (22:50 +0200)
Signed-off-by: Florent Bruneau <florent.bruneau@polytechnique.org>
common/array.h
common/common.c
common/common.h
common/mem.h
common/server.c
common/server.h
pfix-srsd/main-srsd.c
postlicyd/iplist.c
postlicyd/main-postlicyd.c

index 6f5217d..4a5b88d 100644 (file)
         array_ensure_can_edit(array);                                          \
         p_allocgrow(&(array).data, (goal), &(array).size);                     \
     }
-#define array_adjust(array)                                                    \
+#define array_shrink(array, cap)                                               \
     do {                                                                       \
         array_ensure_can_edit(array);                                          \
-        p_shrink(&(array).data, (array).len, &(array).size);                   \
+        if ((cap) < (array).size && (array).size != (array).len) {             \
+            p_shrink(&(array).data, MAX((array).len, (cap)), &(array).size);   \
+        }                                                                      \
     } while (0)
+#define array_adjust(array) array_shrink(array, 0)
+
 #define array_elt(array, n) ((array).data[(n)])
+#define array_last(array) array_elt(array, (array).len - 1)
+#define array_pop_last(array) array_elt(array, --((array).len))
+
 #define array_ptr(array, n) ((array).data + (n))
 
 #define foreach(var, array)                                                    \
index aa9e5ea..43b2bf2 100644 (file)
@@ -68,7 +68,7 @@ void common_sighandler(int sig)
     }
 }
 
-static int setnonblock(int sock)
+int setnonblock(int sock)
 {
     int res = fcntl(sock, F_GETFL);
 
index 0b0cd85..22299b7 100644 (file)
@@ -111,6 +111,7 @@ extern bool         log_syslog;
 
 void common_sighandler(int sig);
 
+int setnonblock(int sock);
 int tcp_bind(const struct sockaddr *addr, socklen_t len);
 int tcp_listen(const struct sockaddr *addr, socklen_t len);
 int tcp_listen_nonblock(const struct sockaddr *addr, socklen_t len);
index f7b9d80..9d98b87 100644 (file)
@@ -55,9 +55,9 @@
 
 #  define p_shrink(pp, goalnb, allocnb)           \
     do {                                          \
-        if (*(allocnb) > goalnb) {                \
-            p_realloc(pp, goalnb);                \
-            *(allocnb) = goalnb;                  \
+        if (*(allocnb) > (goalnb)) {              \
+            p_realloc(pp, (goalnb));              \
+            *(allocnb) = (goalnb);                \
         }                                         \
     } while(0)
 
index 78af625..c3b7a2b 100644 (file)
 
 #include "server.h"
 #include "epoll.h"
+#include "common.h"
 
-static server_t *listeners[1024];
-static int listener_count = 0;
-
+static PA(server_t) listeners   = ARRAY_INIT;
+static PA(server_t) server_pool = ARRAY_INIT;
 
 static server_t* server_new(void)
 {
     server_t* server = p_new(server_t, 1);
-    server->fd = -1;
+    server->fd  = -1;
+    server->fd2 = -1;
     return server;
 }
 
+static void server_wipe(server_t *server)
+{
+    server->listener = server->event = false;
+    if (server->fd > 0) {
+        close(server->fd);
+        server->fd = -1;
+    }
+    if (server->fd2 > 0) {
+        close(server->fd2);
+        server->fd2 = -1;
+    }
+    if (server->data && server->clear_data) {
+        server->clear_data(&server->data);
+    }
+    array_shrink(server->ibuf, 512);
+    array_shrink(server->obuf, 512);
+}
+
 static void server_delete(server_t **server)
 {
     if (*server) {
-        if ((*server)->fd >= 0) {
-            close((*server)->fd);
-        }
-        if ((*server)->data && (*server)->clear_data) {
-            (*server)->clear_data(&(*server)->data);
-        }
         buffer_wipe(&(*server)->ibuf);
         buffer_wipe(&(*server)->obuf);
+        server_wipe(*server);
         p_delete(server);
     }
 }
 
-static void server_shutdown(void)
+static server_t* server_acquire(void)
 {
-    for (int i = 0 ; i < listener_count ; ++i) {
-        server_delete(&listeners[i]);
+    if (server_pool.len != 0) {
+        return array_elt(server_pool, --server_pool.len);
+    } else {
+        return server_new();
     }
 }
 
+static void server_release(server_t *server)
+{
+    server_wipe(server);
+    array_add(server_pool, server);
+}
+
+static void server_shutdown(void)
+{
+    array_deep_wipe(listeners, server_delete);
+    array_deep_wipe(server_pool, server_delete);
+}
+
 module_exit(server_shutdown);
 
 int start_server(int port, start_listener_t starter, delete_client_t deleter)
@@ -95,13 +123,13 @@ int start_server(int port, start_listener_t starter, delete_client_t deleter)
       }
     }
 
-    tmp             = server_new();
+    tmp             = server_acquire();
     tmp->fd         = sock;
     tmp->listener   = true;
     tmp->data       = data;
     tmp->clear_data = deleter;
     epoll_register(sock, EPOLLIN, tmp);
-    listeners[listener_count++] = tmp;
+    array_add(listeners, tmp);
     return 0;
 }
 
@@ -126,7 +154,7 @@ static int start_client(server_t *server, start_client_t starter,
         }
     }
 
-    tmp             = server_new();
+    tmp             = server_acquire();
     tmp->fd         = sock;
     tmp->data       = data;
     tmp->clear_data = deleter;
@@ -134,8 +162,53 @@ static int start_client(server_t *server, start_client_t starter,
     return 0;
 }
 
+int event_register(void *data)
+{
+    int fds[2];
+    if (pipe(fds) != 0) {
+        UNIXERR("pipe");
+        return -1;
+    }
+    if (setnonblock(fds[0]) != 0) {
+        close(fds[0]);
+        close(fds[1]);
+        return -1;
+    }
+
+    server_t *tmp = server_acquire();
+    tmp->event = true;
+    tmp->fd    = fds[0];
+    tmp->fd2   = fds[1];
+    tmp->data  = data;
+    epoll_register(fds[0], EPOLLIN, tmp);
+    return tmp->fd2;
+}
+
+bool event_fire(int event)
+{
+    static const char *data = "";
+    return write(event, data, 1) == 0;
+}
+
+static bool event_cancel(int event)
+{
+    static char buff[1];
+    while (true) {
+        ssize_t res = read(event, buff, 64);
+        if (res == -1 && errno != EAGAIN && errno != EINTR) {
+            UNIXERR("read");
+            return false;
+        } else if (res == -1 && errno == EINTR) {
+            continue;
+        } else if (res != 1) {
+            return true;
+        }
+    }
+}
+
 int server_loop(start_client_t starter, delete_client_t deleter,
-                run_client_t runner, refresh_t refresh, void* config) {
+                run_client_t runner, event_handler_t handler,
+                refresh_t refresh, void* config) {
     info("entering processing loop");
     while (!sigint) {
         struct epoll_event evts[1024];
@@ -166,18 +239,29 @@ int server_loop(start_client_t starter, delete_client_t deleter,
             if (d->listener) {
                 (void)start_client(d, starter, deleter);
                 continue;
+            } else if (d->event) {
+                if (!event_cancel(d->fd)) {
+                    server_release(d);
+                    continue;
+                }
+                if (handler) {
+                    if (!handler(d->data, config)) {
+                        server_release(d);
+                    }
+                }
+                continue;
             }
 
             if (evts[n].events & EPOLLIN) {
                 if (runner(d, config) < 0) {
-                    server_delete(&d);
+                    server_release(d);
                     continue;
                 }
             }
 
             if ((evts[n].events & EPOLLOUT) && d->obuf.len) {
                 if (buffer_write(&d->obuf, d->fd) < 0) {
-                    server_delete(&d);
+                    server_release(d);
                     continue;
                 }
                 if (!d->obuf.len) {
index 38289e6..239ae37 100644 (file)
@@ -45,19 +45,30 @@ typedef void  (*delete_client_t)(void*);
 typedef void *(*start_client_t)(server_t*);
 typedef int   (*run_client_t)(server_t*, void*);
 typedef bool   (*refresh_t)(void*);
+typedef bool  (*event_handler_t)(void* data, void* config);
 
 struct server_t {
     unsigned listener : 1;
+    unsigned event    : 1;
+
     int fd;
+    int fd2;
+
     buffer_t ibuf;
     buffer_t obuf;
-    void* data;
+
     delete_client_t clear_data;
+    void* data;
 };
+ARRAY(server_t);
 
 int start_server(int port, start_listener_t starter, delete_client_t deleter);
 
+int event_register(void *data);
+bool event_fire(int event);
+
 int server_loop(start_client_t starter, delete_client_t deleter,
-                run_client_t runner, refresh_t refresh, void* config);
+                run_client_t runner, event_handler_t handler,
+                refresh_t refresh, void *config);
 
 #endif
index 037a458..98633e6 100644 (file)
@@ -328,5 +328,5 @@ int main(int argc, char *argv[])
         || start_listener(port_dec, true) < 0) {
         return EXIT_FAILURE;
     }
-    return server_loop(srsd_starter, NULL, process_srs, NULL, &config);
+    return server_loop(srsd_starter, NULL, process_srs, NULL, NULL, &config);
 }
index a1f49eb..ebd29f8 100644 (file)
@@ -458,6 +458,7 @@ static int rbl_init(void)
     (void)filter_hook_register(type, "fail");
     (void)filter_hook_register(type, "hard_match");
     (void)filter_hook_register(type, "soft_match");
+    (void)filter_hook_register(type, "async");
 
     /* Parameters.
      */
index 34b4fbb..57e1e25 100644 (file)
@@ -240,6 +240,6 @@ int main(int argc, char *argv[])
         return EXIT_FAILURE;
     } else {
         return server_loop(query_starter, (delete_client_t)query_delete,
-                           policy_run, config_refresh, config);
+                           policy_run, NULL, config_refresh, config);
     }
 }