Fix build system, drop gai things, we will go with threads as policy processes are...
[apps/pfixtools.git] / job.c
diff --git a/job.c b/job.c
index 23cd3d5..3d99a8f 100644 (file)
--- a/job.c
+++ b/job.c
  * Copyright © 2007 Pierre Habouzit
  */
 
+#include <errno.h>
 #include <fcntl.h>
+#include <signal.h>
 #include <stdbool.h>
+#include <syslog.h>
+#include <sysexits.h>
 #include <sys/epoll.h>
 #include <sys/socket.h>
 #include <sys/types.h>
+#include <time.h>
 #include <unistd.h>
 
 #ifndef EPOLLRDHUP
 #  endif
 #endif
 
-
 #include "job.h"
 
-static int epollfd;
-
-static void job_wipe(job_t *job)
-{
-    if (job->fd >= 0) {
-        close(job->fd);
-        job->fd = -1;
-    }
-}
-DO_DELETE(job_t, job);
+static int epollfd = -1;
+static bool sigint = false;
 
-void job_release(job_t **job)
+void job_delete(job_t **job)
 {
     if (*job) {
         if ((*job)->stop) {
             (*job)->stop(*job);
         }
-        job_delete(job);
+        if ((*job)->fd >= 0) {
+            close((*job)->fd);
+        }
+        p_delete(job);
     }
 }
 
@@ -77,59 +76,138 @@ static job_t *job_register_fd(job_t *job)
 {
     struct epoll_event event = { .data.ptr = job, .events = EPOLLRDHUP };
 
-    if (job->state & JOB_READ || job->state & JOB_LISTEN) {
+    if (job->mode & (JOB_READ | JOB_LISTEN)) {
         event.events |= EPOLLIN;
     }
 
-    if (job->state & JOB_WRITE || job->state & JOB_CONN) {
-        event.events |= EPOLLIN;
+    if (job->mode & (JOB_WRITE | JOB_CONN)) {
+        event.events |= EPOLLOUT;
     }
 
     if (epoll_ctl(epollfd, EPOLL_CTL_ADD, job->fd, &event) < 0) {
+        syslog(LOG_ERR, "epoll_ctl error: %m");
         job->error = true;
-        job_release(&job);
+        job_delete(&job);
     }
 
     return job;
 }
 
-void job_update_state(job_t *job, int state)
+void job_update_mode(job_t *job, int mode)
 {
     struct epoll_event event = { .data.ptr = job, .events = EPOLLRDHUP };
 
-    if (job->state == state)
+    if (job->mode == mode)
         return;
 
-    job->state = state;
-
-    if (job->state & JOB_READ || job->state & JOB_LISTEN) {
+    job->mode = mode;
+    if (job->mode & (JOB_READ | JOB_LISTEN)) {
         event.events |= EPOLLIN;
     }
 
-    if (job->state & JOB_WRITE || job->state & JOB_CONN) {
-        event.events |= EPOLLIN;
+    if (job->mode & (JOB_WRITE | JOB_CONN)) {
+        event.events |= EPOLLOUT;
     }
 
-    epoll_ctl(epollfd, EPOLL_CTL_MOD, job->fd, &event);
+    if (epoll_ctl(epollfd, EPOLL_CTL_MOD, job->fd, &event) < 0) {
+        syslog(LOG_ERR, "epoll_ctl error: %m");
+        job->error = true;
+    }
 }
 
-job_t *job_accept(job_t *listener, int state)
+job_t *job_accept(job_t *listener, int mode)
 {
     int sock;
     job_t *res;
 
     if ((sock = accept(listener->fd, NULL, 0)) < 0) {
+        syslog(LOG_ERR, "accept error: %m");
         return NULL;
     }
 
     if (fcntl(sock, F_SETFL, fcntl(sock, F_GETFL) | O_NONBLOCK)) {
+        syslog(LOG_ERR, "fcntl error: %m");
         return NULL;
     }
 
     res          = job_new();
     res->fd      = sock;
-    res->state   = state;
+    res->mode    = mode;
     res->process = listener->process;
     res->stop    = listener->stop;
     return job_register_fd(res);
 }
+
+static void job_sighandler(int sig)
+{
+    static time_t lastintr = 0;
+    time_t now = time(NULL);
+
+    switch (sig) {
+      case SIGINT:
+        if (sigint) {
+            if (now - lastintr >= 1)
+                break;
+        } else {
+            lastintr = now;
+            sigint   = true;
+        }
+        return;
+
+      case SIGTERM:
+        break;
+
+      default:
+        return;
+    }
+
+    syslog(LOG_ERR, "Killed...");
+    exit(-1);
+}
+
+void job_initialize(void)
+{
+    signal(SIGPIPE, SIG_IGN);
+    signal(SIGINT,  &job_sighandler);
+    signal(SIGTERM, &job_sighandler);
+
+    epollfd = epoll_create(128);
+    if (epollfd < 0) {
+        syslog(LOG_ERR, "epoll_create error: %m");
+        exit(EX_OSERR);
+    }
+}
+
+void job_loop(void)
+{
+    while (!sigint) {
+        struct epoll_event events[FD_SETSIZE];
+        int todo = epoll_wait(epollfd, events, countof(events), -1);
+
+        if (todo < 0) {
+            if (errno == EAGAIN || errno == EINTR)
+                continue;
+            syslog(LOG_ERR, "epoll_wait error: %m");
+            exit(EX_OSERR);
+        }
+
+        while (todo) {
+            job_t *job = events[--todo].data.ptr;
+
+            assert (job->process);
+            job->process(job);
+
+            if (job->error || job->done) {
+                job_delete(&job);
+            }
+        }
+    }
+}
+
+void job_shutdown(void)
+{
+    if (epollfd >= 0) {
+        close(epollfd);
+        epollfd = -1;
+    }
+}