81715965eb01e54968c5af20914eab4dc88e6c7b
[~madcoder/pwqr.git] / kernel / pwqr.c
1 /*
2  * Copyright (C) 2012   Pierre Habouzit <pierre.habouzit@intersec.com>
3  * Copyright (C) 2012   Intersec SAS
4  *
5  * This file implements the Linux Pthread Workqueue Regulator, and is part
6  * of the linux kernel.
7  *
8  * The Linux Kernel is free software: you can redistribute it and/or modify it
9  * under the terms of the GNU General Public License version 2 as published by
10  * the Free Software Foundation.
11  *
12  * The Linux Kernel is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
15  * License for more details.
16  *
17  * You should have received a copy of the GNU General Public License version 2
18  * along with The Linux Kernel.  If not, see <http://www.gnu.org/licenses/>.
19  */
20
21 #include <linux/cdev.h>
22 #include <linux/device.h>
23 #include <linux/file.h>
24 #include <linux/fs.h>
25 #include <linux/hash.h>
26 #include <linux/init.h>
27 #include <linux/kref.h>
28 #include <linux/module.h>
29 #include <linux/sched.h>
30 #include <linux/slab.h>
31 #include <linux/spinlock.h>
32 #include <linux/timer.h>
33 #include <linux/uaccess.h>
34 #include <linux/wait.h>
35
36 #ifndef CONFIG_PREEMPT_NOTIFIERS
37 #  error PWQ module requires CONFIG_PREEMPT_NOTIFIERS
38 #endif
39
40 #include "pwqr.h"
41
42 #define PWQR_UNPARK_DELAY       (HZ / 10)
43 #define PWQR_HASH_BITS          5
44 #define PWQR_HASH_SIZE          (1 << PWQR_HASH_BITS)
45
46 struct pwqr_task_bucket {
47         spinlock_t              lock;
48         struct hlist_head       tasks;
49 };
50
51 struct pwqr_sb {
52         struct kref             kref;
53         struct rcu_head         rcu;
54         struct timer_list       timer;
55         wait_queue_head_t       wqh;
56         pid_t                   tgid;
57
58         unsigned                concurrency;
59         unsigned                registered;
60
61         unsigned                running;
62         unsigned                waiting;
63         unsigned                parked;
64         unsigned                overcommit_wakes;
65
66         unsigned                dead;
67 };
68
69 struct pwqr_task {
70         struct preempt_notifier notifier;
71         struct hlist_node       link;
72         struct rcu_head         rcu;
73         struct task_struct     *task;
74         struct pwqr_sb         *sb;
75 };
76
77 /*
78  * Global variables
79  */
80 static struct class            *pwqr_class;
81 static int                      pwqr_major;
82 static struct pwqr_task_bucket  pwqr_tasks_hash[PWQR_HASH_SIZE];
83 static struct preempt_ops       pwqr_preempt_running_ops;
84 static struct preempt_ops       pwqr_preempt_blocked_ops;
85 static struct preempt_ops       pwqr_preempt_noop_ops;
86
87 /*****************************************************************************
88  * Scoreboards
89  */
90
91 #define pwqr_sb_lock_irqsave(sb, flags) \
92         spin_lock_irqsave(&(sb)->wqh.lock, flags)
93 #define pwqr_sb_unlock_irqrestore(sb, flags) \
94         spin_unlock_irqrestore(&(sb)->wqh.lock, flags)
95
96 static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta)
97 {
98         sb->running += running_delta;
99         if (sb->running > sb->concurrency) {
100                 /* TODO see ../Documentation/pwqr.adoc */
101         } else if (sb->running == sb->concurrency) {
102                 /* do nothing */
103         } else if (sb->waiting == 0 && sb->parked) {
104                 if (!timer_pending(&sb->timer)) {
105                         mod_timer(&sb->timer, jiffies + PWQR_UNPARK_DELAY);
106                 }
107                 return;
108         }
109
110         if (timer_pending(&sb->timer))
111                 del_timer(&sb->timer);
112 }
113
114 static void pwqr_sb_timer_cb(unsigned long arg)
115 {
116         struct pwqr_sb *sb = (struct pwqr_sb *)arg;
117         unsigned long flags;
118
119         pwqr_sb_lock_irqsave(sb, flags);
120         if (sb->waiting == 0 && sb->parked && sb->running < sb->concurrency) {
121                 if (sb->overcommit_wakes == 0)
122                         wake_up_locked(&sb->wqh);
123         }
124         pwqr_sb_unlock_irqrestore(sb, flags);
125 }
126
127 static struct pwqr_sb *pwqr_sb_create(void)
128 {
129         struct pwqr_sb *sb;
130
131         sb = kzalloc(sizeof(struct pwqr_sb), GFP_KERNEL);
132         if (sb == NULL)
133                 return ERR_PTR(-ENOMEM);
134
135         kref_init(&sb->kref);
136         init_waitqueue_head(&sb->wqh);
137         sb->tgid        = current->tgid;
138         sb->concurrency = num_online_cpus();
139         init_timer(&sb->timer);
140         sb->timer.function = pwqr_sb_timer_cb;
141         sb->timer.data     = (unsigned long)sb;
142
143         __module_get(THIS_MODULE);
144         return sb;
145 }
146 static inline void pwqr_sb_get(struct pwqr_sb *sb)
147 {
148         kref_get(&sb->kref);
149 }
150
151 static void pwqr_sb_finalize(struct rcu_head *rcu)
152 {
153         struct pwqr_sb *sb = container_of(rcu, struct pwqr_sb, rcu);
154
155         module_put(THIS_MODULE);
156         kfree(sb);
157 }
158
159 static void pwqr_sb_release(struct kref *kref)
160 {
161         struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
162
163         del_timer_sync(&sb->timer);
164         call_rcu(&sb->rcu, pwqr_sb_finalize);
165 }
166 static inline void pwqr_sb_put(struct pwqr_sb *sb)
167 {
168         kref_put(&sb->kref, pwqr_sb_release);
169 }
170
171 /*****************************************************************************
172  * tasks
173  */
174 static inline struct pwqr_task_bucket *task_hbucket(struct task_struct *task)
175 {
176         return &pwqr_tasks_hash[hash_ptr(task, PWQR_HASH_BITS)];
177 }
178
179 static struct pwqr_task *pwqr_task_find(struct task_struct *task)
180 {
181         struct pwqr_task_bucket *b = task_hbucket(task);
182         struct hlist_node *node;
183         struct pwqr_task *pwqt = NULL;
184
185         spin_lock(&b->lock);
186         hlist_for_each_entry(pwqt, node, &b->tasks, link) {
187                 if (pwqt->task == task)
188                         break;
189         }
190         spin_unlock(&b->lock);
191         return pwqt;
192 }
193
194 static struct pwqr_task *pwqr_task_create(struct task_struct *task)
195 {
196         struct pwqr_task_bucket *b = task_hbucket(task);
197         struct pwqr_task *pwqt;
198
199         pwqt = kmalloc(sizeof(*pwqt), GFP_KERNEL);
200         if (pwqt == NULL)
201                 return ERR_PTR(-ENOMEM);
202
203         preempt_notifier_init(&pwqt->notifier, &pwqr_preempt_running_ops);
204         preempt_notifier_register(&pwqt->notifier);
205         pwqt->task = task;
206
207         spin_lock(&b->lock);
208         hlist_add_head(&pwqt->link, &b->tasks);
209         spin_unlock(&b->lock);
210
211         return pwqt;
212 }
213
214 __cold
215 static void pwqr_task_detach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
216 {
217         unsigned long flags;
218
219         pwqr_sb_lock_irqsave(sb, flags);
220         sb->registered--;
221         if (pwqt->notifier.ops == &pwqr_preempt_running_ops) {
222                 __pwqr_sb_update_state(sb, -1);
223         } else {
224                 __pwqr_sb_update_state(sb, 0);
225         }
226         pwqr_sb_unlock_irqrestore(sb, flags);
227         pwqr_sb_put(sb);
228         pwqt->sb = NULL;
229 }
230
231 __cold
232 static void pwqr_task_attach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
233 {
234         unsigned long flags;
235
236         pwqr_sb_lock_irqsave(sb, flags);
237         pwqr_sb_get(pwqt->sb = sb);
238         sb->registered++;
239         __pwqr_sb_update_state(sb, 1);
240         pwqr_sb_unlock_irqrestore(sb, flags);
241 }
242
243 __cold
244 static void pwqr_task_release(struct pwqr_task *pwqt, bool from_notifier)
245 {
246         struct pwqr_task_bucket *b = task_hbucket(pwqt->task);
247
248         spin_lock(&b->lock);
249         hlist_del(&pwqt->link);
250         spin_unlock(&b->lock);
251         pwqt->notifier.ops = &pwqr_preempt_noop_ops;
252
253         if (from_notifier) {
254                 /* When called from sched_{out,in}, it's not allowed to
255                  * call preempt_notifier_unregister (or worse kfree())
256                  *
257                  * Though it's not a good idea to kfree() still registered
258                  * callbacks if we're not dying, it'll panic on the next
259                  * sched_{in,out} call.
260                  */
261                 BUG_ON(!(pwqt->task->state & TASK_DEAD));
262                 kfree_rcu(pwqt, rcu);
263         } else {
264                 preempt_notifier_unregister(&pwqt->notifier);
265                 kfree(pwqt);
266         }
267 }
268
269 static void pwqr_task_noop_sched_in(struct preempt_notifier *notifier, int cpu)
270 {
271 }
272
273 static void pwqr_task_noop_sched_out(struct preempt_notifier *notifier,
274                                     struct task_struct *next)
275 {
276 }
277
278 static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cpu)
279 {
280         struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
281         struct pwqr_sb   *sb   = pwqt->sb;
282         unsigned long flags;
283
284         if (unlikely(sb->dead)) {
285                 pwqr_task_detach(pwqt, sb);
286                 pwqr_task_release(pwqt, true);
287                 return;
288         }
289
290         pwqt->notifier.ops = &pwqr_preempt_running_ops;
291         pwqr_sb_lock_irqsave(sb, flags);
292         __pwqr_sb_update_state(sb, 1);
293         pwqr_sb_unlock_irqrestore(sb, flags);
294 }
295
296 static void pwqr_task_sched_out(struct preempt_notifier *notifier,
297                                struct task_struct *next)
298 {
299         struct pwqr_task    *pwqt = container_of(notifier, struct pwqr_task, notifier);
300         struct pwqr_sb      *sb   = pwqt->sb;
301         struct task_struct *p    = pwqt->task;
302
303         if (unlikely(p->state & TASK_DEAD) || unlikely(sb->dead)) {
304                 pwqr_task_detach(pwqt, sb);
305                 pwqr_task_release(pwqt, true);
306                 return;
307         }
308         if (p->state == 0 || (p->state & (__TASK_STOPPED | __TASK_TRACED)))
309                 return;
310
311         pwqt->notifier.ops = &pwqr_preempt_blocked_ops;
312         /* see preempt.h: irq are disabled for sched_out */
313         spin_lock(&sb->wqh.lock);
314         __pwqr_sb_update_state(sb, -1);
315         spin_unlock(&sb->wqh.lock);
316 }
317
318 static struct preempt_ops __read_mostly pwqr_preempt_noop_ops = {
319         .sched_in       = pwqr_task_noop_sched_in,
320         .sched_out      = pwqr_task_noop_sched_out,
321 };
322
323 static struct preempt_ops __read_mostly pwqr_preempt_running_ops = {
324         .sched_in       = pwqr_task_noop_sched_in,
325         .sched_out      = pwqr_task_sched_out,
326 };
327
328 static struct preempt_ops __read_mostly pwqr_preempt_blocked_ops = {
329         .sched_in       = pwqr_task_blocked_sched_in,
330         .sched_out      = pwqr_task_sched_out,
331 };
332
333 /*****************************************************************************
334  * file descriptor
335  */
336 static int pwqr_open(struct inode *inode, struct file *filp)
337 {
338         struct pwqr_sb *sb;
339
340         sb = pwqr_sb_create();
341         if (IS_ERR(sb))
342                 return PTR_ERR(sb);
343         filp->private_data = sb;
344         return 0;
345 }
346
347 static int pwqr_release(struct inode *inode, struct file *filp)
348 {
349         struct pwqr_sb *sb = filp->private_data;
350         unsigned long flags;
351
352         pwqr_sb_lock_irqsave(sb, flags);
353         sb->dead = true;
354         pwqr_sb_unlock_irqrestore(sb, flags);
355         wake_up_all(&sb->wqh);
356         pwqr_sb_put(sb);
357         return 0;
358 }
359
360 static long
361 do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
362             int is_wait, struct pwqr_ioc_wait __user *arg)
363 {
364         unsigned long flags;
365         struct pwqr_ioc_wait wait;
366         long rc = 0;
367         u32 uval;
368
369         preempt_notifier_unregister(&pwqt->notifier);
370
371         if (is_wait && copy_from_user(&wait, arg, sizeof(wait))) {
372                 rc = -EFAULT;
373                 goto out;
374         }
375
376         pwqr_sb_lock_irqsave(sb, flags);
377         if (sb->running + sb->waiting <= sb->concurrency) {
378                 if (is_wait) {
379                         while (probe_kernel_address(wait.pwqr_uaddr, uval)) {
380                                 pwqr_sb_unlock_irqrestore(sb, flags);
381                                 rc = get_user(uval, (u32 *)wait.pwqr_uaddr);
382                                 if (rc)
383                                         goto out;
384                                 pwqr_sb_lock_irqsave(sb, flags);
385                         }
386
387                         if (uval != (u32)wait.pwqr_ticket) {
388                                 rc = -EWOULDBLOCK;
389                                 goto out_unlock;
390                         }
391                 } else {
392                         goto out_unlock;
393                 }
394         }
395
396         /* @ see <wait_event_interruptible_exclusive_locked_irq> */
397         if (likely(!sb->dead)) {
398                 DEFINE_WAIT(__wait);
399
400                 __wait.flags |= WQ_FLAG_EXCLUSIVE;
401
402                 if (is_wait) {
403                         sb->waiting++;
404                         __add_wait_queue(&sb->wqh, &__wait);
405                 } else {
406                         sb->parked++;
407                         __add_wait_queue_tail(&sb->wqh, &__wait);
408                 }
409                 __pwqr_sb_update_state(sb, -1);
410                 set_current_state(TASK_INTERRUPTIBLE);
411
412                 do {
413                         if (sb->overcommit_wakes)
414                                 break;
415                         if (signal_pending(current)) {
416                                 rc = -ERESTARTSYS;
417                                 break;
418                         }
419                         spin_unlock_irq(&sb->wqh.lock);
420                         schedule();
421                         spin_lock_irq(&sb->wqh.lock);
422                         if (is_wait)
423                                 break;
424                         if (sb->running + sb->waiting < sb->concurrency)
425                                 break;
426                 } while (likely(!sb->dead));
427
428                 __remove_wait_queue(&sb->wqh, &__wait);
429                 __set_current_state(TASK_RUNNING);
430
431                 if (is_wait) {
432                         sb->waiting--;
433                 } else {
434                         sb->parked--;
435                 }
436                 __pwqr_sb_update_state(sb, 1);
437                 if (sb->overcommit_wakes)
438                         sb->overcommit_wakes--;
439                 if (sb->waiting + sb->running > sb->concurrency)
440                         rc = -EDQUOT;
441         }
442
443 out_unlock:
444         if (unlikely(sb->dead))
445                 rc = -EBADFD;
446         pwqr_sb_unlock_irqrestore(sb, flags);
447 out:
448         preempt_notifier_register(&pwqt->notifier);
449         return rc;
450 }
451
452 static long do_pwqr_unregister(struct pwqr_sb *sb, struct pwqr_task *pwqt)
453 {
454         if (!pwqt)
455                 return -EINVAL;
456         if (pwqt->sb != sb)
457                 return -ENOENT;
458         pwqr_task_detach(pwqt, sb);
459         pwqr_task_release(pwqt, false);
460         return 0;
461 }
462
463 static long do_pwqr_set_conc(struct pwqr_sb *sb, int conc)
464 {
465         long old_conc = sb->concurrency;
466         unsigned long flags;
467
468         pwqr_sb_lock_irqsave(sb, flags);
469         if (conc <= 0)
470                 conc = num_online_cpus();
471         if (conc != old_conc) {
472                 sb->concurrency = conc;
473                 __pwqr_sb_update_state(sb, 0);
474         }
475         pwqr_sb_unlock_irqrestore(sb, flags);
476
477         return old_conc;
478 }
479
480 static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count)
481 {
482         unsigned long flags;
483         int nwake;
484
485         if (count < 0)
486                 return -EINVAL;
487
488         pwqr_sb_lock_irqsave(sb, flags);
489
490         if (oc) {
491                 nwake = sb->waiting + sb->parked - sb->overcommit_wakes;
492                 if (count > nwake) {
493                         count = nwake;
494                 } else {
495                         nwake = count;
496                 }
497                 sb->overcommit_wakes += count;
498         } else if (sb->running + sb->overcommit_wakes < sb->concurrency) {
499                 nwake = sb->concurrency - sb->overcommit_wakes - sb->running;
500                 if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) {
501                         nwake = sb->waiting + sb->parked -
502                                 sb->overcommit_wakes;
503                 }
504                 if (count > nwake) {
505                         count = nwake;
506                 } else {
507                         nwake = count;
508                 }
509         } else {
510                 /*
511                  * This codepath deserves an explanation: waking the thread
512                  * "for real" would overcommit, though userspace KNOWS there
513                  * is at least one waiting thread. Such threads are threads
514                  * that are "quarantined".
515                  *
516                  * Quarantined threads are woken up one by one, to allow a
517                  * slow ramp down, trying to minimize "waiting" <-> "parked"
518                  * flip-flops, no matter how many wakes have been asked.
519                  *
520                  * Since releasing one quarantined thread will wake up a
521                  * thread that will (almost) straight go to parked mode, lie
522                  * to userland about the fact that we unblocked that thread,
523                  * and return 0.
524                  *
525                  * Though if we're already waking all waiting threads for
526                  * overcommitting jobs, well, we don't need that.
527                  */
528                 count = 0;
529                 nwake = sb->waiting > sb->overcommit_wakes;
530         }
531         while (nwake-- > 0)
532                 wake_up_locked(&sb->wqh);
533         pwqr_sb_unlock_irqrestore(sb, flags);
534
535         return count;
536 }
537
538 static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg)
539 {
540         struct pwqr_sb      *sb   = filp->private_data;
541         struct task_struct *task = current;
542         struct pwqr_task    *pwqt;
543         int rc = 0;
544
545         if (sb->tgid != current->tgid)
546                 return -EBADFD;
547
548         switch (command) {
549         case PWQR_GET_CONC:
550                 return sb->concurrency;
551         case PWQR_SET_CONC:
552                 return do_pwqr_set_conc(sb, (int)arg);
553
554         case PWQR_WAKE:
555         case PWQR_WAKE_OC:
556                 return do_pwqr_wake(sb, command == PWQR_WAKE_OC, (int)arg);
557
558         case PWQR_WAIT:
559         case PWQR_PARK:
560         case PWQR_REGISTER:
561         case PWQR_UNREGISTER:
562                 break;
563         default:
564                 return -EINVAL;
565         }
566
567         pwqt = pwqr_task_find(task);
568         if (command == PWQR_UNREGISTER)
569                 return do_pwqr_unregister(sb, pwqt);
570
571         if (pwqt == NULL) {
572                 pwqt = pwqr_task_create(task);
573                 if (IS_ERR(pwqt))
574                         return PTR_ERR(pwqt);
575                 pwqr_task_attach(pwqt, sb);
576         } else if (unlikely(pwqt->sb != sb)) {
577                 pwqr_task_detach(pwqt, pwqt->sb);
578                 pwqr_task_attach(pwqt, sb);
579         }
580
581         switch (command) {
582         case PWQR_WAIT:
583                 rc = do_pwqr_wait(sb, pwqt, true, (struct pwqr_ioc_wait __user *)arg);
584                 break;
585         case PWQR_PARK:
586                 rc = do_pwqr_wait(sb, pwqt, false, NULL);
587                 break;
588         }
589
590         if (unlikely(sb->dead)) {
591                 pwqr_task_detach(pwqt, pwqt->sb);
592                 return -EBADFD;
593         }
594         return rc;
595 }
596
597 static const struct file_operations pwqr_dev_fops = {
598         .owner          = THIS_MODULE,
599         .open           = pwqr_open,
600         .release        = pwqr_release,
601         .unlocked_ioctl = pwqr_ioctl,
602 #ifdef CONFIG_COMPAT
603         .compat_ioctl   = pwqr_ioctl,
604 #endif
605 };
606
607 /*****************************************************************************
608  * module
609  */
610 static int __init pwqr_start(void)
611 {
612         int i;
613
614         for (i = 0; i < PWQR_HASH_SIZE; i++) {
615                 spin_lock_init(&pwqr_tasks_hash[i].lock);
616                 INIT_HLIST_HEAD(&pwqr_tasks_hash[i].tasks);
617         }
618
619         /* Register as a character device */
620         pwqr_major = register_chrdev(0, "pwqr", &pwqr_dev_fops);
621         if (pwqr_major < 0) {
622                 printk(KERN_ERR "pwqr: register_chrdev() failed\n");
623                 return pwqr_major;
624         }
625
626         /* Create a device node */
627         pwqr_class = class_create(THIS_MODULE, PWQR_DEVICE_NAME);
628         if (IS_ERR(pwqr_class)) {
629                 printk(KERN_ERR "pwqr: Error creating raw class\n");
630                 unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
631                 return PTR_ERR(pwqr_class);
632         }
633         device_create(pwqr_class, NULL, MKDEV(pwqr_major, 0), NULL, PWQR_DEVICE_NAME);
634         printk(KERN_INFO "pwqr: PThreads Work Queues Regulator v1 loaded");
635         return 0;
636 }
637
638 static void __exit pwqr_end(void)
639 {
640         rcu_barrier();
641         device_destroy(pwqr_class, MKDEV(pwqr_major, 0));
642         class_destroy(pwqr_class);
643         unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
644 }
645
646 module_init(pwqr_start);
647 module_exit(pwqr_end);
648
649 MODULE_LICENSE("GPL");
650 MODULE_AUTHOR("Pierre Habouzit <pierre.habouzit@intersec.com>");
651 MODULE_DESCRIPTION("PThreads Work Queues Regulator");
652
653 // vim:noet:sw=8:cinoptions+=\:0,L-1,=1s: