From 866e56c8f10d718c24e812335b107a05218dc339 Mon Sep 17 00:00:00 2001 From: Pierre Habouzit Date: Sun, 15 Jan 2012 10:31:44 +0100 Subject: [PATCH] implement poll and read No matter if we implement some other kind of dirty notification mechanism, it feels right to have pwqr pollable for overcommit. Documentation: - drop the "in kernel unpark" method, it sucks - migrate to using non blocking "read" for the probing method. - document the pollability and how read works in the pwqr_create "manpage". lib: - implement epoll_create with flags. It requires a kernel supporting O_NONBLOCK/O_CLOEXEC flags to open(), I've been too lazy to implement the emulation yet. Signed-off-by: Pierre Habouzit --- Documentation/pwqr.adoc | 51 +++++++++++++------------- kernel/pwqr.c | 80 ++++++++++++++++++++++++++++++++++++++--- lib/libpwqr.c | 10 ++++-- 3 files changed, 110 insertions(+), 31 deletions(-) diff --git a/Documentation/pwqr.adoc b/Documentation/pwqr.adoc index 1942097..d028a9c 100644 --- a/Documentation/pwqr.adoc +++ b/Documentation/pwqr.adoc @@ -100,22 +100,14 @@ in kernel (poll solution):: + It sounds very easy, but it has one major drawback: it meaks the pwqfd must be somehow registered into the eventloop, and it's not very suitable for a -pthread_workqueue implementation. - -in kernel (hack-ish solution):: - The kernel could voluntarily unpark/unblock a thread with another - errno that would signal overcommiting. Unlike the pollable proposal, - this doesn't require hooking in the event loop. Though it requires - having one such thread, which may not be the case when userland has - reached the peak number of threads it would ever want to use. +pthread_workqueue implementation. In other words, if you can plug into the +event-loop because it's a custom one or one that provides thread regulation +then it's fine, if you can't (glib, libdispatch, ...) then you need a thread +that will basically just poll() on this file-descriptor, it's really wasteful. + -Is this really a problem? I'm not sure. Especially since when that happens -userland could pick a victim thread that would call `PWQR_PARK` after each -processed job, which would allow some kind of poor man's poll. -+ -The drawback I see in that solution is that we wake up YET ANOTHER thread at a -moment when we're already overcommiting, which sounds counter productive. -That's why I didn't implement that. +NOTE: this has been implemented now, but still it looks "expensive" to hook +for some users. So if some alternative way to be signalled could exist, it'd +be really awesome. in userspace:: Userspace knows how many "running" threads there are, it's easy to @@ -123,24 +115,21 @@ in userspace:: already accounted for. When "waiting" is zero, if "registerd - parked" is "High" userspace could choose to randomly try to park one thread. + -I think `PWQR_PARK` could use `val` to have some "probing" mode, that would -return `0` if it wouldn't block and `-1/EWOULDBLOCK` if it would in the non -probing mode. Userspace could maintain some global probing_mode flag, that -would be a tristate: NONE, SLOW, AGGRESSVE. +userspace can use non blocking read() to probe if it's overcommiting. + It's in NONE when userspace belives it's not necessary to probe (e.g. when the amount of running + waiting threads isn't that large, say less than 110% of the concurrency or any kind of similar rule). + It's in SLOW mode else. In slow mode each thread does a probe every 32 or 64 -jobs to mitigate the cost of the syscall. If the probe returns EWOULDBLOCK -then the thread goes to PARK mode, and the probing_mode goes to AGGRESSVE. +jobs to mitigate the cost of the syscall. If the probe returns '1' then ask +for down-commiting and stay in SLOW mode, if it returns AGAIN all is fine, if +it returns more than '1' ask for down-commiting and go to AGGRESSIVE. + When AGGRESSVE threads check if they must park more often and in a more controlled fashion (every 32 or 64 jobs isn't nice because jobs can be very long), for example based on some poor man's timer (clock_gettime(MONOTONIC) -sounds fine). As soon as a probe returns 0 or we're in the NONE conditions, -then the probing_mode goes back to NONE/SLOW. +sounds fine). State transition works as for SLOW. + The issue I have with this is that it sounds to add quite some code in the fastpath code, hence I dislike it a lot. @@ -172,7 +161,21 @@ with a concurrency corresponding to the number of online CPUs at the time of the call, as would be returned by `sysconf(_SC_NPROCESSORS_ONLN)`. `flags`:: - a mask of flags, currently only O_CLOEXEC. + a mask of flags among `O_CLOEXEC`, and `O_NONBLOCK`. + +Available operations on the pwqr file descriptor are: + +`poll`, `epoll` and friends:: + the PWQR file descriptor can be watched for POLLIN events (not POLLOUT + ones as it can not be written to). + +`read`:: + The file returned can be read upon. The read blocks (or fails setting + `EAGAIN` if in non blocking mode) until the regulator believes the + pool is overcommitting. The buffer passed to read should be able to + hold an integer. When `read(3)` is successful, it writes the amount of + overcommiting threads (understand: the number of threads to park so + that the pool isn't overcommiting anymore). RETURN VALUE ~~~~~~~~~~~~ diff --git a/kernel/pwqr.c b/kernel/pwqr.c index a6ff8e5..0d6b42c 100644 --- a/kernel/pwqr.c +++ b/kernel/pwqr.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -60,6 +61,7 @@ struct pwqr_sb { struct rcu_head rcu; struct timer_list timer; wait_queue_head_t wqh; + wait_queue_head_t wqh_poll; unsigned concurrency; unsigned registered; @@ -133,7 +135,7 @@ static void pwqr_sb_timer_cb(unsigned long arg) wake_up_locked(&sb->wqh); } if (sb->running > sb->concurrency) { - /* See ../Documentation/pwqr.adoc */ + wake_up_poll(&sb->wqh_poll, POLLIN); } pwqr_sb_unlock_irqrestore(sb, flags); } @@ -174,6 +176,7 @@ static void pwqr_sb_release(struct kref *kref) struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref); del_timer_sync(&sb->timer); + wake_up_poll(&sb->wqh_poll, POLLHUP); call_rcu(&sb->rcu, pwqr_sb_finalize); } static inline void pwqr_sb_put(struct pwqr_sb *sb) @@ -370,6 +373,73 @@ static int pwqr_release(struct inode *inode, struct file *filp) return 0; } +static unsigned int pwqr_poll(struct file *filp, poll_table *wait) +{ + struct pwqr_sb *sb = filp->private_data; + unsigned int events = 0; + unsigned long flags; + + poll_wait(filp, &sb->wqh_poll, wait); + + pwqr_sb_lock_irqsave(sb, flags); + if (sb->running > sb->concurrency) + events |= POLLIN; + if (sb->state < 0) + events |= POLLHUP; + pwqr_sb_unlock_irqrestore(sb, flags); + + return events; +} + +static inline ssize_t pwqr_sb_read(struct pwqr_sb *sb, int no_wait, u32 *cnt) +{ + DECLARE_WAITQUEUE(wait, current); + ssize_t rc = -EAGAIN; + + spin_lock_irq(&sb->wqh.lock); + if (sb->running > sb->concurrency) { + rc = 0; + } else if (!no_wait) { + add_wait_queue(&sb->wqh_poll, &wait); + for (;;) { + set_current_state(TASK_INTERRUPTIBLE); + if (sb->running > sb->concurrency) { + rc = 0; + break; + } + if (signal_pending(current)) { + rc = -ERESTARTSYS; + break; + } + spin_unlock_irq(&sb->wqh.lock); + schedule(); + spin_lock_irq(&sb->wqh.lock); + } + remove_wait_queue(&sb->wqh_poll, &wait); + __set_current_state(TASK_RUNNING); + } + if (likely(rc == 0)) + *cnt = sb->running - sb->concurrency; + spin_unlock_irq(&sb->wqh.lock); + + return rc; +} + +static ssize_t +pwqr_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) +{ + struct pwqr_sb *sb = filp->private_data; + u32 cnt = 0; + ssize_t rc; + + if (count < sizeof(cnt)) + return -EINVAL; + rc = pwqr_sb_read(sb, filp->f_flags & O_NONBLOCK, &cnt); + if (rc < 0) + return rc; + return put_user(cnt, (u32 __user *)buf) ? -EFAULT : sizeof(cnt); +} + static long do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, int is_wait, struct pwqr_ioc_wait __user *arg) @@ -415,7 +485,6 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, /* @ see */ if (likely(sb->state >= 0)) { DEFINE_WAIT(__wait); - __wait.flags |= WQ_FLAG_EXCLUSIVE; if (is_wait) { @@ -426,9 +495,9 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, __add_wait_queue_tail(&sb->wqh, &__wait); } __pwqr_sb_update_state(sb, -1); - set_current_state(TASK_INTERRUPTIBLE); do { + set_current_state(TASK_INTERRUPTIBLE); if (sb->overcommit_wakes) break; if (signal_pending(current)) { @@ -446,13 +515,13 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, __remove_wait_queue(&sb->wqh, &__wait); __set_current_state(TASK_RUNNING); - if (is_wait) { sb->waiting--; } else { sb->parked--; } __pwqr_sb_update_state(sb, 1); + if (sb->overcommit_wakes) sb->overcommit_wakes--; if (sb->waiting + sb->running > sb->concurrency) @@ -614,6 +683,9 @@ static const struct file_operations pwqr_dev_fops = { .owner = THIS_MODULE, .open = pwqr_open, .release = pwqr_release, + .poll = pwqr_poll, + .read = pwqr_read, + .llseek = noop_llseek, .unlocked_ioctl = pwqr_ioctl, #ifdef CONFIG_COMPAT .compat_ioctl = pwqr_ioctl, diff --git a/lib/libpwqr.c b/lib/libpwqr.c index f5d53a2..85cccac 100644 --- a/lib/libpwqr.c +++ b/lib/libpwqr.c @@ -66,9 +66,13 @@ /* }}} */ /* pwqr wrapping {{{ */ -static int pwqr_create(void) +static int pwqr_create(int flags) { - return open("/dev/"PWQR_DEVICE_NAME, O_RDWR); + if (flags & ~(O_NONBLOCK | O_CLOEXEC)) { + errno = -EINVAL; + return -1; + } + return open("/dev/"PWQR_DEVICE_NAME, O_RDWR | flags); } static int pwqr_ctl(int fd, int op, int val, void *uaddr) @@ -328,7 +332,7 @@ static int _pthread_workqueue_init_np(void) if (pwqr_g.fd >= 0) goto out; - fd = pwqr_create(); + fd = pwqr_create(0); if (fd < 0) { rc = -1; goto out; -- 2.20.1