static int epollfd = -1;
+static int el_job_setemode(job_t *w, el_mode emode)
+{
+ static int const evtmode_to_epoll[] = {
+ [EL_NEW] = EPOLLRDHUP,
+ [EL_READING] = EPOLLIN,
+ [EL_WRITING] = EPOLLOUT,
+ [EL_RDWR] = EPOLLIN | EPOLLOUT,
+ [EL_IDLE] = EPOLLRDHUP,
+ };
+
+ assert (w->mode == emode || emode == EL_WRITING || emode == EL_READING);
+
+ if (emode != w->emode) {
+ struct epoll_event event = {
+ .data.ptr = w,
+ .events = evtmode_to_epoll[emode],
+ };
+ int action = w->emode == EL_NEW ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
+ if (epoll_ctl(epollfd, action, w->fd, &event) < 0) {
+ return el_job_release(w, true);
+ }
+ }
+ w->emode = emode;
+ return 0;
+}
+
+int el_job_setmode(job_t *w, el_mode mode)
+{
+ if (w->mode == w->emode) {
+ w->mode = mode;
+ return el_job_setemode(w, mode);
+ } else {
+ w->mode = mode;
+ return 0;
+ }
+}
+
int el_job_release(job_t *w, el_status reason)
{
w->state = EL_LLP_FINI;
if (w->fd >= 0) {
close(w->fd);
}
- p_delete(&w);
+ job_delete(&w);
return -1;
}
+static int el_job_connecting(job_t *w)
+{
+ int err = 0;
+ socklen_t len = sizeof(err);
+
+ if (getsockopt(w->fd, SOL_SOCKET, SO_ERROR, (void *)&err, &len) || err)
+ return el_job_release(w, EL_ERROR);
+
+ w->state = EL_LLP_READY;
+ return w->m->on_event(w, EL_EVT_RUNNING);
+}
+
+int el_job_connect(job_t *w, struct sockaddr *addr, socklen_t len,
+ int type, int proto)
+{
+ int res, sock = socket(addr->sa_family, type, proto);
+
+ if (sock < 0)
+ goto error;
+
+ res = fcntl(sock, F_GETFL);
+ if (res < 0)
+ goto error;
+ if (fcntl(sock, F_SETFL, res | O_NONBLOCK) < 0)
+ goto error;
+ if (connect(sock, addr, len) < 0)
+ goto error;
+
+ w->fd = sock;
+ w->llp = &el_job_connecting;
+ return el_job_setmode(w, EL_WRITING);
+
+ error:
+ close(sock);
+ return el_job_release(w, EL_ERROR);
+}
+
+ssize_t el_job_read(job_t *w, buffer_t *buf)
+{
+ ssize_t nr;
+
+ buffer_ensure(buf, BUFSIZ);
+
+ if (w->session) {
+ nr = gnutls_record_recv(w->session, buf->data + buf->len, BUFSIZ);
+ if (nr < 0 && gnutls_error_is_fatal(nr))
+ return el_job_release(w, EL_RDHUP);
+ if (nr <= 0) {
+ int wr = gnutls_record_get_direction(w->session);
+ return el_job_setemode(w, wr ? EL_WRITING : EL_READING);
+ }
+ EL_JOB_CHECK(el_job_setemode(w, w->mode));
+ } else {
+ nr = read(w->fd, buf->data + buf->len, BUFSIZ);
+ if (nr < 0) {
+ if (errno == EINTR || errno == EAGAIN)
+ return 0;
+ }
+ if (nr <= 0)
+ return el_job_release(w, EL_RDHUP);
+ }
+ buffer_extend(buf, nr);
+ return nr;
+}
+
int el_dispatch(int timeout)
{
struct epoll_event events[FD_SETSIZE];