Basic support for async filters.
authorFlorent Bruneau <florent.bruneau@polytechnique.org>
Fri, 10 Oct 2008 22:07:20 +0000 (00:07 +0200)
committerFlorent Bruneau <florent.bruneau@polytechnique.org>
Fri, 10 Oct 2008 22:07:20 +0000 (00:07 +0200)
Signed-off-by: Florent Bruneau <florent.bruneau@polytechnique.org>
13 files changed:
common/epoll.c
common/epoll.h
common/server.c
common/server.h
postlicyd/filter.c
postlicyd/filter.h
postlicyd/greylist.c
postlicyd/iplist.c
postlicyd/main-postlicyd.c
postlicyd/match.c
postlicyd/postlicyd.h [new file with mode: 0644]
postlicyd/query.h
postlicyd/strlist.c

index e3df03e..896fc2c 100644 (file)
@@ -69,6 +69,14 @@ void epoll_modify(int fd, uint32_t events, void *ptr)
     }
 }
 
+void epoll_unregister(int fd)
+{
+    if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) {
+        UNIXERR("epoll_ctl");
+        abort();
+    }
+}
+
 int epoll_select(struct epoll_event *events, int maxevents, int timeout)
 {
     return epoll_wait(epollfd, events, maxevents, timeout);
index 57179d2..fdc2665 100644 (file)
@@ -41,6 +41,7 @@
 
 void epoll_register(int fd, uint32_t events, void *ptr);
 void epoll_modify(int fd, uint32_t events, void *ptr);
+void epoll_unregister(int fd);
 int epoll_select(struct epoll_event *events, int maxevents, int timeout);
 
 #endif
index 6053408..a085c40 100644 (file)
@@ -84,7 +84,7 @@ static server_t* server_acquire(void)
     }
 }
 
