X-Git-Url: http://git.madism.org/?p=~madcoder%2Fpwqr.git;a=blobdiff_plain;f=kernel%2Fpwqr.c;h=81715965eb01e54968c5af20914eab4dc88e6c7b;hp=0dd026fef9c342809a5d4bf2c33b6175923d0131;hb=170387f1b2fdd96e6271d658161926086a9c2586;hpb=c78f2216a947e712fff07f6e79f743d9009e60c8 diff --git a/kernel/pwqr.c b/kernel/pwqr.c index 0dd026f..8171596 100644 --- a/kernel/pwqr.c +++ b/kernel/pwqr.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -38,6 +39,7 @@ #include "pwqr.h" +#define PWQR_UNPARK_DELAY (HZ / 10) #define PWQR_HASH_BITS 5 #define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS) @@ -49,6 +51,7 @@ struct pwqr_task_bucket { struct pwqr_sb { struct kref kref; struct rcu_head rcu; + struct timer_list timer; wait_queue_head_t wqh; pid_t tgid; @@ -57,7 +60,6 @@ struct pwqr_sb { unsigned running; unsigned waiting; - unsigned quarantined; unsigned parked; unsigned overcommit_wakes; @@ -93,34 +95,33 @@ static struct preempt_ops pwqr_preempt_noop_ops; static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta) { - int overcommit; - sb->running += running_delta; - overcommit = sb->running + sb->waiting - sb->concurrency; - if (overcommit == 0) + if (sb->running > sb->concurrency) { + /* TODO see ../Documentation/pwqr.adoc */ + } else if (sb->running == sb->concurrency) { + /* do nothing */ + } else if (sb->waiting == 0 && sb->parked) { + if (!timer_pending(&sb->timer)) { + mod_timer(&sb->timer, jiffies + PWQR_UNPARK_DELAY); + } return; + } - if (overcommit > 0) { - if (overcommit > sb->waiting) { - sb->quarantined += sb->waiting; - sb->waiting = 0; - } else { - sb->quarantined += overcommit; - sb->waiting -= overcommit; - } - } else { - unsigned undercommit = -overcommit; - - if (undercommit < sb->quarantined) { - sb->waiting += undercommit; - sb->quarantined -= undercommit; - } else if (sb->quarantined) { - sb->waiting += sb->quarantined; - sb->quarantined = 0; - } else if (sb->waiting == 0 && sb->parked) { + if (timer_pending(&sb->timer)) + del_timer(&sb->timer); +} + +static void pwqr_sb_timer_cb(unsigned long arg) +{ + struct pwqr_sb *sb = (struct pwqr_sb *)arg; + unsigned long flags; + + pwqr_sb_lock_irqsave(sb, flags); + if (sb->waiting == 0 && sb->parked && sb->running < sb->concurrency) { + if (sb->overcommit_wakes == 0) wake_up_locked(&sb->wqh); - } } + pwqr_sb_unlock_irqrestore(sb, flags); } static struct pwqr_sb *pwqr_sb_create(void) @@ -135,6 +136,9 @@ static struct pwqr_sb *pwqr_sb_create(void) init_waitqueue_head(&sb->wqh); sb->tgid = current->tgid; sb->concurrency = num_online_cpus(); + init_timer(&sb->timer); + sb->timer.function = pwqr_sb_timer_cb; + sb->timer.data = (unsigned long)sb; __module_get(THIS_MODULE); return sb; @@ -156,6 +160,7 @@ static void pwqr_sb_release(struct kref *kref) { struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref); + del_timer_sync(&sb->timer); call_rcu(&sb->rcu, pwqr_sb_finalize); } static inline void pwqr_sb_put(struct pwqr_sb *sb) @@ -354,7 +359,7 @@ static int pwqr_release(struct inode *inode, struct file *filp) static long do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, - int in_pool, struct pwqr_ioc_wait __user *arg) + int is_wait, struct pwqr_ioc_wait __user *arg) { unsigned long flags; struct pwqr_ioc_wait wait; @@ -363,14 +368,14 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, preempt_notifier_unregister(&pwqt->notifier); - if (in_pool && copy_from_user(&wait, arg, sizeof(wait))) { + if (is_wait && copy_from_user(&wait, arg, sizeof(wait))) { rc = -EFAULT; goto out; } pwqr_sb_lock_irqsave(sb, flags); if (sb->running + sb->waiting <= sb->concurrency) { - if (in_pool) { + if (is_wait) { while (probe_kernel_address(wait.pwqr_uaddr, uval)) { pwqr_sb_unlock_irqrestore(sb, flags); rc = get_user(uval, (u32 *)wait.pwqr_uaddr); @@ -384,7 +389,6 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, goto out_unlock; } } else { - BUG_ON(sb->quarantined != 0); goto out_unlock; } } @@ -395,7 +399,7 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, __wait.flags |= WQ_FLAG_EXCLUSIVE; - if (in_pool) { + if (is_wait) { sb->waiting++; __add_wait_queue(&sb->wqh, &__wait); } else { @@ -415,7 +419,7 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, spin_unlock_irq(&sb->wqh.lock); schedule(); spin_lock_irq(&sb->wqh.lock); - if (in_pool && sb->waiting) + if (is_wait) break; if (sb->running + sb->waiting < sb->concurrency) break; @@ -424,13 +428,8 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, __remove_wait_queue(&sb->wqh, &__wait); __set_current_state(TASK_RUNNING); - if (in_pool) { - if (sb->waiting) { - sb->waiting--; - } else { - BUG_ON(!sb->quarantined); - sb->quarantined--; - } + if (is_wait) { + sb->waiting--; } else { sb->parked--; } @@ -489,7 +488,7 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) pwqr_sb_lock_irqsave(sb, flags); if (oc) { - nwake = sb->waiting + sb->quarantined + sb->parked - sb->overcommit_wakes; + nwake = sb->waiting + sb->parked - sb->overcommit_wakes; if (count > nwake) { count = nwake; } else { @@ -498,6 +497,10 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) sb->overcommit_wakes += count; } else if (sb->running + sb->overcommit_wakes < sb->concurrency) { nwake = sb->concurrency - sb->overcommit_wakes - sb->running; + if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) { + nwake = sb->waiting + sb->parked - + sb->overcommit_wakes; + } if (count > nwake) { count = nwake; } else { @@ -505,23 +508,25 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) } } else { /* - * This codepath deserves an explanation: when the thread is - * quarantined, for is, really, it's already "parked". Though - * userland doesn't know about, so wake as many threads as - * userlands would have liked to, and let the wakeup tell - * userland those should be parked. + * This codepath deserves an explanation: waking the thread + * "for real" would overcommit, though userspace KNOWS there + * is at least one waiting thread. Such threads are threads + * that are "quarantined". + * + * Quarantined threads are woken up one by one, to allow a + * slow ramp down, trying to minimize "waiting" <-> "parked" + * flip-flops, no matter how many wakes have been asked. * - * That's why we lie about the number of woken threads, - * really, userlandwise we woke up a thread so that it could - * be parked for real and avoid spurious syscalls. So it's as - * if we woke up 0 threads. + * Since releasing one quarantined thread will wake up a + * thread that will (almost) straight go to parked mode, lie + * to userland about the fact that we unblocked that thread, + * and return 0. + * + * Though if we're already waking all waiting threads for + * overcommitting jobs, well, we don't need that. */ - nwake = sb->quarantined; - if (sb->waiting < sb->overcommit_wakes) - nwake -= sb->overcommit_wakes - sb->waiting; - if (nwake > count) - nwake = count; count = 0; + nwake = sb->waiting > sb->overcommit_wakes; } while (nwake-- > 0) wake_up_locked(&sb->wqh);