2 * Copyright (C) 2012 Pierre Habouzit <pierre.habouzit@intersec.com>
3 * Copyright (C) 2012 Intersec SAS
5 * This file implements the Linux Pthread Workqueue Regulator, and is part
8 * The Linux Kernel is free software: you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License version 2 as published by
10 * the Free Software Foundation.
12 * The Linux Kernel is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
17 * You should have received a copy of the GNU General Public License version 2
18 * along with The Linux Kernel. If not, see <http://www.gnu.org/licenses/>.
21 #include <linux/cdev.h>
22 #include <linux/device.h>
23 #include <linux/file.h>
25 #include <linux/hash.h>
26 #include <linux/init.h>
27 #include <linux/kref.h>
28 #include <linux/module.h>
29 #include <linux/poll.h>
30 #include <linux/sched.h>
31 #include <linux/slab.h>
32 #include <linux/spinlock.h>
33 #include <linux/timer.h>
34 #include <linux/uaccess.h>
35 #include <linux/wait.h>
36 #include <linux/version.h>
39 * The pthread workqueue regulator code is for now written as a proof of
40 * concept module, meant to work with 2.6.23+ kernels or redhat5 ones.
42 * For now it uses a device /dev/pwq, which spawns magic file-descriptors
43 * supporting a few ioctl operations (see Documentation/pwqr.adoc shipped in
44 * the same git repository).
46 * This code is meant to be merged into mainline, but after the following
47 * changes, kept here as a "todolist":
49 * - get rid of the device stuff (which is 100% of the init code for 2.6.23
52 * - resubmit the patch that makes it possible to call
53 * preempt_notifier_unregister from sched_in/sched_out (just a matter of a
54 * hlist_for_each_safe instead of hlist_for_each), and fix
55 * pwqr_task_release to not require RCU anymore. It makes
56 * pwqr_preempt_noop_ops go away.
58 * - think about the possibility to add a pwq_notifier pointer directly into
59 * the task_struct, thought it's not *that* necessary, it grows the
60 * structure for a speed gain we don't really need (making pwqr_ctl
61 * faster). I think it's okay to crawl the preempt_notifier list instead.
62 * We may want to add nice "macros" for that though.
64 * - replace the ioctl with a pwqr_ctl syscall
66 * - create a pwqr_create() syscall to create a pwqr file-descriptor.
68 * Summary: most of the code should be untouched or almost not changed,
69 * pwqr_ioctl adapted to become a syscall, and the module boilerplate replaced
70 * with pwqr_create() and file-descriptor creation boilerplate instead. But
71 * looking at fs/eventfd.c this looks rather simple.
74 #ifndef CONFIG_PREEMPT_NOTIFIERS
75 # error PWQ module requires CONFIG_PREEMPT_NOTIFIERS
80 #define PWQR_UC_DELAY (HZ / 10)
81 #define PWQR_OC_DELAY (HZ / 20)
83 #define PWQR_STATE_NONE 0
84 #define PWQR_STATE_UC 1
85 #define PWQR_STATE_OC 2
86 #define PWQR_STATE_DEAD (-1)
89 * This is the first inclusion of CONFIG_PREEMPT_NOTIFIERS in the kernel.
91 * Though I want it to work on older redhat 5 kernels, that have an emulation
92 * of the feature but not implemented the same way, and instead of linking the
93 * preempt_notifiers from the task_struct directly, they have a private
94 * h-table I don't have access to, so I need my own too.
96 * For vanilla kernels we crawl through the task_struct::preempt_notifiers
97 * hlist until we find our entry, this list is often very short, and it's no
98 * slower than the global h-table which also crawls a list anyway.
100 #define IS_PRE_2_6_23 (LINUX_VERSION_CODE < KERNEL_VERSION(2, 6, 23))
105 struct timer_list timer;
106 wait_queue_head_t wqh;
107 wait_queue_head_t wqh_poll;
109 unsigned concurrency;
115 unsigned overcommit_wakes;
122 struct preempt_notifier notifier;
126 struct hlist_node link;
127 struct task_struct *task;
133 #define PWQR_HASH_BITS 5
134 #define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS)
136 struct pwqr_task_bucket {
138 struct hlist_head tasks;
141 static struct pwqr_task_bucket pwqr_tasks_hash[PWQR_HASH_SIZE];
147 static struct class *pwqr_class;
148 static int pwqr_major;
149 static struct preempt_ops pwqr_preempt_running_ops;
150 static struct preempt_ops pwqr_preempt_blocked_ops;
151 static struct preempt_ops pwqr_preempt_noop_ops;
153 /*****************************************************************************
157 #define pwqr_sb_lock_irqsave(sb, flags) \
158 spin_lock_irqsave(&(sb)->wqh.lock, flags)
159 #define pwqr_sb_unlock_irqrestore(sb, flags) \
160 spin_unlock_irqrestore(&(sb)->wqh.lock, flags)
162 static inline void pwqr_arm_timer(struct pwqr_sb *sb, int how, int delay)
164 if (timer_pending(&sb->timer) && sb->state == how)
166 mod_timer(&sb->timer, jiffies + delay);
170 static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta)
172 sb->running += running_delta;
174 if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) {
176 pwqr_arm_timer(sb, PWQR_STATE_UC, PWQR_UC_DELAY);
177 } else if (sb->running > sb->concurrency) {
179 pwqr_arm_timer(sb, PWQR_STATE_OC, PWQR_OC_DELAY);
182 sb->state = PWQR_STATE_NONE;
183 if (!timer_pending(&sb->timer))
184 del_timer(&sb->timer);
188 static void pwqr_sb_timer_cb(unsigned long arg)
190 struct pwqr_sb *sb = (struct pwqr_sb *)arg;
193 pwqr_sb_lock_irqsave(sb, flags);
194 if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) {
195 if (sb->overcommit_wakes == 0)
196 wake_up_locked(&sb->wqh);
198 if (sb->running > sb->concurrency) {
199 printk(KERN_DEBUG "wake up poll");
200 wake_up_poll(&sb->wqh_poll, POLLIN);
203 pwqr_sb_unlock_irqrestore(sb, flags);
206 static struct pwqr_sb *pwqr_sb_create(void)
210 sb = kzalloc(sizeof(struct pwqr_sb), GFP_KERNEL);
212 return ERR_PTR(-ENOMEM);
214 kref_init(&sb->kref);
215 init_waitqueue_head(&sb->wqh);
216 init_waitqueue_head(&sb->wqh_poll);
217 sb->concurrency = num_online_cpus();
218 init_timer(&sb->timer);
219 sb->timer.function = pwqr_sb_timer_cb;
220 sb->timer.data = (unsigned long)sb;
222 __module_get(THIS_MODULE);
225 static inline void pwqr_sb_get(struct pwqr_sb *sb)
230 static void pwqr_sb_finalize(struct rcu_head *rcu)
232 struct pwqr_sb *sb = container_of(rcu, struct pwqr_sb, rcu);
234 module_put(THIS_MODULE);
238 static void pwqr_sb_release(struct kref *kref)
240 struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
242 del_timer_sync(&sb->timer);
243 wake_up_poll(&sb->wqh_poll, POLLHUP);
244 call_rcu(&sb->rcu, pwqr_sb_finalize);
246 static inline void pwqr_sb_put(struct pwqr_sb *sb)
248 kref_put(&sb->kref, pwqr_sb_release);
251 /*****************************************************************************
255 static inline struct pwqr_task_bucket *task_hbucket(struct task_struct *task)
257 return &pwqr_tasks_hash[hash_ptr(task, PWQR_HASH_BITS)];
260 static struct pwqr_task *pwqr_task_find(struct task_struct *task)
262 struct pwqr_task_bucket *b = task_hbucket(task);
263 struct hlist_node *node;
264 struct pwqr_task *pwqt = NULL;
267 hlist_for_each_entry(pwqt, node, &b->tasks, link) {
268 if (pwqt->task == task)
271 spin_unlock(&b->lock);
275 static struct pwqr_task *pwqr_task_find(struct task_struct *task)
277 struct hlist_node *node;
278 struct preempt_notifier *it;
279 struct pwqr_task *pwqt = NULL;
281 hlist_for_each_entry(it, node, &task->preempt_notifiers, link) {
282 if (it->ops == &pwqr_preempt_running_ops ||
283 it->ops == &pwqr_preempt_blocked_ops ||
284 it->ops == &pwqr_preempt_noop_ops)
286 pwqt = container_of(it, struct pwqr_task, notifier);
295 static struct pwqr_task *pwqr_task_create(struct task_struct *task)
297 struct pwqr_task *pwqt;
299 pwqt = kmalloc(sizeof(*pwqt), GFP_KERNEL);
301 return ERR_PTR(-ENOMEM);
303 preempt_notifier_init(&pwqt->notifier, &pwqr_preempt_running_ops);
304 preempt_notifier_register(&pwqt->notifier);
307 struct pwqr_task_bucket *b = task_hbucket(task);
311 hlist_add_head(&pwqt->link, &b->tasks);
312 spin_unlock(&b->lock);
319 static void pwqr_task_detach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
323 pwqr_sb_lock_irqsave(sb, flags);
325 if (pwqt->notifier.ops == &pwqr_preempt_running_ops) {
326 __pwqr_sb_update_state(sb, -1);
328 __pwqr_sb_update_state(sb, 0);
330 pwqr_sb_unlock_irqrestore(sb, flags);
336 static void pwqr_task_attach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
340 pwqr_sb_lock_irqsave(sb, flags);
341 pwqr_sb_get(pwqt->sb = sb);
343 __pwqr_sb_update_state(sb, 1);
344 pwqr_sb_unlock_irqrestore(sb, flags);
348 static void pwqr_task_release(struct pwqr_task *pwqt, bool from_notifier)
351 struct pwqr_task_bucket *b = task_hbucket(pwqt->task);
354 hlist_del(&pwqt->link);
355 spin_unlock(&b->lock);
357 pwqt->notifier.ops = &pwqr_preempt_noop_ops;
360 /* When called from sched_{out,in}, it's not allowed to
361 * call preempt_notifier_unregister (or worse kfree())
363 * Though it's not a good idea to kfree() still registered
364 * callbacks if we're not dying, it'll panic on the next
365 * sched_{in,out} call.
367 BUG_ON(!(current->state & TASK_DEAD));
368 kfree_rcu(pwqt, rcu);
370 preempt_notifier_unregister(&pwqt->notifier);
375 static void pwqr_task_noop_sched_in(struct preempt_notifier *notifier, int cpu)
379 static void pwqr_task_noop_sched_out(struct preempt_notifier *notifier,
380 struct task_struct *next)
384 static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cpu)
386 struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
387 struct pwqr_sb *sb = pwqt->sb;
390 if (unlikely(sb->state < 0)) {
391 pwqr_task_detach(pwqt, sb);
392 pwqr_task_release(pwqt, true);
396 pwqt->notifier.ops = &pwqr_preempt_running_ops;
397 pwqr_sb_lock_irqsave(sb, flags);
398 __pwqr_sb_update_state(sb, 1);
399 pwqr_sb_unlock_irqrestore(sb, flags);
402 static void pwqr_task_sched_out(struct preempt_notifier *notifier,
403 struct task_struct *next)
405 struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
406 struct pwqr_sb *sb = pwqt->sb;
407 struct task_struct *p = current;
409 if (unlikely(p->state & TASK_DEAD) || unlikely(sb->state < 0)) {
410 pwqr_task_detach(pwqt, sb);
411 pwqr_task_release(pwqt, true);
414 if (p->state == 0 || (p->state & (__TASK_STOPPED | __TASK_TRACED)))
417 pwqt->notifier.ops = &pwqr_preempt_blocked_ops;
418 /* see preempt.h: irq are disabled for sched_out */
419 spin_lock(&sb->wqh.lock);
420 __pwqr_sb_update_state(sb, -1);
421 spin_unlock(&sb->wqh.lock);
424 static struct preempt_ops __read_mostly pwqr_preempt_noop_ops = {
425 .sched_in = pwqr_task_noop_sched_in,
426 .sched_out = pwqr_task_noop_sched_out,
429 static struct preempt_ops __read_mostly pwqr_preempt_running_ops = {
430 .sched_in = pwqr_task_noop_sched_in,
431 .sched_out = pwqr_task_sched_out,
434 static struct preempt_ops __read_mostly pwqr_preempt_blocked_ops = {
435 .sched_in = pwqr_task_blocked_sched_in,
436 .sched_out = pwqr_task_sched_out,
439 /*****************************************************************************
442 static int pwqr_open(struct inode *inode, struct file *filp)
446 sb = pwqr_sb_create();
449 filp->private_data = sb;
453 static int pwqr_release(struct inode *inode, struct file *filp)
455 struct pwqr_sb *sb = filp->private_data;
458 pwqr_sb_lock_irqsave(sb, flags);
459 sb->state = PWQR_STATE_DEAD;
460 pwqr_sb_unlock_irqrestore(sb, flags);
461 wake_up_all(&sb->wqh);
466 static unsigned int pwqr_poll(struct file *filp, poll_table *wait)
468 struct pwqr_sb *sb = filp->private_data;
469 unsigned int events = 0;
472 poll_wait(filp, &sb->wqh_poll, wait);
474 pwqr_sb_lock_irqsave(sb, flags);
479 pwqr_sb_unlock_irqrestore(sb, flags);
484 static inline ssize_t pwqr_sb_read(struct pwqr_sb *sb, int no_wait, u32 *cnt)
486 DECLARE_WAITQUEUE(wait, current);
487 ssize_t rc = -EAGAIN;
489 spin_lock_irq(&sb->wqh.lock);
490 if (sb->running > sb->concurrency) {
492 } else if (!no_wait) {
493 add_wait_queue(&sb->wqh_poll, &wait);
495 set_current_state(TASK_INTERRUPTIBLE);
496 if (sb->running > sb->concurrency) {
500 if (signal_pending(current)) {
504 spin_unlock_irq(&sb->wqh.lock);
506 spin_lock_irq(&sb->wqh.lock);
508 remove_wait_queue(&sb->wqh_poll, &wait);
509 __set_current_state(TASK_RUNNING);
511 if (likely(rc == 0)) {
512 *cnt = sb->running - sb->concurrency;
515 spin_unlock_irq(&sb->wqh.lock);
521 pwqr_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
523 struct pwqr_sb *sb = filp->private_data;
527 if (count < sizeof(cnt))
529 rc = pwqr_sb_read(sb, filp->f_flags & O_NONBLOCK, &cnt);
532 return put_user(cnt, (u32 __user *)buf) ? -EFAULT : sizeof(cnt);
536 do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
537 int is_wait, struct pwqr_ioc_wait __user *arg)
540 struct pwqr_ioc_wait wait;
544 preempt_notifier_unregister(&pwqt->notifier);
547 if (copy_from_user(&wait, arg, sizeof(wait))) {
551 if (unlikely((long)wait.pwqr_uaddr % sizeof(int) != 0)) {
557 pwqr_sb_lock_irqsave(sb, flags);
558 if (sb->running + sb->waiting <= sb->concurrency) {
560 while (probe_kernel_address(wait.pwqr_uaddr, uval)) {
561 pwqr_sb_unlock_irqrestore(sb, flags);
562 rc = get_user(uval, (u32 *)wait.pwqr_uaddr);
565 pwqr_sb_lock_irqsave(sb, flags);
568 if (uval != (u32)wait.pwqr_ticket) {
577 /* @ see <wait_event_interruptible_exclusive_locked_irq> */
578 if (likely(sb->state >= 0)) {
580 __wait.flags |= WQ_FLAG_EXCLUSIVE;
584 __add_wait_queue(&sb->wqh, &__wait);
587 __add_wait_queue_tail(&sb->wqh, &__wait);
589 __pwqr_sb_update_state(sb, -1);
592 set_current_state(TASK_INTERRUPTIBLE);
593 if (sb->overcommit_wakes)
595 if (signal_pending(current)) {
599 spin_unlock_irq(&sb->wqh.lock);
601 spin_lock_irq(&sb->wqh.lock);
604 if (sb->running + sb->waiting < sb->concurrency)
606 } while (likely(sb->state >= 0));
608 __remove_wait_queue(&sb->wqh, &__wait);
609 __set_current_state(TASK_RUNNING);
615 __pwqr_sb_update_state(sb, 1);
617 if (sb->overcommit_wakes)
618 sb->overcommit_wakes--;
619 if (sb->waiting + sb->running > sb->concurrency)
624 if (unlikely(sb->state < 0))
626 pwqr_sb_unlock_irqrestore(sb, flags);
628 preempt_notifier_register(&pwqt->notifier);
632 static long do_pwqr_unregister(struct pwqr_sb *sb, struct pwqr_task *pwqt)
638 pwqr_task_detach(pwqt, sb);
639 pwqr_task_release(pwqt, false);
643 static long do_pwqr_set_conc(struct pwqr_sb *sb, int conc)
645 long old_conc = sb->concurrency;
648 pwqr_sb_lock_irqsave(sb, flags);
650 conc = num_online_cpus();
651 if (conc != old_conc) {
652 sb->concurrency = conc;
653 __pwqr_sb_update_state(sb, 0);
655 pwqr_sb_unlock_irqrestore(sb, flags);
660 static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count)
668 pwqr_sb_lock_irqsave(sb, flags);
671 nwake = sb->waiting + sb->parked - sb->overcommit_wakes;
677 sb->overcommit_wakes += count;
678 } else if (sb->running + sb->overcommit_wakes < sb->concurrency) {
679 nwake = sb->concurrency - sb->overcommit_wakes - sb->running;
680 if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) {
681 nwake = sb->waiting + sb->parked -
682 sb->overcommit_wakes;
691 * This codepath deserves an explanation: waking the thread
692 * "for real" would overcommit, though userspace KNOWS there
693 * is at least one waiting thread. Such threads are threads
694 * that are "quarantined".
696 * Quarantined threads are woken up one by one, to allow a
697 * slow ramp down, trying to minimize "waiting" <-> "parked"
698 * flip-flops, no matter how many wakes have been asked.
700 * Since releasing one quarantined thread will wake up a
701 * thread that will (almost) straight go to parked mode, lie
702 * to userland about the fact that we unblocked that thread,
705 * Though if we're already waking all waiting threads for
706 * overcommitting jobs, well, we don't need that.
709 nwake = sb->waiting > sb->overcommit_wakes;
712 wake_up_locked(&sb->wqh);
713 pwqr_sb_unlock_irqrestore(sb, flags);
718 static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg)
720 struct pwqr_sb *sb = filp->private_data;
721 struct task_struct *task = current;
722 struct pwqr_task *pwqt;
726 case PWQR_CTL_GET_CONC:
727 return sb->concurrency;
728 case PWQR_CTL_SET_CONC:
729 return do_pwqr_set_conc(sb, (int)arg);
732 case PWQR_CTL_WAKE_OC:
733 return do_pwqr_wake(sb, command == PWQR_CTL_WAKE_OC, (int)arg);
737 case PWQR_CTL_REGISTER:
738 case PWQR_CTL_UNREGISTER:
744 pwqt = pwqr_task_find(task);
745 if (command == PWQR_CTL_UNREGISTER)
746 return do_pwqr_unregister(sb, pwqt);
749 pwqt = pwqr_task_create(task);
751 return PTR_ERR(pwqt);
752 pwqr_task_attach(pwqt, sb);
753 } else if (unlikely(pwqt->sb != sb)) {
754 pwqr_task_detach(pwqt, pwqt->sb);
755 pwqr_task_attach(pwqt, sb);
760 rc = do_pwqr_wait(sb, pwqt, true, (struct pwqr_ioc_wait __user *)arg);
763 rc = do_pwqr_wait(sb, pwqt, false, NULL);
769 static const struct file_operations pwqr_dev_fops = {
770 .owner = THIS_MODULE,
772 .release = pwqr_release,
775 .llseek = noop_llseek,
776 .unlocked_ioctl = pwqr_ioctl,
778 .compat_ioctl = pwqr_ioctl,
782 /*****************************************************************************
785 static int __init pwqr_start(void)
790 for (i = 0; i < PWQR_HASH_SIZE; i++) {
791 spin_lock_init(&pwqr_tasks_hash[i].lock);
792 INIT_HLIST_HEAD(&pwqr_tasks_hash[i].tasks);
796 /* Register as a character device */
797 pwqr_major = register_chrdev(0, "pwqr", &pwqr_dev_fops);
798 if (pwqr_major < 0) {
799 printk(KERN_ERR "pwqr: register_chrdev() failed\n");
803 /* Create a device node */
804 pwqr_class = class_create(THIS_MODULE, PWQR_DEVICE_NAME);
805 if (IS_ERR(pwqr_class)) {
806 printk(KERN_ERR "pwqr: Error creating raw class\n");
807 unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
808 return PTR_ERR(pwqr_class);
810 device_create(pwqr_class, NULL, MKDEV(pwqr_major, 0), NULL, PWQR_DEVICE_NAME);
811 printk(KERN_INFO "pwqr: PThreads Work Queues Regulator v1 loaded");
815 static void __exit pwqr_end(void)
818 device_destroy(pwqr_class, MKDEV(pwqr_major, 0));
819 class_destroy(pwqr_class);
820 unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
823 module_init(pwqr_start);
824 module_exit(pwqr_end);
826 MODULE_LICENSE("GPL");
827 MODULE_AUTHOR("Pierre Habouzit <pierre.habouzit@intersec.com>");
828 MODULE_DESCRIPTION("PThreads Work Queues Regulator");
831 // vim:noet:sw=8:cinoptions+=\:0,L-1,=1s: