From: Florent Bruneau Date: Fri, 10 Oct 2008 22:07:20 +0000 (+0200) Subject: Basic support for async filters. X-Git-Url: http://git.madism.org/?a=commitdiff_plain;h=ae0c2eb5d2ea501fd9e458fc138696c268a14569;p=apps%2Fpfixtools.git Basic support for async filters. Signed-off-by: Florent Bruneau --- diff --git a/common/epoll.c b/common/epoll.c index e3df03e..896fc2c 100644 --- a/common/epoll.c +++ b/common/epoll.c @@ -69,6 +69,14 @@ void epoll_modify(int fd, uint32_t events, void *ptr) } } +void epoll_unregister(int fd) +{ + if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL)) { + UNIXERR("epoll_ctl"); + abort(); + } +} + int epoll_select(struct epoll_event *events, int maxevents, int timeout) { return epoll_wait(epollfd, events, maxevents, timeout); diff --git a/common/epoll.h b/common/epoll.h index 57179d2..fdc2665 100644 --- a/common/epoll.h +++ b/common/epoll.h @@ -41,6 +41,7 @@ void epoll_register(int fd, uint32_t events, void *ptr); void epoll_modify(int fd, uint32_t events, void *ptr); +void epoll_unregister(int fd); int epoll_select(struct epoll_event *events, int maxevents, int timeout); #endif diff --git a/common/server.c b/common/server.c index 6053408..a085c40 100644 --- a/common/server.c +++ b/common/server.c @@ -84,7 +84,7 @@ static server_t* server_acquire(void) } } -static void server_release(server_t *server) +void server_release(server_t *server) { server_wipe(server); array_add(server_pool, server); @@ -92,6 +92,7 @@ static void server_release(server_t *server) static void server_shutdown(void) { + printf("Server shutdown"); array_deep_wipe(listeners, server_delete); array_deep_wipe(server_pool, server_delete); } @@ -125,6 +126,7 @@ int start_server(int port, start_listener_t starter, delete_client_t deleter) tmp = server_acquire(); tmp->fd = sock; tmp->listener = true; + tmp->event = false; tmp->data = data; tmp->clear_data = deleter; epoll_register(sock, EPOLLIN, tmp); @@ -154,6 +156,8 @@ static int start_client(server_t *server, start_client_t starter, } tmp = server_acquire(); + tmp->listener = false; + tmp->event = false; tmp->fd = sock; tmp->data = data; tmp->clear_data = deleter; @@ -161,7 +165,7 @@ static int start_client(server_t *server, start_client_t starter, return 0; } -event_t event_register(int fd, void *data) +server_t * event_register(int fd, void *data) { int fds[2]; if (fd == -1) { @@ -177,6 +181,7 @@ event_t event_register(int fd, void *data) } server_t *tmp = server_acquire(); + tmp->listener = false; tmp->event = true; tmp->fd = fd == -1 ? fds[0] : fd; tmp->fd2 = fd == -1 ? fds[1] : -1; @@ -185,7 +190,7 @@ event_t event_register(int fd, void *data) return tmp; } -bool event_fire(event_t event) +bool event_fire(server_t *event) { static const char *data = ""; if (event->fd2 == -1) { @@ -194,7 +199,7 @@ bool event_fire(event_t event) return write(event->fd2, data, 1) == 0; } -static bool event_cancel(event_t event) +bool event_cancel(server_t *event) { char buff[32]; while (true) { @@ -210,6 +215,12 @@ static bool event_cancel(event_t event) } } +void event_release(server_t *event) +{ + epoll_unregister(event->fd); + server_release(event); +} + int server_loop(start_client_t starter, delete_client_t deleter, run_client_t runner, event_handler_t handler, refresh_t refresh, void* config) @@ -245,14 +256,12 @@ int server_loop(start_client_t starter, delete_client_t deleter, (void)start_client(d, starter, deleter); continue; } else if (d->event) { - if (!event_cancel(d)) { - server_release(d); - continue; - } if (handler) { if (!handler(d, config)) { - server_release(d); + event_release(d); } + } else { + event_release(d); } continue; } diff --git a/common/server.h b/common/server.h index a552dbe..85cbae4 100644 --- a/common/server.h +++ b/common/server.h @@ -39,7 +39,6 @@ #include "buffer.h" typedef struct server_t server_t; -typedef server_t *event_t; #define INVALID_EVENT (NULL) @@ -48,7 +47,7 @@ typedef void (*delete_client_t)(void*); typedef void *(*start_client_t)(server_t*); typedef int (*run_client_t)(server_t*, void*); typedef bool (*refresh_t)(void*); -typedef bool (*event_handler_t)(event_t, void*); +typedef bool (*event_handler_t)(server_t *, void*); struct server_t { unsigned listener : 1; @@ -67,8 +66,12 @@ ARRAY(server_t); int start_server(int port, start_listener_t starter, delete_client_t deleter); -event_t event_register(int fd, void *data); -bool event_fire(event_t event); +void server_release(server_t *server); + +server_t *event_register(int fd, void *data); +bool event_fire(server_t *event); +bool event_cancel(server_t *event); +void event_release(server_t *event); #define event_data(event) ((event)->data) int server_loop(start_client_t starter, delete_client_t deleter, diff --git a/postlicyd/filter.c b/postlicyd/filter.c index 7aba638..a150d6d 100644 --- a/postlicyd/filter.c +++ b/postlicyd/filter.c @@ -43,15 +43,29 @@ static filter_destructor_t destructors[FTK_count]; static bool hooks[FTK_count][HTK_count]; static bool params[FTK_count][ATK_count]; +static filter_context_constructor_t ctx_constructors[FTK_count]; +static filter_context_destructor_t ctx_destructors[FTK_count]; + static const filter_hook_t default_hook = { .type = 0, .value = (char*)"DUNNO", .postfix = true, + .async = false, + .filter_id = 0 +}; + +static const filter_hook_t async_hook = { + .type = 0, + .value = NULL, + .postfix = false, + .async = true, .filter_id = 0 }; filter_type_t filter_register(const char *type, filter_constructor_t constructor, - filter_destructor_t destructor, filter_runner_t runner) + filter_destructor_t destructor, filter_runner_t runner, + filter_context_constructor_t context_constructor, + filter_context_destructor_t context_destructor) { filter_token tok = filter_tokenize(type, m_strlen(type)); CHECK_FILTER(tok); @@ -59,6 +73,9 @@ filter_type_t filter_register(const char *type, filter_constructor_t constructor runners[tok] = runner; constructors[tok] = constructor; destructors[tok] = destructor; + + ctx_constructors[tok] = context_constructor; + ctx_destructors[tok] = context_destructor; return tok; } @@ -163,17 +180,24 @@ void filter_wipe(filter_t *filter) p_delete(&filter->name); } -const filter_hook_t *filter_run(const filter_t *filter, const query_t *query) +const filter_hook_t *filter_run(const filter_t *filter, const query_t *query, + filter_context_t *context) { int start = 0; int end = filter->hooks.len; debug("running filter %s (%s)", filter->name, ftokens[filter->type]); - filter_result_t res = runners[filter->type](filter, query); + filter_result_t res = runners[filter->type](filter, query, context); + + context->current_filter = NULL; + debug("filter run, result is %s", htokens[res]); if (res == HTK_ABORT) { return NULL; } - debug("filter run, result is %s", htokens[res]); + if (res == HTK_ASYNC) { + context->current_filter = filter; + return &async_hook; + } while (start < end) { int mid = (start + end) / 2; @@ -192,9 +216,10 @@ const filter_hook_t *filter_run(const filter_t *filter, const query_t *query) return &default_hook; } -bool filter_test(const filter_t *filter, const query_t *query, filter_result_t result) +bool filter_test(const filter_t *filter, const query_t *query, + filter_context_t *context, filter_result_t result) { - return !!(runners[filter->type](filter, query) == result); + return !!(runners[filter->type](filter, query, context) == result); } void filter_set_name(filter_t *filter, const char *name, int len) @@ -242,9 +267,30 @@ bool filter_add_hook(filter_t *filter, const char *name, int name_len, htokens[hook.type], ftokens[filter->type]); return false; } + hook.async = false; hook.postfix = (strncmp(value, "postfix:", 8) == 0); hook.value = m_strdup(hook.postfix ? value + 8 : value); hook.filter_id = -1; array_add(filter->hooks, hook); return true; } + +void filter_context_prepare(filter_context_t *context, void *qctx) +{ + for (int i = 0 ; i < FTK_count ; ++i) { + if (ctx_constructors[i] != NULL) { + context->contexts[i] = ctx_constructors[i](); + } + } + context->current_filter = NULL; + context->data = qctx; +} + +void filter_context_wipe(filter_context_t *context) +{ + for (int i = 0 ; i < FTK_count ; ++i) { + if (ctx_destructors[i] != NULL) { + ctx_destructors[i](context->contexts[i]); + } + } +} diff --git a/postlicyd/filter.h b/postlicyd/filter.h index 3047803..f551e58 100644 --- a/postlicyd/filter.h +++ b/postlicyd/filter.h @@ -51,7 +51,8 @@ typedef struct filter_hook_t { filter_result_t type; char *value; - bool postfix; + unsigned postfix:1; + unsigned async:1; int filter_id; } filter_hook_t; ARRAY(filter_hook_t) @@ -63,6 +64,8 @@ typedef struct filter_param_t { } filter_param_t; ARRAY(filter_param_t) +/** Description of a filter. + */ typedef struct filter_t { char *name; filter_type_t type; @@ -78,6 +81,17 @@ typedef struct filter_t { } filter_t; ARRAY(filter_t) +/** Context of the query. To be filled with data to use when + * performing asynchronous filtering. + */ +typedef struct filter_context_t { + const filter_t *current_filter; + void *contexts[FTK_count]; + + void *data; +} filter_context_t; + + #define FILTER_INIT { NULL, FTK_UNKNOWN, ARRAY_INIT, NULL, ARRAY_INIT, -1 } #define CHECK_FILTER(Filter) \ assert(Filter != FTK_UNKNOWN && Filter != FTK_count \ @@ -89,14 +103,28 @@ ARRAY(filter_t) assert(Param != ATK_UNKNOWN && Param != ATK_count \ && "Unknown param") + +/* Callback to be implemented by a filter. + */ + typedef filter_result_t (*filter_runner_t)(const filter_t *filter, - const query_t *query); + const query_t *query, + filter_context_t *context); typedef bool (*filter_constructor_t)(filter_t *filter); typedef void (*filter_destructor_t)(filter_t *filter); +typedef void *(*filter_context_constructor_t)(void); +typedef void (*filter_context_destructor_t)(void*); + + +/* Registration. + */ + __attribute__((nonnull(1,4))) filter_type_t filter_register(const char *type, filter_constructor_t constructor, - filter_destructor_t destructor, filter_runner_t runner); + filter_destructor_t destructor, filter_runner_t runner, + filter_context_constructor_t context_constructor, + filter_context_destructor_t context_destructor); __attribute__((nonnull(2))) filter_result_t filter_hook_register(filter_type_t filter, const char *name); @@ -104,6 +132,10 @@ filter_result_t filter_hook_register(filter_type_t filter, const char *name); __attribute__((nonnull(2))) filter_param_id_t filter_param_register(filter_type_t filter, const char *name); + +/* Filter builder. + */ + __attribute__((nonnull(1))) static inline void filter_init(filter_t *filter) { @@ -170,14 +202,20 @@ static inline void filter_params_wipe(filter_param_t *param) __attribute__((nonnull(1))) void filter_wipe(filter_t *filter); + +/* Runner. + */ + __attribute__((nonnull(1,2))) -const filter_hook_t *filter_run(const filter_t *filter, const query_t *query); +const filter_hook_t *filter_run(const filter_t *filter, const query_t *query, + filter_context_t *context); __attribute__((nonnull(1,2))) -bool filter_test(const filter_t *filter, const query_t *query, filter_result_t expt); +bool filter_test(const filter_t *filter, const query_t *query, + filter_context_t *context, filter_result_t expt); -/* Helpers +/* Parsing Helpers */ #define FILTER_PARAM_PARSE_STRING(Param, Dest) \ @@ -216,4 +254,14 @@ bool filter_test(const filter_t *filter, const query_t *query, filter_result_t e } \ } break + +/* Filter context + */ + +__attribute__((nonnull)) +void filter_context_prepare(filter_context_t *context, void* qctx); + +__attribute__((nonnull)) +void filter_context_wipe(filter_context_t *context); + #endif diff --git a/postlicyd/greylist.c b/postlicyd/greylist.c index a2b3346..4cbe8b9 100644 --- a/postlicyd/greylist.c +++ b/postlicyd/greylist.c @@ -458,7 +458,8 @@ static void greylist_filter_destructor(filter_t *filter) } static filter_result_t greylist_filter(const filter_t *filter, - const query_t *query) + const query_t *query, + filter_context_t *context) { const greylist_config_t *config = filter->data; if (query->state != SMTP_RCPT) { @@ -475,7 +476,7 @@ static int greylist_init(void) { filter_type_t type = filter_register("greylist", greylist_filter_constructor, greylist_filter_destructor, - greylist_filter); + greylist_filter, NULL, NULL); /* Hooks. */ (void)filter_hook_register(type, "abort"); diff --git a/postlicyd/iplist.c b/postlicyd/iplist.c index ebd29f8..867f88b 100644 --- a/postlicyd/iplist.c +++ b/postlicyd/iplist.c @@ -391,7 +391,8 @@ static void rbl_filter_destructor(filter_t *filter) filter->data = data; } -static filter_result_t rbl_filter(const filter_t *filter, const query_t *query) +static filter_result_t rbl_filter(const filter_t *filter, const query_t *query, + filter_context_t *context) { uint32_t ip; int32_t sum = 0; @@ -450,7 +451,8 @@ static filter_result_t rbl_filter(const filter_t *filter, const query_t *query) static int rbl_init(void) { filter_type_t type = filter_register("iplist", rbl_filter_constructor, - rbl_filter_destructor, rbl_filter); + rbl_filter_destructor, rbl_filter, + NULL, NULL); /* Hooks. */ (void)filter_hook_register(type, "abort"); diff --git a/postlicyd/main-postlicyd.c b/postlicyd/main-postlicyd.c index 57e1e25..6773070 100644 --- a/postlicyd/main-postlicyd.c +++ b/postlicyd/main-postlicyd.c @@ -41,8 +41,8 @@ #include "epoll.h" #include "policy_tokens.h" #include "server.h" -#include "query.h" #include "config.h" +#include "postlicyd.h" #define DAEMON_NAME "postlicyd" #define DAEMON_VERSION "0.2" @@ -54,7 +54,18 @@ DECLARE_MAIN static void *query_starter(server_t* server) { - return query_new(); + query_context_t *context = p_new(query_context_t, 1); + filter_context_prepare(&context->context, context); + return context; +} + +static void query_stopper(void *data) +{ + query_context_t **context = data; + if (*context) { + filter_context_wipe(&(*context)->context); + p_delete(context); + } } static bool config_refresh(void *config) @@ -66,7 +77,8 @@ __attribute__((format(printf,2,0))) static void policy_answer(server_t *pcy, const char *fmt, ...) { va_list args; - const query_t* query = pcy->data; + query_context_t *context = pcy->data; + const query_t* query = &context->query; buffer_addstr(&pcy->obuf, "action="); va_start(args, fmt); @@ -79,21 +91,34 @@ static void policy_answer(server_t *pcy, const char *fmt, ...) static bool policy_process(server_t *pcy, const config_t *config) { - const query_t* query = pcy->data; + query_context_t *context = pcy->data; + const query_t* query = &context->query; const filter_t *filter; if (config->entry_points[query->state] == -1) { warn("no filter defined for current protocol_state (%d)", query->state); return false; } - filter = array_ptr(config->filters, config->entry_points[query->state]); + if (context->context.current_filter != NULL) { + filter = context->context.current_filter; + } else { + filter = array_ptr(config->filters, config->entry_points[query->state]); + } while (true) { - const filter_hook_t *hook = filter_run(filter, query); + const filter_hook_t *hook = filter_run(filter, query, &context->context); if (hook == NULL) { warn("request client=%s, from=<%s>, to=<%s>: aborted", query->client_name, query->sender == NULL ? "undefined" : query->sender, query->recipient == NULL ? "undefined" : query->recipient); return false; + } else if (hook->async) { + debug("request client=%s, from=<%s>, to=<%s>: " + "asynchronous filter from filter %s", + query->client_name, + query->sender == NULL ? "undefined" : query->sender, + query->recipient == NULL ? "undefined" : query->recipient, + filter->name); + return true; } else if (hook->postfix) { info("request client=%s, from=<%s>, to=<%s>: " "awswer %s from filter %s: \"%s\"", @@ -121,7 +146,8 @@ static int policy_run(server_t *pcy, void* vconfig) int search_offs = MAX(0, (int)(pcy->ibuf.len - 1)); int nb = buffer_read(&pcy->ibuf, pcy->fd, -1); const char *eoq; - query_t *query = pcy->data; + query_context_t *context = pcy->data; + query_t *query = &context->query; const config_t *config = vconfig; if (nb < 0) { @@ -146,6 +172,15 @@ static int policy_run(server_t *pcy, void* vconfig) return policy_process(pcy, config) ? 0 : -1; } +static bool policy_event(server_t *event, void *config) +{ + if (!policy_process(event, config)) { + server_release(event); + return true; + } + return true; +} + int start_listener(int port) { return start_server(port, NULL, NULL); @@ -239,7 +274,7 @@ int main(int argc, char *argv[]) if (start_listener(config->port) < 0) { return EXIT_FAILURE; } else { - return server_loop(query_starter, (delete_client_t)query_delete, - policy_run, NULL, config_refresh, config); + return server_loop(query_starter, query_stopper, + policy_run, policy_event, config_refresh, config); } } diff --git a/postlicyd/match.c b/postlicyd/match.c index d50b12a..ebee763 100644 --- a/postlicyd/match.c +++ b/postlicyd/match.c @@ -262,7 +262,8 @@ static inline bool match_condition(const match_condition_t *cond, const query_t return true; } -static filter_result_t match_filter(const filter_t *filter, const query_t *query) +static filter_result_t match_filter(const filter_t *filter, const query_t *query, + filter_context_t *context) { const match_config_t *config = filter->data; foreach (const match_condition_t *condition, config->conditions) { @@ -287,7 +288,8 @@ static filter_result_t match_filter(const filter_t *filter, const query_t *query static int match_init(void) { filter_type_t type = filter_register("match", match_filter_constructor, - match_filter_destructor, match_filter); + match_filter_destructor, match_filter, + NULL, NULL); /* Hooks. */ (void)filter_hook_register(type, "abort"); diff --git a/postlicyd/postlicyd.h b/postlicyd/postlicyd.h new file mode 100644 index 0000000..d6af4b2 --- /dev/null +++ b/postlicyd/postlicyd.h @@ -0,0 +1,47 @@ +/******************************************************************************/ +/* pfixtools: a collection of postfix related tools */ +/* ~~~~~~~~~ */ +/* ________________________________________________________________________ */ +/* */ +/* Redistribution and use in source and binary forms, with or without */ +/* modification, are permitted provided that the following conditions */ +/* are met: */ +/* */ +/* 1. Redistributions of source code must retain the above copyright */ +/* notice, this list of conditions and the following disclaimer. */ +/* 2. Redistributions in binary form must reproduce the above copyright */ +/* notice, this list of conditions and the following disclaimer in the */ +/* documentation and/or other materials provided with the distribution. */ +/* 3. The names of its contributors may not be used to endorse or promote */ +/* products derived from this software without specific prior written */ +/* permission. */ +/* */ +/* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND */ +/* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE */ +/* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR */ +/* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS */ +/* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR */ +/* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF */ +/* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS */ +/* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN */ +/* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) */ +/* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF */ +/* THE POSSIBILITY OF SUCH DAMAGE. */ +/******************************************************************************/ + +/* + * Copyright © 2008 Florent Bruneau + */ + +#ifndef PFIXTOOLS_POSTLICYD_H +#define PFIXTOOLS_POSTLICYD_H + +#include "query.h" +#include "filter.h" + +typedef struct query_context_t { + query_t query; + filter_context_t context; +} query_context_t; + +#endif diff --git a/postlicyd/query.h b/postlicyd/query.h index 87ee01e..86e41ba 100644 --- a/postlicyd/query.h +++ b/postlicyd/query.h @@ -94,18 +94,6 @@ typedef struct query_t { const char *eoq; } query_t; -static inline query_t *query_new(void) -{ - return p_new(query_t, 1); -} - -static inline void query_delete(query_t **query) -{ - if (*query) { - p_delete(query); - } -} - /** Parse the content of the text to fill the query. * The text pointed by \p p is segmented (and modified to add * a \0 at the end of each segment) and used to fill the query diff --git a/postlicyd/strlist.c b/postlicyd/strlist.c index 0c790cb..4b5eeb5 100644 --- a/postlicyd/strlist.c +++ b/postlicyd/strlist.c @@ -500,7 +500,8 @@ static void strlist_filter_destructor(filter_t *filter) filter->data = config; } -static filter_result_t strlist_filter(const filter_t *filter, const query_t *query) +static filter_result_t strlist_filter(const filter_t *filter, const query_t *query, + filter_context_t *context) { char reverse[BUFSIZ]; char normal[BUFSIZ]; @@ -600,7 +601,8 @@ static filter_result_t strlist_filter(const filter_t *filter, const query_t *que static int strlist_init(void) { filter_type_t type = filter_register("strlist", strlist_filter_constructor, - strlist_filter_destructor, strlist_filter); + strlist_filter_destructor, strlist_filter, + NULL, NULL); /* Hooks. */ (void)filter_hook_register(type, "abort");