X-Git-Url: http://git.madism.org/?p=~madcoder%2Fpwqr.git;a=blobdiff_plain;f=kernel%2Fpwqr.c;h=8592a006701bb9847e9a3f4fa9995a9a516c1222;hp=0dd026fef9c342809a5d4bf2c33b6175923d0131;hb=c7cabb18799090c303666bab78827bc64ca2594e;hpb=c78f2216a947e712fff07f6e79f743d9009e60c8 diff --git a/kernel/pwqr.c b/kernel/pwqr.c index 0dd026f..8592a00 100644 --- a/kernel/pwqr.c +++ b/kernel/pwqr.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -41,6 +42,14 @@ #define PWQR_HASH_BITS 5 #define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS) +#define PWQR_UC_DELAY (HZ / 10) +#define PWQR_OC_DELAY (HZ / 20) + +#define PWQR_STATE_NONE 0 +#define PWQR_STATE_UC 1 +#define PWQR_STATE_OC 2 +#define PWQR_STATE_DEAD (-1) + struct pwqr_task_bucket { spinlock_t lock; struct hlist_head tasks; @@ -49,6 +58,7 @@ struct pwqr_task_bucket { struct pwqr_sb { struct kref kref; struct rcu_head rcu; + struct timer_list timer; wait_queue_head_t wqh; pid_t tgid; @@ -57,11 +67,10 @@ struct pwqr_sb { unsigned running; unsigned waiting; - unsigned quarantined; unsigned parked; unsigned overcommit_wakes; - unsigned dead; + int state; }; struct pwqr_task { @@ -91,36 +100,43 @@ static struct preempt_ops pwqr_preempt_noop_ops; #define pwqr_sb_unlock_irqrestore(sb, flags) \ spin_unlock_irqrestore(&(sb)->wqh.lock, flags) -static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta) +static inline void pwqr_arm_timer(struct pwqr_sb *sb, int how, int delay) { - int overcommit; + if (timer_pending(&sb->timer) && sb->state == how) + return; + mod_timer(&sb->timer, jiffies + delay); + sb->state = how; +} +static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta) +{ sb->running += running_delta; - overcommit = sb->running + sb->waiting - sb->concurrency; - if (overcommit == 0) - return; - if (overcommit > 0) { - if (overcommit > sb->waiting) { - sb->quarantined += sb->waiting; - sb->waiting = 0; - } else { - sb->quarantined += overcommit; - sb->waiting -= overcommit; - } + if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) { + pwqr_arm_timer(sb, PWQR_STATE_UC, PWQR_UC_DELAY); + } else if (sb->running > sb->concurrency) { + pwqr_arm_timer(sb, PWQR_STATE_OC, PWQR_OC_DELAY); } else { - unsigned undercommit = -overcommit; - - if (undercommit < sb->quarantined) { - sb->waiting += undercommit; - sb->quarantined -= undercommit; - } else if (sb->quarantined) { - sb->waiting += sb->quarantined; - sb->quarantined = 0; - } else if (sb->waiting == 0 && sb->parked) { + sb->state = PWQR_STATE_NONE; + if (!timer_pending(&sb->timer)) + del_timer(&sb->timer); + } +} + +static void pwqr_sb_timer_cb(unsigned long arg) +{ + struct pwqr_sb *sb = (struct pwqr_sb *)arg; + unsigned long flags; + + pwqr_sb_lock_irqsave(sb, flags); + if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) { + if (sb->overcommit_wakes == 0) wake_up_locked(&sb->wqh); - } } + if (sb->running > sb->concurrency) { + /* See ../Documentation/pwqr.adoc */ + } + pwqr_sb_unlock_irqrestore(sb, flags); } static struct pwqr_sb *pwqr_sb_create(void) @@ -135,6 +151,9 @@ static struct pwqr_sb *pwqr_sb_create(void) init_waitqueue_head(&sb->wqh); sb->tgid = current->tgid; sb->concurrency = num_online_cpus(); + init_timer(&sb->timer); + sb->timer.function = pwqr_sb_timer_cb; + sb->timer.data = (unsigned long)sb; __module_get(THIS_MODULE); return sb; @@ -156,6 +175,7 @@ static void pwqr_sb_release(struct kref *kref) { struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref); + del_timer_sync(&sb->timer); call_rcu(&sb->rcu, pwqr_sb_finalize); } static inline void pwqr_sb_put(struct pwqr_sb *sb) @@ -276,7 +296,7 @@ static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cp struct pwqr_sb *sb = pwqt->sb; unsigned long flags; - if (unlikely(sb->dead)) { + if (unlikely(sb->state < 0)) { pwqr_task_detach(pwqt, sb); pwqr_task_release(pwqt, true); return; @@ -291,11 +311,11 @@ static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cp static void pwqr_task_sched_out(struct preempt_notifier *notifier, struct task_struct *next) { - struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier); - struct pwqr_sb *sb = pwqt->sb; + struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier); + struct pwqr_sb *sb = pwqt->sb; struct task_struct *p = pwqt->task; - if (unlikely(p->state & TASK_DEAD) || unlikely(sb->dead)) { + if (unlikely(p->state & TASK_DEAD) || unlikely(sb->state < 0)) { pwqr_task_detach(pwqt, sb); pwqr_task_release(pwqt, true); return; @@ -345,7 +365,7 @@ static int pwqr_release(struct inode *inode, struct file *filp) unsigned long flags; pwqr_sb_lock_irqsave(sb, flags); - sb->dead = true; + sb->state = PWQR_STATE_DEAD; pwqr_sb_unlock_irqrestore(sb, flags); wake_up_all(&sb->wqh); pwqr_sb_put(sb); @@ -354,7 +374,7 @@ static int pwqr_release(struct inode *inode, struct file *filp) static long do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, - int in_pool, struct pwqr_ioc_wait __user *arg) + int is_wait, struct pwqr_ioc_wait __user *arg) { unsigned long flags; struct pwqr_ioc_wait wait; @@ -363,14 +383,20 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, preempt_notifier_unregister(&pwqt->notifier); - if (in_pool && copy_from_user(&wait, arg, sizeof(wait))) { - rc = -EFAULT; - goto out; + if (is_wait) { + if (copy_from_user(&wait, arg, sizeof(wait))) { + rc = -EFAULT; + goto out; + } + if (unlikely((long)wait.pwqr_uaddr % sizeof(int) != 0)) { + rc = -EINVAL; + goto out; + } } pwqr_sb_lock_irqsave(sb, flags); if (sb->running + sb->waiting <= sb->concurrency) { - if (in_pool) { + if (is_wait) { while (probe_kernel_address(wait.pwqr_uaddr, uval)) { pwqr_sb_unlock_irqrestore(sb, flags); rc = get_user(uval, (u32 *)wait.pwqr_uaddr); @@ -384,18 +410,17 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, goto out_unlock; } } else { - BUG_ON(sb->quarantined != 0); goto out_unlock; } } /* @ see */ - if (likely(!sb->dead)) { + if (likely(sb->state >= 0)) { DEFINE_WAIT(__wait); __wait.flags |= WQ_FLAG_EXCLUSIVE; - if (in_pool) { + if (is_wait) { sb->waiting++; __add_wait_queue(&sb->wqh, &__wait); } else { @@ -415,22 +440,17 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, spin_unlock_irq(&sb->wqh.lock); schedule(); spin_lock_irq(&sb->wqh.lock); - if (in_pool && sb->waiting) + if (is_wait) break; if (sb->running + sb->waiting < sb->concurrency) break; - } while (likely(!sb->dead)); + } while (likely(sb->state >= 0)); __remove_wait_queue(&sb->wqh, &__wait); __set_current_state(TASK_RUNNING); - if (in_pool) { - if (sb->waiting) { - sb->waiting--; - } else { - BUG_ON(!sb->quarantined); - sb->quarantined--; - } + if (is_wait) { + sb->waiting--; } else { sb->parked--; } @@ -442,7 +462,7 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, } out_unlock: - if (unlikely(sb->dead)) + if (unlikely(sb->state < 0)) rc = -EBADFD; pwqr_sb_unlock_irqrestore(sb, flags); out: @@ -489,7 +509,7 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) pwqr_sb_lock_irqsave(sb, flags); if (oc) { - nwake = sb->waiting + sb->quarantined + sb->parked - sb->overcommit_wakes; + nwake = sb->waiting + sb->parked - sb->overcommit_wakes; if (count > nwake) { count = nwake; } else { @@ -498,6 +518,10 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) sb->overcommit_wakes += count; } else if (sb->running + sb->overcommit_wakes < sb->concurrency) { nwake = sb->concurrency - sb->overcommit_wakes - sb->running; + if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) { + nwake = sb->waiting + sb->parked - + sb->overcommit_wakes; + } if (count > nwake) { count = nwake; } else { @@ -505,23 +529,25 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) } } else { /* - * This codepath deserves an explanation: when the thread is - * quarantined, for is, really, it's already "parked". Though - * userland doesn't know about, so wake as many threads as - * userlands would have liked to, and let the wakeup tell - * userland those should be parked. + * This codepath deserves an explanation: waking the thread + * "for real" would overcommit, though userspace KNOWS there + * is at least one waiting thread. Such threads are threads + * that are "quarantined". * - * That's why we lie about the number of woken threads, - * really, userlandwise we woke up a thread so that it could - * be parked for real and avoid spurious syscalls. So it's as - * if we woke up 0 threads. + * Quarantined threads are woken up one by one, to allow a + * slow ramp down, trying to minimize "waiting" <-> "parked" + * flip-flops, no matter how many wakes have been asked. + * + * Since releasing one quarantined thread will wake up a + * thread that will (almost) straight go to parked mode, lie + * to userland about the fact that we unblocked that thread, + * and return 0. + * + * Though if we're already waking all waiting threads for + * overcommitting jobs, well, we don't need that. */ - nwake = sb->quarantined; - if (sb->waiting < sb->overcommit_wakes) - nwake -= sb->overcommit_wakes - sb->waiting; - if (nwake > count) - nwake = count; count = 0; + nwake = sb->waiting > sb->overcommit_wakes; } while (nwake-- > 0) wake_up_locked(&sb->wqh); @@ -532,9 +558,9 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg) { - struct pwqr_sb *sb = filp->private_data; + struct pwqr_sb *sb = filp->private_data; struct task_struct *task = current; - struct pwqr_task *pwqt; + struct pwqr_task *pwqt; int rc = 0; if (sb->tgid != current->tgid) @@ -582,7 +608,7 @@ static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg) break; } - if (unlikely(sb->dead)) { + if (unlikely(sb->state < 0)) { pwqr_task_detach(pwqt, pwqt->sb); return -EBADFD; }