Asynchronous DNS queries on iplist.
authorFlorent Bruneau <florent.bruneau@polytechnique.org>
Sat, 11 Oct 2008 15:20:54 +0000 (17:20 +0200)
committerFlorent Bruneau <florent.bruneau@polytechnique.org>
Sat, 11 Oct 2008 15:20:54 +0000 (17:20 +0200)
Signed-off-by: Florent Bruneau <florent.bruneau@polytechnique.org>
common/rbl.c
common/rbl.h
common/server.c
common/server.h
pfix-srsd/main-srsd.c
postlicyd/Makefile
postlicyd/filter.c
postlicyd/filter.h
postlicyd/iplist.c
postlicyd/main-postlicyd.c
postlicyd/postlicyd.h

index 1f2f442..f0b5d55 100644 (file)
  * Copyright © 2008 Florent Bruneau
  */
 
+#include <unbound.h>
 #include <netdb.h>
+#include "array.h"
+#include "epoll.h"
+#include "server.h"
 #include "rbl.h"
 
-static inline rbl_result_t rbl_dns_check(const char *hostname)
+
+typedef struct rbl_context_t {
+    rbl_result_t *result;
+    rbl_result_callback_t call;
+    void *data;
+} rbl_context_t;
+ARRAY(rbl_context_t);
+
+static struct ub_ctx *ctx = NULL;
+static PA(rbl_context_t) ctx_pool = ARRAY_INIT;
+
+static rbl_context_t *rbl_context_new(void)
+{
+    return p_new(rbl_context_t, 1);
+}
+
+static void rbl_context_delete(rbl_context_t **context)
+{
+    if (*context) {
+        p_delete(context);
+    }
+}
+
+static void rbl_context_wipe(rbl_context_t *context)
+{
+    p_clear(context, 1);
+}
+
+static rbl_context_t *rbl_context_acquire(void)
+{
+    if (array_len(ctx_pool) > 0) {
+        return array_pop_last(ctx_pool);
+    } else {
+        return rbl_context_new();
+    }
+}
+
+static void rbl_context_release(rbl_context_t *context)
+{
+    rbl_context_wipe(context);
+    array_add(ctx_pool, context);
+}
+
+static void rbl_exit(void)
+{
+    if (ctx != NULL) {
+        ub_ctx_delete(ctx);
+        ctx = NULL;
+    }
+    array_deep_wipe(ctx_pool, rbl_context_delete);
+}
+module_exit(rbl_exit);
+
+static void rbl_callback(void *arg, int err, struct ub_result *result)
 {
-    debug("looking up for %s", hostname);
-    struct hostent *host = gethostbyname(hostname);
-    if (host != NULL) {
-        debug("host found");
-        return RBL_FOUND;
+    rbl_context_t *context = arg;
+    if (err != 0) {
+        debug("asynchronous request led to an error");
+        *context->result = RBL_ERROR;
+    } else if (result->nxdomain) {
+        debug("asynchronous request done, %s NOT FOUND", result->qname);
+        *context->result = RBL_NOTFOUND;
+    } else {
+        debug("asynchronous request done, %s FOUND", result->qname);
+        *context->result = RBL_FOUND;
+    }
+    if (context->call != NULL) {
+        debug("calling callback");
+        context->call(context->result, context->data);
     } else {
-        if (h_errno == HOST_NOT_FOUND) {
-            debug("host not found: %s", hostname);
-            return RBL_NOTFOUND;
+        debug("no callback defined");
+    }
+    ub_resolve_free(result);
+    rbl_context_release(context);
+}
+
+static int rbl_handler(server_t *event, void *config)
+{
+    int retval = 0;
+    debug("rbl_handler called: ub_fd triggered");
+    epoll_modify(event->fd, 0, event);
+    if ((retval = ub_process(ctx)) != 0) {
+        err("error in DNS resolution: %s", ub_strerror(retval));
+    }
+    epoll_modify(event->fd, EPOLLIN, event);
+    return 0;
+}
+
+static inline bool rbl_dns_check(const char *hostname, rbl_result_t *result,
+                                 rbl_result_callback_t callback, void *data)
+{
+    if (ctx == NULL) {
+        ctx = ub_ctx_create();
+        ub_ctx_async(ctx, true);
+        if (server_register(ub_fd(ctx), rbl_handler, NULL) == NULL) {
+            crit("cannot register asynchronous DNS event handler");
+            abort();
         }
-        debug("dns error: %m");
-        return RBL_ERROR;
+    }
+    rbl_context_t *context = rbl_context_acquire();
+    context->result = result;
+    context->call   = callback;
+    context->data   = data;
+    if (ub_resolve_async(ctx, (char*)hostname, 1, 1, context, rbl_callback, NULL) == 0) {
+        *result = RBL_ASYNC;
+        return true;
+    } else {
+        *result = RBL_ERROR;
+        rbl_context_release(context);
+        return false;
     }
 }
 
-rbl_result_t rbl_check(const char *rbl, uint32_t ip)
+bool rbl_check(const char *rbl, uint32_t ip, rbl_result_t *result,
+               rbl_result_callback_t callback, void *data)
 {
     char host[257];
     int len;
@@ -65,10 +166,11 @@ rbl_result_t rbl_check(const char *rbl, uint32_t ip)
         return RBL_ERROR;
     if (host[len - 2] == '.')
         host[len - 1] = '\0';
-    return rbl_dns_check(host);
+    return rbl_dns_check(host, result, callback, data);
 }
 
-rbl_result_t rhbl_check(const char *rhbl, const char *hostname)
+bool rhbl_check(const char *rhbl, const char *hostname, rbl_result_t *result,
+                rbl_result_callback_t callback, void *data)
 {
     char host[257];
     int len;
@@ -78,5 +180,5 @@ rbl_result_t rhbl_check(const char *rhbl, const char *hostname)
         return RBL_ERROR;
     if (host[len - 2] == '.')
         host[len - 1] = '\0';
-    return rbl_dns_check(host);
+    return rbl_dns_check(host, result, callback, data);
 }
index 4a1e763..cefa5da 100644 (file)
 #include "common.h"
 
 typedef enum {
+  RBL_ASYNC,
   RBL_ERROR,
   RBL_FOUND,
   RBL_NOTFOUND,
 } rbl_result_t;
+ARRAY(rbl_result_t);
+
+typedef void (*rbl_result_callback_t)(rbl_result_t *result, void *data);
 
 /** Check the presence of the given IP in the given rbl.
  */
-__attribute__((nonnull(1)))
-rbl_result_t rbl_check(const char *rbl, uint32_t ip);
+__attribute__((nonnull(1,3)))
+bool rbl_check(const char *rbl, uint32_t ip, rbl_result_t *result,
+               rbl_result_callback_t callback, void *data);
 
 /** Check the presence of the given hostname in the given rhbl.
  */
-__attribute__((nonnull(1,2)))
-rbl_result_t rhbl_check(const char *rhbl, const char *hostname);
+__attribute__((nonnull(1,2,3)))
+bool rhbl_check(const char *rhbl, const char *hostname, rbl_result_t *result,
+                rbl_result_callback_t callback, void *data);
 
 #endif
index a085c40..13de783 100644 (file)
@@ -44,22 +44,17 @@ static server_t* server_new(void)
 {
     server_t* server = p_new(server_t, 1);
     server->fd  = -1;
-    server->fd2 = -1;
     return server;
 }
 
 static void server_wipe(server_t *server)
 {
-    server->listener = server->event = false;
-    if (server->fd > 0) {
-        epoll_modify(server->fd, 0, NULL);
+    server->listener = false;
+    if (server->fd >= 0) {
+        epoll_unregister(server->fd);
         close(server->fd);
         server->fd = -1;
     }
-    if (server->fd2 > 0) {
-        close(server->fd2);
-        server->fd2 = -1;
-    }
     if (server->data && server->clear_data) {
         server->clear_data(&server->data);
     }
@@ -126,8 +121,8 @@ int start_server(int port, start_listener_t starter, delete_client_t deleter)
     tmp             = server_acquire();
     tmp->fd         = sock;
     tmp->listener   = true;
-    tmp->event      = false;
     tmp->data       = data;
+    tmp->run        = NULL;
     tmp->clear_data = deleter;
     epoll_register(sock, EPOLLIN, tmp);
     array_add(listeners, tmp);
@@ -135,7 +130,7 @@ int start_server(int port, start_listener_t starter, delete_client_t deleter)
 }
 
 static int start_client(server_t *server, start_client_t starter,
-                        delete_client_t deleter)
+                        run_client_t runner, delete_client_t deleter)
 {
     server_t *tmp;
     void* data = NULL;
@@ -157,73 +152,32 @@ static int start_client(server_t *server, start_client_t starter,
 
     tmp             = server_acquire();
     tmp->listener   = false;
-    tmp->event      = false;
     tmp->fd         = sock;
     tmp->data       = data;
+    tmp->run        = runner;
     tmp->clear_data = deleter;
     epoll_register(sock, EPOLLIN, tmp);
     return 0;
 }
 
-server_t * event_register(int fd, void *data)
+server_t *server_register(int fd, run_client_t runner, void *data)
 {
-    int fds[2];
-    if (fd == -1) {
-        if (pipe(fds) != 0) {
-            UNIXERR("pipe");
-            return NULL;
-        }
-        if (setnonblock(fds[0]) != 0) {
-            close(fds[0]);
-            close(fds[1]);
-            return NULL;
-        }
+    if (fd < 0) {
+        return NULL;
     }
 
-    server_t *tmp = server_acquire();
-    tmp->listener = false;
-    tmp->event = true;
-    tmp->fd    = fd == -1 ? fds[0] : fd;
-    tmp->fd2   = fd == -1 ? fds[1] : -1;
-    tmp->data  = data;
-    epoll_register(fds[0], EPOLLIN, tmp);
+    server_t *tmp   = server_acquire();
+    tmp->listener   = false;
+    tmp->fd         = fd;
+    tmp->data       = data;
+    tmp->run        = runner;
+    tmp->clear_data = NULL;
+    epoll_register(fd, EPOLLIN, tmp);
     return tmp;
 }
 
-bool event_fire(server_t *event)
-{
-    static const char *data = "";
-    if (event->fd2 == -1) {
-        return false;
-    }
-    return write(event->fd2, data, 1) == 0;
-}
-
-bool event_cancel(server_t *event)
-{
-    char buff[32];
-    while (true) {
-        ssize_t res = read(event->fd, buff, 32);
-        if (res == -1 && errno != EAGAIN && errno != EINTR) {
-            UNIXERR("read");
-            return false;
-        } else if (res == -1 && errno == EINTR) {
-            continue;
-        } else if (res != 32) {
-            return true;
-        }
-    }
-}
-
-void event_release(server_t *event)
-{
-    epoll_unregister(event->fd);
-    server_release(event);
-}
-
 int server_loop(start_client_t starter, delete_client_t deleter,
-                run_client_t runner, event_handler_t handler,
-                refresh_t refresh, void* config)
+                run_client_t runner, refresh_t refresh, void* config)
 {
     info("entering processing loop");
     while (!sigint) {
@@ -253,21 +207,12 @@ int server_loop(start_client_t starter, delete_client_t deleter,
             server_t *d = evts[n].data.ptr;
 
             if (d->listener) {
-                (void)start_client(d, starter, deleter);
-                continue;
-            } else if (d->event) {
-                if (handler) {
-                    if (!handler(d, config)) {
-                        event_release(d);
-                    }
-                } else {
-                    event_release(d);
-                }
+                (void)start_client(d, starter, runner, deleter);
                 continue;
             }
 
             if (evts[n].events & EPOLLIN) {
-                if (runner(d, config) < 0) {
+                if (d->run(d, config) < 0) {
                     server_release(d);
                     continue;
                 }
index 85cbae4..33d7157 100644 (file)
@@ -51,14 +51,13 @@ typedef bool  (*event_handler_t)(server_t *, void*);
 
 struct server_t {
     unsigned listener : 1;
-    unsigned event    : 1;
 
     int fd;
-    int fd2;
 
     buffer_t ibuf;
     buffer_t obuf;
 
+    run_client_t run;
     delete_client_t clear_data;
     void* data;
 };
@@ -66,16 +65,10 @@ ARRAY(server_t);
 
 int start_server(int port, start_listener_t starter, delete_client_t deleter);
 
+server_t *server_register(int fd, run_client_t runner, void *data);
 void server_release(server_t *server);
 
-server_t *event_register(int fd, void *data);
-bool event_fire(server_t *event);
-bool event_cancel(server_t *event);
-void event_release(server_t *event);
-#define event_data(event) ((event)->data)
-
 int server_loop(start_client_t starter, delete_client_t deleter,
-                run_client_t runner, event_handler_t handler,
-                refresh_t refresh, void *config);
+                run_client_t runner, refresh_t refresh, void *config);
 
 #endif
index 98633e6..037a458 100644 (file)
@@ -328,5 +328,5 @@ int main(int argc, char *argv[])
         || start_listener(port_dec, true) < 0) {
         return EXIT_FAILURE;
     }
-    return server_loop(srsd_starter, NULL, process_srs, NULL, NULL, &config);
+    return server_loop(srsd_starter, NULL, process_srs, NULL, &config);
 }
index 8ffe7cd..380b83a 100644 (file)
@@ -38,15 +38,17 @@ GENERATED = policy_tokens.h policy_tokens.c \
                                                param_tokens.h param_tokens.c
 TESTS     = tst-rbl tst-filters tst-greylist
 
-FILTERS                = iplist.c greylist.c strlist.c match.c
+UB_LIBS   = -L/usr/local/lib -lunbound
+
+FILTERS                = iplist.c greylist.c match.c
 
 postlicyd_SOURCES = main-postlicyd.c ../common/lib.a filter.c config.c query.c $(FILTERS) $(GENERATED)
-postlicyd_LIBADD  = $(TC_LIBS)
+postlicyd_LIBADD  = $(UB_LIBS) $(TC_LIBS)
 
 tst-rbl_SOURCES   = tst-rbl.c ../common/lib.a filter.c config.c query.c iplist.c $(GENERATED)
 
 tst-filters_SOURCES = tst-filters.c ../common/lib.a config.c filter.c query.c $(FILTERS) $(GENERATED)
-tst-filters_LIBADD  = $(TC_LIBS)
+tst-filters_LIBADD  = $(UB_LIBS) $(TC_LIBS)
 
 tst-greylist_SOURCES = tst-greylist.c ../common/lib.a
 tst-greylist_LIBADD  = $(TC_LIBS)
index a150d6d..7861af4 100644 (file)
@@ -45,6 +45,7 @@ static bool                 params[FTK_count][ATK_count];
 
 static filter_context_constructor_t ctx_constructors[FTK_count];
 static filter_context_destructor_t  ctx_destructors[FTK_count];
+static filter_async_handler_t       async_handler = NULL;
 
 static const filter_hook_t default_hook = {
     .type      = 0,
@@ -101,6 +102,11 @@ filter_param_id_t filter_param_register(filter_type_t filter,
     return tok;
 }
 
+void filter_async_handler_register(filter_async_handler_t handler)
+{
+    async_handler = handler;
+}
+
 bool filter_build(filter_t *filter)
 {
     bool ret = true;
@@ -180,22 +186,16 @@ void filter_wipe(filter_t *filter)
     p_delete(&filter->name);
 }
 
-const filter_hook_t *filter_run(const filter_t *filter, const query_t *query,
-                                filter_context_t *context)
+static inline const filter_hook_t *filter_hook_for_result(const filter_t *filter,
+                                                          filter_result_t res)
 {
     int start = 0;
     int end   = filter->hooks.len;
-    debug("running filter %s (%s)", filter->name, ftokens[filter->type]);
-    filter_result_t res = runners[filter->type](filter, query, context);
 
-    context->current_filter = NULL;
-
-    debug("filter run, result is %s", htokens[res]);
     if (res == HTK_ABORT) {
         return NULL;
     }
     if (res == HTK_ASYNC) {
-        context->current_filter = filter;
         return &async_hook;
     }
 
@@ -216,6 +216,22 @@ const filter_hook_t *filter_run(const filter_t *filter, const query_t *query,
     return &default_hook;
 }
 
+const filter_hook_t *filter_run(const filter_t *filter, const query_t *query,
+                                filter_context_t *context)
+{
+    debug("running filter %s (%s)", filter->name, ftokens[filter->type]);
+    filter_result_t res = runners[filter->type](filter, query, context);
+
+    if (res == HTK_ASYNC) {
+        context->current_filter = filter;
+    } else {
+        context->current_filter = NULL;
+    }
+
+    debug("filter run, result is %s", htokens[res]);
+    return filter_hook_for_result(filter, res);
+}
+
 bool filter_test(const filter_t *filter, const query_t *query,
                  filter_context_t *context, filter_result_t result)
 {
@@ -294,3 +310,15 @@ void filter_context_wipe(filter_context_t *context)
         }
     }
 }
+
+void filter_post_async_result(filter_context_t *context, filter_result_t result)
+{
+    const filter_t *filter = context->current_filter;
+    const filter_hook_t *hook = NULL;
+
+    if (result == HTK_ASYNC) {
+        return;
+    }
+    hook = filter_hook_for_result(filter, result);
+    async_handler(context, hook);
+}
index f551e58..f4511dd 100644 (file)
@@ -116,6 +116,8 @@ typedef void (*filter_destructor_t)(filter_t *filter);
 typedef void *(*filter_context_constructor_t)(void);
 typedef void (*filter_context_destructor_t)(void*);
 
+typedef void (*filter_async_handler_t)(filter_context_t *context,
+                                       const filter_hook_t *result);
 
 /* Registration.
  */
@@ -132,6 +134,8 @@ filter_result_t filter_hook_register(filter_type_t filter, const char *name);
 __attribute__((nonnull(2)))
 filter_param_id_t filter_param_register(filter_type_t filter, const char *name);
 
+__attribute__((nonnull))
+void filter_async_handler_register(filter_async_handler_t handler);
 
 /* Filter builder.
  */
@@ -264,4 +268,7 @@ void filter_context_prepare(filter_context_t *context, void* qctx);
 __attribute__((nonnull))
 void filter_context_wipe(filter_context_t *context);
 
+__attribute__((nonnull))
+void filter_post_async_result(filter_context_t *context, filter_result_t result);
+
 #endif
index 867f88b..c2b471b 100644 (file)
@@ -225,7 +225,7 @@ bool rbldb_ipv4_lookup(const rbldb_t *db, uint32_t ip)
 
 #include "filter.h"
 
-typedef struct rbl_filter_t {
+typedef struct iplist_filter_t {
     PA(rbldb_t) rbls;
     A(int)      weights;
     A(char)     hosts;
@@ -234,14 +234,23 @@ typedef struct rbl_filter_t {
 
     int32_t     hard_threshold;
     int32_t     soft_threshold;
-} rbl_filter_t;
+} iplist_filter_t;
 
-static rbl_filter_t *rbl_filter_new(void)
+typedef struct iplist_async_data_t {
+    A(rbl_result_t) results;
+    int awaited;
+    uint32_t sum;
+    bool error;
+} iplist_async_data_t;
+
+static filter_type_t filter_type = FTK_UNKNOWN;
+
+static iplist_filter_t *iplist_filter_new(void)
 {
-    return p_new(rbl_filter_t, 1);
+    return p_new(iplist_filter_t, 1);
 }
 
-static void rbl_filter_delete(rbl_filter_t **rbl)
+static void iplist_filter_delete(iplist_filter_t **rbl)
 {
     if (*rbl) {
         array_deep_wipe((*rbl)->rbls, rbldb_delete);
@@ -254,14 +263,14 @@ static void rbl_filter_delete(rbl_filter_t **rbl)
 }
 
 
-static bool rbl_filter_constructor(filter_t *filter)
+static bool iplist_filter_constructor(filter_t *filter)
 {
-    rbl_filter_t *data = rbl_filter_new();
+    iplist_filter_t *data = iplist_filter_new();
 
 #define PARSE_CHECK(Expr, Str, ...)                                            \
     if (!(Expr)) {                                                             \
         err(Str, ##__VA_ARGS__);                                               \
-        rbl_filter_delete(&data);                                              \
+        iplist_filter_delete(&data);                                              \
         return false;                                                          \
     }
 
@@ -384,20 +393,66 @@ static bool rbl_filter_constructor(filter_t *filter)
     return true;
 }
 
-static void rbl_filter_destructor(filter_t *filter)
+static void iplist_filter_destructor(filter_t *filter)
 {
-    rbl_filter_t *data = filter->data;
-    rbl_filter_delete(&data);
+    iplist_filter_t *data = filter->data;
+    iplist_filter_delete(&data);
     filter->data = data;
 }
 
-static filter_result_t rbl_filter(const filter_t *filter, const query_t *query,
-                                  filter_context_t *context)
+static void iplist_filter_async(rbl_result_t *result, void *arg)
+{
+    filter_context_t   *context = arg;
+    const filter_t      *filter = context->current_filter;
+    const iplist_filter_t *data = filter->data;
+    iplist_async_data_t  *async = context->contexts[filter_type];
+
+
+    if (*result != RBL_ERROR) {
+        async->error = false;
+    }
+    --async->awaited;
+
+    debug("got asynchronous request result for filter %s, rbl %d, still awaiting %d answers",
+          filter->name, result - array_ptr(async->results, 0), async->awaited);
+
+    if (async->awaited == 0) {
+        filter_result_t res = HTK_FAIL;
+        if (async->error) {
+            res = HTK_ERROR;
+        } else {
+            for (uint32_t i = 0 ; i < array_len(data->host_offsets) ; ++i) {
+                int weight = array_elt(data->host_weights, i);
+
+                switch (array_elt(async->results, i)) {
+                  case RBL_ASYNC:
+                    crit("no more awaited answer but result is ASYNC");
+                    abort();
+                  case RBL_FOUND:
+                    async->sum += weight;
+                    break;
+                  default:
+                    break;
+                }
+            }
+            if (async->sum >= (uint32_t)data->hard_threshold) {
+                res = HTK_HARD_MATCH;
+            } else if (async->sum >= (uint32_t)data->soft_threshold) {
+                res = HTK_SOFT_MATCH;
+            }
+        }
+        debug("answering to filter %s", filter->name);
+        filter_post_async_result(context, res);
+    }
+}
+
+static filter_result_t iplist_filter(const filter_t *filter, const query_t *query,
+                                     filter_context_t *context)
 {
     uint32_t ip;
     int32_t sum = 0;
     const char *end = NULL;
-    const rbl_filter_t *data = filter->data;
+    const iplist_filter_t *data = filter->data;
     bool  error = true;
 
     if (parse_ipv4(query->client_address, &end, &ip) != 0) {
@@ -416,24 +471,22 @@ static filter_result_t rbl_filter(const filter_t *filter, const query_t *query,
         }
         error = false;
     }
-    for (uint32_t i = 0 ; i < data->host_offsets.len ; ++i) {
-        const char *rbl = array_ptr(data->hosts, array_elt(data->host_offsets, i));
-        int weight      = array_elt(data->host_weights, i);
-        switch (rbl_check(rbl, ip)) {
-          case RBL_FOUND:
-            error = false;
-            sum += weight;
-            if (sum >= data->hard_threshold) {
-                return HTK_HARD_MATCH;
+    if (array_len(data->host_offsets) > 0) {
+        iplist_async_data_t* async = context->contexts[filter_type];
+        array_ensure_exact_capacity(async->results, array_len(data->host_offsets));
+        async->sum = sum;
+        async->awaited = 0;
+        for (uint32_t i = 0 ; i < data->host_offsets.len ; ++i) {
+            const char *rbl = array_ptr(data->hosts, array_elt(data->host_offsets, i));
+            if (rbl_check(rbl, ip, array_ptr(async->results, i),
+                          iplist_filter_async, context)) {
+                error = false;
+                ++async->awaited;
             }
-            break;
-          case RBL_NOTFOUND:
-            error = false;
-            break;
-          case RBL_ERROR:
-            warn("rbl %s unavailable", rbl);
-            break;
         }
+        debug("filter %s awaiting %d asynchronous queries", filter->name, async->awaited);
+        async->error = error;
+        return HTK_ASYNC;
     }
     if (error) {
         err("filter %s: all the rbl returned an error", filter->name);
@@ -448,27 +501,39 @@ static filter_result_t rbl_filter(const filter_t *filter, const query_t *query,
     }
 }
 
-static int rbl_init(void)
+static void *iplist_context_constructor(void)
+{
+    return p_new(iplist_async_data_t, 1);
+}
+
+static void iplist_context_destructor(void *data)
+{
+    iplist_async_data_t *ctx = data;
+    p_delete(&ctx);
+}
+
+static int iplist_init(void)
 {
-    filter_type_t type =  filter_register("iplist", rbl_filter_constructor,
-                                          rbl_filter_destructor, rbl_filter,
-                                          NULL, NULL);
+    filter_type =  filter_register("iplist", iplist_filter_constructor,
+                                   iplist_filter_destructor, iplist_filter,
+                                   iplist_context_constructor,
+                                   iplist_context_destructor);
     /* Hooks.
      */
-    (void)filter_hook_register(type, "abort");
-    (void)filter_hook_register(type, "error");
-    (void)filter_hook_register(type, "fail");
-    (void)filter_hook_register(type, "hard_match");
-    (void)filter_hook_register(type, "soft_match");
-    (void)filter_hook_register(type, "async");
+    (void)filter_hook_register(filter_type, "abort");
+    (void)filter_hook_register(filter_type, "error");
+    (void)filter_hook_register(filter_type, "fail");
+    (void)filter_hook_register(filter_type, "hard_match");
+    (void)filter_hook_register(filter_type, "soft_match");
+    (void)filter_hook_register(filter_type, "async");
 
     /* Parameters.
      */
-    (void)filter_param_register(type, "file");
-    (void)filter_param_register(type, "rbldns");
-    (void)filter_param_register(type, "dns");
-    (void)filter_param_register(type, "hard_threshold");
-    (void)filter_param_register(type, "soft_threshold");
+    (void)filter_param_register(filter_type, "file");
+    (void)filter_param_register(filter_type, "rbldns");
+    (void)filter_param_register(filter_type, "dns");
+    (void)filter_param_register(filter_type, "hard_threshold");
+    (void)filter_param_register(filter_type, "soft_threshold");
     return 0;
 }
-module_init(rbl_init);
+module_init(iplist_init);
index 6773070..48233e0 100644 (file)
@@ -52,6 +52,9 @@
 
 DECLARE_MAIN
 
+static config_t *config = NULL;
+
+
 static void *query_starter(server_t* server)
 {
     query_context_t *context = p_new(query_context_t, 1);
@@ -68,9 +71,9 @@ static void query_stopper(void *data)
     }
 }
 
-static bool config_refresh(void *config)
+static bool config_refresh(void *mconfig)
 {
-    return config_reload(config);
+    return config_reload(mconfig);
 }
 
 __attribute__((format(printf,2,0)))
@@ -89,20 +92,21 @@ static void policy_answer(server_t *pcy, const char *fmt, ...)
     epoll_modify(pcy->fd, EPOLLIN | EPOLLOUT, pcy);
 }
 
-static bool policy_process(server_t *pcy, const config_t *config)
+static bool policy_process(server_t *pcy, const config_t *mconfig)
 {
     query_context_t *context = pcy->data;
     const query_t* query = &context->query;
     const filter_t *filter;
-    if (config->entry_points[query->state] == -1) {
+    if (mconfig->entry_points[query->state] == -1) {
         warn("no filter defined for current protocol_state (%d)", query->state);
         return false;
     }
     if (context->context.current_filter != NULL) {
         filter = context->context.current_filter;
     } else {
-        filter = array_ptr(config->filters, config->entry_points[query->state]);
+        filter = array_ptr(mconfig->filters, mconfig->entry_points[query->state]);
     }
+    context->context.current_filter = NULL;
     while (true) {
         const filter_hook_t *hook = filter_run(filter, query, &context->context);
         if (hook == NULL) {
@@ -135,8 +139,8 @@ static bool policy_process(server_t *pcy, const config_t *config)
                    query->sender == NULL ? "undefined" : query->sender,
                    query->recipient == NULL ? "undefined" : query->recipient,
                    htokens[hook->type], filter->name,
-                   (array_ptr(config->filters, hook->filter_id))->name);
-            filter = array_ptr(config->filters, hook->filter_id);
+                   (array_ptr(mconfig->filters, hook->filter_id))->name);
+            filter = array_ptr(mconfig->filters, hook->filter_id);
         }
     }
 }
@@ -148,7 +152,8 @@ static int policy_run(server_t *pcy, void* vconfig)
     const char *eoq;
     query_context_t *context = pcy->data;
     query_t  *query  = &context->query;
-    const config_t *config = vconfig;
+    context->server = pcy;
+    const config_t *mconfig = vconfig;
 
     if (nb < 0) {
         if (errno == EAGAIN || errno == EINTR)
@@ -169,18 +174,38 @@ static int policy_run(server_t *pcy, void* vconfig)
         return -1;
     query->eoq = eoq + strlen("\n\n");
     epoll_modify(pcy->fd, 0, pcy);
-    return policy_process(pcy, config) ? 0 : -1;
+    return policy_process(pcy, mconfig) ? 0 : -1;
 }
 
-static bool policy_event(server_t *event, void *config)
+static void policy_async_handler(filter_context_t *context,
+                                 const filter_hook_t *hook)
 {
-    if (!policy_process(event, config)) {
-        server_release(event);
-        return true;
+    const filter_t *filter = context->current_filter;
+    query_context_t *qctx  = context->data;
+    query_t         *query = &qctx->query;
+    server_t        *server = qctx->server;
+
+    debug("request client=%s, from=<%s>, to=<%s>: "
+           "awswer %s from filter %s: next filter %s",
+           query->client_name,
+           query->sender == NULL ? "undefined" : query->sender,
+           query->recipient == NULL ? "undefined" : query->recipient,
+           htokens[hook->type], filter->name,
+           (array_ptr(config->filters, hook->filter_id))->name);
+    context->current_filter = array_ptr(config->filters, hook->filter_id);
+
+    if (!policy_process(server, config)) {
+        server_release(server);
     }
-    return true;
 }
 
+static int postlicyd_init(void)
+{
+    filter_async_handler_register(policy_async_handler);
+    return 0;
+}
+module_init(postlicyd_init);
+
 int start_listener(int port)
 {
     return start_server(port, NULL, NULL);
@@ -256,7 +281,7 @@ int main(int argc, char *argv[])
         return EXIT_FAILURE;
     }
 
-    config_t *config = config_read(argv[optind]);
+    config = config_read(argv[optind]);
     if (config == NULL) {
         return EXIT_FAILURE;
     }
@@ -275,6 +300,6 @@ int main(int argc, char *argv[])
         return EXIT_FAILURE;
     } else {
         return server_loop(query_starter, query_stopper,
-                           policy_run, policy_event, config_refresh, config);
+                           policy_run, config_refresh, config);
     }
 }
index d6af4b2..f189fb0 100644 (file)
@@ -42,6 +42,7 @@
 typedef struct query_context_t {
     query_t query;
     filter_context_t context;
+    server_t *server;
 } query_context_t;
 
 #endif