Basic support for async filters.
[apps/pfixtools.git] / common / server.c
index ea1df31..a085c40 100644 (file)
 
 #include "server.h"
 #include "epoll.h"
+#include "common.h"
 
-static server_t *listeners[1024];
-static int listener_count = 0;
-
+static PA(server_t) listeners   = ARRAY_INIT;
+static PA(server_t) server_pool = ARRAY_INIT;
 
 static server_t* server_new(void)
 {
     server_t* server = p_new(server_t, 1);
-    server->fd = -1;
+    server->fd  = -1;
+    server->fd2 = -1;
     return server;
 }
 
+static void server_wipe(server_t *server)
+{
+    server->listener = server->event = false;
+    if (server->fd > 0) {
+        epoll_modify(server->fd, 0, NULL);
+        close(server->fd);
+        server->fd = -1;
+    }
+    if (server->fd2 > 0) {
+        close(server->fd2);
+        server->fd2 = -1;
+    }
+    if (server->data && server->clear_data) {
+        server->clear_data(&server->data);
+    }
+}
+
 static void server_delete(server_t **server)
 {
     if (*server) {
-        if ((*server)->fd >= 0) {
-            close((*server)->fd);
-        }
-        if ((*server)->data && (*server)->clear_data) {
-            (*server)->clear_data(&(*server)->data);
-        }
         buffer_wipe(&(*server)->ibuf);
         buffer_wipe(&(*server)->obuf);
+        server_wipe(*server);
         p_delete(server);
     }
 }
 
-static void server_shutdown(void)
+static server_t* server_acquire(void)
 {
-    for (int i = 0 ; i < listener_count ; ++i) {
-        server_delete(&listeners[i]);
+    if (server_pool.len != 0) {
+        return array_elt(server_pool, --server_pool.len);
+    } else {
+        return server_new();
     }
 }
 
+void server_release(server_t *server)
+{
+    server_wipe(server);
+    array_add(server_pool, server);
+}
+
+static void server_shutdown(void)
+{
+    printf("Server shutdown");
+    array_deep_wipe(listeners, server_delete);
+    array_deep_wipe(server_pool, server_delete);
+}
+
 module_exit(server_shutdown);
 
 int start_server(int port, start_listener_t starter, delete_client_t deleter)
@@ -95,13 +123,14 @@ int start_server(int port, start_listener_t starter, delete_client_t deleter)
       }
     }
 
-    tmp             = server_new();
+    tmp             = server_acquire();
     tmp->fd         = sock;
     tmp->listener   = true;
+    tmp->event      = false;
     tmp->data       = data;
     tmp->clear_data = deleter;
     epoll_register(sock, EPOLLIN, tmp);
-    listeners[listener_count++] = tmp;
+    array_add(listeners, tmp);
     return 0;
 }
 
@@ -126,7 +155,9 @@ static int start_client(server_t *server, start_client_t starter,
         }
     }
 
-    tmp             = server_new();
+    tmp             = server_acquire();
+    tmp->listener   = false;
+    tmp->event      = false;
     tmp->fd         = sock;
     tmp->data       = data;
     tmp->clear_data = deleter;
@@ -134,18 +165,79 @@ static int start_client(server_t *server, start_client_t starter,
     return 0;
 }
 
+server_t * event_register(int fd, void *data)
+{
+    int fds[2];
+    if (fd == -1) {
+        if (pipe(fds) != 0) {
+            UNIXERR("pipe");
+            return NULL;
+        }
+        if (setnonblock(fds[0]) != 0) {
+            close(fds[0]);
+            close(fds[1]);
+            return NULL;
+        }
+    }
+
+    server_t *tmp = server_acquire();
+    tmp->listener = false;
+    tmp->event = true;
+    tmp->fd    = fd == -1 ? fds[0] : fd;
+    tmp->fd2   = fd == -1 ? fds[1] : -1;
+    tmp->data  = data;
+    epoll_register(fds[0], EPOLLIN, tmp);
+    return tmp;
+}
+
+bool event_fire(server_t *event)
+{
+    static const char *data = "";
+    if (event->fd2 == -1) {
+        return false;
+    }
+    return write(event->fd2, data, 1) == 0;
+}
+
+bool event_cancel(server_t *event)
+{
+    char buff[32];
+    while (true) {
+        ssize_t res = read(event->fd, buff, 32);
+        if (res == -1 && errno != EAGAIN && errno != EINTR) {
+            UNIXERR("read");
+            return false;
+        } else if (res == -1 && errno == EINTR) {
+            continue;
+        } else if (res != 32) {
+            return true;
+        }
+    }
+}
+
+void event_release(server_t *event)
+{
+    epoll_unregister(event->fd);
+    server_release(event);
+}
+
 int server_loop(start_client_t starter, delete_client_t deleter,
-                run_client_t runner, refresh_t refresh, void* config) {
+                run_client_t runner, event_handler_t handler,
+                refresh_t refresh, void* config)
+{
+    info("entering processing loop");
     while (!sigint) {
         struct epoll_event evts[1024];
         int n;
 
         if (sighup && refresh) {
             sighup = false;
+            info("refreshing...");
             if (!refresh(config)) {
                 crit("error while refreshing configuration");
                 return EXIT_FAILURE;
             }
+            info("refresh done, processing loop restarts");
         }
 
         n = epoll_select(evts, countof(evts), -1);
@@ -163,18 +255,27 @@ int server_loop(start_client_t starter, delete_client_t deleter,
             if (d->listener) {
                 (void)start_client(d, starter, deleter);
                 continue;
+            } else if (d->event) {
+                if (handler) {
+                    if (!handler(d, config)) {
+                        event_release(d);
+                    }
+                } else {
+                    event_release(d);
+                }
+                continue;
             }
 
             if (evts[n].events & EPOLLIN) {
                 if (runner(d, config) < 0) {
-                    server_delete(&d);
+                    server_release(d);
                     continue;
                 }
             }
 
             if ((evts[n].events & EPOLLOUT) && d->obuf.len) {
                 if (buffer_write(&d->obuf, d->fd) < 0) {
-                    server_delete(&d);
+                    server_release(d);
                     continue;
                 }
                 if (!d->obuf.len) {
@@ -183,5 +284,6 @@ int server_loop(start_client_t starter, delete_client_t deleter,
             }
         }
     }
+    info("exit requested");
     return EXIT_SUCCESS;
 }