#include <linux/sched.h>
#include <linux/slab.h>
#include <linux/spinlock.h>
+#include <linux/timer.h>
#include <linux/uaccess.h>
#include <linux/wait.h>
#include "pwqr.h"
+#define PWQR_UNPARK_DELAY (HZ / 10)
#define PWQR_HASH_BITS 5
#define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS)
struct pwqr_sb {
struct kref kref;
struct rcu_head rcu;
+ struct timer_list timer;
wait_queue_head_t wqh;
pid_t tgid;
unsigned running;
unsigned waiting;
- unsigned quarantined;
unsigned parked;
unsigned overcommit_wakes;
static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta)
{
- int overcommit;
-
sb->running += running_delta;
- overcommit = sb->running + sb->waiting - sb->concurrency;
- if (overcommit == 0)
+ if (sb->running > sb->concurrency) {
+ /* TODO see ../Documentation/pwqr.adoc */
+ } else if (sb->running == sb->concurrency) {
+ /* do nothing */
+ } else if (sb->waiting == 0 && sb->parked) {
+ if (!timer_pending(&sb->timer)) {
+ mod_timer(&sb->timer, jiffies + PWQR_UNPARK_DELAY);
+ }
return;
+ }
- if (overcommit > 0) {
- if (overcommit > sb->waiting) {
- sb->quarantined += sb->waiting;
- sb->waiting = 0;
- } else {
- sb->quarantined += overcommit;
- sb->waiting -= overcommit;
- }
- } else {
- unsigned undercommit = -overcommit;
-
- if (undercommit < sb->quarantined) {
- sb->waiting += undercommit;
- sb->quarantined -= undercommit;
- } else if (sb->quarantined) {
- sb->waiting += sb->quarantined;
- sb->quarantined = 0;
- } else if (sb->waiting == 0 && sb->parked) {
+ if (timer_pending(&sb->timer))
+ del_timer(&sb->timer);
+}
+
+static void pwqr_sb_timer_cb(unsigned long arg)
+{
+ struct pwqr_sb *sb = (struct pwqr_sb *)arg;
+ unsigned long flags;
+
+ pwqr_sb_lock_irqsave(sb, flags);
+ if (sb->waiting == 0 && sb->parked && sb->running < sb->concurrency) {
+ if (sb->overcommit_wakes == 0)
wake_up_locked(&sb->wqh);
- }
}
+ pwqr_sb_unlock_irqrestore(sb, flags);
}
static struct pwqr_sb *pwqr_sb_create(void)
init_waitqueue_head(&sb->wqh);
sb->tgid = current->tgid;
sb->concurrency = num_online_cpus();
+ init_timer(&sb->timer);
+ sb->timer.function = pwqr_sb_timer_cb;
+ sb->timer.data = (unsigned long)sb;
__module_get(THIS_MODULE);
return sb;
{
struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
+ del_timer_sync(&sb->timer);
call_rcu(&sb->rcu, pwqr_sb_finalize);
}
static inline void pwqr_sb_put(struct pwqr_sb *sb)
static long
do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
- int in_pool, struct pwqr_ioc_wait __user *arg)
+ int is_wait, struct pwqr_ioc_wait __user *arg)
{
unsigned long flags;
struct pwqr_ioc_wait wait;
preempt_notifier_unregister(&pwqt->notifier);
- if (in_pool && copy_from_user(&wait, arg, sizeof(wait))) {
+ if (is_wait && copy_from_user(&wait, arg, sizeof(wait))) {
rc = -EFAULT;
goto out;
}
pwqr_sb_lock_irqsave(sb, flags);
if (sb->running + sb->waiting <= sb->concurrency) {
- if (in_pool) {
+ if (is_wait) {
while (probe_kernel_address(wait.pwqr_uaddr, uval)) {
pwqr_sb_unlock_irqrestore(sb, flags);
rc = get_user(uval, (u32 *)wait.pwqr_uaddr);
goto out_unlock;
}
} else {
- BUG_ON(sb->quarantined != 0);
goto out_unlock;
}
}
__wait.flags |= WQ_FLAG_EXCLUSIVE;
- if (in_pool) {
+ if (is_wait) {
sb->waiting++;
__add_wait_queue(&sb->wqh, &__wait);
} else {
spin_unlock_irq(&sb->wqh.lock);
schedule();
spin_lock_irq(&sb->wqh.lock);
- if (in_pool && sb->waiting)
+ if (is_wait)
break;
if (sb->running + sb->waiting < sb->concurrency)
break;
__remove_wait_queue(&sb->wqh, &__wait);
__set_current_state(TASK_RUNNING);
- if (in_pool) {
- if (sb->waiting) {
- sb->waiting--;
- } else {
- BUG_ON(!sb->quarantined);
- sb->quarantined--;
- }
+ if (is_wait) {
+ sb->waiting--;
} else {
sb->parked--;
}
pwqr_sb_lock_irqsave(sb, flags);
if (oc) {
- nwake = sb->waiting + sb->quarantined + sb->parked - sb->overcommit_wakes;
+ nwake = sb->waiting + sb->parked - sb->overcommit_wakes;
if (count > nwake) {
count = nwake;
} else {
sb->overcommit_wakes += count;
} else if (sb->running + sb->overcommit_wakes < sb->concurrency) {
nwake = sb->concurrency - sb->overcommit_wakes - sb->running;
+ if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) {
+ nwake = sb->waiting + sb->parked -
+ sb->overcommit_wakes;
+ }
if (count > nwake) {
count = nwake;
} else {
}
} else {
/*
- * This codepath deserves an explanation: when the thread is
- * quarantined, for is, really, it's already "parked". Though
- * userland doesn't know about, so wake as many threads as
- * userlands would have liked to, and let the wakeup tell
- * userland those should be parked.
+ * This codepath deserves an explanation: waking the thread
+ * "for real" would overcommit, though userspace KNOWS there
+ * is at least one waiting thread. Such threads are threads
+ * that are "quarantined".
+ *
+ * Quarantined threads are woken up one by one, to allow a
+ * slow ramp down, trying to minimize "waiting" <-> "parked"
+ * flip-flops, no matter how many wakes have been asked.
*
- * That's why we lie about the number of woken threads,
- * really, userlandwise we woke up a thread so that it could
- * be parked for real and avoid spurious syscalls. So it's as
- * if we woke up 0 threads.
+ * Since releasing one quarantined thread will wake up a
+ * thread that will (almost) straight go to parked mode, lie
+ * to userland about the fact that we unblocked that thread,
+ * and return 0.
+ *
+ * Though if we're already waking all waiting threads for
+ * overcommitting jobs, well, we don't need that.
*/
- nwake = sb->quarantined;
- if (sb->waiting < sb->overcommit_wakes)
- nwake -= sb->overcommit_wakes - sb->waiting;
- if (nwake > count)
- nwake = count;
count = 0;
+ nwake = sb->waiting > sb->overcommit_wakes;
}
while (nwake-- > 0)
wake_up_locked(&sb->wqh);