2 * Copyright (C) 2012 Pierre Habouzit <pierre.habouzit@intersec.com>
3 * Copyright (C) 2012 Intersec SAS
5 * This file implements the Linux Pthread Workqueue Regulator, and is part
8 * The Linux Kernel is free software: you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License version 2 as published by
10 * the Free Software Foundation.
12 * The Linux Kernel is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
17 * You should have received a copy of the GNU General Public License version 2
18 * along with The Linux Kernel. If not, see <http://www.gnu.org/licenses/>.
21 #include <linux/cdev.h>
22 #include <linux/device.h>
23 #include <linux/file.h>
25 #include <linux/hash.h>
26 #include <linux/init.h>
27 #include <linux/kref.h>
28 #include <linux/module.h>
29 #include <linux/poll.h>
30 #include <linux/sched.h>
31 #include <linux/slab.h>
32 #include <linux/spinlock.h>
33 #include <linux/timer.h>
34 #include <linux/uaccess.h>
35 #include <linux/wait.h>
37 #ifndef CONFIG_PREEMPT_NOTIFIERS
38 # error PWQ module requires CONFIG_PREEMPT_NOTIFIERS
43 #define PWQR_HASH_BITS 5
44 #define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS)
46 #define PWQR_UC_DELAY (HZ / 10)
47 #define PWQR_OC_DELAY (HZ / 20)
49 #define PWQR_STATE_NONE 0
50 #define PWQR_STATE_UC 1
51 #define PWQR_STATE_OC 2
52 #define PWQR_STATE_DEAD (-1)
54 struct pwqr_task_bucket {
56 struct hlist_head tasks;
62 struct timer_list timer;
63 wait_queue_head_t wqh;
64 wait_queue_head_t wqh_poll;
72 unsigned overcommit_wakes;
78 struct preempt_notifier notifier;
79 struct hlist_node link;
81 struct task_struct *task;
88 static struct class *pwqr_class;
89 static int pwqr_major;
90 static struct pwqr_task_bucket pwqr_tasks_hash[PWQR_HASH_SIZE];
91 static struct preempt_ops pwqr_preempt_running_ops;
92 static struct preempt_ops pwqr_preempt_blocked_ops;
93 static struct preempt_ops pwqr_preempt_noop_ops;
95 /*****************************************************************************
99 #define pwqr_sb_lock_irqsave(sb, flags) \
100 spin_lock_irqsave(&(sb)->wqh.lock, flags)
101 #define pwqr_sb_unlock_irqrestore(sb, flags) \
102 spin_unlock_irqrestore(&(sb)->wqh.lock, flags)
104 static inline void pwqr_arm_timer(struct pwqr_sb *sb, int how, int delay)
106 if (timer_pending(&sb->timer) && sb->state == how)
108 mod_timer(&sb->timer, jiffies + delay);
112 static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta)
114 sb->running += running_delta;
116 if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) {
117 pwqr_arm_timer(sb, PWQR_STATE_UC, PWQR_UC_DELAY);
118 } else if (sb->running > sb->concurrency) {
119 pwqr_arm_timer(sb, PWQR_STATE_OC, PWQR_OC_DELAY);
121 sb->state = PWQR_STATE_NONE;
122 if (!timer_pending(&sb->timer))
123 del_timer(&sb->timer);
127 static void pwqr_sb_timer_cb(unsigned long arg)
129 struct pwqr_sb *sb = (struct pwqr_sb *)arg;
132 pwqr_sb_lock_irqsave(sb, flags);
133 if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) {
134 if (sb->overcommit_wakes == 0)
135 wake_up_locked(&sb->wqh);
137 if (sb->running > sb->concurrency) {
138 wake_up_poll(&sb->wqh_poll, POLLIN);
140 pwqr_sb_unlock_irqrestore(sb, flags);
143 static struct pwqr_sb *pwqr_sb_create(void)
147 sb = kzalloc(sizeof(struct pwqr_sb), GFP_KERNEL);
149 return ERR_PTR(-ENOMEM);
151 kref_init(&sb->kref);
152 init_waitqueue_head(&sb->wqh);
153 sb->concurrency = num_online_cpus();
154 init_timer(&sb->timer);
155 sb->timer.function = pwqr_sb_timer_cb;
156 sb->timer.data = (unsigned long)sb;
158 __module_get(THIS_MODULE);
161 static inline void pwqr_sb_get(struct pwqr_sb *sb)
166 static void pwqr_sb_finalize(struct rcu_head *rcu)
168 struct pwqr_sb *sb = container_of(rcu, struct pwqr_sb, rcu);
170 module_put(THIS_MODULE);
174 static void pwqr_sb_release(struct kref *kref)
176 struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
178 del_timer_sync(&sb->timer);
179 wake_up_poll(&sb->wqh_poll, POLLHUP);
180 call_rcu(&sb->rcu, pwqr_sb_finalize);
182 static inline void pwqr_sb_put(struct pwqr_sb *sb)
184 kref_put(&sb->kref, pwqr_sb_release);
187 /*****************************************************************************
190 static inline struct pwqr_task_bucket *task_hbucket(struct task_struct *task)
192 return &pwqr_tasks_hash[hash_ptr(task, PWQR_HASH_BITS)];
195 static struct pwqr_task *pwqr_task_find(struct task_struct *task)
197 struct pwqr_task_bucket *b = task_hbucket(task);
198 struct hlist_node *node;
199 struct pwqr_task *pwqt = NULL;
202 hlist_for_each_entry(pwqt, node, &b->tasks, link) {
203 if (pwqt->task == task)
206 spin_unlock(&b->lock);
210 static struct pwqr_task *pwqr_task_create(struct task_struct *task)
212 struct pwqr_task_bucket *b = task_hbucket(task);
213 struct pwqr_task *pwqt;
215 pwqt = kmalloc(sizeof(*pwqt), GFP_KERNEL);
217 return ERR_PTR(-ENOMEM);
219 preempt_notifier_init(&pwqt->notifier, &pwqr_preempt_running_ops);
220 preempt_notifier_register(&pwqt->notifier);
224 hlist_add_head(&pwqt->link, &b->tasks);
225 spin_unlock(&b->lock);
231 static void pwqr_task_detach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
235 pwqr_sb_lock_irqsave(sb, flags);
237 if (pwqt->notifier.ops == &pwqr_preempt_running_ops) {
238 __pwqr_sb_update_state(sb, -1);
240 __pwqr_sb_update_state(sb, 0);
242 pwqr_sb_unlock_irqrestore(sb, flags);
248 static void pwqr_task_attach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
252 pwqr_sb_lock_irqsave(sb, flags);
253 pwqr_sb_get(pwqt->sb = sb);
255 __pwqr_sb_update_state(sb, 1);
256 pwqr_sb_unlock_irqrestore(sb, flags);
260 static void pwqr_task_release(struct pwqr_task *pwqt, bool from_notifier)
262 struct pwqr_task_bucket *b = task_hbucket(pwqt->task);
265 hlist_del(&pwqt->link);
266 spin_unlock(&b->lock);
267 pwqt->notifier.ops = &pwqr_preempt_noop_ops;
270 /* When called from sched_{out,in}, it's not allowed to
271 * call preempt_notifier_unregister (or worse kfree())
273 * Though it's not a good idea to kfree() still registered
274 * callbacks if we're not dying, it'll panic on the next
275 * sched_{in,out} call.
277 BUG_ON(!(pwqt->task->state & TASK_DEAD));
278 kfree_rcu(pwqt, rcu);
280 preempt_notifier_unregister(&pwqt->notifier);
285 static void pwqr_task_noop_sched_in(struct preempt_notifier *notifier, int cpu)
289 static void pwqr_task_noop_sched_out(struct preempt_notifier *notifier,
290 struct task_struct *next)
294 static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cpu)
296 struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
297 struct pwqr_sb *sb = pwqt->sb;
300 if (unlikely(sb->state < 0)) {
301 pwqr_task_detach(pwqt, sb);
302 pwqr_task_release(pwqt, true);
306 pwqt->notifier.ops = &pwqr_preempt_running_ops;
307 pwqr_sb_lock_irqsave(sb, flags);
308 __pwqr_sb_update_state(sb, 1);
309 pwqr_sb_unlock_irqrestore(sb, flags);
312 static void pwqr_task_sched_out(struct preempt_notifier *notifier,
313 struct task_struct *next)
315 struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
316 struct pwqr_sb *sb = pwqt->sb;
317 struct task_struct *p = pwqt->task;
319 if (unlikely(p->state & TASK_DEAD) || unlikely(sb->state < 0)) {
320 pwqr_task_detach(pwqt, sb);
321 pwqr_task_release(pwqt, true);
324 if (p->state == 0 || (p->state & (__TASK_STOPPED | __TASK_TRACED)))
327 pwqt->notifier.ops = &pwqr_preempt_blocked_ops;
328 /* see preempt.h: irq are disabled for sched_out */
329 spin_lock(&sb->wqh.lock);
330 __pwqr_sb_update_state(sb, -1);
331 spin_unlock(&sb->wqh.lock);
334 static struct preempt_ops __read_mostly pwqr_preempt_noop_ops = {
335 .sched_in = pwqr_task_noop_sched_in,
336 .sched_out = pwqr_task_noop_sched_out,
339 static struct preempt_ops __read_mostly pwqr_preempt_running_ops = {
340 .sched_in = pwqr_task_noop_sched_in,
341 .sched_out = pwqr_task_sched_out,
344 static struct preempt_ops __read_mostly pwqr_preempt_blocked_ops = {
345 .sched_in = pwqr_task_blocked_sched_in,
346 .sched_out = pwqr_task_sched_out,
349 /*****************************************************************************
352 static int pwqr_open(struct inode *inode, struct file *filp)
356 sb = pwqr_sb_create();
359 filp->private_data = sb;
363 static int pwqr_release(struct inode *inode, struct file *filp)
365 struct pwqr_sb *sb = filp->private_data;
368 pwqr_sb_lock_irqsave(sb, flags);
369 sb->state = PWQR_STATE_DEAD;
370 pwqr_sb_unlock_irqrestore(sb, flags);
371 wake_up_all(&sb->wqh);
376 static unsigned int pwqr_poll(struct file *filp, poll_table *wait)
378 struct pwqr_sb *sb = filp->private_data;
379 unsigned int events = 0;
382 poll_wait(filp, &sb->wqh_poll, wait);
384 pwqr_sb_lock_irqsave(sb, flags);
385 if (sb->running > sb->concurrency)
389 pwqr_sb_unlock_irqrestore(sb, flags);
394 static inline ssize_t pwqr_sb_read(struct pwqr_sb *sb, int no_wait, u32 *cnt)
396 DECLARE_WAITQUEUE(wait, current);
397 ssize_t rc = -EAGAIN;
399 spin_lock_irq(&sb->wqh.lock);
400 if (sb->running > sb->concurrency) {
402 } else if (!no_wait) {
403 add_wait_queue(&sb->wqh_poll, &wait);
405 set_current_state(TASK_INTERRUPTIBLE);
406 if (sb->running > sb->concurrency) {
410 if (signal_pending(current)) {
414 spin_unlock_irq(&sb->wqh.lock);
416 spin_lock_irq(&sb->wqh.lock);
418 remove_wait_queue(&sb->wqh_poll, &wait);
419 __set_current_state(TASK_RUNNING);
422 *cnt = sb->running - sb->concurrency;
423 spin_unlock_irq(&sb->wqh.lock);
429 pwqr_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
431 struct pwqr_sb *sb = filp->private_data;
435 if (count < sizeof(cnt))
437 rc = pwqr_sb_read(sb, filp->f_flags & O_NONBLOCK, &cnt);
440 return put_user(cnt, (u32 __user *)buf) ? -EFAULT : sizeof(cnt);
444 do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
445 int is_wait, struct pwqr_ioc_wait __user *arg)
448 struct pwqr_ioc_wait wait;
452 preempt_notifier_unregister(&pwqt->notifier);
455 if (copy_from_user(&wait, arg, sizeof(wait))) {
459 if (unlikely((long)wait.pwqr_uaddr % sizeof(int) != 0)) {
465 pwqr_sb_lock_irqsave(sb, flags);
466 if (sb->running + sb->waiting <= sb->concurrency) {
468 while (probe_kernel_address(wait.pwqr_uaddr, uval)) {
469 pwqr_sb_unlock_irqrestore(sb, flags);
470 rc = get_user(uval, (u32 *)wait.pwqr_uaddr);
473 pwqr_sb_lock_irqsave(sb, flags);
476 if (uval != (u32)wait.pwqr_ticket) {
485 /* @ see <wait_event_interruptible_exclusive_locked_irq> */
486 if (likely(sb->state >= 0)) {
488 __wait.flags |= WQ_FLAG_EXCLUSIVE;
492 __add_wait_queue(&sb->wqh, &__wait);
495 __add_wait_queue_tail(&sb->wqh, &__wait);
497 __pwqr_sb_update_state(sb, -1);
500 set_current_state(TASK_INTERRUPTIBLE);
501 if (sb->overcommit_wakes)
503 if (signal_pending(current)) {
507 spin_unlock_irq(&sb->wqh.lock);
509 spin_lock_irq(&sb->wqh.lock);
512 if (sb->running + sb->waiting < sb->concurrency)
514 } while (likely(sb->state >= 0));
516 __remove_wait_queue(&sb->wqh, &__wait);
517 __set_current_state(TASK_RUNNING);
523 __pwqr_sb_update_state(sb, 1);
525 if (sb->overcommit_wakes)
526 sb->overcommit_wakes--;
527 if (sb->waiting + sb->running > sb->concurrency)
532 if (unlikely(sb->state < 0))
534 pwqr_sb_unlock_irqrestore(sb, flags);
536 preempt_notifier_register(&pwqt->notifier);
540 static long do_pwqr_unregister(struct pwqr_sb *sb, struct pwqr_task *pwqt)
546 pwqr_task_detach(pwqt, sb);
547 pwqr_task_release(pwqt, false);
551 static long do_pwqr_set_conc(struct pwqr_sb *sb, int conc)
553 long old_conc = sb->concurrency;
556 pwqr_sb_lock_irqsave(sb, flags);
558 conc = num_online_cpus();
559 if (conc != old_conc) {
560 sb->concurrency = conc;
561 __pwqr_sb_update_state(sb, 0);
563 pwqr_sb_unlock_irqrestore(sb, flags);
568 static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count)
576 pwqr_sb_lock_irqsave(sb, flags);
579 nwake = sb->waiting + sb->parked - sb->overcommit_wakes;
585 sb->overcommit_wakes += count;
586 } else if (sb->running + sb->overcommit_wakes < sb->concurrency) {
587 nwake = sb->concurrency - sb->overcommit_wakes - sb->running;
588 if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) {
589 nwake = sb->waiting + sb->parked -
590 sb->overcommit_wakes;
599 * This codepath deserves an explanation: waking the thread
600 * "for real" would overcommit, though userspace KNOWS there
601 * is at least one waiting thread. Such threads are threads
602 * that are "quarantined".
604 * Quarantined threads are woken up one by one, to allow a
605 * slow ramp down, trying to minimize "waiting" <-> "parked"
606 * flip-flops, no matter how many wakes have been asked.
608 * Since releasing one quarantined thread will wake up a
609 * thread that will (almost) straight go to parked mode, lie
610 * to userland about the fact that we unblocked that thread,
613 * Though if we're already waking all waiting threads for
614 * overcommitting jobs, well, we don't need that.
617 nwake = sb->waiting > sb->overcommit_wakes;
620 wake_up_locked(&sb->wqh);
621 pwqr_sb_unlock_irqrestore(sb, flags);
626 static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg)
628 struct pwqr_sb *sb = filp->private_data;
629 struct task_struct *task = current;
630 struct pwqr_task *pwqt;
635 return sb->concurrency;
637 return do_pwqr_set_conc(sb, (int)arg);
641 return do_pwqr_wake(sb, command == PWQR_WAKE_OC, (int)arg);
646 case PWQR_UNREGISTER:
652 pwqt = pwqr_task_find(task);
653 if (command == PWQR_UNREGISTER)
654 return do_pwqr_unregister(sb, pwqt);
657 pwqt = pwqr_task_create(task);
659 return PTR_ERR(pwqt);
660 pwqr_task_attach(pwqt, sb);
661 } else if (unlikely(pwqt->sb != sb)) {
662 pwqr_task_detach(pwqt, pwqt->sb);
663 pwqr_task_attach(pwqt, sb);
668 rc = do_pwqr_wait(sb, pwqt, true, (struct pwqr_ioc_wait __user *)arg);
671 rc = do_pwqr_wait(sb, pwqt, false, NULL);
675 if (unlikely(sb->state < 0)) {
676 pwqr_task_detach(pwqt, pwqt->sb);
682 static const struct file_operations pwqr_dev_fops = {
683 .owner = THIS_MODULE,
685 .release = pwqr_release,
688 .llseek = noop_llseek,
689 .unlocked_ioctl = pwqr_ioctl,
691 .compat_ioctl = pwqr_ioctl,
695 /*****************************************************************************
698 static int __init pwqr_start(void)
702 for (i = 0; i < PWQR_HASH_SIZE; i++) {
703 spin_lock_init(&pwqr_tasks_hash[i].lock);
704 INIT_HLIST_HEAD(&pwqr_tasks_hash[i].tasks);
707 /* Register as a character device */
708 pwqr_major = register_chrdev(0, "pwqr", &pwqr_dev_fops);
709 if (pwqr_major < 0) {
710 printk(KERN_ERR "pwqr: register_chrdev() failed\n");
714 /* Create a device node */
715 pwqr_class = class_create(THIS_MODULE, PWQR_DEVICE_NAME);
716 if (IS_ERR(pwqr_class)) {
717 printk(KERN_ERR "pwqr: Error creating raw class\n");
718 unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
719 return PTR_ERR(pwqr_class);
721 device_create(pwqr_class, NULL, MKDEV(pwqr_major, 0), NULL, PWQR_DEVICE_NAME);
722 printk(KERN_INFO "pwqr: PThreads Work Queues Regulator v1 loaded");
726 static void __exit pwqr_end(void)
729 device_destroy(pwqr_class, MKDEV(pwqr_major, 0));
730 class_destroy(pwqr_class);
731 unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
734 module_init(pwqr_start);
735 module_exit(pwqr_end);
737 MODULE_LICENSE("GPL");
738 MODULE_AUTHOR("Pierre Habouzit <pierre.habouzit@intersec.com>");
739 MODULE_DESCRIPTION("PThreads Work Queues Regulator");
741 // vim:noet:sw=8:cinoptions+=\:0,L-1,=1s: