X-Git-Url: http://git.madism.org/?p=apps%2Fpfixtools.git;a=blobdiff_plain;f=postlicyd%2Fmain-postlicyd.c;h=ced07502029e04b5845d967279f6c570ad14cfe9;hp=7d6970de1e37c482f2cb14cb511e6fcf613f3e84;hb=12f38a249e2cc8160a8d2c150d2243b9067b5908;hpb=9b993811fe1fd208afd954e86c0bfc81a78af04f diff --git a/postlicyd/main-postlicyd.c b/postlicyd/main-postlicyd.c index 7d6970d..ced0750 100644 --- a/postlicyd/main-postlicyd.c +++ b/postlicyd/main-postlicyd.c @@ -38,152 +38,170 @@ #include "buffer.h" #include "common.h" -#include "epoll.h" #include "policy_tokens.h" #include "server.h" +#include "config.h" #include "query.h" #define DAEMON_NAME "postlicyd" +#define DAEMON_VERSION "0.5" #define DEFAULT_PORT 10000 #define RUNAS_USER "nobody" #define RUNAS_GROUP "nogroup" DECLARE_MAIN -static void *query_starter(server_t* server) +typedef struct query_context_t { + query_t query; + filter_context_t context; + client_t *client; +} query_context_t; + +static config_t *config = NULL; +static bool refresh = false; +static PA(client_t) busy = ARRAY_INIT; + +static void *query_starter(listener_t* server) { - return query_new(); + query_context_t *context = p_new(query_context_t, 1); + filter_context_prepare(&context->context, context); + return context; } -static int postfix_parsejob(query_t *query, char *p) +static void query_stopper(void *data) { -#define PARSE_CHECK(expr, error, ...) \ - do { \ - if (!(expr)) { \ - syslog(LOG_ERR, error, ##__VA_ARGS__); \ - return -1; \ - } \ - } while (0) - - p_clear(query, 1); - while (*p != '\n') { - char *k, *v; - int klen, vlen, vtk; - - while (isblank(*p)) - p++; - p = strchr(k = p, '='); - PARSE_CHECK(p, "could not find '=' in line"); - for (klen = p - k; klen && isblank(k[klen]); klen--); - p += 1; /* skip = */ - - while (isblank(*p)) - p++; - p = strchr(v = p, '\n'); - PARSE_CHECK(p, "could not find final \\n in line"); - for (vlen = p - v; vlen && isblank(v[vlen]); vlen--); - p += 1; /* skip \n */ - - vtk = policy_tokenize(v, vlen); - switch (policy_tokenize(k, klen)) { -#define CASE(up, low) case PTK_##up: query->low = v; v[vlen] = '\0'; break; - CASE(HELO_NAME, helo_name); - CASE(QUEUE_ID, queue_id); - CASE(SENDER, sender); - CASE(RECIPIENT, recipient); - CASE(RECIPIENT_COUNT, recipient_count); - CASE(CLIENT_ADDRESS, client_address); - CASE(CLIENT_NAME, client_name); - CASE(REVERSE_CLIENT_NAME, reverse_client_name); - CASE(INSTANCE, instance); - CASE(SASL_METHOD, sasl_method); - CASE(SASL_USERNAME, sasl_username); - CASE(SASL_SENDER, sasl_sender); - CASE(SIZE, size); - CASE(CCERT_SUBJECT, ccert_subject); - CASE(CCERT_ISSUER, ccert_issuer); - CASE(CCERT_FINGERPRINT, ccert_fingerprint); - CASE(ENCRYPTION_PROTOCOL, encryption_protocol); - CASE(ENCRYPTION_CIPHER, encryption_cipher); - CASE(ENCRYPTION_KEYSIZE, encryption_keysize); - CASE(ETRN_DOMAIN, etrn_domain); - CASE(STRESS, stress); -#undef CASE - - case PTK_REQUEST: - PARSE_CHECK(vtk == PTK_SMTPD_ACCESS_POLICY, - "unexpected `request' value: %.*s", vlen, v); - break; - - case PTK_PROTOCOL_NAME: - PARSE_CHECK(vtk == PTK_SMTP || vtk == PTK_ESMTP, - "unexpected `protocol_name' value: %.*s", vlen, v); - query->esmtp = vtk == PTK_ESMTP; - break; - - case PTK_PROTOCOL_STATE: - switch (vtk) { -#define CASE(name) case PTK_##name: query->state = SMTP_##name; break; - CASE(CONNECT); - CASE(EHLO); - CASE(HELO); - CASE(MAIL); - CASE(RCPT); - CASE(DATA); - CASE(END_OF_MESSAGE); - CASE(VRFY); - CASE(ETRN); - default: - PARSE_CHECK(false, "unexpected `protocol_state` value: %.*s", - vlen, v); -#undef CASE - } - break; - - default: - syslog(LOG_WARNING, "unexpected key, skipped: %.*s", klen, k); - continue; - } + query_context_t **context = data; + if (*context) { + filter_context_wipe(&(*context)->context); + p_delete(context); } - - return query->state == SMTP_UNKNOWN ? -1 : 0; -#undef PARSE_CHECK } -__attribute__((format(printf,2,0))) -static void policy_answer(server_t *pcy, const char *fmt, ...) +static bool config_refresh(void *mconfig) { - va_list args; - const query_t* query = pcy->data; - - buffer_addstr(&pcy->obuf, "action="); - va_start(args, fmt); - buffer_addvf(&pcy->obuf, fmt, args); - va_end(args); - buffer_addstr(&pcy->obuf, "\n\n"); - buffer_consume(&pcy->ibuf, query->eoq - pcy->ibuf.data); - epoll_modify(pcy->fd, EPOLLIN | EPOLLOUT, pcy); + refresh = true; + if (filter_running > 0) { + return true; + } + log_state = "refreshing "; + info("reloading configuration"); + bool ret = config_reload(mconfig); + log_state = ""; + foreach (client_t **server, busy) { + client_io_ro(*server); + }} + array_len(busy) = 0; + refresh = false; + return ret; } -static bool policy_run_filter(const query_t* query, void* filter, void* conf) +static void policy_answer(client_t *pcy, const char *message) { - return false; + query_context_t *context = client_data(pcy); + const query_t* query = &context->query; + buffer_t *buf = client_output_buffer(pcy); + + /* Write reply "action=ACTION [text]" */ + buffer_addstr(buf, "action="); + if (!query_format_buffer(buf, message, query)) { + buffer_addstr(buf, message); + } + buffer_addstr(buf, "\n\n"); + + /* Finalize query. */ + buf = client_input_buffer(pcy); + buffer_consume(buf, query->eoq - buf->data); + client_io_rw(pcy); } -static void policy_process(server_t *pcy) +static const filter_t *next_filter(client_t *pcy, const filter_t *filter, + const query_t *query, const filter_hook_t *hook, bool *ok) { + char log_prefix[BUFSIZ]; + log_prefix[0] = '\0'; + +#define log_reply(Level, Msg, ...) \ + if (log_level >= LOG_ ## Level) { \ + if (log_prefix[0] == '\0') { \ + query_format(log_prefix, BUFSIZ, \ + config->log_format && config->log_format[0] ? \ + config->log_format : DEFAULT_LOG_FORMAT, query); \ + } \ + __log(LOG_ ## Level, "%s: " Msg, log_prefix, ##__VA_ARGS__); \ + } + + if (hook != NULL) { + query_context_t *context = client_data(pcy); + if (hook->counter >= 0 && hook->counter < MAX_COUNTERS && hook->cost > 0) { + context->context.counters[hook->counter] += hook->cost; + log_reply(DEBUG, "added %d to counter %d (now %u)", + hook->cost, hook->counter, + context->context.counters[hook->counter]); + } + } + if (hook == NULL) { + log_reply(WARNING, "aborted"); + *ok = false; + return NULL; + } else if (hook->async) { + log_reply(WARNING, "asynchronous filter from filter %s", filter->name); + *ok = true; + return NULL; + } else if (hook->postfix) { + log_reply(INFO, "answer %s from filter %s: \"%s\"", + htokens[hook->type], filter->name, hook->value); + policy_answer(pcy, hook->value); + *ok = true; + return NULL; + } else { + log_reply(DEBUG, "answer %s from filter %s: next filter %s", + htokens[hook->type], filter->name, + (array_ptr(config->filters, hook->filter_id))->name); + return array_ptr(config->filters, hook->filter_id); + } +#undef log_reply +} + +static bool policy_process(client_t *pcy, const config_t *mconfig) { - const query_t* query = pcy->data; - if (!policy_run_filter(query, NULL, NULL)) { - policy_answer(pcy, "DUNNO"); + query_context_t *context = client_data(pcy); + const query_t* query = &context->query; + const filter_t *filter; + if (mconfig->entry_points[query->state] == -1) { + warn("no filter defined for current protocol_state (%s)", smtp_state_names[query->state].str); + return false; + } + if (context->context.current_filter != NULL) { + filter = context->context.current_filter; + } else { + filter = array_ptr(mconfig->filters, mconfig->entry_points[query->state]); + } + context->context.current_filter = NULL; + while (true) { + bool ok = false; + const filter_hook_t *hook = filter_run(filter, query, &context->context); + filter = next_filter(pcy, filter, query, hook, &ok); + if (filter == NULL) { + return ok; + } } } -static int policy_run(server_t *pcy, void* config) +static int policy_run(client_t *pcy, void* vconfig) { - ssize_t search_offs = MAX(0, pcy->ibuf.len - 1); - int nb = buffer_read(&pcy->ibuf, pcy->fd, -1); + const config_t *mconfig = vconfig; + if (refresh) { + array_add(busy, pcy); + return 0; + } + + query_context_t *context = client_data(pcy); + query_t *query = &context->query; + context->client = pcy; + + buffer_t *buf = client_input_buffer(pcy); + int search_offs = MAX(0, (int)(buf->len - 1)); + int nb = client_read(pcy); const char *eoq; - query_t* query = pcy->data; if (nb < 0) { if (errno == EAGAIN || errno == EINTR) @@ -192,26 +210,63 @@ static int policy_run(server_t *pcy, void* config) return -1; } if (nb == 0) { - if (pcy->ibuf.len) - syslog(LOG_ERR, "unexpected end of data"); + if (buf->len) + err("unexpected end of data"); return -1; } - if (!(eoq = strstr(pcy->ibuf.data + search_offs, "\n\n"))) + if (!(eoq = strstr(buf->data + search_offs, "\n\n"))) { return 0; + } - if (postfix_parsejob(pcy->data, pcy->ibuf.data) < 0) + if (!query_parse(query, buf->data)) { return -1; + } query->eoq = eoq + strlen("\n\n"); - epoll_modify(pcy->fd, 0, pcy); - policy_process(pcy); + + /* The instance changed => reset the static context */ + if (query->instance.str == NULL || query->instance.len == 0 + || strcmp(context->context.instance, query->instance.str) != 0) { + filter_context_clean(&context->context); + m_strcat(context->context.instance, 64, query->instance.str); + } + client_io_none(pcy); + return policy_process(pcy, mconfig) ? 0 : -1; +} + +static void policy_async_handler(filter_context_t *context, + const filter_hook_t *hook) +{ + bool ok = false; + const filter_t *filter = context->current_filter; + query_context_t *qctx = context->data; + query_t *query = &qctx->query; + client_t *server = qctx->client; + + context->current_filter = next_filter(server, filter, query, hook, &ok); + if (context->current_filter != NULL) { + ok = policy_process(server, config); + } + if (!ok) { + client_release(server); + } + if (refresh && filter_running == 0) { + config_refresh(config); + } +} + +static int postlicyd_init(void) +{ + filter_async_handler_register(policy_async_handler); return 0; } -int start_listener(int port) +static void postlicyd_shutdown(void) { - return start_server(port, NULL, NULL); + array_deep_wipe(busy, client_delete); } +module_init(postlicyd_init); +module_exit(postlicyd_shutdown); /* administrivia {{{ */ @@ -223,6 +278,9 @@ void usage(void) " -l port to listen to\n" " -p file to write our pid to\n" " -f stay in foreground\n" + " -d grow logging level\n" + " -u unsafe mode (don't drop privileges)\n" + " -c check-conf\n" , stderr); } @@ -234,8 +292,10 @@ int main(int argc, char *argv[]) const char *pidfile = NULL; bool daemonize = true; int port = DEFAULT_PORT; + bool port_from_cli = false; + bool check_conf = false; - for (int c = 0; (c = getopt(argc, argv, "hf" "l:p:")) >= 0; ) { + for (int c = 0; (c = getopt(argc, argv, "ufdc" "l:p:")) >= 0; ) { switch (c) { case 'p': pidfile = optarg; @@ -245,26 +305,68 @@ int main(int argc, char *argv[]) break; case 'l': port = atoi(optarg); + port_from_cli = true; break; case 'f': daemonize = false; break; + case 'd': + ++log_level; + break; + case 'c': + check_conf = true; + daemonize = false; + unsafe = true; + break; default: usage(); return EXIT_FAILURE; } } + if (!daemonize) { + log_syslog = false; + } + if (argc - optind != 1) { usage(); return EXIT_FAILURE; } - if (common_setup(pidfile, false, RUNAS_USER, RUNAS_GROUP, - daemonize) != EXIT_SUCCESS - || start_listener(port) < 0) { + if (check_conf) { + return config_check(argv[optind]) ? EXIT_SUCCESS : EXIT_FAILURE; + } + info("%s v%s...", DAEMON_NAME, DAEMON_VERSION); + + if (pidfile_open(pidfile) < 0) { + crit("unable to write pidfile %s", pidfile); + return EXIT_FAILURE; + } + + if (drop_privileges(RUNAS_USER, RUNAS_GROUP) < 0) { + crit("unable to drop privileges"); + return EXIT_FAILURE; + } + + config = config_read(argv[optind]); + if (config == NULL) { + return EXIT_FAILURE; + } + if (port_from_cli || config->port == 0) { + config->port = port; + } + + if (daemonize && daemon_detach() < 0) { + crit("unable to fork"); + return EXIT_FAILURE; + } + + pidfile_refresh(); + + if (start_listener(config->port) == NULL) { return EXIT_FAILURE; + } else { + return server_loop(query_starter, query_stopper, + policy_run, config_refresh, config); } - return server_loop(query_starter, (delete_client_t)query_delete, - policy_run, NULL); }