From: Pierre Habouzit Date: Sun, 15 Jan 2012 12:57:42 +0000 (+0100) Subject: Update the lib to include polling on the pwqrfd. X-Git-Url: http://git.madism.org/?p=~madcoder%2Fpwqr.git;a=commitdiff_plain;h=537958e9f81a6338fa2986ec56938d5a9fbc5166 Update the lib to include polling on the pwqrfd. This introduces an overcommit_count. This counter is set by a poller thread (or an event loop, see pwqr_overcommit_poll_loop. On the other side, normal jobs consumers check this counter, when non 0 they XCHG it with 0 (to be sure they are alone evaluating the overcommit ratio), ask the kernel for the current overcommit, substract one, put it as the new counter and go to be parked. Of course when going to be parked the threads may actually find overcommit jobs or similar, then the polling thread will reset the overcommit_count again in that case and this will start again. In the more common case, the thread will be parked directly and we hope it'll be enough. When a thread goes out of PARK mode without signaling an EDQUOT condition, we forcefully set the overcommit_count to zero. This should hopefully take care of the downsizing of the pool in case of overcommit for too long. As a side note, the kernel only signals overcommit when it's lasting for more than PWQR_OC_DELAY (which is 1/20 of a second as of this commit), which lets plenty of time for the overcommit to be reduced in other more "natural" ways. Signed-off-by: Pierre Habouzit --- diff --git a/lib/libpwqr.c b/lib/libpwqr.c index 85cccac..1e1f681 100644 --- a/lib/libpwqr.c +++ b/lib/libpwqr.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -60,6 +61,7 @@ #define atomic_add(ptr, n) (void)atomic_fetch_and_add(ptr, n) #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n) #define atomic_sub(ptr, n) (void)atomic_fetch_and_sub(ptr, n) +#define atomic_xchg(ptr, v) ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, v)) #define ALWAYS_INLINE __attribute__((always_inline)) inline @@ -107,6 +109,7 @@ static struct { /* atomic */ int nthreads; /* atomic */ int waiters; /* atomic */ int parked; + /* atomic */ int overcommit_count; union { struct { @@ -183,6 +186,52 @@ static ALWAYS_INLINE void pwqr_signal_relaxed(void) pwqr_signal_relaxed_n(1); } +static int pwqr_fd_read_overcommit(int fd) +{ + int buf; + + if (read(fd, &buf, sizeof(buf)) == sizeof(buf)) + return buf; + return 0; +} + +__attribute__((cold)) +static int pwqr_fd_overcommit_check(void) +{ + if (atomic_xchg(&pwqr_g.overcommit_count, 0)) { + int oc = pwqr_fd_read_overcommit(pwqr_g.fd); + + if (oc) { + access_once(pwqr_g.overcommit_count) = (oc - 1); + return 1; + } + } + return 0; +} + +__attribute__((unused)) +static void pwqr_overcommit_poll_loop(void) +{ + struct pollfd pfd = { + .fd = pwqr_g.fd, + .events = POLLIN, + }; + + for (;;) { + pfd.revents = 0; + if (poll(&pfd, 1, -1) >= 0) { + if (pfd.revents & POLLIN) { + access_once(pwqr_g.overcommit_count) = + pwqr_fd_read_overcommit(pwqr_g.fd); + } + if (pfd.revents & (POLLHUP | POLLERR)) + return; + } else if (errno != EINTR && errno != EAGAIN) { + return; + } + } +} + /* returns: - INPOOL if we were woken to run jobs. @@ -251,6 +300,7 @@ static int pwqr_do_park(void) #endif if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) { + access_once(pwqr_g.overcommit_count) = 0; if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) { pwqr_spawn_thread(PARKED, 1); } @@ -290,12 +340,23 @@ static void *pwqr_main(void *arg) do { uint64_t ticket; + park: if (state == PARKED && (state = pwqr_do_park()) < 0) break; + for (;;) { + if (unlikely(pwqr_g.overcommit_count)) { + if (pwqr_fd_overcommit_check()) { + state = PARKED; + goto park; + } + } #ifdef PSEUDO_CODE - while ((job = find_some_job(ANY_JOB))) + job = find_some_job(ANY_JOB); + if (!job) + break; run_job(job); #endif + } ticket = pwqr_get_ticket(); #ifdef PSEUDO_CODE if ((job = find_some_job(ANY_JOB))) {