X-Git-Url: http://git.madism.org/?p=~madcoder%2Fpwqr.git;a=blobdiff_plain;f=lib%2Flibpwqr.c;h=538813700d6a0ca9df27dc65455ca313cf98b2d5;hp=f5d53a2444d269e870f952320676e1223ac4011c;hb=a5f7e5aaf5bb2168aca37066eb6b49d126689aa6;hpb=c78f2216a947e712fff07f6e79f743d9009e60c8 diff --git a/lib/libpwqr.c b/lib/libpwqr.c index f5d53a2..5388137 100644 --- a/lib/libpwqr.c +++ b/lib/libpwqr.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -60,15 +61,20 @@ #define atomic_add(ptr, n) (void)atomic_fetch_and_add(ptr, n) #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n) #define atomic_sub(ptr, n) (void)atomic_fetch_and_sub(ptr, n) +#define atomic_xchg(ptr, v) ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, v)) #define ALWAYS_INLINE __attribute__((always_inline)) inline /* }}} */ /* pwqr wrapping {{{ */ -static int pwqr_create(void) +static int pwqr_create(int flags) { - return open("/dev/"PWQR_DEVICE_NAME, O_RDWR); + if (flags & ~PWQR_FL__SET) { + errno = -EINVAL; + return -1; + } + return open("/dev/"PWQR_DEVICE_NAME, O_RDWR | flags); } static int pwqr_ctl(int fd, int op, int val, void *uaddr) @@ -76,16 +82,16 @@ static int pwqr_ctl(int fd, int op, int val, void *uaddr) struct pwqr_ioc_wait wait; switch (op) { - case PWQR_GET_CONC: - case PWQR_REGISTER: - case PWQR_UNREGISTER: - case PWQR_PARK: + case PWQR_CTL_GET_CONC: + case PWQR_CTL_REGISTER: + case PWQR_CTL_UNREGISTER: + case PWQR_CTL_PARK: return ioctl(fd, op); - case PWQR_SET_CONC: - case PWQR_WAKE: - case PWQR_WAKE_OC: + case PWQR_CTL_SET_CONC: + case PWQR_CTL_WAKE: + case PWQR_CTL_WAKE_OC: return ioctl(fd, op, val); - case PWQR_WAIT: + case PWQR_CTL_WAIT: wait.pwqr_ticket = val; wait.pwqr_uaddr = uaddr; return ioctl(fd, op, &wait); @@ -103,6 +109,7 @@ static struct { /* atomic */ int nthreads; /* atomic */ int waiters; /* atomic */ int parked; + /* atomic */ int overcommit_count; union { struct { @@ -162,7 +169,7 @@ static ALWAYS_INLINE void pwqr_signal_n(int n) atomic_add(&pwqr_g.ticket, 1); #endif if (atomic_fetch_and_add(&pwqr_g.waiters, 0)) - pwqr_ctl(pwqr_g.fd, PWQR_WAKE, n, NULL); + pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAKE, n, NULL); } static ALWAYS_INLINE void pwqr_signal(void) { @@ -179,6 +186,52 @@ static ALWAYS_INLINE void pwqr_signal_relaxed(void) pwqr_signal_relaxed_n(1); } +static int pwqr_fd_read_overcommit(int fd) +{ + int buf; + + if (read(fd, &buf, sizeof(buf)) == sizeof(buf)) + return buf; + return 0; +} + +__attribute__((cold)) +static int pwqr_fd_overcommit_check(void) +{ + if (atomic_xchg(&pwqr_g.overcommit_count, 0)) { + int oc = pwqr_fd_read_overcommit(pwqr_g.fd); + + if (oc) { + access_once(pwqr_g.overcommit_count) = (oc - 1); + return 1; + } + } + return 0; +} + +__attribute__((unused)) +static void pwqr_overcommit_poll_loop(void) +{ + struct pollfd pfd = { + .fd = pwqr_g.fd, + .events = POLLIN, + }; + + for (;;) { + pfd.revents = 0; + if (poll(&pfd, 1, -1) >= 0) { + if (pfd.revents & POLLIN) { + access_once(pwqr_g.overcommit_count) = + pwqr_fd_read_overcommit(pwqr_g.fd); + } + if (pfd.revents & (POLLHUP | POLLERR)) + return; + } else if (errno != EINTR && errno != EAGAIN) { + return; + } + } +} + /* returns: - INPOOL if we were woken to run jobs. @@ -205,7 +258,7 @@ static int pwqr_do_wait(uint64_t ticket) pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype); - rc = pwqr_ctl(pwqr_g.fd, PWQR_WAIT, ticket, &pwqr_g.lo); + rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAIT, ticket, &pwqr_g.lo); if (rc < 0) { if (errno == EINTR) { rc = INPOOL; @@ -246,7 +299,8 @@ static int pwqr_do_park(void) run_job(job); #endif - if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) { + if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_PARK, 0, NULL)) == 0) { + access_once(pwqr_g.overcommit_count) = 0; if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) { pwqr_spawn_thread(PARKED, 1); } @@ -280,18 +334,29 @@ static void *pwqr_main(void *arg) goto out; pthread_cleanup_push(&pwqr_main_cleanup, NULL); - if (pwqr_ctl(pwqr_g.fd, PWQR_REGISTER, 0, NULL) < 0) + if (pwqr_ctl(pwqr_g.fd, PWQR_CTL_REGISTER, 0, NULL) < 0) goto out; do { uint64_t ticket; + park: if (state == PARKED && (state = pwqr_do_park()) < 0) break; + for (;;) { + if (unlikely(pwqr_g.overcommit_count)) { + if (pwqr_fd_overcommit_check()) { + state = PARKED; + goto park; + } + } #ifdef PSEUDO_CODE - while ((job = find_some_job(ANY_JOB))) + job = find_some_job(ANY_JOB); + if (!job) + break; run_job(job); #endif + } ticket = pwqr_get_ticket(); #ifdef PSEUDO_CODE if ((job = find_some_job(ANY_JOB))) { @@ -328,14 +393,14 @@ static int _pthread_workqueue_init_np(void) if (pwqr_g.fd >= 0) goto out; - fd = pwqr_create(); + fd = pwqr_create(0); if (fd < 0) { rc = -1; goto out; } pwqr_g.fd = fd; - n = pwqr_ctl(pwqr_g.fd, PWQR_GET_CONC, 0, NULL) + 4; + n = pwqr_ctl(pwqr_g.fd, PWQR_CTL_GET_CONC, 0, NULL) + 4; pwqr_spawn_thread(INPOOL, n); pwqr_spawn_thread(PARKED, 4);