X-Git-Url: http://git.madism.org/?a=blobdiff_plain;f=common%2Fserver.c;h=d5298960aa42f69cbbcd1a82018c5b0e39e8b597;hb=0ad0ac6446aa4490800addafd8219f0f2a968c4d;hp=5a9e081c8ad3617b6aae817daa9988b119a8d80a;hpb=521bf8a6290d26b27c332639fe8cc4e47c3d5c4f;p=apps%2Fpfixtools.git diff --git a/common/server.c b/common/server.c index 5a9e081..d529896 100644 --- a/common/server.c +++ b/common/server.c @@ -40,31 +40,30 @@ static PA(server_t) listeners = ARRAY_INIT; static PA(server_t) server_pool = ARRAY_INIT; +struct ev_loop *global_loop = NULL; +static start_client_t client_start = NULL; +static delete_client_t client_delete = NULL; +static run_client_t client_run = NULL; +static refresh_t config_refresh = NULL; +static void *config_ptr = NULL; + static server_t* server_new(void) { server_t* server = p_new(server_t, 1); server->fd = -1; - server->fd2 = -1; return server; } static void server_wipe(server_t *server) { - server->listener = server->event = false; - if (server->fd > 0) { - epoll_modify(server->fd, 0, NULL); + if (server->fd >= 0) { + ev_io_stop(global_loop, &server->io); close(server->fd); server->fd = -1; } - if (server->fd2 > 0) { - close(server->fd2); - server->fd2 = -1; - } if (server->data && server->clear_data) { server->clear_data(&server->data); } - array_shrink(server->ibuf, 512); - array_shrink(server->obuf, 512); } static void server_delete(server_t **server) @@ -86,191 +85,176 @@ static server_t* server_acquire(void) } } -static void server_release(server_t *server) +void server_release(server_t *server) { server_wipe(server); array_add(server_pool, server); } +static int server_init(void) +{ + global_loop = ev_default_loop(0); + return 0; +} + static void server_shutdown(void) { array_deep_wipe(listeners, server_delete); array_deep_wipe(server_pool, server_delete); } - +module_init(server_init); module_exit(server_shutdown); -int start_server(int port, start_listener_t starter, delete_client_t deleter) +static void client_cb(EV_P_ struct ev_io *w, int events) +{ + server_t *server = (server_t*)w; + + debug("Entering client_cb for %p, %d (%d | %d)", w, events, EV_WRITE, EV_READ); + + if (events & EV_WRITE && server->obuf.len) { + if (buffer_write(&server->obuf, server->fd) < 0) { + server_release(server); + return; + } + if (!server->obuf.len) { + ev_io_set(&server->io, server->fd, EV_READ); + } + } + + if (events & EV_READ) { + if (server->run(server, config_ptr) < 0) { + server_release(server); + return; + } + } +} + +static int start_client(server_t *server, start_client_t starter, + run_client_t runner, delete_client_t deleter) { - struct sockaddr_in addr = { - .sin_family = AF_INET, - .sin_addr = { htonl(INADDR_LOOPBACK) }, - }; server_t *tmp; void* data = NULL; int sock; - addr.sin_port = htons(port); - sock = tcp_listen_nonblock((const struct sockaddr *)&addr, sizeof(addr)); + sock = accept_nonblock(server->fd); if (sock < 0) { + UNIXERR("accept"); return -1; } if (starter) { - data = starter(); - if (data == NULL) { - close(sock); - return -1; - } + data = starter(server); + if (data == NULL) { + close(sock); + return -1; + } } tmp = server_acquire(); tmp->fd = sock; - tmp->listener = true; tmp->data = data; + tmp->run = runner; tmp->clear_data = deleter; - epoll_register(sock, EPOLLIN, tmp); - array_add(listeners, tmp); + ev_io_init(&tmp->io, client_cb, tmp->fd, EV_READ); + ev_io_start(global_loop, &tmp->io); return 0; } -static int start_client(server_t *server, start_client_t starter, - delete_client_t deleter) +static void server_cb(EV_P_ struct ev_io *w, int events) +{ + server_t *server = (server_t*)w; + if (start_client(server, client_start, client_run, client_delete) != 0) { + ev_unloop(EV_A_ EVUNLOOP_ALL); + } +} + +int start_server(int port, start_listener_t starter, delete_client_t deleter) { + struct sockaddr_in addr = { + .sin_family = AF_INET, + .sin_addr = { htonl(INADDR_LOOPBACK) }, + }; server_t *tmp; void* data = NULL; int sock; - sock = accept_nonblock(server->fd); + addr.sin_port = htons(port); + sock = tcp_listen_nonblock((const struct sockaddr *)&addr, sizeof(addr)); if (sock < 0) { - UNIXERR("accept"); return -1; } if (starter) { - data = starter(server); - if (data == NULL) { - close(sock); - return -1; - } + data = starter(); + if (data == NULL) { + close(sock); + return -1; + } } tmp = server_acquire(); tmp->fd = sock; tmp->data = data; + tmp->run = NULL; tmp->clear_data = deleter; - epoll_register(sock, EPOLLIN, tmp); + ev_io_init(&tmp->io, server_cb, tmp->fd, EV_READ); + ev_io_start(global_loop, &tmp->io); + array_add(listeners, tmp); return 0; } -event_t event_register(void *data) +server_t *server_register(int fd, run_client_t runner, void *data) { - int fds[2]; - if (pipe(fds) != 0) { - UNIXERR("pipe"); - return NULL; - } - if (setnonblock(fds[0]) != 0) { - close(fds[0]); - close(fds[1]); + if (fd < 0) { return NULL; } - server_t *tmp = server_acquire(); - tmp->event = true; - tmp->fd = fds[0]; - tmp->fd2 = fds[1]; - tmp->data = data; - epoll_register(fds[0], EPOLLIN, tmp); + server_t *tmp = server_acquire(); + tmp->fd = fd; + tmp->data = data; + tmp->run = runner; + tmp->clear_data = NULL; + ev_io_init(&tmp->io, client_cb, tmp->fd, EV_READ); + ev_io_start(global_loop, &tmp->io); return tmp; } -bool event_fire(event_t event) +static void refresh_cb(EV_P_ struct ev_signal *w, int event) { - static const char *data = ""; - return write(event->fd2, data, 1) == 0; + if (!config_refresh(config_ptr)) { + ev_unloop(EV_A_ EVUNLOOP_ALL); + } } -static bool event_cancel(event_t event) +static void exit_cb(EV_P_ struct ev_signal *w, int event) { - char buff[32]; - while (true) { - ssize_t res = read(event->fd, buff, 32); - if (res == -1 && errno != EAGAIN && errno != EINTR) { - UNIXERR("read"); - return false; - } else if (res == -1 && errno == EINTR) { - continue; - } else if (res != 32) { - return true; - } - } + ev_unloop(EV_A_ EVUNLOOP_ALL); } int server_loop(start_client_t starter, delete_client_t deleter, - run_client_t runner, event_handler_t handler, - refresh_t refresh, void* config) { - info("entering processing loop"); - while (!sigint) { - struct epoll_event evts[1024]; - int n; - - if (sighup && refresh) { - sighup = false; - info("refreshing..."); - if (!refresh(config)) { - crit("error while refreshing configuration"); - return EXIT_FAILURE; - } - info("refresh done, processing loop restarts"); - } + run_client_t runner, refresh_t refresh, void* config) +{ + struct ev_signal ev_sighup; + struct ev_signal ev_sigint; + struct ev_signal ev_sigterm; - n = epoll_select(evts, countof(evts), -1); - if (n < 0) { - if (errno != EAGAIN && errno != EINTR) { - UNIXERR("epoll_wait"); - return EXIT_FAILURE; - } - continue; - } + client_start = starter; + client_delete = deleter; + client_run = runner; + config_refresh = refresh; + config_ptr = config; - while (--n >= 0) { - server_t *d = evts[n].data.ptr; - - if (d->listener) { - (void)start_client(d, starter, deleter); - continue; - } else if (d->event) { - if (!event_cancel(d)) { - server_release(d); - continue; - } - if (handler) { - if (!handler(d, config)) { - server_release(d); - } - } - continue; - } - - if (evts[n].events & EPOLLIN) { - if (runner(d, config) < 0) { - server_release(d); - continue; - } - } - - if ((evts[n].events & EPOLLOUT) && d->obuf.len) { - if (buffer_write(&d->obuf, d->fd) < 0) { - server_release(d); - continue; - } - if (!d->obuf.len) { - epoll_modify(d->fd, EPOLLIN, d); - } - } - } + if (refresh != NULL) { + ev_signal_init(&ev_sighup, refresh_cb, SIGHUP); + ev_signal_start(global_loop, &ev_sighup); } + ev_signal_init(&ev_sigint, exit_cb, SIGINT); + ev_signal_start(global_loop, &ev_sigint); + ev_signal_init(&ev_sigterm, exit_cb, SIGTERM); + ev_signal_start(global_loop, &ev_sigterm); + + info("entering processing loop"); + ev_loop(global_loop, 0); info("exit requested"); return EXIT_SUCCESS; }