X-Git-Url: http://git.madism.org/?p=~madcoder%2Fpwqr.git;a=blobdiff_plain;f=lib%2Flibpwqr.c;h=538813700d6a0ca9df27dc65455ca313cf98b2d5;hp=85cccace4ee4b83a3460d866eb953240b338fab7;hb=a5f7e5aaf5bb2168aca37066eb6b49d126689aa6;hpb=866e56c8f10d718c24e812335b107a05218dc339 diff --git a/lib/libpwqr.c b/lib/libpwqr.c index 85cccac..5388137 100644 --- a/lib/libpwqr.c +++ b/lib/libpwqr.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -60,6 +61,7 @@ #define atomic_add(ptr, n) (void)atomic_fetch_and_add(ptr, n) #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n) #define atomic_sub(ptr, n) (void)atomic_fetch_and_sub(ptr, n) +#define atomic_xchg(ptr, v) ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, v)) #define ALWAYS_INLINE __attribute__((always_inline)) inline @@ -68,7 +70,7 @@ static int pwqr_create(int flags) { - if (flags & ~(O_NONBLOCK | O_CLOEXEC)) { + if (flags & ~PWQR_FL__SET) { errno = -EINVAL; return -1; } @@ -80,16 +82,16 @@ static int pwqr_ctl(int fd, int op, int val, void *uaddr) struct pwqr_ioc_wait wait; switch (op) { - case PWQR_GET_CONC: - case PWQR_REGISTER: - case PWQR_UNREGISTER: - case PWQR_PARK: + case PWQR_CTL_GET_CONC: + case PWQR_CTL_REGISTER: + case PWQR_CTL_UNREGISTER: + case PWQR_CTL_PARK: return ioctl(fd, op); - case PWQR_SET_CONC: - case PWQR_WAKE: - case PWQR_WAKE_OC: + case PWQR_CTL_SET_CONC: + case PWQR_CTL_WAKE: + case PWQR_CTL_WAKE_OC: return ioctl(fd, op, val); - case PWQR_WAIT: + case PWQR_CTL_WAIT: wait.pwqr_ticket = val; wait.pwqr_uaddr = uaddr; return ioctl(fd, op, &wait); @@ -107,6 +109,7 @@ static struct { /* atomic */ int nthreads; /* atomic */ int waiters; /* atomic */ int parked; + /* atomic */ int overcommit_count; union { struct { @@ -166,7 +169,7 @@ static ALWAYS_INLINE void pwqr_signal_n(int n) atomic_add(&pwqr_g.ticket, 1); #endif if (atomic_fetch_and_add(&pwqr_g.waiters, 0)) - pwqr_ctl(pwqr_g.fd, PWQR_WAKE, n, NULL); + pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAKE, n, NULL); } static ALWAYS_INLINE void pwqr_signal(void) { @@ -183,6 +186,52 @@ static ALWAYS_INLINE void pwqr_signal_relaxed(void) pwqr_signal_relaxed_n(1); } +static int pwqr_fd_read_overcommit(int fd) +{ + int buf; + + if (read(fd, &buf, sizeof(buf)) == sizeof(buf)) + return buf; + return 0; +} + +__attribute__((cold)) +static int pwqr_fd_overcommit_check(void) +{ + if (atomic_xchg(&pwqr_g.overcommit_count, 0)) { + int oc = pwqr_fd_read_overcommit(pwqr_g.fd); + + if (oc) { + access_once(pwqr_g.overcommit_count) = (oc - 1); + return 1; + } + } + return 0; +} + +__attribute__((unused)) +static void pwqr_overcommit_poll_loop(void) +{ + struct pollfd pfd = { + .fd = pwqr_g.fd, + .events = POLLIN, + }; + + for (;;) { + pfd.revents = 0; + if (poll(&pfd, 1, -1) >= 0) { + if (pfd.revents & POLLIN) { + access_once(pwqr_g.overcommit_count) = + pwqr_fd_read_overcommit(pwqr_g.fd); + } + if (pfd.revents & (POLLHUP | POLLERR)) + return; + } else if (errno != EINTR && errno != EAGAIN) { + return; + } + } +} + /* returns: - INPOOL if we were woken to run jobs. @@ -209,7 +258,7 @@ static int pwqr_do_wait(uint64_t ticket) pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype); - rc = pwqr_ctl(pwqr_g.fd, PWQR_WAIT, ticket, &pwqr_g.lo); + rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAIT, ticket, &pwqr_g.lo); if (rc < 0) { if (errno == EINTR) { rc = INPOOL; @@ -250,7 +299,8 @@ static int pwqr_do_park(void) run_job(job); #endif - if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) { + if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_PARK, 0, NULL)) == 0) { + access_once(pwqr_g.overcommit_count) = 0; if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) { pwqr_spawn_thread(PARKED, 1); } @@ -284,18 +334,29 @@ static void *pwqr_main(void *arg) goto out; pthread_cleanup_push(&pwqr_main_cleanup, NULL); - if (pwqr_ctl(pwqr_g.fd, PWQR_REGISTER, 0, NULL) < 0) + if (pwqr_ctl(pwqr_g.fd, PWQR_CTL_REGISTER, 0, NULL) < 0) goto out; do { uint64_t ticket; + park: if (state == PARKED && (state = pwqr_do_park()) < 0) break; + for (;;) { + if (unlikely(pwqr_g.overcommit_count)) { + if (pwqr_fd_overcommit_check()) { + state = PARKED; + goto park; + } + } #ifdef PSEUDO_CODE - while ((job = find_some_job(ANY_JOB))) + job = find_some_job(ANY_JOB); + if (!job) + break; run_job(job); #endif + } ticket = pwqr_get_ticket(); #ifdef PSEUDO_CODE if ((job = find_some_job(ANY_JOB))) { @@ -339,7 +400,7 @@ static int _pthread_workqueue_init_np(void) } pwqr_g.fd = fd; - n = pwqr_ctl(pwqr_g.fd, PWQR_GET_CONC, 0, NULL) + 4; + n = pwqr_ctl(pwqr_g.fd, PWQR_CTL_GET_CONC, 0, NULL) + 4; pwqr_spawn_thread(INPOOL, n); pwqr_spawn_thread(PARKED, 4);