+
It sounds very easy, but it has one major drawback: it meaks the pwqfd must be
somehow registered into the eventloop, and it's not very suitable for a
-pthread_workqueue implementation.
-
-in kernel (hack-ish solution)::
- The kernel could voluntarily unpark/unblock a thread with another
- errno that would signal overcommiting. Unlike the pollable proposal,
- this doesn't require hooking in the event loop. Though it requires
- having one such thread, which may not be the case when userland has
- reached the peak number of threads it would ever want to use.
+pthread_workqueue implementation. In other words, if you can plug into the
+event-loop because it's a custom one or one that provides thread regulation
+then it's fine, if you can't (glib, libdispatch, ...) then you need a thread
+that will basically just poll() on this file-descriptor, it's really wasteful.
+
-Is this really a problem? I'm not sure. Especially since when that happens
-userland could pick a victim thread that would call `PWQR_PARK` after each
-processed job, which would allow some kind of poor man's poll.
-+
-The drawback I see in that solution is that we wake up YET ANOTHER thread at a
-moment when we're already overcommiting, which sounds counter productive.
-That's why I didn't implement that.
+NOTE: this has been implemented now, but still it looks "expensive" to hook
+for some users. So if some alternative way to be signalled could exist, it'd
+be really awesome.
in userspace::
Userspace knows how many "running" threads there are, it's easy to
already accounted for. When "waiting" is zero, if "registerd - parked"
is "High" userspace could choose to randomly try to park one thread.
+
-I think `PWQR_PARK` could use `val` to have some "probing" mode, that would
-return `0` if it wouldn't block and `-1/EWOULDBLOCK` if it would in the non
-probing mode. Userspace could maintain some global probing_mode flag, that
-would be a tristate: NONE, SLOW, AGGRESSVE.
+userspace can use non blocking read() to probe if it's overcommiting.
+
It's in NONE when userspace belives it's not necessary to probe (e.g. when the
amount of running + waiting threads isn't that large, say less than 110% of
the concurrency or any kind of similar rule).
+
It's in SLOW mode else. In slow mode each thread does a probe every 32 or 64
-jobs to mitigate the cost of the syscall. If the probe returns EWOULDBLOCK
-then the thread goes to PARK mode, and the probing_mode goes to AGGRESSVE.
+jobs to mitigate the cost of the syscall. If the probe returns '1' then ask
+for down-commiting and stay in SLOW mode, if it returns AGAIN all is fine, if
+it returns more than '1' ask for down-commiting and go to AGGRESSIVE.
+
When AGGRESSVE threads check if they must park more often and in a more
controlled fashion (every 32 or 64 jobs isn't nice because jobs can be very
long), for example based on some poor man's timer (clock_gettime(MONOTONIC)
-sounds fine). As soon as a probe returns 0 or we're in the NONE conditions,
-then the probing_mode goes back to NONE/SLOW.
+sounds fine). State transition works as for SLOW.
+
The issue I have with this is that it sounds to add quite some code in the
fastpath code, hence I dislike it a lot.
the call, as would be returned by `sysconf(_SC_NPROCESSORS_ONLN)`.
`flags`::
- a mask of flags, currently only O_CLOEXEC.
+ a mask of flags among `O_CLOEXEC`, and `O_NONBLOCK`.
+
+Available operations on the pwqr file descriptor are:
+
+`poll`, `epoll` and friends::
+ the PWQR file descriptor can be watched for POLLIN events (not POLLOUT
+ ones as it can not be written to).
+
+`read`::
+ The file returned can be read upon. The read blocks (or fails setting
+ `EAGAIN` if in non blocking mode) until the regulator believes the
+ pool is overcommitting. The buffer passed to read should be able to
+ hold an integer. When `read(3)` is successful, it writes the amount of
+ overcommiting threads (understand: the number of threads to park so
+ that the pool isn't overcommiting anymore).
RETURN VALUE
~~~~~~~~~~~~
#include <linux/init.h>
#include <linux/kref.h>
#include <linux/module.h>
+#include <linux/poll.h>
#include <linux/sched.h>
#include <linux/slab.h>
#include <linux/spinlock.h>
struct rcu_head rcu;
struct timer_list timer;
wait_queue_head_t wqh;
+ wait_queue_head_t wqh_poll;
unsigned concurrency;
unsigned registered;
wake_up_locked(&sb->wqh);
}
if (sb->running > sb->concurrency) {
- /* See ../Documentation/pwqr.adoc */
+ wake_up_poll(&sb->wqh_poll, POLLIN);
}
pwqr_sb_unlock_irqrestore(sb, flags);
}
struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
del_timer_sync(&sb->timer);
+ wake_up_poll(&sb->wqh_poll, POLLHUP);
call_rcu(&sb->rcu, pwqr_sb_finalize);
}
static inline void pwqr_sb_put(struct pwqr_sb *sb)
return 0;
}
+static unsigned int pwqr_poll(struct file *filp, poll_table *wait)
+{
+ struct pwqr_sb *sb = filp->private_data;
+ unsigned int events = 0;
+ unsigned long flags;
+
+ poll_wait(filp, &sb->wqh_poll, wait);
+
+ pwqr_sb_lock_irqsave(sb, flags);
+ if (sb->running > sb->concurrency)
+ events |= POLLIN;
+ if (sb->state < 0)
+ events |= POLLHUP;
+ pwqr_sb_unlock_irqrestore(sb, flags);
+
+ return events;
+}
+
+static inline ssize_t pwqr_sb_read(struct pwqr_sb *sb, int no_wait, u32 *cnt)
+{
+ DECLARE_WAITQUEUE(wait, current);
+ ssize_t rc = -EAGAIN;
+
+ spin_lock_irq(&sb->wqh.lock);
+ if (sb->running > sb->concurrency) {
+ rc = 0;
+ } else if (!no_wait) {
+ add_wait_queue(&sb->wqh_poll, &wait);
+ for (;;) {
+ set_current_state(TASK_INTERRUPTIBLE);
+ if (sb->running > sb->concurrency) {
+ rc = 0;
+ break;
+ }
+ if (signal_pending(current)) {
+ rc = -ERESTARTSYS;
+ break;
+ }
+ spin_unlock_irq(&sb->wqh.lock);
+ schedule();
+ spin_lock_irq(&sb->wqh.lock);
+ }
+ remove_wait_queue(&sb->wqh_poll, &wait);
+ __set_current_state(TASK_RUNNING);
+ }
+ if (likely(rc == 0))
+ *cnt = sb->running - sb->concurrency;
+ spin_unlock_irq(&sb->wqh.lock);
+
+ return rc;
+}
+
+static ssize_t
+pwqr_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
+{
+ struct pwqr_sb *sb = filp->private_data;
+ u32 cnt = 0;
+ ssize_t rc;
+
+ if (count < sizeof(cnt))
+ return -EINVAL;
+ rc = pwqr_sb_read(sb, filp->f_flags & O_NONBLOCK, &cnt);
+ if (rc < 0)
+ return rc;
+ return put_user(cnt, (u32 __user *)buf) ? -EFAULT : sizeof(cnt);
+}
+
static long
do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
int is_wait, struct pwqr_ioc_wait __user *arg)
/* @ see <wait_event_interruptible_exclusive_locked_irq> */
if (likely(sb->state >= 0)) {
DEFINE_WAIT(__wait);
-
__wait.flags |= WQ_FLAG_EXCLUSIVE;
if (is_wait) {
__add_wait_queue_tail(&sb->wqh, &__wait);
}
__pwqr_sb_update_state(sb, -1);
- set_current_state(TASK_INTERRUPTIBLE);
do {
+ set_current_state(TASK_INTERRUPTIBLE);
if (sb->overcommit_wakes)
break;
if (signal_pending(current)) {
__remove_wait_queue(&sb->wqh, &__wait);
__set_current_state(TASK_RUNNING);
-
if (is_wait) {
sb->waiting--;
} else {
sb->parked--;
}
__pwqr_sb_update_state(sb, 1);
+
if (sb->overcommit_wakes)
sb->overcommit_wakes--;
if (sb->waiting + sb->running > sb->concurrency)
.owner = THIS_MODULE,
.open = pwqr_open,
.release = pwqr_release,
+ .poll = pwqr_poll,
+ .read = pwqr_read,
+ .llseek = noop_llseek,
.unlocked_ioctl = pwqr_ioctl,
#ifdef CONFIG_COMPAT
.compat_ioctl = pwqr_ioctl,