streamlining.
[~madcoder/pwqr.git] / lib / libpwqr.c
index 85cccac..5388137 100644 (file)
@@ -27,6 +27,7 @@
 #include <stddef.h>
 #include <stdint.h>
 #include <sys/ioctl.h>
 #include <stddef.h>
 #include <stdint.h>
 #include <sys/ioctl.h>
+#include <sys/poll.h>
 #include <sys/types.h>
 #include <unistd.h>
 
 #include <sys/types.h>
 #include <unistd.h>
 
@@ -60,6 +61,7 @@
 #define atomic_add(ptr, n)           (void)atomic_fetch_and_add(ptr, n)
 #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n)
 #define atomic_sub(ptr, n)           (void)atomic_fetch_and_sub(ptr, n)
 #define atomic_add(ptr, n)           (void)atomic_fetch_and_add(ptr, n)
 #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n)
 #define atomic_sub(ptr, n)           (void)atomic_fetch_and_sub(ptr, n)
+#define atomic_xchg(ptr, v)          ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, v))
 
 #define ALWAYS_INLINE     __attribute__((always_inline)) inline
 
 
 #define ALWAYS_INLINE     __attribute__((always_inline)) inline
 
@@ -68,7 +70,7 @@
 
 static int pwqr_create(int flags)
 {
 
 static int pwqr_create(int flags)
 {
-    if (flags & ~(O_NONBLOCK | O_CLOEXEC)) {
+    if (flags & ~PWQR_FL__SET) {
         errno = -EINVAL;
         return -1;
     }
         errno = -EINVAL;
         return -1;
     }
@@ -80,16 +82,16 @@ static int pwqr_ctl(int fd, int op, int val, void *uaddr)
     struct pwqr_ioc_wait wait;
 
     switch (op) {
     struct pwqr_ioc_wait wait;
 
     switch (op) {
-      case PWQR_GET_CONC:
-      case PWQR_REGISTER:
-      case PWQR_UNREGISTER:
-      case PWQR_PARK:
+      case PWQR_CTL_GET_CONC:
+      case PWQR_CTL_REGISTER:
+      case PWQR_CTL_UNREGISTER:
+      case PWQR_CTL_PARK:
         return ioctl(fd, op);
         return ioctl(fd, op);
-      case PWQR_SET_CONC:
-      case PWQR_WAKE:
-      case PWQR_WAKE_OC:
+      case PWQR_CTL_SET_CONC:
+      case PWQR_CTL_WAKE:
+      case PWQR_CTL_WAKE_OC:
         return ioctl(fd, op, val);
         return ioctl(fd, op, val);
-      case PWQR_WAIT:
+      case PWQR_CTL_WAIT:
         wait.pwqr_ticket = val;
         wait.pwqr_uaddr  = uaddr;
         return ioctl(fd, op, &wait);
         wait.pwqr_ticket = val;
         wait.pwqr_uaddr  = uaddr;
         return ioctl(fd, op, &wait);
@@ -107,6 +109,7 @@ static struct {
     /* atomic */ int nthreads;
     /* atomic */ int waiters;
     /* atomic */ int parked;
     /* atomic */ int nthreads;
     /* atomic */ int waiters;
     /* atomic */ int parked;
+    /* atomic */ int overcommit_count;
 
     union {
         struct {
 
     union {
         struct {
@@ -166,7 +169,7 @@ static ALWAYS_INLINE void pwqr_signal_n(int n)
     atomic_add(&pwqr_g.ticket, 1);
 #endif
     if (atomic_fetch_and_add(&pwqr_g.waiters, 0))
     atomic_add(&pwqr_g.ticket, 1);
 #endif
     if (atomic_fetch_and_add(&pwqr_g.waiters, 0))
-        pwqr_ctl(pwqr_g.fd, PWQR_WAKE, n, NULL);
+        pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAKE, n, NULL);
 }
 static ALWAYS_INLINE void pwqr_signal(void)
 {
 }
 static ALWAYS_INLINE void pwqr_signal(void)
 {
@@ -183,6 +186,52 @@ static ALWAYS_INLINE void pwqr_signal_relaxed(void)
     pwqr_signal_relaxed_n(1);
 }
 
     pwqr_signal_relaxed_n(1);
 }
 
+static int pwqr_fd_read_overcommit(int fd)
+{
+    int buf;
+
+    if (read(fd, &buf, sizeof(buf)) == sizeof(buf))
+        return buf;
+    return 0;
+}
+
+__attribute__((cold))
+static int pwqr_fd_overcommit_check(void)
+{
+    if (atomic_xchg(&pwqr_g.overcommit_count, 0)) {
+        int oc = pwqr_fd_read_overcommit(pwqr_g.fd);
+
+        if (oc) {
+            access_once(pwqr_g.overcommit_count) = (oc - 1);
+            return 1;
+        }
+    }
+    return 0;
+}
+
+__attribute__((unused))
+static void pwqr_overcommit_poll_loop(void)
+{
+    struct pollfd pfd = {
+        .fd     = pwqr_g.fd,
+        .events = POLLIN,
+    };
+
+    for (;;) {
+        pfd.revents = 0;
+        if (poll(&pfd, 1, -1) >= 0) {
+            if (pfd.revents & POLLIN) {
+                access_once(pwqr_g.overcommit_count) =
+                    pwqr_fd_read_overcommit(pwqr_g.fd);
+            }
+            if (pfd.revents & (POLLHUP | POLLERR))
+                return;
+        } else if (errno != EINTR && errno != EAGAIN) {
+            return;
+        }
+    }
+}
+
 
 /* returns:
    - INPOOL if we were woken to run jobs.
 
 /* returns:
    - INPOOL if we were woken to run jobs.
@@ -209,7 +258,7 @@ static int pwqr_do_wait(uint64_t ticket)
     pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL);
     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
 
     pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL);
     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
 
-    rc = pwqr_ctl(pwqr_g.fd, PWQR_WAIT, ticket, &pwqr_g.lo);
+    rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAIT, ticket, &pwqr_g.lo);
     if (rc < 0) {
         if (errno == EINTR) {
             rc = INPOOL;
     if (rc < 0) {
         if (errno == EINTR) {
             rc = INPOOL;
@@ -250,7 +299,8 @@ static int pwqr_do_park(void)
             run_job(job);
 #endif
 
             run_job(job);
 #endif
 
-        if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) {
+        if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_PARK, 0, NULL)) == 0) {
+            access_once(pwqr_g.overcommit_count) = 0;
             if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) {
                 pwqr_spawn_thread(PARKED, 1);
             }
             if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) {
                 pwqr_spawn_thread(PARKED, 1);
             }
@@ -284,18 +334,29 @@ static void *pwqr_main(void *arg)
         goto out;
 
     pthread_cleanup_push(&pwqr_main_cleanup, NULL);
         goto out;
 
     pthread_cleanup_push(&pwqr_main_cleanup, NULL);
-    if (pwqr_ctl(pwqr_g.fd, PWQR_REGISTER, 0, NULL) < 0)
+    if (pwqr_ctl(pwqr_g.fd, PWQR_CTL_REGISTER, 0, NULL) < 0)
         goto out;
 
     do {
         uint64_t ticket;
 
         goto out;
 
     do {
         uint64_t ticket;
 
+      park:
         if (state == PARKED && (state = pwqr_do_park()) < 0)
             break;
         if (state == PARKED && (state = pwqr_do_park()) < 0)
             break;
+        for (;;) {
+            if (unlikely(pwqr_g.overcommit_count)) {
+                if (pwqr_fd_overcommit_check()) {
+                    state = PARKED;
+                    goto park;
+                }
+            }
 #ifdef PSEUDO_CODE
 #ifdef PSEUDO_CODE
-        while ((job = find_some_job(ANY_JOB)))
+            job = find_some_job(ANY_JOB);
+            if (!job)
+                break;
             run_job(job);
 #endif
             run_job(job);
 #endif
+        }
         ticket = pwqr_get_ticket();
 #ifdef PSEUDO_CODE
         if ((job = find_some_job(ANY_JOB))) {
         ticket = pwqr_get_ticket();
 #ifdef PSEUDO_CODE
         if ((job = find_some_job(ANY_JOB))) {
@@ -339,7 +400,7 @@ static int _pthread_workqueue_init_np(void)
     }
 
     pwqr_g.fd = fd;
     }
 
     pwqr_g.fd = fd;
-    n = pwqr_ctl(pwqr_g.fd, PWQR_GET_CONC, 0, NULL) + 4;
+    n = pwqr_ctl(pwqr_g.fd, PWQR_CTL_GET_CONC, 0, NULL) + 4;
     pwqr_spawn_thread(INPOOL, n);
     pwqr_spawn_thread(PARKED, 4);
 
     pwqr_spawn_thread(INPOOL, n);
     pwqr_spawn_thread(PARKED, 4);