2 * Copyright (C) 2012 Pierre Habouzit <pierre.habouzit@intersec.com>
3 * Copyright (C) 2012 Intersec SAS
5 * This file implements the Linux Pthread Workqueue Regulator, and is part
8 * The Linux Kernel is free software: you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License version 2 as published by
10 * the Free Software Foundation.
12 * The Linux Kernel is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
17 * You should have received a copy of the GNU General Public License version 2
18 * along with The Linux Kernel. If not, see <http://www.gnu.org/licenses/>.
21 #include <linux/cdev.h>
22 #include <linux/device.h>
23 #include <linux/file.h>
25 #include <linux/hash.h>
26 #include <linux/init.h>
27 #include <linux/kref.h>
28 #include <linux/module.h>
29 #include <linux/sched.h>
30 #include <linux/slab.h>
31 #include <linux/spinlock.h>
32 #include <linux/timer.h>
33 #include <linux/uaccess.h>
34 #include <linux/wait.h>
36 #ifndef CONFIG_PREEMPT_NOTIFIERS
37 # error PWQ module requires CONFIG_PREEMPT_NOTIFIERS
42 #define PWQR_UNPARK_DELAY (HZ / 10)
43 #define PWQR_HASH_BITS 5
44 #define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS)
46 struct pwqr_task_bucket {
48 struct hlist_head tasks;
54 struct timer_list timer;
55 wait_queue_head_t wqh;
64 unsigned overcommit_wakes;
70 struct preempt_notifier notifier;
71 struct hlist_node link;
73 struct task_struct *task;
80 static struct class *pwqr_class;
81 static int pwqr_major;
82 static struct pwqr_task_bucket pwqr_tasks_hash[PWQR_HASH_SIZE];
83 static struct preempt_ops pwqr_preempt_running_ops;
84 static struct preempt_ops pwqr_preempt_blocked_ops;
85 static struct preempt_ops pwqr_preempt_noop_ops;
87 /*****************************************************************************
91 #define pwqr_sb_lock_irqsave(sb, flags) \
92 spin_lock_irqsave(&(sb)->wqh.lock, flags)
93 #define pwqr_sb_unlock_irqrestore(sb, flags) \
94 spin_unlock_irqrestore(&(sb)->wqh.lock, flags)
96 static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta)
98 sb->running += running_delta;
99 if (sb->running > sb->concurrency) {
100 /* TODO see ../Documentation/pwqr.adoc */
101 } else if (sb->running == sb->concurrency) {
103 } else if (sb->waiting == 0 && sb->parked) {
104 if (!timer_pending(&sb->timer)) {
105 mod_timer(&sb->timer, jiffies + PWQR_UNPARK_DELAY);
110 if (timer_pending(&sb->timer))
111 del_timer(&sb->timer);
114 static void pwqr_sb_timer_cb(unsigned long arg)
116 struct pwqr_sb *sb = (struct pwqr_sb *)arg;
119 pwqr_sb_lock_irqsave(sb, flags);
120 if (sb->waiting == 0 && sb->parked && sb->running < sb->concurrency) {
121 if (sb->overcommit_wakes == 0)
122 wake_up_locked(&sb->wqh);
124 pwqr_sb_unlock_irqrestore(sb, flags);
127 static struct pwqr_sb *pwqr_sb_create(void)
131 sb = kzalloc(sizeof(struct pwqr_sb), GFP_KERNEL);
133 return ERR_PTR(-ENOMEM);
135 kref_init(&sb->kref);
136 init_waitqueue_head(&sb->wqh);
137 sb->tgid = current->tgid;
138 sb->concurrency = num_online_cpus();
139 init_timer(&sb->timer);
140 sb->timer.function = pwqr_sb_timer_cb;
141 sb->timer.data = (unsigned long)sb;
143 __module_get(THIS_MODULE);
146 static inline void pwqr_sb_get(struct pwqr_sb *sb)
151 static void pwqr_sb_finalize(struct rcu_head *rcu)
153 struct pwqr_sb *sb = container_of(rcu, struct pwqr_sb, rcu);
155 module_put(THIS_MODULE);
159 static void pwqr_sb_release(struct kref *kref)
161 struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
163 del_timer_sync(&sb->timer);
164 call_rcu(&sb->rcu, pwqr_sb_finalize);
166 static inline void pwqr_sb_put(struct pwqr_sb *sb)
168 kref_put(&sb->kref, pwqr_sb_release);
171 /*****************************************************************************
174 static inline struct pwqr_task_bucket *task_hbucket(struct task_struct *task)
176 return &pwqr_tasks_hash[hash_ptr(task, PWQR_HASH_BITS)];
179 static struct pwqr_task *pwqr_task_find(struct task_struct *task)
181 struct pwqr_task_bucket *b = task_hbucket(task);
182 struct hlist_node *node;
183 struct pwqr_task *pwqt = NULL;
186 hlist_for_each_entry(pwqt, node, &b->tasks, link) {
187 if (pwqt->task == task)
190 spin_unlock(&b->lock);
194 static struct pwqr_task *pwqr_task_create(struct task_struct *task)
196 struct pwqr_task_bucket *b = task_hbucket(task);
197 struct pwqr_task *pwqt;
199 pwqt = kmalloc(sizeof(*pwqt), GFP_KERNEL);
201 return ERR_PTR(-ENOMEM);
203 preempt_notifier_init(&pwqt->notifier, &pwqr_preempt_running_ops);
204 preempt_notifier_register(&pwqt->notifier);
208 hlist_add_head(&pwqt->link, &b->tasks);
209 spin_unlock(&b->lock);
215 static void pwqr_task_detach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
219 pwqr_sb_lock_irqsave(sb, flags);
221 if (pwqt->notifier.ops == &pwqr_preempt_running_ops) {
222 __pwqr_sb_update_state(sb, -1);
224 __pwqr_sb_update_state(sb, 0);
226 pwqr_sb_unlock_irqrestore(sb, flags);
232 static void pwqr_task_attach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
236 pwqr_sb_lock_irqsave(sb, flags);
237 pwqr_sb_get(pwqt->sb = sb);
239 __pwqr_sb_update_state(sb, 1);
240 pwqr_sb_unlock_irqrestore(sb, flags);
244 static void pwqr_task_release(struct pwqr_task *pwqt, bool from_notifier)
246 struct pwqr_task_bucket *b = task_hbucket(pwqt->task);
249 hlist_del(&pwqt->link);
250 spin_unlock(&b->lock);
251 pwqt->notifier.ops = &pwqr_preempt_noop_ops;
254 /* When called from sched_{out,in}, it's not allowed to
255 * call preempt_notifier_unregister (or worse kfree())
257 * Though it's not a good idea to kfree() still registered
258 * callbacks if we're not dying, it'll panic on the next
259 * sched_{in,out} call.
261 BUG_ON(!(pwqt->task->state & TASK_DEAD));
262 kfree_rcu(pwqt, rcu);
264 preempt_notifier_unregister(&pwqt->notifier);
269 static void pwqr_task_noop_sched_in(struct preempt_notifier *notifier, int cpu)
273 static void pwqr_task_noop_sched_out(struct preempt_notifier *notifier,
274 struct task_struct *next)
278 static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cpu)
280 struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
281 struct pwqr_sb *sb = pwqt->sb;
284 if (unlikely(sb->dead)) {
285 pwqr_task_detach(pwqt, sb);
286 pwqr_task_release(pwqt, true);
290 pwqt->notifier.ops = &pwqr_preempt_running_ops;
291 pwqr_sb_lock_irqsave(sb, flags);
292 __pwqr_sb_update_state(sb, 1);
293 pwqr_sb_unlock_irqrestore(sb, flags);
296 static void pwqr_task_sched_out(struct preempt_notifier *notifier,
297 struct task_struct *next)
299 struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
300 struct pwqr_sb *sb = pwqt->sb;
301 struct task_struct *p = pwqt->task;
303 if (unlikely(p->state & TASK_DEAD) || unlikely(sb->dead)) {
304 pwqr_task_detach(pwqt, sb);
305 pwqr_task_release(pwqt, true);
308 if (p->state == 0 || (p->state & (__TASK_STOPPED | __TASK_TRACED)))
311 pwqt->notifier.ops = &pwqr_preempt_blocked_ops;
312 /* see preempt.h: irq are disabled for sched_out */
313 spin_lock(&sb->wqh.lock);
314 __pwqr_sb_update_state(sb, -1);
315 spin_unlock(&sb->wqh.lock);
318 static struct preempt_ops __read_mostly pwqr_preempt_noop_ops = {
319 .sched_in = pwqr_task_noop_sched_in,
320 .sched_out = pwqr_task_noop_sched_out,
323 static struct preempt_ops __read_mostly pwqr_preempt_running_ops = {
324 .sched_in = pwqr_task_noop_sched_in,
325 .sched_out = pwqr_task_sched_out,
328 static struct preempt_ops __read_mostly pwqr_preempt_blocked_ops = {
329 .sched_in = pwqr_task_blocked_sched_in,
330 .sched_out = pwqr_task_sched_out,
333 /*****************************************************************************
336 static int pwqr_open(struct inode *inode, struct file *filp)
340 sb = pwqr_sb_create();
343 filp->private_data = sb;
347 static int pwqr_release(struct inode *inode, struct file *filp)
349 struct pwqr_sb *sb = filp->private_data;
352 pwqr_sb_lock_irqsave(sb, flags);
354 pwqr_sb_unlock_irqrestore(sb, flags);
355 wake_up_all(&sb->wqh);
361 do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
362 int is_wait, struct pwqr_ioc_wait __user *arg)
365 struct pwqr_ioc_wait wait;
369 preempt_notifier_unregister(&pwqt->notifier);
372 if (copy_from_user(&wait, arg, sizeof(wait))) {
376 if (unlikely((long)wait.pwqr_uaddr % sizeof(int) != 0)) {
382 pwqr_sb_lock_irqsave(sb, flags);
383 if (sb->running + sb->waiting <= sb->concurrency) {
385 while (probe_kernel_address(wait.pwqr_uaddr, uval)) {
386 pwqr_sb_unlock_irqrestore(sb, flags);
387 rc = get_user(uval, (u32 *)wait.pwqr_uaddr);
390 pwqr_sb_lock_irqsave(sb, flags);
393 if (uval != (u32)wait.pwqr_ticket) {
402 /* @ see <wait_event_interruptible_exclusive_locked_irq> */
403 if (likely(!sb->dead)) {
406 __wait.flags |= WQ_FLAG_EXCLUSIVE;
410 __add_wait_queue(&sb->wqh, &__wait);
413 __add_wait_queue_tail(&sb->wqh, &__wait);
415 __pwqr_sb_update_state(sb, -1);
416 set_current_state(TASK_INTERRUPTIBLE);
419 if (sb->overcommit_wakes)
421 if (signal_pending(current)) {
425 spin_unlock_irq(&sb->wqh.lock);
427 spin_lock_irq(&sb->wqh.lock);
430 if (sb->running + sb->waiting < sb->concurrency)
432 } while (likely(!sb->dead));
434 __remove_wait_queue(&sb->wqh, &__wait);
435 __set_current_state(TASK_RUNNING);
442 __pwqr_sb_update_state(sb, 1);
443 if (sb->overcommit_wakes)
444 sb->overcommit_wakes--;
445 if (sb->waiting + sb->running > sb->concurrency)
450 if (unlikely(sb->dead))
452 pwqr_sb_unlock_irqrestore(sb, flags);
454 preempt_notifier_register(&pwqt->notifier);
458 static long do_pwqr_unregister(struct pwqr_sb *sb, struct pwqr_task *pwqt)
464 pwqr_task_detach(pwqt, sb);
465 pwqr_task_release(pwqt, false);
469 static long do_pwqr_set_conc(struct pwqr_sb *sb, int conc)
471 long old_conc = sb->concurrency;
474 pwqr_sb_lock_irqsave(sb, flags);
476 conc = num_online_cpus();
477 if (conc != old_conc) {
478 sb->concurrency = conc;
479 __pwqr_sb_update_state(sb, 0);
481 pwqr_sb_unlock_irqrestore(sb, flags);
486 static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count)
494 pwqr_sb_lock_irqsave(sb, flags);
497 nwake = sb->waiting + sb->parked - sb->overcommit_wakes;
503 sb->overcommit_wakes += count;
504 } else if (sb->running + sb->overcommit_wakes < sb->concurrency) {
505 nwake = sb->concurrency - sb->overcommit_wakes - sb->running;
506 if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) {
507 nwake = sb->waiting + sb->parked -
508 sb->overcommit_wakes;
517 * This codepath deserves an explanation: waking the thread
518 * "for real" would overcommit, though userspace KNOWS there
519 * is at least one waiting thread. Such threads are threads
520 * that are "quarantined".
522 * Quarantined threads are woken up one by one, to allow a
523 * slow ramp down, trying to minimize "waiting" <-> "parked"
524 * flip-flops, no matter how many wakes have been asked.
526 * Since releasing one quarantined thread will wake up a
527 * thread that will (almost) straight go to parked mode, lie
528 * to userland about the fact that we unblocked that thread,
531 * Though if we're already waking all waiting threads for
532 * overcommitting jobs, well, we don't need that.
535 nwake = sb->waiting > sb->overcommit_wakes;
538 wake_up_locked(&sb->wqh);
539 pwqr_sb_unlock_irqrestore(sb, flags);
544 static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg)
546 struct pwqr_sb *sb = filp->private_data;
547 struct task_struct *task = current;
548 struct pwqr_task *pwqt;
551 if (sb->tgid != current->tgid)
556 return sb->concurrency;
558 return do_pwqr_set_conc(sb, (int)arg);
562 return do_pwqr_wake(sb, command == PWQR_WAKE_OC, (int)arg);
567 case PWQR_UNREGISTER:
573 pwqt = pwqr_task_find(task);
574 if (command == PWQR_UNREGISTER)
575 return do_pwqr_unregister(sb, pwqt);
578 pwqt = pwqr_task_create(task);
580 return PTR_ERR(pwqt);
581 pwqr_task_attach(pwqt, sb);
582 } else if (unlikely(pwqt->sb != sb)) {
583 pwqr_task_detach(pwqt, pwqt->sb);
584 pwqr_task_attach(pwqt, sb);
589 rc = do_pwqr_wait(sb, pwqt, true, (struct pwqr_ioc_wait __user *)arg);
592 rc = do_pwqr_wait(sb, pwqt, false, NULL);
596 if (unlikely(sb->dead)) {
597 pwqr_task_detach(pwqt, pwqt->sb);
603 static const struct file_operations pwqr_dev_fops = {
604 .owner = THIS_MODULE,
606 .release = pwqr_release,
607 .unlocked_ioctl = pwqr_ioctl,
609 .compat_ioctl = pwqr_ioctl,
613 /*****************************************************************************
616 static int __init pwqr_start(void)
620 for (i = 0; i < PWQR_HASH_SIZE; i++) {
621 spin_lock_init(&pwqr_tasks_hash[i].lock);
622 INIT_HLIST_HEAD(&pwqr_tasks_hash[i].tasks);
625 /* Register as a character device */
626 pwqr_major = register_chrdev(0, "pwqr", &pwqr_dev_fops);
627 if (pwqr_major < 0) {
628 printk(KERN_ERR "pwqr: register_chrdev() failed\n");
632 /* Create a device node */
633 pwqr_class = class_create(THIS_MODULE, PWQR_DEVICE_NAME);
634 if (IS_ERR(pwqr_class)) {
635 printk(KERN_ERR "pwqr: Error creating raw class\n");
636 unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
637 return PTR_ERR(pwqr_class);
639 device_create(pwqr_class, NULL, MKDEV(pwqr_major, 0), NULL, PWQR_DEVICE_NAME);
640 printk(KERN_INFO "pwqr: PThreads Work Queues Regulator v1 loaded");
644 static void __exit pwqr_end(void)
647 device_destroy(pwqr_class, MKDEV(pwqr_major, 0));
648 class_destroy(pwqr_class);
649 unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
652 module_init(pwqr_start);
653 module_exit(pwqr_end);
655 MODULE_LICENSE("GPL");
656 MODULE_AUTHOR("Pierre Habouzit <pierre.habouzit@intersec.com>");
657 MODULE_DESCRIPTION("PThreads Work Queues Regulator");
659 // vim:noet:sw=8:cinoptions+=\:0,L-1,=1s: