2 * Copyright (C) 2012 Pierre Habouzit <pierre.habouzit@intersec.com>
3 * Copyright (C) 2012 Intersec SAS
5 * This file implements the Linux Pthread Workqueue Regulator, and is part
8 * The Linux Kernel is free software: you can redistribute it and/or modify it
9 * under the terms of the GNU General Public License version 2 as published by
10 * the Free Software Foundation.
12 * The Linux Kernel is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 * License for more details.
17 * You should have received a copy of the GNU General Public License version 2
18 * along with The Linux Kernel. If not, see <http://www.gnu.org/licenses/>.
21 #include <linux/cdev.h>
22 #include <linux/device.h>
23 #include <linux/file.h>
25 #include <linux/hash.h>
26 #include <linux/init.h>
27 #include <linux/kref.h>
28 #include <linux/module.h>
29 #include <linux/sched.h>
30 #include <linux/slab.h>
31 #include <linux/spinlock.h>
32 #include <linux/uaccess.h>
33 #include <linux/wait.h>
35 #ifndef CONFIG_PREEMPT_NOTIFIERS
36 # error PWQ module requires CONFIG_PREEMPT_NOTIFIERS
41 #define PWQR_HASH_BITS 5
42 #define PWQR_HASH_SIZE (1 << PWQR_HASH_BITS)
44 struct pwqr_task_bucket {
46 struct hlist_head tasks;
52 wait_queue_head_t wqh;
62 unsigned overcommit_wakes;
68 struct preempt_notifier notifier;
69 struct hlist_node link;
71 struct task_struct *task;
78 static struct class *pwqr_class;
79 static int pwqr_major;
80 static struct pwqr_task_bucket pwqr_tasks_hash[PWQR_HASH_SIZE];
81 static struct preempt_ops pwqr_preempt_running_ops;
82 static struct preempt_ops pwqr_preempt_blocked_ops;
83 static struct preempt_ops pwqr_preempt_noop_ops;
85 /*****************************************************************************
89 #define pwqr_sb_lock_irqsave(sb, flags) \
90 spin_lock_irqsave(&(sb)->wqh.lock, flags)
91 #define pwqr_sb_unlock_irqrestore(sb, flags) \
92 spin_unlock_irqrestore(&(sb)->wqh.lock, flags)
94 static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta)
98 sb->running += running_delta;
99 overcommit = sb->running + sb->waiting - sb->concurrency;
103 if (overcommit > 0) {
104 if (overcommit > sb->waiting) {
105 sb->quarantined += sb->waiting;
108 sb->quarantined += overcommit;
109 sb->waiting -= overcommit;
112 unsigned undercommit = -overcommit;
114 if (undercommit < sb->quarantined) {
115 sb->waiting += undercommit;
116 sb->quarantined -= undercommit;
117 } else if (sb->quarantined) {
118 sb->waiting += sb->quarantined;
120 } else if (sb->waiting == 0 && sb->parked) {
121 wake_up_locked(&sb->wqh);
126 static struct pwqr_sb *pwqr_sb_create(void)
130 sb = kzalloc(sizeof(struct pwqr_sb), GFP_KERNEL);
132 return ERR_PTR(-ENOMEM);
134 kref_init(&sb->kref);
135 init_waitqueue_head(&sb->wqh);
136 sb->tgid = current->tgid;
137 sb->concurrency = num_online_cpus();
139 __module_get(THIS_MODULE);
142 static inline void pwqr_sb_get(struct pwqr_sb *sb)
147 static void pwqr_sb_finalize(struct rcu_head *rcu)
149 struct pwqr_sb *sb = container_of(rcu, struct pwqr_sb, rcu);
151 module_put(THIS_MODULE);
155 static void pwqr_sb_release(struct kref *kref)
157 struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
159 call_rcu(&sb->rcu, pwqr_sb_finalize);
161 static inline void pwqr_sb_put(struct pwqr_sb *sb)
163 kref_put(&sb->kref, pwqr_sb_release);
166 /*****************************************************************************
169 static inline struct pwqr_task_bucket *task_hbucket(struct task_struct *task)
171 return &pwqr_tasks_hash[hash_ptr(task, PWQR_HASH_BITS)];
174 static struct pwqr_task *pwqr_task_find(struct task_struct *task)
176 struct pwqr_task_bucket *b = task_hbucket(task);
177 struct hlist_node *node;
178 struct pwqr_task *pwqt = NULL;
181 hlist_for_each_entry(pwqt, node, &b->tasks, link) {
182 if (pwqt->task == task)
185 spin_unlock(&b->lock);
189 static struct pwqr_task *pwqr_task_create(struct task_struct *task)
191 struct pwqr_task_bucket *b = task_hbucket(task);
192 struct pwqr_task *pwqt;
194 pwqt = kmalloc(sizeof(*pwqt), GFP_KERNEL);
196 return ERR_PTR(-ENOMEM);
198 preempt_notifier_init(&pwqt->notifier, &pwqr_preempt_running_ops);
199 preempt_notifier_register(&pwqt->notifier);
203 hlist_add_head(&pwqt->link, &b->tasks);
204 spin_unlock(&b->lock);
210 static void pwqr_task_detach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
214 pwqr_sb_lock_irqsave(sb, flags);
216 if (pwqt->notifier.ops == &pwqr_preempt_running_ops) {
217 __pwqr_sb_update_state(sb, -1);
219 __pwqr_sb_update_state(sb, 0);
221 pwqr_sb_unlock_irqrestore(sb, flags);
227 static void pwqr_task_attach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
231 pwqr_sb_lock_irqsave(sb, flags);
232 pwqr_sb_get(pwqt->sb = sb);
234 __pwqr_sb_update_state(sb, 1);
235 pwqr_sb_unlock_irqrestore(sb, flags);
239 static void pwqr_task_release(struct pwqr_task *pwqt, bool from_notifier)
241 struct pwqr_task_bucket *b = task_hbucket(pwqt->task);
244 hlist_del(&pwqt->link);
245 spin_unlock(&b->lock);
246 pwqt->notifier.ops = &pwqr_preempt_noop_ops;
249 /* When called from sched_{out,in}, it's not allowed to
250 * call preempt_notifier_unregister (or worse kfree())
252 * Though it's not a good idea to kfree() still registered
253 * callbacks if we're not dying, it'll panic on the next
254 * sched_{in,out} call.
256 BUG_ON(!(pwqt->task->state & TASK_DEAD));
257 kfree_rcu(pwqt, rcu);
259 preempt_notifier_unregister(&pwqt->notifier);
264 static void pwqr_task_noop_sched_in(struct preempt_notifier *notifier, int cpu)
268 static void pwqr_task_noop_sched_out(struct preempt_notifier *notifier,
269 struct task_struct *next)
273 static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cpu)
275 struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
276 struct pwqr_sb *sb = pwqt->sb;
279 if (unlikely(sb->dead)) {
280 pwqr_task_detach(pwqt, sb);
281 pwqr_task_release(pwqt, true);
285 pwqt->notifier.ops = &pwqr_preempt_running_ops;
286 pwqr_sb_lock_irqsave(sb, flags);
287 __pwqr_sb_update_state(sb, 1);
288 pwqr_sb_unlock_irqrestore(sb, flags);
291 static void pwqr_task_sched_out(struct preempt_notifier *notifier,
292 struct task_struct *next)
294 struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
295 struct pwqr_sb *sb = pwqt->sb;
296 struct task_struct *p = pwqt->task;
298 if (unlikely(p->state & TASK_DEAD) || unlikely(sb->dead)) {
299 pwqr_task_detach(pwqt, sb);
300 pwqr_task_release(pwqt, true);
303 if (p->state == 0 || (p->state & (__TASK_STOPPED | __TASK_TRACED)))
306 pwqt->notifier.ops = &pwqr_preempt_blocked_ops;
307 /* see preempt.h: irq are disabled for sched_out */
308 spin_lock(&sb->wqh.lock);
309 __pwqr_sb_update_state(sb, -1);
310 spin_unlock(&sb->wqh.lock);
313 static struct preempt_ops __read_mostly pwqr_preempt_noop_ops = {
314 .sched_in = pwqr_task_noop_sched_in,
315 .sched_out = pwqr_task_noop_sched_out,
318 static struct preempt_ops __read_mostly pwqr_preempt_running_ops = {
319 .sched_in = pwqr_task_noop_sched_in,
320 .sched_out = pwqr_task_sched_out,
323 static struct preempt_ops __read_mostly pwqr_preempt_blocked_ops = {
324 .sched_in = pwqr_task_blocked_sched_in,
325 .sched_out = pwqr_task_sched_out,
328 /*****************************************************************************
331 static int pwqr_open(struct inode *inode, struct file *filp)
335 sb = pwqr_sb_create();
338 filp->private_data = sb;
342 static int pwqr_release(struct inode *inode, struct file *filp)
344 struct pwqr_sb *sb = filp->private_data;
347 pwqr_sb_lock_irqsave(sb, flags);
349 pwqr_sb_unlock_irqrestore(sb, flags);
350 wake_up_all(&sb->wqh);
356 do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
357 int in_pool, struct pwqr_ioc_wait __user *arg)
360 struct pwqr_ioc_wait wait;
364 preempt_notifier_unregister(&pwqt->notifier);
366 if (in_pool && copy_from_user(&wait, arg, sizeof(wait))) {
371 pwqr_sb_lock_irqsave(sb, flags);
372 if (sb->running + sb->waiting <= sb->concurrency) {
374 while (probe_kernel_address(wait.pwqr_uaddr, uval)) {
375 pwqr_sb_unlock_irqrestore(sb, flags);
376 rc = get_user(uval, (u32 *)wait.pwqr_uaddr);
379 pwqr_sb_lock_irqsave(sb, flags);
382 if (uval != (u32)wait.pwqr_ticket) {
387 BUG_ON(sb->quarantined != 0);
392 /* @ see <wait_event_interruptible_exclusive_locked_irq> */
393 if (likely(!sb->dead)) {
396 __wait.flags |= WQ_FLAG_EXCLUSIVE;
400 __add_wait_queue(&sb->wqh, &__wait);
403 __add_wait_queue_tail(&sb->wqh, &__wait);
405 __pwqr_sb_update_state(sb, -1);
406 set_current_state(TASK_INTERRUPTIBLE);
409 if (sb->overcommit_wakes)
411 if (signal_pending(current)) {
415 spin_unlock_irq(&sb->wqh.lock);
417 spin_lock_irq(&sb->wqh.lock);
418 if (in_pool && sb->waiting)
420 if (sb->running + sb->waiting < sb->concurrency)
422 } while (likely(!sb->dead));
424 __remove_wait_queue(&sb->wqh, &__wait);
425 __set_current_state(TASK_RUNNING);
431 BUG_ON(!sb->quarantined);
437 __pwqr_sb_update_state(sb, 1);
438 if (sb->overcommit_wakes)
439 sb->overcommit_wakes--;
440 if (sb->waiting + sb->running > sb->concurrency)
445 if (unlikely(sb->dead))
447 pwqr_sb_unlock_irqrestore(sb, flags);
449 preempt_notifier_register(&pwqt->notifier);
453 static long do_pwqr_unregister(struct pwqr_sb *sb, struct pwqr_task *pwqt)
459 pwqr_task_detach(pwqt, sb);
460 pwqr_task_release(pwqt, false);
464 static long do_pwqr_set_conc(struct pwqr_sb *sb, int conc)
466 long old_conc = sb->concurrency;
469 pwqr_sb_lock_irqsave(sb, flags);
471 conc = num_online_cpus();
472 if (conc != old_conc) {
473 sb->concurrency = conc;
474 __pwqr_sb_update_state(sb, 0);
476 pwqr_sb_unlock_irqrestore(sb, flags);
481 static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count)
489 pwqr_sb_lock_irqsave(sb, flags);
492 nwake = sb->waiting + sb->quarantined + sb->parked - sb->overcommit_wakes;
498 sb->overcommit_wakes += count;
499 } else if (sb->running + sb->overcommit_wakes < sb->concurrency) {
500 nwake = sb->concurrency - sb->overcommit_wakes - sb->running;
508 * This codepath deserves an explanation: when the thread is
509 * quarantined, for is, really, it's already "parked". Though
510 * userland doesn't know about, so wake as many threads as
511 * userlands would have liked to, and let the wakeup tell
512 * userland those should be parked.
514 * That's why we lie about the number of woken threads,
515 * really, userlandwise we woke up a thread so that it could
516 * be parked for real and avoid spurious syscalls. So it's as
517 * if we woke up 0 threads.
519 nwake = sb->quarantined;
520 if (sb->waiting < sb->overcommit_wakes)
521 nwake -= sb->overcommit_wakes - sb->waiting;
527 wake_up_locked(&sb->wqh);
528 pwqr_sb_unlock_irqrestore(sb, flags);
533 static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg)
535 struct pwqr_sb *sb = filp->private_data;
536 struct task_struct *task = current;
537 struct pwqr_task *pwqt;
540 if (sb->tgid != current->tgid)
545 return sb->concurrency;
547 return do_pwqr_set_conc(sb, (int)arg);
551 return do_pwqr_wake(sb, command == PWQR_WAKE_OC, (int)arg);
556 case PWQR_UNREGISTER:
562 pwqt = pwqr_task_find(task);
563 if (command == PWQR_UNREGISTER)
564 return do_pwqr_unregister(sb, pwqt);
567 pwqt = pwqr_task_create(task);
569 return PTR_ERR(pwqt);
570 pwqr_task_attach(pwqt, sb);
571 } else if (unlikely(pwqt->sb != sb)) {
572 pwqr_task_detach(pwqt, pwqt->sb);
573 pwqr_task_attach(pwqt, sb);
578 rc = do_pwqr_wait(sb, pwqt, true, (struct pwqr_ioc_wait __user *)arg);
581 rc = do_pwqr_wait(sb, pwqt, false, NULL);
585 if (unlikely(sb->dead)) {
586 pwqr_task_detach(pwqt, pwqt->sb);
592 static const struct file_operations pwqr_dev_fops = {
593 .owner = THIS_MODULE,
595 .release = pwqr_release,
596 .unlocked_ioctl = pwqr_ioctl,
598 .compat_ioctl = pwqr_ioctl,
602 /*****************************************************************************
605 static int __init pwqr_start(void)
609 for (i = 0; i < PWQR_HASH_SIZE; i++) {
610 spin_lock_init(&pwqr_tasks_hash[i].lock);
611 INIT_HLIST_HEAD(&pwqr_tasks_hash[i].tasks);
614 /* Register as a character device */
615 pwqr_major = register_chrdev(0, "pwqr", &pwqr_dev_fops);
616 if (pwqr_major < 0) {
617 printk(KERN_ERR "pwqr: register_chrdev() failed\n");
621 /* Create a device node */
622 pwqr_class = class_create(THIS_MODULE, PWQR_DEVICE_NAME);
623 if (IS_ERR(pwqr_class)) {
624 printk(KERN_ERR "pwqr: Error creating raw class\n");
625 unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
626 return PTR_ERR(pwqr_class);
628 device_create(pwqr_class, NULL, MKDEV(pwqr_major, 0), NULL, PWQR_DEVICE_NAME);
629 printk(KERN_INFO "pwqr: PThreads Work Queues Regulator v1 loaded");
633 static void __exit pwqr_end(void)
636 device_destroy(pwqr_class, MKDEV(pwqr_major, 0));
637 class_destroy(pwqr_class);
638 unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
641 module_init(pwqr_start);
642 module_exit(pwqr_end);
644 MODULE_LICENSE("GPL");
645 MODULE_AUTHOR("Pierre Habouzit <pierre.habouzit@intersec.com>");
646 MODULE_DESCRIPTION("PThreads Work Queues Regulator");
648 // vim:noet:sw=8:cinoptions+=\:0,L-1,=1s: