#include <stddef.h>
#include <stdint.h>
#include <sys/ioctl.h>
+#include <sys/poll.h>
#include <sys/types.h>
#include <unistd.h>
#define atomic_add(ptr, n) (void)atomic_fetch_and_add(ptr, n)
#define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n)
#define atomic_sub(ptr, n) (void)atomic_fetch_and_sub(ptr, n)
+#define atomic_xchg(ptr, v) ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, v))
#define ALWAYS_INLINE __attribute__((always_inline)) inline
/* }}} */
/* pwqr wrapping {{{ */
-static int pwqr_create(void)
+static int pwqr_create(int flags)
{
- return open("/dev/"PWQR_DEVICE_NAME, O_RDWR);
+ if (flags & ~PWQR_FL__SET) {
+ errno = -EINVAL;
+ return -1;
+ }
+ return open("/dev/"PWQR_DEVICE_NAME, O_RDWR | flags);
}
static int pwqr_ctl(int fd, int op, int val, void *uaddr)
struct pwqr_ioc_wait wait;
switch (op) {
- case PWQR_GET_CONC:
- case PWQR_REGISTER:
- case PWQR_UNREGISTER:
- case PWQR_PARK:
+ case PWQR_CTL_GET_CONC:
+ case PWQR_CTL_REGISTER:
+ case PWQR_CTL_UNREGISTER:
+ case PWQR_CTL_PARK:
return ioctl(fd, op);
- case PWQR_SET_CONC:
- case PWQR_WAKE:
- case PWQR_WAKE_OC:
+ case PWQR_CTL_SET_CONC:
+ case PWQR_CTL_WAKE:
+ case PWQR_CTL_WAKE_OC:
return ioctl(fd, op, val);
- case PWQR_WAIT:
+ case PWQR_CTL_WAIT:
wait.pwqr_ticket = val;
wait.pwqr_uaddr = uaddr;
return ioctl(fd, op, &wait);
/* atomic */ int nthreads;
/* atomic */ int waiters;
/* atomic */ int parked;
+ /* atomic */ int overcommit_count;
union {
struct {
atomic_add(&pwqr_g.ticket, 1);
#endif
if (atomic_fetch_and_add(&pwqr_g.waiters, 0))
- pwqr_ctl(pwqr_g.fd, PWQR_WAKE, n, NULL);
+ pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAKE, n, NULL);
}
static ALWAYS_INLINE void pwqr_signal(void)
{
pwqr_signal_relaxed_n(1);
}
+static int pwqr_fd_read_overcommit(int fd)
+{
+ int buf;
+
+ if (read(fd, &buf, sizeof(buf)) == sizeof(buf))
+ return buf;
+ return 0;
+}
+
+__attribute__((cold))
+static int pwqr_fd_overcommit_check(void)
+{
+ if (atomic_xchg(&pwqr_g.overcommit_count, 0)) {
+ int oc = pwqr_fd_read_overcommit(pwqr_g.fd);
+
+ if (oc) {
+ access_once(pwqr_g.overcommit_count) = (oc - 1);
+ return 1;
+ }
+ }
+ return 0;
+}
+
+__attribute__((unused))
+static void pwqr_overcommit_poll_loop(void)
+{
+ struct pollfd pfd = {
+ .fd = pwqr_g.fd,
+ .events = POLLIN,
+ };
+
+ for (;;) {
+ pfd.revents = 0;
+ if (poll(&pfd, 1, -1) >= 0) {
+ if (pfd.revents & POLLIN) {
+ access_once(pwqr_g.overcommit_count) =
+ pwqr_fd_read_overcommit(pwqr_g.fd);
+ }
+ if (pfd.revents & (POLLHUP | POLLERR))
+ return;
+ } else if (errno != EINTR && errno != EAGAIN) {
+ return;
+ }
+ }
+}
+
/* returns:
- INPOOL if we were woken to run jobs.
pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
- rc = pwqr_ctl(pwqr_g.fd, PWQR_WAIT, ticket, &pwqr_g.lo);
+ rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAIT, ticket, &pwqr_g.lo);
if (rc < 0) {
if (errno == EINTR) {
rc = INPOOL;
run_job(job);
#endif
- if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) {
+ if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_PARK, 0, NULL)) == 0) {
+ access_once(pwqr_g.overcommit_count) = 0;
if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) {
pwqr_spawn_thread(PARKED, 1);
}
goto out;
pthread_cleanup_push(&pwqr_main_cleanup, NULL);
- if (pwqr_ctl(pwqr_g.fd, PWQR_REGISTER, 0, NULL) < 0)
+ if (pwqr_ctl(pwqr_g.fd, PWQR_CTL_REGISTER, 0, NULL) < 0)
goto out;
do {
uint64_t ticket;
+ park:
if (state == PARKED && (state = pwqr_do_park()) < 0)
break;
+ for (;;) {
+ if (unlikely(pwqr_g.overcommit_count)) {
+ if (pwqr_fd_overcommit_check()) {
+ state = PARKED;
+ goto park;
+ }
+ }
#ifdef PSEUDO_CODE
- while ((job = find_some_job(ANY_JOB)))
+ job = find_some_job(ANY_JOB);
+ if (!job)
+ break;
run_job(job);
#endif
+ }
ticket = pwqr_get_ticket();
#ifdef PSEUDO_CODE
if ((job = find_some_job(ANY_JOB))) {
if (pwqr_g.fd >= 0)
goto out;
- fd = pwqr_create();
+ fd = pwqr_create(0);
if (fd < 0) {
rc = -1;
goto out;
}
pwqr_g.fd = fd;
- n = pwqr_ctl(pwqr_g.fd, PWQR_GET_CONC, 0, NULL) + 4;
+ n = pwqr_ctl(pwqr_g.fd, PWQR_CTL_GET_CONC, 0, NULL) + 4;
pwqr_spawn_thread(INPOOL, n);
pwqr_spawn_thread(PARKED, 4);