Update the lib to include polling on the pwqrfd.
authorPierre Habouzit <pierre.habouzit@intersec.com>
Sun, 15 Jan 2012 12:57:42 +0000 (13:57 +0100)
committerPierre Habouzit <pierre.habouzit@intersec.com>
Sun, 15 Jan 2012 13:11:58 +0000 (14:11 +0100)
This introduces an overcommit_count.  This counter is set by a poller
thread (or an event loop, see pwqr_overcommit_poll_loop.

On the other side, normal jobs consumers check this counter, when non 0
they XCHG it with 0 (to be sure they are alone evaluating the overcommit
ratio), ask the kernel for the current overcommit, substract one, put it
as the new counter and go to be parked.

Of course when going to be parked the threads may actually find overcommit
jobs or similar, then the polling thread will reset the overcommit_count
again in that case and this will start again. In the more common case, the
thread will be parked directly and we hope it'll be enough.

When a thread goes out of PARK mode without signaling an EDQUOT condition,
we forcefully set the overcommit_count to zero.

This should hopefully take care of the downsizing of the pool in case of
overcommit for too long. As a side note, the kernel only signals
overcommit when it's lasting for more than PWQR_OC_DELAY (which is 1/20 of
a second as of this commit), which lets plenty of time for the overcommit
to be reduced in other more "natural" ways.

Signed-off-by: Pierre Habouzit <pierre.habouzit@intersec.com>
lib/libpwqr.c

index 85cccac..1e1f681 100644 (file)
@@ -27,6 +27,7 @@
 #include <stddef.h>
 #include <stdint.h>
 #include <sys/ioctl.h>
+#include <sys/poll.h>
 #include <sys/types.h>
 #include <unistd.h>
 
@@ -60,6 +61,7 @@
 #define atomic_add(ptr, n)           (void)atomic_fetch_and_add(ptr, n)
 #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n)
 #define atomic_sub(ptr, n)           (void)atomic_fetch_and_sub(ptr, n)
+#define atomic_xchg(ptr, v)          ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, v))
 
 #define ALWAYS_INLINE     __attribute__((always_inline)) inline
 
@@ -107,6 +109,7 @@ static struct {
     /* atomic */ int nthreads;
     /* atomic */ int waiters;
     /* atomic */ int parked;
+    /* atomic */ int overcommit_count;
 
     union {
         struct {
@@ -183,6 +186,52 @@ static ALWAYS_INLINE void pwqr_signal_relaxed(void)
     pwqr_signal_relaxed_n(1);
 }
 
+static int pwqr_fd_read_overcommit(int fd)
+{
+    int buf;
+
+    if (read(fd, &buf, sizeof(buf)) == sizeof(buf))
+        return buf;
+    return 0;
+}
+
+__attribute__((cold))
+static int pwqr_fd_overcommit_check(void)
+{
+    if (atomic_xchg(&pwqr_g.overcommit_count, 0)) {
+        int oc = pwqr_fd_read_overcommit(pwqr_g.fd);
+
+        if (oc) {
+            access_once(pwqr_g.overcommit_count) = (oc - 1);
+            return 1;
+        }
+    }
+    return 0;
+}
+
+__attribute__((unused))
+static void pwqr_overcommit_poll_loop(void)
+{
+    struct pollfd pfd = {
+        .fd     = pwqr_g.fd,
+        .events = POLLIN,
+    };
+
+    for (;;) {
+        pfd.revents = 0;
+        if (poll(&pfd, 1, -1) >= 0) {
+            if (pfd.revents & POLLIN) {
+                access_once(pwqr_g.overcommit_count) =
+                    pwqr_fd_read_overcommit(pwqr_g.fd);
+            }
+            if (pfd.revents & (POLLHUP | POLLERR))
+                return;
+        } else if (errno != EINTR && errno != EAGAIN) {
+            return;
+        }
+    }
+}
+
 
 /* returns:
    - INPOOL if we were woken to run jobs.
@@ -251,6 +300,7 @@ static int pwqr_do_park(void)
 #endif
 
         if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) {
+            access_once(pwqr_g.overcommit_count) = 0;
             if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) {
                 pwqr_spawn_thread(PARKED, 1);
             }
@@ -290,12 +340,23 @@ static void *pwqr_main(void *arg)
     do {
         uint64_t ticket;
 
+      park:
         if (state == PARKED && (state = pwqr_do_park()) < 0)
             break;
+        for (;;) {
+            if (unlikely(pwqr_g.overcommit_count)) {
+                if (pwqr_fd_overcommit_check()) {
+                    state = PARKED;
+                    goto park;
+                }
+            }
 #ifdef PSEUDO_CODE
-        while ((job = find_some_job(ANY_JOB)))
+            job = find_some_job(ANY_JOB);
+            if (!job)
+                break;
             run_job(job);
 #endif
+        }
         ticket = pwqr_get_ticket();
 #ifdef PSEUDO_CODE
         if ((job = find_some_job(ANY_JOB))) {