Basic support for async filters.
[apps/pfixtools.git] / common / server.c
index 65e1a70..a085c40 100644 (file)
@@ -52,6 +52,7 @@ static void server_wipe(server_t *server)
 {
     server->listener = server->event = false;
     if (server->fd > 0) {
+        epoll_modify(server->fd, 0, NULL);
         close(server->fd);
         server->fd = -1;
     }
@@ -62,8 +63,6 @@ static void server_wipe(server_t *server)
     if (server->data && server->clear_data) {
         server->clear_data(&server->data);
     }
-    array_shrink(server->ibuf, 512);
-    array_shrink(server->obuf, 512);
 }
 
 static void server_delete(server_t **server)
@@ -85,7 +84,7 @@ static server_t* server_acquire(void)
     }
 }
 
-static void server_release(server_t *server)
+void server_release(server_t *server)
 {
     server_wipe(server);
     array_add(server_pool, server);
@@ -93,6 +92,7 @@ static void server_release(server_t *server)
 
 static void server_shutdown(void)
 {
+    printf("Server shutdown");
     array_deep_wipe(listeners, server_delete);
     array_deep_wipe(server_pool, server_delete);
 }
@@ -126,6 +126,7 @@ int start_server(int port, start_listener_t starter, delete_client_t deleter)
     tmp             = server_acquire();
     tmp->fd         = sock;
     tmp->listener   = true;
+    tmp->event      = false;
     tmp->data       = data;
     tmp->clear_data = deleter;
     epoll_register(sock, EPOLLIN, tmp);
@@ -155,6 +156,8 @@ static int start_client(server_t *server, start_client_t starter,
     }
 
     tmp             = server_acquire();
+    tmp->listener   = false;
+    tmp->event      = false;
     tmp->fd         = sock;
     tmp->data       = data;
     tmp->clear_data = deleter;
@@ -162,39 +165,45 @@ static int start_client(server_t *server, start_client_t starter,
     return 0;
 }
 
-event_t event_register(void *data)
+server_t * event_register(int fd, void *data)
 {
     int fds[2];
-    if (pipe(fds) != 0) {
-        UNIXERR("pipe");
-        return INVALID_EVENT;
-    }
-    if (setnonblock(fds[0]) != 0) {
-        close(fds[0]);
-        close(fds[1]);
-        return INVALID_EVENT;
+    if (fd == -1) {
+        if (pipe(fds) != 0) {
+            UNIXERR("pipe");
+            return NULL;
+        }
+        if (setnonblock(fds[0]) != 0) {
+            close(fds[0]);
+            close(fds[1]);
+            return NULL;
+        }
     }
 
     server_t *tmp = server_acquire();
+    tmp->listener = false;
     tmp->event = true;
-    tmp->fd    = fds[0];
-    tmp->fd2   = fds[1];
+    tmp->fd    = fd == -1 ? fds[0] : fd;
+    tmp->fd2   = fd == -1 ? fds[1] : -1;
     tmp->data  = data;
     epoll_register(fds[0], EPOLLIN, tmp);
-    return tmp->fd2;
+    return tmp;
 }
 
-bool event_fire(event_t event)
+bool event_fire(server_t *event)
 {
     static const char *data = "";
-    return write(event, data, 1) == 0;
+    if (event->fd2 == -1) {
+        return false;
+    }
+    return write(event->fd2, data, 1) == 0;
 }
 
-static bool event_cancel(int event)
+bool event_cancel(server_t *event)
 {
     char buff[32];
     while (true) {
-        ssize_t res = read(event, buff, 32);
+        ssize_t res = read(event->fd, buff, 32);
         if (res == -1 && errno != EAGAIN && errno != EINTR) {
             UNIXERR("read");
             return false;
@@ -206,9 +215,16 @@ static bool event_cancel(int event)
     }
 }
 
+void event_release(server_t *event)
+{
+    epoll_unregister(event->fd);
+    server_release(event);
+}
+
 int server_loop(start_client_t starter, delete_client_t deleter,
                 run_client_t runner, event_handler_t handler,
-                refresh_t refresh, void* config) {
+                refresh_t refresh, void* config)
+{
     info("entering processing loop");
     while (!sigint) {
         struct epoll_event evts[1024];
@@ -240,14 +256,12 @@ int server_loop(start_client_t starter, delete_client_t deleter,
                 (void)start_client(d, starter, deleter);
                 continue;
             } else if (d->event) {
-                if (!event_cancel(d->fd)) {
-                    server_release(d);
-                    continue;
-                }
                 if (handler) {
-                    if (!handler(d->data, config)) {
-                        server_release(d);
+                    if (!handler(d, config)) {
+                        event_release(d);
                     }
+                } else {
+                    event_release(d);
                 }
                 continue;
             }