#include <linux/init.h>
#include <linux/kref.h>
#include <linux/module.h>
+#include <linux/poll.h>
#include <linux/sched.h>
#include <linux/slab.h>
#include <linux/spinlock.h>
+#include <linux/timer.h>
#include <linux/uaccess.h>
#include <linux/wait.h>
#define PWQR_HASH_BITS 5
#define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS)
+#define PWQR_UC_DELAY (HZ / 10)
+#define PWQR_OC_DELAY (HZ / 20)
+
+#define PWQR_STATE_NONE 0
+#define PWQR_STATE_UC 1
+#define PWQR_STATE_OC 2
+#define PWQR_STATE_DEAD (-1)
+
struct pwqr_task_bucket {
spinlock_t lock;
struct hlist_head tasks;
struct pwqr_sb {
struct kref kref;
struct rcu_head rcu;
+ struct timer_list timer;
wait_queue_head_t wqh;
- pid_t tgid;
+ wait_queue_head_t wqh_poll;
unsigned concurrency;
unsigned registered;
unsigned running;
unsigned waiting;
- unsigned quarantined;
unsigned parked;
unsigned overcommit_wakes;
- unsigned dead;
+ int state;
};
struct pwqr_task {
#define pwqr_sb_unlock_irqrestore(sb, flags) \
spin_unlock_irqrestore(&(sb)->wqh.lock, flags)
-static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta)
+static inline void pwqr_arm_timer(struct pwqr_sb *sb, int how, int delay)
{
- int overcommit;
+ if (timer_pending(&sb->timer) && sb->state == how)
+ return;
+ mod_timer(&sb->timer, jiffies + delay);
+ sb->state = how;
+}
+static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta)
+{
sb->running += running_delta;
- overcommit = sb->running + sb->waiting - sb->concurrency;
- if (overcommit == 0)
- return;
- if (overcommit > 0) {
- if (overcommit > sb->waiting) {
- sb->quarantined += sb->waiting;
- sb->waiting = 0;
- } else {
- sb->quarantined += overcommit;
- sb->waiting -= overcommit;
- }
+ if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) {
+ pwqr_arm_timer(sb, PWQR_STATE_UC, PWQR_UC_DELAY);
+ } else if (sb->running > sb->concurrency) {
+ pwqr_arm_timer(sb, PWQR_STATE_OC, PWQR_OC_DELAY);
} else {
- unsigned undercommit = -overcommit;
-
- if (undercommit < sb->quarantined) {
- sb->waiting += undercommit;
- sb->quarantined -= undercommit;
- } else if (sb->quarantined) {
- sb->waiting += sb->quarantined;
- sb->quarantined = 0;
- } else if (sb->waiting == 0 && sb->parked) {
+ sb->state = PWQR_STATE_NONE;
+ if (!timer_pending(&sb->timer))
+ del_timer(&sb->timer);
+ }
+}
+
+static void pwqr_sb_timer_cb(unsigned long arg)
+{
+ struct pwqr_sb *sb = (struct pwqr_sb *)arg;
+ unsigned long flags;
+
+ pwqr_sb_lock_irqsave(sb, flags);
+ if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) {
+ if (sb->overcommit_wakes == 0)
wake_up_locked(&sb->wqh);
- }
}
+ if (sb->running > sb->concurrency) {
+ wake_up_poll(&sb->wqh_poll, POLLIN);
+ }
+ pwqr_sb_unlock_irqrestore(sb, flags);
}
static struct pwqr_sb *pwqr_sb_create(void)
kref_init(&sb->kref);
init_waitqueue_head(&sb->wqh);
- sb->tgid = current->tgid;
- sb->concurrency = num_online_cpus();
+ sb->concurrency = num_online_cpus();
+ init_timer(&sb->timer);
+ sb->timer.function = pwqr_sb_timer_cb;
+ sb->timer.data = (unsigned long)sb;
__module_get(THIS_MODULE);
return sb;
{
struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
+ del_timer_sync(&sb->timer);
+ wake_up_poll(&sb->wqh_poll, POLLHUP);
call_rcu(&sb->rcu, pwqr_sb_finalize);
}
static inline void pwqr_sb_put(struct pwqr_sb *sb)
struct pwqr_sb *sb = pwqt->sb;
unsigned long flags;
- if (unlikely(sb->dead)) {
+ if (unlikely(sb->state < 0)) {
pwqr_task_detach(pwqt, sb);
pwqr_task_release(pwqt, true);
return;
static void pwqr_task_sched_out(struct preempt_notifier *notifier,
struct task_struct *next)
{
- struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
- struct pwqr_sb *sb = pwqt->sb;
+ struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
+ struct pwqr_sb *sb = pwqt->sb;
struct task_struct *p = pwqt->task;
- if (unlikely(p->state & TASK_DEAD) || unlikely(sb->dead)) {
+ if (unlikely(p->state & TASK_DEAD) || unlikely(sb->state < 0)) {
pwqr_task_detach(pwqt, sb);
pwqr_task_release(pwqt, true);
return;
unsigned long flags;
pwqr_sb_lock_irqsave(sb, flags);
- sb->dead = true;
+ sb->state = PWQR_STATE_DEAD;
pwqr_sb_unlock_irqrestore(sb, flags);
wake_up_all(&sb->wqh);
pwqr_sb_put(sb);
return 0;
}
+static unsigned int pwqr_poll(struct file *filp, poll_table *wait)
+{
+ struct pwqr_sb *sb = filp->private_data;
+ unsigned int events = 0;
+ unsigned long flags;
+
+ poll_wait(filp, &sb->wqh_poll, wait);
+
+ pwqr_sb_lock_irqsave(sb, flags);
+ if (sb->running > sb->concurrency)
+ events |= POLLIN;
+ if (sb->state < 0)
+ events |= POLLHUP;
+ pwqr_sb_unlock_irqrestore(sb, flags);
+
+ return events;
+}
+
+static inline ssize_t pwqr_sb_read(struct pwqr_sb *sb, int no_wait, u32 *cnt)
+{
+ DECLARE_WAITQUEUE(wait, current);
+ ssize_t rc = -EAGAIN;
+
+ spin_lock_irq(&sb->wqh.lock);
+ if (sb->running > sb->concurrency) {
+ rc = 0;
+ } else if (!no_wait) {
+ add_wait_queue(&sb->wqh_poll, &wait);
+ for (;;) {
+ set_current_state(TASK_INTERRUPTIBLE);
+ if (sb->running > sb->concurrency) {
+ rc = 0;
+ break;
+ }
+ if (signal_pending(current)) {
+ rc = -ERESTARTSYS;
+ break;
+ }
+ spin_unlock_irq(&sb->wqh.lock);
+ schedule();
+ spin_lock_irq(&sb->wqh.lock);
+ }
+ remove_wait_queue(&sb->wqh_poll, &wait);
+ __set_current_state(TASK_RUNNING);
+ }
+ if (likely(rc == 0))
+ *cnt = sb->running - sb->concurrency;
+ spin_unlock_irq(&sb->wqh.lock);
+
+ return rc;
+}
+
+static ssize_t
+pwqr_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
+{
+ struct pwqr_sb *sb = filp->private_data;
+ u32 cnt = 0;
+ ssize_t rc;
+
+ if (count < sizeof(cnt))
+ return -EINVAL;
+ rc = pwqr_sb_read(sb, filp->f_flags & O_NONBLOCK, &cnt);
+ if (rc < 0)
+ return rc;
+ return put_user(cnt, (u32 __user *)buf) ? -EFAULT : sizeof(cnt);
+}
+
static long
do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
- int in_pool, struct pwqr_ioc_wait __user *arg)
+ int is_wait, struct pwqr_ioc_wait __user *arg)
{
unsigned long flags;
struct pwqr_ioc_wait wait;
preempt_notifier_unregister(&pwqt->notifier);
- if (in_pool && copy_from_user(&wait, arg, sizeof(wait))) {
- rc = -EFAULT;
- goto out;
+ if (is_wait) {
+ if (copy_from_user(&wait, arg, sizeof(wait))) {
+ rc = -EFAULT;
+ goto out;
+ }
+ if (unlikely((long)wait.pwqr_uaddr % sizeof(int) != 0)) {
+ rc = -EINVAL;
+ goto out;
+ }
}
pwqr_sb_lock_irqsave(sb, flags);
if (sb->running + sb->waiting <= sb->concurrency) {
- if (in_pool) {
+ if (is_wait) {
while (probe_kernel_address(wait.pwqr_uaddr, uval)) {
pwqr_sb_unlock_irqrestore(sb, flags);
rc = get_user(uval, (u32 *)wait.pwqr_uaddr);
goto out_unlock;
}
} else {
- BUG_ON(sb->quarantined != 0);
goto out_unlock;
}
}
/* @ see <wait_event_interruptible_exclusive_locked_irq> */
- if (likely(!sb->dead)) {
+ if (likely(sb->state >= 0)) {
DEFINE_WAIT(__wait);
-
__wait.flags |= WQ_FLAG_EXCLUSIVE;
- if (in_pool) {
+ if (is_wait) {
sb->waiting++;
__add_wait_queue(&sb->wqh, &__wait);
} else {
__add_wait_queue_tail(&sb->wqh, &__wait);
}
__pwqr_sb_update_state(sb, -1);
- set_current_state(TASK_INTERRUPTIBLE);
do {
+ set_current_state(TASK_INTERRUPTIBLE);
if (sb->overcommit_wakes)
break;
if (signal_pending(current)) {
spin_unlock_irq(&sb->wqh.lock);
schedule();
spin_lock_irq(&sb->wqh.lock);
- if (in_pool && sb->waiting)
+ if (is_wait)
break;
if (sb->running + sb->waiting < sb->concurrency)
break;
- } while (likely(!sb->dead));
+ } while (likely(sb->state >= 0));
__remove_wait_queue(&sb->wqh, &__wait);
__set_current_state(TASK_RUNNING);
-
- if (in_pool) {
- if (sb->waiting) {
- sb->waiting--;
- } else {
- BUG_ON(!sb->quarantined);
- sb->quarantined--;
- }
+ if (is_wait) {
+ sb->waiting--;
} else {
sb->parked--;
}
__pwqr_sb_update_state(sb, 1);
+
if (sb->overcommit_wakes)
sb->overcommit_wakes--;
if (sb->waiting + sb->running > sb->concurrency)
}
out_unlock:
- if (unlikely(sb->dead))
+ if (unlikely(sb->state < 0))
rc = -EBADFD;
pwqr_sb_unlock_irqrestore(sb, flags);
out:
pwqr_sb_lock_irqsave(sb, flags);
if (oc) {
- nwake = sb->waiting + sb->quarantined + sb->parked - sb->overcommit_wakes;
+ nwake = sb->waiting + sb->parked - sb->overcommit_wakes;
if (count > nwake) {
count = nwake;
} else {
sb->overcommit_wakes += count;
} else if (sb->running + sb->overcommit_wakes < sb->concurrency) {
nwake = sb->concurrency - sb->overcommit_wakes - sb->running;
+ if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) {
+ nwake = sb->waiting + sb->parked -
+ sb->overcommit_wakes;
+ }
if (count > nwake) {
count = nwake;
} else {
}
} else {
/*
- * This codepath deserves an explanation: when the thread is
- * quarantined, for is, really, it's already "parked". Though
- * userland doesn't know about, so wake as many threads as
- * userlands would have liked to, and let the wakeup tell
- * userland those should be parked.
+ * This codepath deserves an explanation: waking the thread
+ * "for real" would overcommit, though userspace KNOWS there
+ * is at least one waiting thread. Such threads are threads
+ * that are "quarantined".
+ *
+ * Quarantined threads are woken up one by one, to allow a
+ * slow ramp down, trying to minimize "waiting" <-> "parked"
+ * flip-flops, no matter how many wakes have been asked.
*
- * That's why we lie about the number of woken threads,
- * really, userlandwise we woke up a thread so that it could
- * be parked for real and avoid spurious syscalls. So it's as
- * if we woke up 0 threads.
+ * Since releasing one quarantined thread will wake up a
+ * thread that will (almost) straight go to parked mode, lie
+ * to userland about the fact that we unblocked that thread,
+ * and return 0.
+ *
+ * Though if we're already waking all waiting threads for
+ * overcommitting jobs, well, we don't need that.
*/
- nwake = sb->quarantined;
- if (sb->waiting < sb->overcommit_wakes)
- nwake -= sb->overcommit_wakes - sb->waiting;
- if (nwake > count)
- nwake = count;
count = 0;
+ nwake = sb->waiting > sb->overcommit_wakes;
}
while (nwake-- > 0)
wake_up_locked(&sb->wqh);
static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg)
{
- struct pwqr_sb *sb = filp->private_data;
+ struct pwqr_sb *sb = filp->private_data;
struct task_struct *task = current;
- struct pwqr_task *pwqt;
+ struct pwqr_task *pwqt;
int rc = 0;
- if (sb->tgid != current->tgid)
- return -EBADFD;
-
switch (command) {
case PWQR_GET_CONC:
return sb->concurrency;
break;
}
- if (unlikely(sb->dead)) {
+ if (unlikely(sb->state < 0)) {
pwqr_task_detach(pwqt, pwqt->sb);
return -EBADFD;
}
.owner = THIS_MODULE,
.open = pwqr_open,
.release = pwqr_release,
+ .poll = pwqr_poll,
+ .read = pwqr_read,
+ .llseek = noop_llseek,
.unlocked_ioctl = pwqr_ioctl,
#ifdef CONFIG_COMPAT
.compat_ioctl = pwqr_ioctl,