-static void server_release(server_t *server)
+void server_release(server_t *server)
 {
     server_wipe(server);
     array_add(server_pool, server);
@@ -92,6 +92,7 @@ static void server_release(server_t *server)
 
 static void server_shutdown(void)
 {
+    printf("Server shutdown");
     array_deep_wipe(listeners, server_delete);
     array_deep_wipe(server_pool, server_delete);
 }
@@ -125,6 +126,7 @@ int start_server(int port, start_listener_t starter, delete_client_t deleter)
     tmp             = server_acquire();
     tmp->fd         = sock;
     tmp->listener   = true;
+    tmp->event      = false;
     tmp->data       = data;
     tmp->clear_data = deleter;
     epoll_register(sock, EPOLLIN, tmp);
@@ -154,6 +156,8 @@ static int start_client(server_t *server, start_client_t starter,
     }
 
     tmp             = server_acquire();
+    tmp->listener   = false;
+    tmp->event      = false;
     tmp->fd         = sock;
     tmp->data       = data;
     tmp->clear_data = deleter;
@@ -161,7 +165,7 @@ static int start_client(server_t *server, start_client_t starter,
     return 0;
 }
 
-event_t event_register(int fd, void *data)
+server_t * event_register(int fd, void *data)
 {
     int fds[2];
     if (fd == -1) {
@@ -177,6 +181,7 @@ event_t event_register(int fd, void *data)
     }
 
     server_t *tmp = server_acquire();
+    tmp->listener = false;
     tmp->event = true;
     tmp->fd    = fd == -1 ? fds[0] : fd;
     tmp->fd2   = fd == -1 ? fds[1] : -1;
@@ -185,7 +190,7 @@ event_t event_register(int fd, void *data)
     return tmp;
 }
 
-bool event_fire(event_t event)
+bool event_fire(server_t *event)
 {
     static const char *data = "";
     if (event->fd2 == -1) {
@@ -194,7 +199,7 @@ bool event_fire(event_t event)
     return write(event->fd2, data, 1) == 0;
 }
 
-static bool event_cancel(event_t event)
+bool event_cancel(server_t *event)
 {
     char buff[32];
     while (true) {
@@ -210,6 +215,12 @@ static bool event_cancel(event_t event)
     }
 }
 
+void event_release(server_t *event)
+{
+    epoll_unregister(event->fd);
+    server_release(event);
+}
+
 int server_loop(start_client_t starter, delete_client_t deleter,
                 run_client_t runner, event_handler_t handler,
                 refresh_t refresh, void* config)
@@ -245,14 +256,12 @@ int server_loop(start_client_t starter, delete_client_t deleter,
                 (void)start_client(d, starter, deleter);
                 continue;
             } else if (d->event) {
-                if (!event_cancel(d)) {
-                    server_release(d);
-                    continue;
-                }
                 if (handler) {
                     if (!handler(d, config)) {
-                        server_release(d);
+                        event_release(d);
                     }
+                } else {
+                    event_release(d);
                 }
                 continue;
             }
index a552dbe..85cbae4 100644 (file)
@@ -39,7 +39,6 @@
 #include "buffer.h"
 
 typedef struct server_t server_t;
-typedef server_t *event_t;
 
 #define INVALID_EVENT (NULL)
 
@@ -48,7 +47,7 @@ typedef void  (*delete_client_t)(void*);
 typedef void *(*start_client_t)(server_t*);
 typedef int   (*run_client_t)(server_t*, void*);
 typedef bool   (*refresh_t)(void*);
-typedef bool  (*event_handler_t)(event_t, void*);
+typedef bool  (*event_handler_t)(server_t *, void*);
 
 struct server_t {
     unsigned listener : 1;
@@ -67,8 +66,12 @@ ARRAY(server_t);
 
 int start_server(int port, start_listener_t starter, delete_client_t deleter);
 
-event_t event_register(int fd, void *data);
-bool event_fire(event_t event);
+void server_release(server_t *server);
+
+server_t *event_register(int fd, void *data);
+bool event_fire(server_t *event);
+bool event_cancel(server_t *event);
+void event_release(server_t *event);
 #define event_data(event) ((event)->data)
 
 int server_loop(start_client_t starter, delete_client_t deleter,
index 7aba638..a150d6d 100644 (file)
@@ -43,15 +43,29 @@ static filter_destructor_t  destructors[FTK_count];
 static bool                 hooks[FTK_count][HTK_count];
 static bool                 params[FTK_count][ATK_count];
 
+static filter_context_constructor_t ctx_constructors[FTK_count];
+static filter_context_destructor_t  ctx_destructors[FTK_count];
+
 static const filter_hook_t default_hook = {
     .type      = 0,
     .value     = (char*)"DUNNO",
     .postfix   = true,
+    .async     = false,
+    .filter_id = 0
+};
+
+static const filter_hook_t async_hook = {
+    .type      = 0,
+    .value     = NULL,
+    .postfix   = false,
+    .async     = true,
     .filter_id = 0
 };
 
 filter_type_t filter_register(const char *type, filter_constructor_t constructor,
-                              filter_destructor_t destructor, filter_runner_t runner)
+                              filter_destructor_t destructor, filter_runner_t runner,
+                              filter_context_constructor_t context_constructor,
+                              filter_context_destructor_t context_destructor)
 {
     filter_token tok = filter_tokenize(type, m_strlen(type));
     CHECK_FILTER(tok);
@@ -59,6 +73,9 @@ filter_type_t filter_register(const char *type, filter_constructor_t constructor
     runners[tok] = runner;
     constructors[tok] = constructor;
     destructors[tok] = destructor;
+
+    ctx_constructors[tok] = context_constructor;
+    ctx_destructors[tok]  = context_destructor;
     return tok;
 }
 
@@ -163,17 +180,24 @@ void filter_wipe(filter_t *filter)
     p_delete(&filter->name);
 }
 
-const filter_hook_t *filter_run(const filter_t *filter, const query_t *query)
+const filter_hook_t *filter_run(const filter_t *filter, const query_t *query,
+                                filter_context_t *context)
 {
     int start = 0;
     int end   = filter->hooks.len;
     debug("running filter %s (%s)", filter->name, ftokens[filter->type]);
-    filter_result_t res = runners[filter->type](filter, query);
+    filter_result_t res = runners[filter->type](filter, query, context);
+
+    context->current_filter = NULL;
 
+    debug("filter run, result is %s", htokens[res]);
     if (res == HTK_ABORT) {
         return NULL;
     }
-    debug("filter run, result is %s", htokens[res]);
+    if (res == HTK_ASYNC) {
+        context->current_filter = filter;
+        return &async_hook;
+    }
 
     while (start < end) {
         int mid = (start + end) / 2;
@@ -192,9 +216,10 @@ const filter_hook_t *filter_run(const filter_t *filter, const query_t *query)
     return &default_hook;
 }
 
-bool filter_test(const filter_t *filter, const query_t *query, filter_result_t result)
+bool filter_test(const filter_t *filter, const query_t *query,
+                 filter_context_t *context, filter_result_t result)
 {
-    return !!(runners[filter->type](filter, query) == result);
+    return !!(runners[filter->type](filter, query, context) == result);
 }
 
 void filter_set_name(filter_t *filter, const char *name, int len)
@@ -242,9 +267,30 @@ bool filter_add_hook(filter_t *filter, const char *name, int name_len,
             htokens[hook.type], ftokens[filter->type]);
         return false;
     }
+    hook.async   = false;
     hook.postfix = (strncmp(value, "postfix:", 8) == 0);
     hook.value = m_strdup(hook.postfix ? value + 8 : value);
     hook.filter_id = -1;
     array_add(filter->hooks, hook);
     return true;
 }
+
+void filter_context_prepare(filter_context_t *context, void *qctx)
+{
+    for (int i = 0 ; i < FTK_count ; ++i) {
+        if (ctx_constructors[i] != NULL) {
+            context->contexts[i] = ctx_constructors[i]();
+        }
+    }
+    context->current_filter = NULL;
+    context->data = qctx;
+}
+
+void filter_context_wipe(filter_context_t *context)
+{
+    for (int i = 0 ; i < FTK_count ; ++i) {
+        if (ctx_destructors[i] != NULL) {
+            ctx_destructors[i](context->contexts[i]);
+        }
+    }
+}
index 3047803..f551e58 100644 (file)
@@ -51,7 +51,8 @@ typedef struct filter_hook_t {
     filter_result_t type;
     char *value;
 
-    bool postfix;
+    unsigned postfix:1;
+    unsigned async:1;
     int filter_id;
 } filter_hook_t;
 ARRAY(filter_hook_t)
@@ -63,6 +64,8 @@ typedef struct filter_param_t {
 } filter_param_t;
 ARRAY(filter_param_t)
 
+/** Description of a filter.
+ */
 typedef struct filter_t {
     char *name;
     filter_type_t type;
@@ -78,6 +81,17 @@ typedef struct filter_t {
 } filter_t;
 ARRAY(filter_t)
 
+/** Context of the query. To be filled with data to use when
+ * performing asynchronous filtering.
+ */
+typedef struct filter_context_t {
+    const filter_t *current_filter;
+    void *contexts[FTK_count];
+
+    void *data;
+} filter_context_t;
+
+
 #define FILTER_INIT { NULL, FTK_UNKNOWN, ARRAY_INIT, NULL, ARRAY_INIT, -1 }
 #define CHECK_FILTER(Filter)                                                   \
     assert(Filter != FTK_UNKNOWN && Filter != FTK_count                        \
@@ -89,14 +103,28 @@ ARRAY(filter_t)
     assert(Param != ATK_UNKNOWN && Param != ATK_count                          \
            && "Unknown param")
 
+
+/* Callback to be implemented by a filter.
+ */
+
 typedef filter_result_t (*filter_runner_t)(const filter_t *filter,
-                                           const query_t *query);
+                                           const query_t *query,
+                                           filter_context_t *context);
 typedef bool (*filter_constructor_t)(filter_t *filter);
 typedef void (*filter_destructor_t)(filter_t *filter);
 
+typedef void *(*filter_context_constructor_t)(void);
+typedef void (*filter_context_destructor_t)(void*);
+
+
+/* Registration.
+ */
+
 __attribute__((nonnull(1,4)))
 filter_type_t filter_register(const char *type, filter_constructor_t constructor,
-                              filter_destructor_t destructor, filter_runner_t runner);
+                              filter_destructor_t destructor, filter_runner_t runner,
+                              filter_context_constructor_t context_constructor,
+                              filter_context_destructor_t context_destructor);
 
 __attribute__((nonnull(2)))
 filter_result_t filter_hook_register(filter_type_t filter, const char *name);
@@ -104,6 +132,10 @@ filter_result_t filter_hook_register(filter_type_t filter, const char *name);
 __attribute__((nonnull(2)))
 filter_param_id_t filter_param_register(filter_type_t filter, const char *name);
 
+
+/* Filter builder.
+ */
+
 __attribute__((nonnull(1)))
 static inline void filter_init(filter_t *filter)
 {
@@ -170,14 +202,20 @@ static inline void filter_params_wipe(filter_param_t *param)
 __attribute__((nonnull(1)))
 void filter_wipe(filter_t *filter);
 
+
+/* Runner.
+ */
+
 __attribute__((nonnull(1,2)))
-const filter_hook_t *filter_run(const filter_t *filter, const query_t *query);
+const filter_hook_t *filter_run(const filter_t *filter, const query_t *query,
+                                filter_context_t *context);
 
 __attribute__((nonnull(1,2)))
-bool filter_test(const filter_t *filter, const query_t *query, filter_result_t expt);
+bool filter_test(const filter_t *filter, const query_t *query,
+                 filter_context_t *context, filter_result_t expt);
 
 
-/* Helpers
+/* Parsing Helpers
  */
 
 #define FILTER_PARAM_PARSE_STRING(Param, Dest)                                 \
@@ -216,4 +254,14 @@ bool filter_test(const filter_t *filter, const query_t *query, filter_result_t e
         }                                                                      \
     } break
 
+
+/* Filter context
+ */
+
+__attribute__((nonnull))
+void filter_context_prepare(filter_context_t *context, void* qctx);
+
+__attribute__((nonnull))
+void filter_context_wipe(filter_context_t *context);
+
 #endif
index a2b3346..4cbe8b9 100644 (file)
@@ -458,7 +458,8 @@ static void greylist_filter_destructor(filter_t *filter)
 }
 
 static filter_result_t greylist_filter(const filter_t *filter,
-                                       const query_t *query)
+                                       const query_t *query,
+                                       filter_context_t *context)
 {
     const greylist_config_t *config = filter->data;
     if (query->state != SMTP_RCPT) {
@@ -475,7 +476,7 @@ static int greylist_init(void)
 {
     filter_type_t type =  filter_register("greylist", greylist_filter_constructor,
                                           greylist_filter_destructor,
-                                          greylist_filter);
+                                          greylist_filter, NULL, NULL);
     /* Hooks.
      */
     (void)filter_hook_register(type, "abort");
index ebd29f8..867f88b 100644 (file)
@@ -391,7 +391,8 @@ static void rbl_filter_destructor(filter_t *filter)
     filter->data = data;
 }
 
-static filter_result_t rbl_filter(const filter_t *filter, const query_t *query)
+static filter_result_t rbl_filter(const filter_t *filter, const query_t *query,
+                                  filter_context_t *context)
 {
     uint32_t ip;
     int32_t sum = 0;
@@ -450,7 +451,8 @@ static filter_result_t rbl_filter(const filter_t *filter, const query_t *query)
 static int rbl_init(void)
 {
     filter_type_t type =  filter_register("iplist", rbl_filter_constructor,
-                                          rbl_filter_destructor, rbl_filter);
+                                          rbl_filter_destructor, rbl_filter,
+                                          NULL, NULL);
     /* Hooks.
      */
     (void)filter_hook_register(type, "abort");
index 57e1e25..6773070 100644 (file)
@@ -41,8 +41,8 @@
 #include "epoll.h"
 #include "policy_tokens.h"
 #include "server.h"
-#include "query.h"
 #include "config.h"
+#include "postlicyd.h"
 
 #define DAEMON_NAME             "postlicyd"
 #define DAEMON_VERSION          "0.2"
@@ -54,7 +54,18 @@ DECLARE_MAIN
 
 static void *query_starter(server_t* server)
 {
-    return query_new();
+    query_context_t *context = p_new(query_context_t, 1);
+    filter_context_prepare(&context->context, context);
+    return context;
+}
+
+static void query_stopper(void *data)
+{
+    query_context_t **context = data;
+    if (*context) {
+        filter_context_wipe(&(*context)->context);
+        p_delete(context);
+    }
 }
 
 static bool config_refresh(void *config)
@@ -66,7 +77,8 @@ __attribute__((format(printf,2,0)))
 static void policy_answer(server_t *pcy, const char *fmt, ...)
 {
     va_list args;
-    const query_t* query = pcy->data;
+    query_context_t *context = pcy->data;
+    const query_t* query = &context->query;
 
     buffer_addstr(&pcy->obuf, "action=");
     va_start(args, fmt);
@@ -79,21 +91,34 @@ static void policy_answer(server_t *pcy, const char *fmt, ...)
 
 static bool policy_process(server_t *pcy, const config_t *config)
 {
-    const query_t* query = pcy->data;
+    query_context_t *context = pcy->data;
+    const query_t* query = &context->query;
     const filter_t *filter;
     if (config->entry_points[query->state] == -1) {
         warn("no filter defined for current protocol_state (%d)", query->state);
         return false;
     }
-    filter = array_ptr(config->filters, config->entry_points[query->state]);
+    if (context->context.current_filter != NULL) {
+        filter = context->context.current_filter;
+    } else {
+        filter = array_ptr(config->filters, config->entry_points[query->state]);
+    }
     while (true) {
-        const filter_hook_t *hook = filter_run(filter, query);
+        const filter_hook_t *hook = filter_run(filter, query, &context->context);
         if (hook == NULL) {
             warn("request client=%s, from=<%s>, to=<%s>: aborted",
                  query->client_name,
                  query->sender == NULL ? "undefined" : query->sender,
                  query->recipient == NULL ? "undefined" : query->recipient);
             return false;
+        } else if (hook->async) {
+            debug("request client=%s, from=<%s>, to=<%s>: "
+                  "asynchronous filter from filter %s",
+                   query->client_name,
+                   query->sender == NULL ? "undefined" : query->sender,
+                   query->recipient == NULL ? "undefined" : query->recipient,
+                   filter->name);
+            return true;
         } else if (hook->postfix) {
             info("request client=%s, from=<%s>, to=<%s>: "
                  "awswer %s from filter %s: \"%s\"",
@@ -121,7 +146,8 @@ static int policy_run(server_t *pcy, void* vconfig)
     int search_offs = MAX(0, (int)(pcy->ibuf.len - 1));
     int nb = buffer_read(&pcy->ibuf, pcy->fd, -1);
     const char *eoq;
-    query_t  *query  = pcy->data;
+    query_context_t *context = pcy->data;
+    query_t  *query  = &context->query;
     const config_t *config = vconfig;
 
     if (nb < 0) {
@@ -146,6 +172,15 @@ static int policy_run(server_t *pcy, void* vconfig)
     return policy_process(pcy, config) ? 0 : -1;
 }
 
+static bool policy_event(server_t *event, void *config)
+{
+    if (!policy_process(event, config)) {
+        server_release(event);
+        return true;
+    }
+    return true;
+}
+
 int start_listener(int port)
 {
     return start_server(port, NULL, NULL);
@@ -239,7 +274,7 @@ int main(int argc, char *argv[])
     if (start_listener(config->port) < 0) {
         return EXIT_FAILURE;
     } else {
-        return server_loop(query_starter, (delete_client_t)query_delete,
-                           policy_run, NULL, config_refresh, config);
+        return server_loop(query_starter, query_stopper,
+                           policy_run, policy_event, config_refresh, config);
     }
 }
index d50b12a..ebee763 100644 (file)
@@ -262,7 +262,8 @@ static inline bool match_condition(const match_condition_t *cond, const query_t
     return true;
 }
 
-static filter_result_t match_filter(const filter_t *filter, const query_t *query)
+static filter_result_t match_filter(const filter_t *filter, const query_t *query,
+                                    filter_context_t *context)
 {
     const match_config_t *config = filter->data;
     foreach (const match_condition_t *condition, config->conditions) {
@@ -287,7 +288,8 @@ static filter_result_t match_filter(const filter_t *filter, const query_t *query
 static int match_init(void)
 {
     filter_type_t type =  filter_register("match", match_filter_constructor,
-                                          match_filter_destructor, match_filter);
+                                          match_filter_destructor, match_filter,
+                                          NULL, NULL);
     /* Hooks.
      */
     (void)filter_hook_register(type, "abort");
diff --git a/postlicyd/postlicyd.h b/postlicyd/postlicyd.h
new file mode 100644 (file)
index 0000000..d6af4b2
--- /dev/null
@@ -0,0 +1,47 @@
+/******************************************************************************/
+/*          pfixtools: a collection of postfix related tools                  */
+/*          ~~~~~~~~~                                                         */
+/*  ________________________________________________________________________  */
+/*                                                                            */
+/*  Redistribution and use in source and binary forms, with or without        */
+/*  modification, are permitted provided that the following conditions        */
+/*  are met:                                                                  */
+/*                                                                            */
+/*  1. Redistributions of source code must retain the above copyright         */
+/*     notice, this list of conditions and the following disclaimer.          */
+/*  2. Redistributions in binary form must reproduce the above copyright      */
+/*     notice, this list of conditions and the following disclaimer in the    */
+/*     documentation and/or other materials provided with the distribution.   */
+/*  3. The names of its contributors may not be used to endorse or promote    */
+/*     products derived from this software without specific prior written     */
+/*     permission.                                                            */
+/*                                                                            */
+/*  THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND   */
+/*  ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE     */
+/*  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR        */
+/*  PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS    */
+/*  BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR    */
+/*  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF      */
+/*  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS  */
+/*  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN   */
+/*  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)   */
+/*  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF    */
+/*  THE POSSIBILITY OF SUCH DAMAGE.                                           */
+/******************************************************************************/
+
+/*
+ * Copyright © 2008 Florent Bruneau
+ */
+
+#ifndef PFIXTOOLS_POSTLICYD_H
+#define PFIXTOOLS_POSTLICYD_H
+
+#include "query.h"
+#include "filter.h"
+
+typedef struct query_context_t {
+    query_t query;
+    filter_context_t context;
+} query_context_t;
+
+#endif
index 87ee01e..86e41ba 100644 (file)
@@ -94,18 +94,6 @@ typedef struct query_t {
     const char *eoq;
 } query_t;
 
-static inline query_t *query_new(void)
-{
-    return p_new(query_t, 1);
-}
-
-static inline void query_delete(query_t **query)
-{
-    if (*query) {
-        p_delete(query);
-    }
-}
-
 /** Parse the content of the text to fill the query.
  * The text pointed by \p p is segmented (and modified to add
  * a \0 at the end of each segment) and used to fill the query
index 0c790cb..4b5eeb5 100644 (file)
@@ -500,7 +500,8 @@ static void strlist_filter_destructor(filter_t *filter)
     filter->data = config;
 }
 
-static filter_result_t strlist_filter(const filter_t *filter, const query_t *query)
+static filter_result_t strlist_filter(const filter_t *filter, const query_t *query,
+                                      filter_context_t *context)
 {
     char reverse[BUFSIZ];
     char normal[BUFSIZ];
@@ -600,7 +601,8 @@ static filter_result_t strlist_filter(const filter_t *filter, const query_t *que
 static int strlist_init(void)
 {
     filter_type_t type =  filter_register("strlist", strlist_filter_constructor,
-                                          strlist_filter_destructor, strlist_filter);
+                                          strlist_filter_destructor, strlist_filter,
+                                          NULL, NULL);
     /* Hooks.
      */
     (void)filter_hook_register(type, "abort");