continue to rework our jobs
authorPierre Habouzit <madcoder@debian.org>
Sun, 4 Feb 2007 21:54:05 +0000 (22:54 +0100)
committerPierre Habouzit <madcoder@debian.org>
Sun, 4 Feb 2007 21:54:05 +0000 (22:54 +0100)
Signed-off-by: Pierre Habouzit <madcoder@debian.org>
job.c
job.h
postfix.c
postfix.h

diff --git a/job.c b/job.c
index fc0c1b8..12c450a 100644 (file)
--- a/job.c
+++ b/job.c
  * Copyright © 2007 Pierre Habouzit
  */
 
+#include <fcntl.h>
+#include <stdbool.h>
+#include <sys/epoll.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#ifndef EPOLLRDHUP
+#  include <linux/poll.h>
+#  ifdef POLLRDHUP
+#    define EPOLLRDHUP POLLRDHUP
+#  else
+#    define EPOLLRDHUP 0
+#  endif
+#endif
+
+
 #include "job.h"
 
+static int epollfd;
+
+static void job_wipe(job_t *job)
+{
+    if (job->fd >= 0) {
+        close(job->fd);
+        job->fd = -1;
+    }
+}
+DO_DELETE(job_t, job);
+
+void job_release(job_t **job)
+{
+    if (*job) {
+        if ((*job)->task && (*job)->task->stop) {
+            (*job)->task->stop(*job);
+        }
+        job_delete(job);
+    }
+}
+
+static job_t *job_register_fd(job_t *job)
+{
+    struct epoll_event event = { .data.ptr = job, .events = EPOLLRDHUP };
+
+    if (job->state & JOB_READ || job->state & JOB_LISTEN) {
+        event.events |= EPOLLIN;
+    }
+
+    if (job->state & JOB_WRITE || job->state & JOB_CONN) {
+        event.events |= EPOLLIN;
+    }
+
+    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, job->fd, &event) < 0) {
+        job->error = true;
+        job_release(&job);
+    }
+
+    return job;
+}
+
+void job_update_events(job_t *job)
+{
+    struct epoll_event event = { .data.ptr = job, .events = EPOLLRDHUP };
+
+    if (job->state & JOB_READ || job->state & JOB_LISTEN) {
+        event.events |= EPOLLIN;
+    }
+
+    if (job->state & JOB_WRITE || job->state & JOB_CONN) {
+        event.events |= EPOLLIN;
+    }
+
+    epoll_ctl(epollfd, EPOLL_CTL_MOD, job->fd, &event);
+}
+
+job_t *job_accept(job_t *listener, int state)
+{
+    int sock;
+    job_t *res;
+
+    if ((sock = accept(listener->fd, NULL, 0)) < 0) {
+        return NULL;
+    }
+
+    if (fcntl(sock, F_SETFL, fcntl(sock, F_GETFL) | O_NONBLOCK)) {
+        return NULL;
+    }
+
+    res        = job_new();
+    res->fd    = sock;
+    res->state = state;
+    res->task  = listener->task;
+    return job_register_fd(res);
+}
diff --git a/job.h b/job.h
index ad7d649..388d8f3 100644 (file)
--- a/job.h
+++ b/job.h
 #include "buffer.h"
 
 enum job_state {
-    JOB_FREE   = 0x00,
+    JOB_IDLE   = 0x00,
     JOB_READ   = 0x01,
     JOB_WRITE  = 0x02,
     JOB_RDWR   = 0x03,
     JOB_CONN   = 0x04,
     JOB_LISTEN = 0x08,
-    JOB_IDLE   = 0x10,
 };
 
 enum smtp_state {
@@ -62,16 +61,16 @@ enum smtp_state {
 typedef struct job_t   job_t;
 typedef struct jpriv_t jpriv_t;
 typedef struct task_t  task_t;
+typedef struct tpriv_t tpriv_t;
 typedef struct query_t query_t;
 
 struct task_t {
-    task_t *(*create)(void);
-    void (*release)(task_t **);
-
-    void (*run)(job_t *, query_t *);
-    void (*done)(job_t *);
+    void (*start)(job_t *, query_t *);
+    void (*stop)(job_t *);
     void (*cancel)(job_t *);
     void (*process)(job_t *);
+
+    tpriv_t *tdata;
 };
 
 struct job_t {
@@ -81,10 +80,19 @@ struct job_t {
 
     int fd;
 
-    task_t *task;
+    task_t  *task;
     jpriv_t *jdata;
 };
 
+static inline job_t *job_init(job_t *job) {
+    p_clear(job, 1);
+    job->fd = -1;
+    return job;
+}
+DO_NEW(job_t, job);
+void job_release(job_t **job);
+void job_update_events(job_t *job);
+
 struct query_t {
     unsigned state : 4;
     unsigned esmtp : 1;
@@ -131,4 +139,7 @@ static inline void query_wipe(query_t *rq) {
 DO_NEW(query_t, query);
 DO_DELETE(query_t, query);
 
+
+job_t *job_accept(job_t *listener, int state);
+
 #endif
index d9b4930..5155471 100644 (file)
--- a/postfix.c
+++ b/postfix.c
@@ -37,6 +37,7 @@
 #include <stdbool.h>
 #include <unistd.h>
 
+#include "job.h"
 #include "postfix.h"
 
 struct jpriv_t {
@@ -44,30 +45,23 @@ struct jpriv_t {
     buffer_t obuf;
 };
 
-task_t *postfix_create(void)
-{
-    return NULL;
-}
-
-void postfix_release(task_t **task)
-{
-    if (task) {
-        *task = NULL;
-    }
-}
+struct tpriv_t {
+    job_t *jobs;
+};
 
-void postfix_run(job_t *job, query_t *query)
+void postfix_start(job_t *job, query_t *query)
 {
 }
 
-void postfix_done(job_t *job)
+void postfix_stop(job_t *job)
 {
 }
 
 void postfix_process(job_t *job)
 {
     if (job->state & JOB_LISTEN) {
-        // TODO
+        /* TODO check return code */
+        job_accept(job, JOB_READ);
     }
 
     if (job->state & JOB_WRITE) {
@@ -101,12 +95,3 @@ void postfix_process(job_t *job)
         /* TODO: do the parse */
     }
 }
-
-task_t task_postfix = {
-    postfix_create,
-    postfix_release,
-    postfix_run,
-    postfix_done,
-    NULL,
-    postfix_process
-};
index 029991c..ea8fffb 100644 (file)
--- a/postfix.h
+++ b/postfix.h
@@ -36,8 +36,4 @@
 #ifndef POSTLICYD_POSTFIX_H
 #define POSTLICYD_POSTFIX_H
 
-#include "job.h"
-
-extern task_t task_postfix;
-
 #endif