X-Git-Url: http://git.madism.org/?p=~madcoder%2Fpwqr.git;a=blobdiff_plain;f=kernel%2Fpwqr.c;h=a6ff8e5e32b33fcfc5648fffd1e33d19d7bf9450;hp=e4e3f58a872b146af038b8cffc9a5c6f2ca5ab0a;hb=3560071ae86bc78a0b4eafff5911a553e7651f64;hpb=b29ccbcc4ab80bc7e7b2131349459cc9e96d49ef diff --git a/kernel/pwqr.c b/kernel/pwqr.c index e4e3f58..a6ff8e5 100644 --- a/kernel/pwqr.c +++ b/kernel/pwqr.c @@ -39,10 +39,17 @@ #include "pwqr.h" -#define PWQR_UNPARK_DELAY (HZ / 10) #define PWQR_HASH_BITS 5 #define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS) +#define PWQR_UC_DELAY (HZ / 10) +#define PWQR_OC_DELAY (HZ / 20) + +#define PWQR_STATE_NONE 0 +#define PWQR_STATE_UC 1 +#define PWQR_STATE_OC 2 +#define PWQR_STATE_DEAD (-1) + struct pwqr_task_bucket { spinlock_t lock; struct hlist_head tasks; @@ -53,18 +60,16 @@ struct pwqr_sb { struct rcu_head rcu; struct timer_list timer; wait_queue_head_t wqh; - pid_t tgid; unsigned concurrency; unsigned registered; unsigned running; unsigned waiting; - unsigned quarantined; unsigned parked; unsigned overcommit_wakes; - unsigned dead; + int state; }; struct pwqr_task { @@ -94,42 +99,27 @@ static struct preempt_ops pwqr_preempt_noop_ops; #define pwqr_sb_unlock_irqrestore(sb, flags) \ spin_unlock_irqrestore(&(sb)->wqh.lock, flags) -static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta) +static inline void pwqr_arm_timer(struct pwqr_sb *sb, int how, int delay) { - int overcommit; + if (timer_pending(&sb->timer) && sb->state == how) + return; + mod_timer(&sb->timer, jiffies + delay); + sb->state = how; +} +static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta) +{ sb->running += running_delta; - overcommit = sb->running + sb->waiting - sb->concurrency; - if (overcommit == 0) { - /* do nothing */ - } else if (overcommit > 0) { - if (overcommit > sb->waiting) { - sb->quarantined += sb->waiting; - sb->waiting = 0; - } else { - sb->quarantined += overcommit; - sb->waiting -= overcommit; - } + + if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) { + pwqr_arm_timer(sb, PWQR_STATE_UC, PWQR_UC_DELAY); + } else if (sb->running > sb->concurrency) { + pwqr_arm_timer(sb, PWQR_STATE_OC, PWQR_OC_DELAY); } else { - unsigned undercommit = -overcommit; - - if (undercommit < sb->quarantined) { - sb->waiting += undercommit; - sb->quarantined -= undercommit; - } else if (sb->quarantined) { - sb->waiting += sb->quarantined; - sb->quarantined = 0; - } else if (sb->waiting == 0 && sb->parked) { - if (!timer_pending(&sb->timer)) { - mod_timer(&sb->timer, jiffies + - PWQR_UNPARK_DELAY); - } - return; - } + sb->state = PWQR_STATE_NONE; + if (!timer_pending(&sb->timer)) + del_timer(&sb->timer); } - - if (timer_pending(&sb->timer)) - del_timer(&sb->timer); } static void pwqr_sb_timer_cb(unsigned long arg) @@ -138,10 +128,13 @@ static void pwqr_sb_timer_cb(unsigned long arg) unsigned long flags; pwqr_sb_lock_irqsave(sb, flags); - if (sb->waiting == 0 && sb->parked && sb->running < sb->concurrency) { + if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) { if (sb->overcommit_wakes == 0) wake_up_locked(&sb->wqh); } + if (sb->running > sb->concurrency) { + /* See ../Documentation/pwqr.adoc */ + } pwqr_sb_unlock_irqrestore(sb, flags); } @@ -155,8 +148,7 @@ static struct pwqr_sb *pwqr_sb_create(void) kref_init(&sb->kref); init_waitqueue_head(&sb->wqh); - sb->tgid = current->tgid; - sb->concurrency = num_online_cpus(); + sb->concurrency = num_online_cpus(); init_timer(&sb->timer); sb->timer.function = pwqr_sb_timer_cb; sb->timer.data = (unsigned long)sb; @@ -302,7 +294,7 @@ static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cp struct pwqr_sb *sb = pwqt->sb; unsigned long flags; - if (unlikely(sb->dead)) { + if (unlikely(sb->state < 0)) { pwqr_task_detach(pwqt, sb); pwqr_task_release(pwqt, true); return; @@ -317,11 +309,11 @@ static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cp static void pwqr_task_sched_out(struct preempt_notifier *notifier, struct task_struct *next) { - struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier); - struct pwqr_sb *sb = pwqt->sb; + struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier); + struct pwqr_sb *sb = pwqt->sb; struct task_struct *p = pwqt->task; - if (unlikely(p->state & TASK_DEAD) || unlikely(sb->dead)) { + if (unlikely(p->state & TASK_DEAD) || unlikely(sb->state < 0)) { pwqr_task_detach(pwqt, sb); pwqr_task_release(pwqt, true); return; @@ -371,7 +363,7 @@ static int pwqr_release(struct inode *inode, struct file *filp) unsigned long flags; pwqr_sb_lock_irqsave(sb, flags); - sb->dead = true; + sb->state = PWQR_STATE_DEAD; pwqr_sb_unlock_irqrestore(sb, flags); wake_up_all(&sb->wqh); pwqr_sb_put(sb); @@ -380,7 +372,7 @@ static int pwqr_release(struct inode *inode, struct file *filp) static long do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, - int in_pool, struct pwqr_ioc_wait __user *arg) + int is_wait, struct pwqr_ioc_wait __user *arg) { unsigned long flags; struct pwqr_ioc_wait wait; @@ -389,14 +381,20 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, preempt_notifier_unregister(&pwqt->notifier); - if (in_pool && copy_from_user(&wait, arg, sizeof(wait))) { - rc = -EFAULT; - goto out; + if (is_wait) { + if (copy_from_user(&wait, arg, sizeof(wait))) { + rc = -EFAULT; + goto out; + } + if (unlikely((long)wait.pwqr_uaddr % sizeof(int) != 0)) { + rc = -EINVAL; + goto out; + } } pwqr_sb_lock_irqsave(sb, flags); if (sb->running + sb->waiting <= sb->concurrency) { - if (in_pool) { + if (is_wait) { while (probe_kernel_address(wait.pwqr_uaddr, uval)) { pwqr_sb_unlock_irqrestore(sb, flags); rc = get_user(uval, (u32 *)wait.pwqr_uaddr); @@ -410,18 +408,17 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, goto out_unlock; } } else { - BUG_ON(sb->quarantined != 0); goto out_unlock; } } /* @ see */ - if (likely(!sb->dead)) { + if (likely(sb->state >= 0)) { DEFINE_WAIT(__wait); __wait.flags |= WQ_FLAG_EXCLUSIVE; - if (in_pool) { + if (is_wait) { sb->waiting++; __add_wait_queue(&sb->wqh, &__wait); } else { @@ -441,22 +438,17 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, spin_unlock_irq(&sb->wqh.lock); schedule(); spin_lock_irq(&sb->wqh.lock); - if (in_pool && sb->waiting) + if (is_wait) break; if (sb->running + sb->waiting < sb->concurrency) break; - } while (likely(!sb->dead)); + } while (likely(sb->state >= 0)); __remove_wait_queue(&sb->wqh, &__wait); __set_current_state(TASK_RUNNING); - if (in_pool) { - if (sb->waiting) { - sb->waiting--; - } else { - BUG_ON(!sb->quarantined); - sb->quarantined--; - } + if (is_wait) { + sb->waiting--; } else { sb->parked--; } @@ -468,7 +460,7 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, } out_unlock: - if (unlikely(sb->dead)) + if (unlikely(sb->state < 0)) rc = -EBADFD; pwqr_sb_unlock_irqrestore(sb, flags); out: @@ -515,7 +507,7 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) pwqr_sb_lock_irqsave(sb, flags); if (oc) { - nwake = sb->waiting + sb->quarantined + sb->parked - sb->overcommit_wakes; + nwake = sb->waiting + sb->parked - sb->overcommit_wakes; if (count > nwake) { count = nwake; } else { @@ -524,6 +516,10 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) sb->overcommit_wakes += count; } else if (sb->running + sb->overcommit_wakes < sb->concurrency) { nwake = sb->concurrency - sb->overcommit_wakes - sb->running; + if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) { + nwake = sb->waiting + sb->parked - + sb->overcommit_wakes; + } if (count > nwake) { count = nwake; } else { @@ -531,23 +527,25 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) } } else { /* - * This codepath deserves an explanation: when the thread is - * quarantined, for us, really, it's already "parked". Though - * userland doesn't know about, so wake as many threads as - * userlands would have liked to, and let the wakeup tell - * userland those should be parked. + * This codepath deserves an explanation: waking the thread + * "for real" would overcommit, though userspace KNOWS there + * is at least one waiting thread. Such threads are threads + * that are "quarantined". * - * That's why we lie about the number of woken threads, - * really, userlandwise we woke up a thread so that it could - * be parked for real and avoid spurious syscalls. So it's as - * if we woke up 0 threads. + * Quarantined threads are woken up one by one, to allow a + * slow ramp down, trying to minimize "waiting" <-> "parked" + * flip-flops, no matter how many wakes have been asked. + * + * Since releasing one quarantined thread will wake up a + * thread that will (almost) straight go to parked mode, lie + * to userland about the fact that we unblocked that thread, + * and return 0. + * + * Though if we're already waking all waiting threads for + * overcommitting jobs, well, we don't need that. */ - nwake = sb->quarantined; - if (sb->waiting < sb->overcommit_wakes) - nwake -= sb->overcommit_wakes - sb->waiting; - if (nwake > count) - nwake = count; count = 0; + nwake = sb->waiting > sb->overcommit_wakes; } while (nwake-- > 0) wake_up_locked(&sb->wqh); @@ -558,14 +556,11 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg) { - struct pwqr_sb *sb = filp->private_data; + struct pwqr_sb *sb = filp->private_data; struct task_struct *task = current; - struct pwqr_task *pwqt; + struct pwqr_task *pwqt; int rc = 0; - if (sb->tgid != current->tgid) - return -EBADFD; - switch (command) { case PWQR_GET_CONC: return sb->concurrency; @@ -608,7 +603,7 @@ static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg) break; } - if (unlikely(sb->dead)) { + if (unlikely(sb->state < 0)) { pwqr_task_detach(pwqt, pwqt->sb); return -EBADFD; }