X-Git-Url: http://git.madism.org/?a=blobdiff_plain;f=postlicyd%2Fmain-postlicyd.c;h=08c6a2b114bdeacb5bc8dd88210bf8cd0487a1b3;hb=d622febc1fb702194f7ac3f95bfa6cba448dcc5b;hp=70389bfb41d9b013b600914663635fa55d828adb;hpb=c091e1809626636627a2824d6a2dfedeab3a4d24;p=apps%2Fpfixtools.git diff --git a/postlicyd/main-postlicyd.c b/postlicyd/main-postlicyd.c index 70389bf..08c6a2b 100644 --- a/postlicyd/main-postlicyd.c +++ b/postlicyd/main-postlicyd.c @@ -38,24 +38,30 @@ #include "buffer.h" #include "common.h" -#include "epoll.h" #include "policy_tokens.h" #include "server.h" #include "config.h" -#include "postlicyd.h" +#include "query.h" #define DAEMON_NAME "postlicyd" -#define DAEMON_VERSION "0.2" +#define DAEMON_VERSION "0.4" #define DEFAULT_PORT 10000 #define RUNAS_USER "nobody" #define RUNAS_GROUP "nogroup" DECLARE_MAIN -static config_t *config = NULL; +typedef struct query_context_t { + query_t query; + filter_context_t context; + client_t *client; +} query_context_t; +static config_t *config = NULL; +static bool refresh = false; +static PA(client_t) busy = ARRAY_INIT; -static void *query_starter(server_t* server) +static void *query_starter(listener_t* server) { query_context_t *context = p_new(query_context_t, 1); filter_context_prepare(&context->context, context); @@ -73,88 +79,99 @@ static void query_stopper(void *data) static bool config_refresh(void *mconfig) { + refresh = true; if (filter_running > 0) { - sighup = true; - sleep(1); return true; } - return config_reload(mconfig); + bool ret = config_reload(mconfig); + foreach (client_t **server, busy) { + client_io_ro(*server); + }} + array_len(busy) = 0; + refresh = false; + return ret; } -static void policy_answer(server_t *pcy, const char *message) +static void policy_answer(client_t *pcy, const char *message) { - query_context_t *context = pcy->data; + query_context_t *context = client_data(pcy); const query_t* query = &context->query; + buffer_t *buf = client_output_buffer(pcy); - buffer_addstr(&pcy->obuf, "action="); - buffer_ensure(&pcy->obuf, m_strlen(message) + 64); + /* Write reply "action=ACTION [text]" */ + buffer_addstr(buf, "action="); + buffer_ensure(buf, m_strlen(message) + 64); - ssize_t size = array_size(pcy->obuf) - array_len(pcy->obuf); - ssize_t format_size = query_format(array_ptr(pcy->obuf, array_len(pcy->obuf)), + ssize_t size = array_size(*buf) - array_len(*buf); + ssize_t format_size = query_format(array_ptr(*buf, array_len(*buf)), size, message, query); if (format_size == -1) { - buffer_addstr(&pcy->obuf, message); + buffer_addstr(buf, message); } else if (format_size > size) { - buffer_ensure(&pcy->obuf, format_size + 1); - query_format(array_ptr(pcy->obuf, array_len(pcy->obuf)), - array_size(pcy->obuf) - array_len(pcy->obuf), + buffer_ensure(buf, format_size + 1); + query_format(array_ptr(*buf, array_len(*buf)), + array_size(*buf) - array_len(*buf), message, query); - array_len(pcy->obuf) += format_size; + array_len(*buf) += format_size; } else { - array_len(pcy->obuf) += format_size; + array_len(*buf) += format_size; } - buffer_addstr(&pcy->obuf, "\n\n"); - buffer_consume(&pcy->ibuf, query->eoq - pcy->ibuf.data); - epoll_modify(pcy->fd, EPOLLIN | EPOLLOUT, pcy); + buffer_addstr(buf, "\n\n"); + + /* Finalize query. */ + buf = client_input_buffer(pcy); + buffer_consume(buf, query->eoq - buf->data); + client_io_rw(pcy); } -static const filter_t *next_filter(server_t *pcy, const filter_t *filter, +static const filter_t *next_filter(client_t *pcy, const filter_t *filter, const query_t *query, const filter_hook_t *hook, bool *ok) { +#define MESSAGE_FORMAT "request client=%s from=<%s> to=<%s> at %s: " +#define MESSAGE_PARAMS query->client_name, \ + query->sender == NULL ? "undefined" : query->sender, \ + query->recipient == NULL ? "undefined" : query->recipient, \ + smtp_state_names[query->state] + + if (hook != NULL) { + query_context_t *context = client_data(pcy); + if (hook->counter >= 0 && hook->counter < MAX_COUNTERS && hook->cost > 0) { + context->context.counters[hook->counter] += hook->cost; + debug(MESSAGE_FORMAT "added %d to counter %d (now %u)", MESSAGE_PARAMS, + hook->cost, hook->counter, context->context.counters[hook->counter]); + } + } if (hook == NULL) { - warn("request client=%s, from=<%s>, to=<%s>: aborted", - query->client_name, - query->sender == NULL ? "undefined" : query->sender, - query->recipient == NULL ? "undefined" : query->recipient); + warn(MESSAGE_FORMAT "aborted", MESSAGE_PARAMS); *ok = false; return NULL; } else if (hook->async) { - debug("request client=%s, from=<%s>, to=<%s>: " - "asynchronous filter from filter %s", - query->client_name, - query->sender == NULL ? "undefined" : query->sender, - query->recipient == NULL ? "undefined" : query->recipient, - filter->name); + debug(MESSAGE_FORMAT "asynchronous filter from filter %s", + MESSAGE_PARAMS, filter->name); *ok = true; return NULL; } else if (hook->postfix) { - info("request client=%s, from=<%s>, to=<%s>: " - "awswer %s from filter %s: \"%s\"", - query->client_name, - query->sender == NULL ? "undefined" : query->sender, - query->recipient == NULL ? "undefined" : query->recipient, + info(MESSAGE_FORMAT "awswer %s from filter %s: \"%s\"", MESSAGE_PARAMS, htokens[hook->type], filter->name, hook->value); policy_answer(pcy, hook->value); *ok = true; return NULL; } else { - debug("request client=%s, from=<%s>, to=<%s>: " - "awswer %s from filter %s: next filter %s", - query->client_name, - query->sender == NULL ? "undefined" : query->sender, - query->recipient == NULL ? "undefined" : query->recipient, - htokens[hook->type], filter->name, - (array_ptr(config->filters, hook->filter_id))->name); + debug(MESSAGE_FORMAT "awswer %s from filter %s: next filter %s", + MESSAGE_PARAMS, htokens[hook->type], filter->name, + (array_ptr(config->filters, hook->filter_id))->name); return array_ptr(config->filters, hook->filter_id); } +#undef MESSAGE_PARAMS +#undef MESSAGE_FORMAT } -static bool policy_process(server_t *pcy, const config_t *mconfig) +static bool policy_process(client_t *pcy, const config_t *mconfig) { - query_context_t *context = pcy->data; + query_context_t *context = client_data(pcy); const query_t* query = &context->query; const filter_t *filter; if (mconfig->entry_points[query->state] == -1) { - warn("no filter defined for current protocol_state (%d)", query->state); + warn("no filter defined for current protocol_state (%s)", smtp_state_names[query->state]); return false; } if (context->context.current_filter != NULL) { @@ -173,19 +190,22 @@ static bool policy_process(server_t *pcy, const config_t *mconfig) } } -static int policy_run(server_t *pcy, void* vconfig) +static int policy_run(client_t *pcy, void* vconfig) { - if (sighup) { + const config_t *mconfig = vconfig; + if (refresh) { + array_add(busy, pcy); return 0; } - int search_offs = MAX(0, (int)(pcy->ibuf.len - 1)); - int nb = buffer_read(&pcy->ibuf, pcy->fd, -1); + query_context_t *context = client_data(pcy); + query_t *query = &context->query; + context->client = pcy; + + buffer_t *buf = client_input_buffer(pcy); + int search_offs = MAX(0, (int)(buf->len - 1)); + int nb = client_read(pcy); const char *eoq; - query_context_t *context = pcy->data; - query_t *query = &context->query; - context->server = pcy; - const config_t *mconfig = vconfig; if (nb < 0) { if (errno == EAGAIN || errno == EINTR) @@ -194,18 +214,26 @@ static int policy_run(server_t *pcy, void* vconfig) return -1; } if (nb == 0) { - if (pcy->ibuf.len) + if (buf->len) err("unexpected end of data"); return -1; } - if (!(eoq = strstr(pcy->ibuf.data + search_offs, "\n\n"))) + if (!(eoq = strstr(buf->data + search_offs, "\n\n"))) { return 0; + } - if (!query_parse(pcy->data, pcy->ibuf.data)) + if (!query_parse(query, buf->data)) { return -1; + } query->eoq = eoq + strlen("\n\n"); - epoll_modify(pcy->fd, 0, pcy); + + /* The instance changed => reset the static context */ + if (query->instance == NULL || strcmp(context->context.instance, query->instance) != 0) { + filter_context_clean(&context->context); + m_strcat(context->context.instance, 64, query->instance); + } + client_io_none(pcy); return policy_process(pcy, mconfig) ? 0 : -1; } @@ -216,14 +244,17 @@ static void policy_async_handler(filter_context_t *context, const filter_t *filter = context->current_filter; query_context_t *qctx = context->data; query_t *query = &qctx->query; - server_t *server = qctx->server; + client_t *server = qctx->client; context->current_filter = next_filter(server, filter, query, hook, &ok); if (context->current_filter != NULL) { ok = policy_process(server, config); } if (!ok) { - server_release(server); + client_release(server); + } + if (refresh && filter_running == 0) { + config_refresh(config); } } @@ -232,12 +263,13 @@ static int postlicyd_init(void) filter_async_handler_register(policy_async_handler); return 0; } -module_init(postlicyd_init); -int start_listener(int port) +static void postlicyd_shutdown(void) { - return start_server(port, NULL, NULL); + array_deep_wipe(busy, client_delete); } +module_init(postlicyd_init); +module_exit(postlicyd_shutdown); /* administrivia {{{ */ @@ -297,7 +329,7 @@ int main(int argc, char *argv[]) return EXIT_FAILURE; } - info("starting %s v%s...", DAEMON_NAME, DAEMON_VERSION); + info("%s v%s...", DAEMON_NAME, DAEMON_VERSION); if (pidfile_open(pidfile) < 0) { crit("unable to write pidfile %s", pidfile); @@ -324,7 +356,7 @@ int main(int argc, char *argv[]) pidfile_refresh(); - if (start_listener(config->port) < 0) { + if (start_listener(config->port) == NULL) { return EXIT_FAILURE; } else { return server_loop(query_starter, query_stopper,