X-Git-Url: http://git.madism.org/?p=~madcoder%2Fpwqr.git;a=blobdiff_plain;f=kernel%2Fpwqr.c;h=8248482b56a8666e5895f7acc7cf325c47586b08;hp=0dd026fef9c342809a5d4bf2c33b6175923d0131;hb=98c615b91f0da85c9bd00bbff7a8fd1258400ab9;hpb=c78f2216a947e712fff07f6e79f743d9009e60c8 diff --git a/kernel/pwqr.c b/kernel/pwqr.c index 0dd026f..8248482 100644 --- a/kernel/pwqr.c +++ b/kernel/pwqr.c @@ -26,58 +26,125 @@ #include #include #include +#include #include #include #include +#include #include #include +#include + +/* + * The pthread workqueue regulator code is for now written as a proof of + * concept module, meant to work with 2.6.23+ kernels or redhat5 ones. + * + * For now it uses a device /dev/pwq, which spawns magic file-descriptors + * supporting a few ioctl operations (see Documentation/pwqr.adoc shipped in + * the same git repository). + * + * This code is meant to be merged into mainline, but after the following + * changes, kept here as a "todolist": + * + * - get rid of the device stuff (which is 100% of the init code for 2.6.23 + * kernels); + * + * - resubmit the patch that makes it possible to call + * preempt_notifier_unregister from sched_in/sched_out (just a matter of a + * hlist_for_each_safe instead of hlist_for_each), and fix + * pwqr_task_release to not require RCU anymore. It makes + * pwqr_preempt_noop_ops go away. + * + * - think about the possibility to add a pwq_notifier pointer directly into + * the task_struct, thought it's not *that* necessary, it grows the + * structure for a speed gain we don't really need (making pwqr_ctl + * faster). I think it's okay to crawl the preempt_notifier list instead. + * We may want to add nice "macros" for that though. + * + * - replace the ioctl with a pwqr_ctl syscall + * + * - create a pwqr_create() syscall to create a pwqr file-descriptor. + * + * Summary: most of the code should be untouched or almost not changed, + * pwqr_ioctl adapted to become a syscall, and the module boilerplate replaced + * with pwqr_create() and file-descriptor creation boilerplate instead. But + * looking at fs/eventfd.c this looks rather simple. + */ #ifndef CONFIG_PREEMPT_NOTIFIERS # error PWQ module requires CONFIG_PREEMPT_NOTIFIERS -#endif +#else #include "pwqr.h" -#define PWQR_HASH_BITS 5 -#define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS) +#define PWQR_UC_DELAY (HZ / 10) +#define PWQR_OC_DELAY (HZ / 20) -struct pwqr_task_bucket { - spinlock_t lock; - struct hlist_head tasks; -}; +#define PWQR_STATE_NONE 0 +#define PWQR_STATE_UC 1 +#define PWQR_STATE_OC 2 +#define PWQR_STATE_DEAD (-1) + +/* + * This is the first inclusion of CONFIG_PREEMPT_NOTIFIERS in the kernel. + * + * Though I want it to work on older redhat 5 kernels, that have an emulation + * of the feature but not implemented the same way, and instead of linking the + * preempt_notifiers from the task_struct directly, they have a private + * h-table I don't have access to, so I need my own too. + * + * For vanilla kernels we crawl through the task_struct::preempt_notifiers + * hlist until we find our entry, this list is often very short, and it's no + * slower than the global h-table which also crawls a list anyway. + */ +#define IS_PRE_2_6_23 (LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 23)) struct pwqr_sb { struct kref kref; struct rcu_head rcu; + struct timer_list timer; wait_queue_head_t wqh; - pid_t tgid; + wait_queue_head_t wqh_poll; unsigned concurrency; unsigned registered; unsigned running; unsigned waiting; - unsigned quarantined; unsigned parked; unsigned overcommit_wakes; - unsigned dead; + int state; }; struct pwqr_task { struct preempt_notifier notifier; - struct hlist_node link; + struct pwqr_sb *sb; struct rcu_head rcu; +#if IS_PRE_2_6_23 + struct hlist_node link; struct task_struct *task; - struct pwqr_sb *sb; +#endif }; +#if IS_PRE_2_6_23 + +#define PWQR_HASH_BITS 5 +#define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS) + +struct pwqr_task_bucket { + spinlock_t lock; + struct hlist_head tasks; +}; + +static struct pwqr_task_bucket pwqr_tasks_hash[PWQR_HASH_SIZE]; +#endif + /* * Global variables */ static struct class *pwqr_class; static int pwqr_major; -static struct pwqr_task_bucket pwqr_tasks_hash[PWQR_HASH_SIZE]; static struct preempt_ops pwqr_preempt_running_ops; static struct preempt_ops pwqr_preempt_blocked_ops; static struct preempt_ops pwqr_preempt_noop_ops; @@ -91,36 +158,44 @@ static struct preempt_ops pwqr_preempt_noop_ops; #define pwqr_sb_unlock_irqrestore(sb, flags) \ spin_unlock_irqrestore(&(sb)->wqh.lock, flags) -static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta) +static inline void pwqr_arm_timer(struct pwqr_sb *sb, int how, int delay) { - int overcommit; + if (timer_pending(&sb->timer) && sb->state == how) + return; + mod_timer(&sb->timer, jiffies + delay); + sb->state = how; +} +static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta) +{ sb->running += running_delta; - overcommit = sb->running + sb->waiting - sb->concurrency; - if (overcommit == 0) - return; - if (overcommit > 0) { - if (overcommit > sb->waiting) { - sb->quarantined += sb->waiting; - sb->waiting = 0; - } else { - sb->quarantined += overcommit; - sb->waiting -= overcommit; - } + if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) { + pwqr_arm_timer(sb, PWQR_STATE_UC, PWQR_UC_DELAY); + } else if (sb->running > sb->concurrency) { + pwqr_arm_timer(sb, PWQR_STATE_OC, PWQR_OC_DELAY); } else { - unsigned undercommit = -overcommit; - - if (undercommit < sb->quarantined) { - sb->waiting += undercommit; - sb->quarantined -= undercommit; - } else if (sb->quarantined) { - sb->waiting += sb->quarantined; - sb->quarantined = 0; - } else if (sb->waiting == 0 && sb->parked) { + sb->state = PWQR_STATE_NONE; + if (!timer_pending(&sb->timer)) + del_timer(&sb->timer); + } +} + +static void pwqr_sb_timer_cb(unsigned long arg) +{ + struct pwqr_sb *sb = (struct pwqr_sb *)arg; + unsigned long flags; + + pwqr_sb_lock_irqsave(sb, flags); + if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) { + if (sb->overcommit_wakes == 0) wake_up_locked(&sb->wqh); - } } + if (sb->running > sb->concurrency) { + printk(KERN_DEBUG "wake up poll"); + wake_up_poll(&sb->wqh_poll, POLLIN); + } + pwqr_sb_unlock_irqrestore(sb, flags); } static struct pwqr_sb *pwqr_sb_create(void) @@ -133,8 +208,10 @@ static struct pwqr_sb *pwqr_sb_create(void) kref_init(&sb->kref); init_waitqueue_head(&sb->wqh); - sb->tgid = current->tgid; - sb->concurrency = num_online_cpus(); + sb->concurrency = num_online_cpus(); + init_timer(&sb->timer); + sb->timer.function = pwqr_sb_timer_cb; + sb->timer.data = (unsigned long)sb; __module_get(THIS_MODULE); return sb; @@ -156,6 +233,8 @@ static void pwqr_sb_release(struct kref *kref) { struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref); + del_timer_sync(&sb->timer); + wake_up_poll(&sb->wqh_poll, POLLHUP); call_rcu(&sb->rcu, pwqr_sb_finalize); } static inline void pwqr_sb_put(struct pwqr_sb *sb) @@ -166,6 +245,7 @@ static inline void pwqr_sb_put(struct pwqr_sb *sb) /***************************************************************************** * tasks */ +#if IS_PRE_2_6_23 static inline struct pwqr_task_bucket *task_hbucket(struct task_struct *task) { return &pwqr_tasks_hash[hash_ptr(task, PWQR_HASH_BITS)]; @@ -185,10 +265,29 @@ static struct pwqr_task *pwqr_task_find(struct task_struct *task) spin_unlock(&b->lock); return pwqt; } +#else +static struct pwqr_task *pwqr_task_find(struct task_struct *task) +{ + struct hlist_node *node; + struct preempt_notifier *it; + struct pwqr_task *pwqt = NULL; + + hlist_for_each_entry(it, node, &task->preempt_notifiers, link) { + if (it->ops == &pwqr_preempt_running_ops || + it->ops == &pwqr_preempt_blocked_ops || + it->ops == &pwqr_preempt_noop_ops) + { + pwqt = container_of(it, struct pwqr_task, notifier); + break; + } + } + + return pwqt; +} +#endif static struct pwqr_task *pwqr_task_create(struct task_struct *task) { - struct pwqr_task_bucket *b = task_hbucket(task); struct pwqr_task *pwqt; pwqt = kmalloc(sizeof(*pwqt), GFP_KERNEL); @@ -197,12 +296,16 @@ static struct pwqr_task *pwqr_task_create(struct task_struct *task) preempt_notifier_init(&pwqt->notifier, &pwqr_preempt_running_ops); preempt_notifier_register(&pwqt->notifier); - pwqt->task = task; - - spin_lock(&b->lock); - hlist_add_head(&pwqt->link, &b->tasks); - spin_unlock(&b->lock); - +#if IS_PRE_2_6_23 + { + struct pwqr_task_bucket *b = task_hbucket(task); + + pwqt->task = task; + spin_lock(&b->lock); + hlist_add_head(&pwqt->link, &b->tasks); + spin_unlock(&b->lock); + } +#endif return pwqt; } @@ -238,11 +341,13 @@ static void pwqr_task_attach(struct pwqr_task *pwqt, struct pwqr_sb *sb) __cold static void pwqr_task_release(struct pwqr_task *pwqt, bool from_notifier) { +#if IS_PRE_2_6_23 struct pwqr_task_bucket *b = task_hbucket(pwqt->task); spin_lock(&b->lock); hlist_del(&pwqt->link); spin_unlock(&b->lock); +#endif pwqt->notifier.ops = &pwqr_preempt_noop_ops; if (from_notifier) { @@ -253,7 +358,7 @@ static void pwqr_task_release(struct pwqr_task *pwqt, bool from_notifier) * callbacks if we're not dying, it'll panic on the next * sched_{in,out} call. */ - BUG_ON(!(pwqt->task->state & TASK_DEAD)); + BUG_ON(!(current->state & TASK_DEAD)); kfree_rcu(pwqt, rcu); } else { preempt_notifier_unregister(&pwqt->notifier); @@ -276,7 +381,7 @@ static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cp struct pwqr_sb *sb = pwqt->sb; unsigned long flags; - if (unlikely(sb->dead)) { + if (unlikely(sb->state < 0)) { pwqr_task_detach(pwqt, sb); pwqr_task_release(pwqt, true); return; @@ -289,13 +394,13 @@ static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cp } static void pwqr_task_sched_out(struct preempt_notifier *notifier, - struct task_struct *next) + struct task_struct *next) { - struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier); - struct pwqr_sb *sb = pwqt->sb; - struct task_struct *p = pwqt->task; + struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier); + struct pwqr_sb *sb = pwqt->sb; + struct task_struct *p = current; - if (unlikely(p->state & TASK_DEAD) || unlikely(sb->dead)) { + if (unlikely(p->state & TASK_DEAD) || unlikely(sb->state < 0)) { pwqr_task_detach(pwqt, sb); pwqr_task_release(pwqt, true); return; @@ -345,16 +450,83 @@ static int pwqr_release(struct inode *inode, struct file *filp) unsigned long flags; pwqr_sb_lock_irqsave(sb, flags); - sb->dead = true; + sb->state = PWQR_STATE_DEAD; pwqr_sb_unlock_irqrestore(sb, flags); wake_up_all(&sb->wqh); pwqr_sb_put(sb); return 0; } +static unsigned int pwqr_poll(struct file *filp, poll_table *wait) +{ + struct pwqr_sb *sb = filp->private_data; + unsigned int events = 0; + unsigned long flags; + + poll_wait(filp, &sb->wqh_poll, wait); + + pwqr_sb_lock_irqsave(sb, flags); + if (sb->running > sb->concurrency) + events |= POLLIN; + if (sb->state < 0) + events |= POLLHUP; + pwqr_sb_unlock_irqrestore(sb, flags); + + return events; +} + +static inline ssize_t pwqr_sb_read(struct pwqr_sb *sb, int no_wait, u32 *cnt) +{ + DECLARE_WAITQUEUE(wait, current); + ssize_t rc = -EAGAIN; + + spin_lock_irq(&sb->wqh.lock); + if (sb->running > sb->concurrency) { + rc = 0; + } else if (!no_wait) { + add_wait_queue(&sb->wqh_poll, &wait); + for (;;) { + set_current_state(TASK_INTERRUPTIBLE); + if (sb->running > sb->concurrency) { + rc = 0; + break; + } + if (signal_pending(current)) { + rc = -ERESTARTSYS; + break; + } + spin_unlock_irq(&sb->wqh.lock); + schedule(); + spin_lock_irq(&sb->wqh.lock); + } + remove_wait_queue(&sb->wqh_poll, &wait); + __set_current_state(TASK_RUNNING); + } + if (likely(rc == 0)) + *cnt = sb->running - sb->concurrency; + spin_unlock_irq(&sb->wqh.lock); + + return rc; +} + +static ssize_t +pwqr_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) +{ + struct pwqr_sb *sb = filp->private_data; + u32 cnt = 0; + ssize_t rc; + + if (count < sizeof(cnt)) + return -EINVAL; + rc = pwqr_sb_read(sb, filp->f_flags & O_NONBLOCK, &cnt); + if (rc < 0) + return rc; + return put_user(cnt, (u32 __user *)buf) ? -EFAULT : sizeof(cnt); +} + static long do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, - int in_pool, struct pwqr_ioc_wait __user *arg) + int is_wait, struct pwqr_ioc_wait __user *arg) { unsigned long flags; struct pwqr_ioc_wait wait; @@ -363,14 +535,20 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, preempt_notifier_unregister(&pwqt->notifier); - if (in_pool && copy_from_user(&wait, arg, sizeof(wait))) { - rc = -EFAULT; - goto out; + if (is_wait) { + if (copy_from_user(&wait, arg, sizeof(wait))) { + rc = -EFAULT; + goto out; + } + if (unlikely((long)wait.pwqr_uaddr % sizeof(int) != 0)) { + rc = -EINVAL; + goto out; + } } pwqr_sb_lock_irqsave(sb, flags); if (sb->running + sb->waiting <= sb->concurrency) { - if (in_pool) { + if (is_wait) { while (probe_kernel_address(wait.pwqr_uaddr, uval)) { pwqr_sb_unlock_irqrestore(sb, flags); rc = get_user(uval, (u32 *)wait.pwqr_uaddr); @@ -384,18 +562,16 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, goto out_unlock; } } else { - BUG_ON(sb->quarantined != 0); goto out_unlock; } } /* @ see */ - if (likely(!sb->dead)) { + if (likely(sb->state >= 0)) { DEFINE_WAIT(__wait); - __wait.flags |= WQ_FLAG_EXCLUSIVE; - if (in_pool) { + if (is_wait) { sb->waiting++; __add_wait_queue(&sb->wqh, &__wait); } else { @@ -403,9 +579,9 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, __add_wait_queue_tail(&sb->wqh, &__wait); } __pwqr_sb_update_state(sb, -1); - set_current_state(TASK_INTERRUPTIBLE); do { + set_current_state(TASK_INTERRUPTIBLE); if (sb->overcommit_wakes) break; if (signal_pending(current)) { @@ -415,26 +591,21 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, spin_unlock_irq(&sb->wqh.lock); schedule(); spin_lock_irq(&sb->wqh.lock); - if (in_pool && sb->waiting) + if (is_wait) break; if (sb->running + sb->waiting < sb->concurrency) break; - } while (likely(!sb->dead)); + } while (likely(sb->state >= 0)); __remove_wait_queue(&sb->wqh, &__wait); __set_current_state(TASK_RUNNING); - - if (in_pool) { - if (sb->waiting) { - sb->waiting--; - } else { - BUG_ON(!sb->quarantined); - sb->quarantined--; - } + if (is_wait) { + sb->waiting--; } else { sb->parked--; } __pwqr_sb_update_state(sb, 1); + if (sb->overcommit_wakes) sb->overcommit_wakes--; if (sb->waiting + sb->running > sb->concurrency) @@ -442,7 +613,7 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt, } out_unlock: - if (unlikely(sb->dead)) + if (unlikely(sb->state < 0)) rc = -EBADFD; pwqr_sb_unlock_irqrestore(sb, flags); out: @@ -489,7 +660,7 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) pwqr_sb_lock_irqsave(sb, flags); if (oc) { - nwake = sb->waiting + sb->quarantined + sb->parked - sb->overcommit_wakes; + nwake = sb->waiting + sb->parked - sb->overcommit_wakes; if (count > nwake) { count = nwake; } else { @@ -498,6 +669,10 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) sb->overcommit_wakes += count; } else if (sb->running + sb->overcommit_wakes < sb->concurrency) { nwake = sb->concurrency - sb->overcommit_wakes - sb->running; + if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) { + nwake = sb->waiting + sb->parked - + sb->overcommit_wakes; + } if (count > nwake) { count = nwake; } else { @@ -505,23 +680,25 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) } } else { /* - * This codepath deserves an explanation: when the thread is - * quarantined, for is, really, it's already "parked". Though - * userland doesn't know about, so wake as many threads as - * userlands would have liked to, and let the wakeup tell - * userland those should be parked. + * This codepath deserves an explanation: waking the thread + * "for real" would overcommit, though userspace KNOWS there + * is at least one waiting thread. Such threads are threads + * that are "quarantined". + * + * Quarantined threads are woken up one by one, to allow a + * slow ramp down, trying to minimize "waiting" <-> "parked" + * flip-flops, no matter how many wakes have been asked. + * + * Since releasing one quarantined thread will wake up a + * thread that will (almost) straight go to parked mode, lie + * to userland about the fact that we unblocked that thread, + * and return 0. * - * That's why we lie about the number of woken threads, - * really, userlandwise we woke up a thread so that it could - * be parked for real and avoid spurious syscalls. So it's as - * if we woke up 0 threads. + * Though if we're already waking all waiting threads for + * overcommitting jobs, well, we don't need that. */ - nwake = sb->quarantined; - if (sb->waiting < sb->overcommit_wakes) - nwake -= sb->overcommit_wakes - sb->waiting; - if (nwake > count) - nwake = count; count = 0; + nwake = sb->waiting > sb->overcommit_wakes; } while (nwake-- > 0) wake_up_locked(&sb->wqh); @@ -532,35 +709,32 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count) static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg) { - struct pwqr_sb *sb = filp->private_data; + struct pwqr_sb *sb = filp->private_data; struct task_struct *task = current; - struct pwqr_task *pwqt; + struct pwqr_task *pwqt; int rc = 0; - if (sb->tgid != current->tgid) - return -EBADFD; - switch (command) { - case PWQR_GET_CONC: + case PWQR_CTL_GET_CONC: return sb->concurrency; - case PWQR_SET_CONC: + case PWQR_CTL_SET_CONC: return do_pwqr_set_conc(sb, (int)arg); - case PWQR_WAKE: - case PWQR_WAKE_OC: - return do_pwqr_wake(sb, command == PWQR_WAKE_OC, (int)arg); + case PWQR_CTL_WAKE: + case PWQR_CTL_WAKE_OC: + return do_pwqr_wake(sb, command == PWQR_CTL_WAKE_OC, (int)arg); - case PWQR_WAIT: - case PWQR_PARK: - case PWQR_REGISTER: - case PWQR_UNREGISTER: + case PWQR_CTL_WAIT: + case PWQR_CTL_PARK: + case PWQR_CTL_REGISTER: + case PWQR_CTL_UNREGISTER: break; default: return -EINVAL; } pwqt = pwqr_task_find(task); - if (command == PWQR_UNREGISTER) + if (command == PWQR_CTL_UNREGISTER) return do_pwqr_unregister(sb, pwqt); if (pwqt == NULL) { @@ -574,18 +748,13 @@ static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg) } switch (command) { - case PWQR_WAIT: + case PWQR_CTL_WAIT: rc = do_pwqr_wait(sb, pwqt, true, (struct pwqr_ioc_wait __user *)arg); break; - case PWQR_PARK: + case PWQR_CTL_PARK: rc = do_pwqr_wait(sb, pwqt, false, NULL); break; } - - if (unlikely(sb->dead)) { - pwqr_task_detach(pwqt, pwqt->sb); - return -EBADFD; - } return rc; } @@ -593,6 +762,9 @@ static const struct file_operations pwqr_dev_fops = { .owner = THIS_MODULE, .open = pwqr_open, .release = pwqr_release, + .poll = pwqr_poll, + .read = pwqr_read, + .llseek = noop_llseek, .unlocked_ioctl = pwqr_ioctl, #ifdef CONFIG_COMPAT .compat_ioctl = pwqr_ioctl, @@ -604,12 +776,14 @@ static const struct file_operations pwqr_dev_fops = { */ static int __init pwqr_start(void) { +#if IS_PRE_2_6_23 int i; for (i = 0; i < PWQR_HASH_SIZE; i++) { spin_lock_init(&pwqr_tasks_hash[i].lock); INIT_HLIST_HEAD(&pwqr_tasks_hash[i].tasks); } +#endif /* Register as a character device */ pwqr_major = register_chrdev(0, "pwqr", &pwqr_dev_fops); @@ -644,5 +818,6 @@ module_exit(pwqr_end); MODULE_LICENSE("GPL"); MODULE_AUTHOR("Pierre Habouzit "); MODULE_DESCRIPTION("PThreads Work Queues Regulator"); +#endif // vim:noet:sw=8:cinoptions+=\:0,L-1,=1s: