X-Git-Url: http://git.madism.org/?p=~madcoder%2Fpwqr.git;a=blobdiff_plain;f=lib%2Flibpwqr.c;h=1e1f681a76c8df3b994b33cae192f732efc55270;hp=85cccace4ee4b83a3460d866eb953240b338fab7;hb=537958e9f81a6338fa2986ec56938d5a9fbc5166;hpb=866e56c8f10d718c24e812335b107a05218dc339;ds=inline diff --git a/lib/libpwqr.c b/lib/libpwqr.c index 85cccac..1e1f681 100644 --- a/lib/libpwqr.c +++ b/lib/libpwqr.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -60,6 +61,7 @@ #define atomic_add(ptr, n) (void)atomic_fetch_and_add(ptr, n) #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n) #define atomic_sub(ptr, n) (void)atomic_fetch_and_sub(ptr, n) +#define atomic_xchg(ptr, v) ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, v)) #define ALWAYS_INLINE __attribute__((always_inline)) inline @@ -107,6 +109,7 @@ static struct { /* atomic */ int nthreads; /* atomic */ int waiters; /* atomic */ int parked; + /* atomic */ int overcommit_count; union { struct { @@ -183,6 +186,52 @@ static ALWAYS_INLINE void pwqr_signal_relaxed(void) pwqr_signal_relaxed_n(1); } +static int pwqr_fd_read_overcommit(int fd) +{ + int buf; + + if (read(fd, &buf, sizeof(buf)) == sizeof(buf)) + return buf; + return 0; +} + +__attribute__((cold)) +static int pwqr_fd_overcommit_check(void) +{ + if (atomic_xchg(&pwqr_g.overcommit_count, 0)) { + int oc = pwqr_fd_read_overcommit(pwqr_g.fd); + + if (oc) { + access_once(pwqr_g.overcommit_count) = (oc - 1); + return 1; + } + } + return 0; +} + +__attribute__((unused)) +static void pwqr_overcommit_poll_loop(void) +{ + struct pollfd pfd = { + .fd = pwqr_g.fd, + .events = POLLIN, + }; + + for (;;) { + pfd.revents = 0; + if (poll(&pfd, 1, -1) >= 0) { + if (pfd.revents & POLLIN) { + access_once(pwqr_g.overcommit_count) = + pwqr_fd_read_overcommit(pwqr_g.fd); + } + if (pfd.revents & (POLLHUP | POLLERR)) + return; + } else if (errno != EINTR && errno != EAGAIN) { + return; + } + } +} + /* returns: - INPOOL if we were woken to run jobs. @@ -251,6 +300,7 @@ static int pwqr_do_park(void) #endif if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) { + access_once(pwqr_g.overcommit_count) = 0; if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) { pwqr_spawn_thread(PARKED, 1); } @@ -290,12 +340,23 @@ static void *pwqr_main(void *arg) do { uint64_t ticket; + park: if (state == PARKED && (state = pwqr_do_park()) < 0) break; + for (;;) { + if (unlikely(pwqr_g.overcommit_count)) { + if (pwqr_fd_overcommit_check()) { + state = PARKED; + goto park; + } + } #ifdef PSEUDO_CODE - while ((job = find_some_job(ANY_JOB))) + job = find_some_job(ANY_JOB); + if (!job) + break; run_job(job); #endif + } ticket = pwqr_get_ticket(); #ifdef PSEUDO_CODE if ((job = find_some_job(ANY_JOB))) {