7ce0700027f6d3c4a77275feccccd11b6a4924e4
[~madcoder/pwqr.git] / kernel / pwqr.c
1 /*
2  * Copyright (C) 2012   Pierre Habouzit <pierre.habouzit@intersec.com>
3  * Copyright (C) 2012   Intersec SAS
4  *
5  * This file implements the Linux Pthread Workqueue Regulator, and is part
6  * of the linux kernel.
7  *
8  * The Linux Kernel is free software: you can redistribute it and/or modify it
9  * under the terms of the GNU General Public License version 2 as published by
10  * the Free Software Foundation.
11  *
12  * The Linux Kernel is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
15  * License for more details.
16  *
17  * You should have received a copy of the GNU General Public License version 2
18  * along with The Linux Kernel.  If not, see <http://www.gnu.org/licenses/>.
19  */
20
21 #include <linux/cdev.h>
22 #include <linux/device.h>
23 #include <linux/file.h>
24 #include <linux/fs.h>
25 #include <linux/hash.h>
26 #include <linux/init.h>
27 #include <linux/kref.h>
28 #include <linux/module.h>
29 #include <linux/sched.h>
30 #include <linux/slab.h>
31 #include <linux/spinlock.h>
32 #include <linux/timer.h>
33 #include <linux/uaccess.h>
34 #include <linux/wait.h>
35
36 #ifndef CONFIG_PREEMPT_NOTIFIERS
37 #  error PWQ module requires CONFIG_PREEMPT_NOTIFIERS
38 #endif
39
40 #include "pwqr.h"
41
42 #define PWQR_UNPARK_DELAY       (HZ / 10)
43 #define PWQR_HASH_BITS          5
44 #define PWQR_HASH_SIZE          (1 << PWQR_HASH_BITS)
45
46 struct pwqr_task_bucket {
47         spinlock_t              lock;
48         struct hlist_head       tasks;
49 };
50
51 struct pwqr_sb {
52         struct kref             kref;
53         struct rcu_head         rcu;
54         struct timer_list       timer;
55         wait_queue_head_t       wqh;
56         pid_t                   tgid;
57
58         unsigned                concurrency;
59         unsigned                registered;
60
61         unsigned                running;
62         unsigned                waiting;
63         unsigned                parked;
64         unsigned                overcommit_wakes;
65
66         unsigned                dead;
67 };
68
69 struct pwqr_task {
70         struct preempt_notifier notifier;
71         struct hlist_node       link;
72         struct rcu_head         rcu;
73         struct task_struct     *task;
74         struct pwqr_sb         *sb;
75 };
76
77 /*
78  * Global variables
79  */
80 static struct class            *pwqr_class;
81 static int                      pwqr_major;
82 static struct pwqr_task_bucket  pwqr_tasks_hash[PWQR_HASH_SIZE];
83 static struct preempt_ops       pwqr_preempt_running_ops;
84 static struct preempt_ops       pwqr_preempt_blocked_ops;
85 static struct preempt_ops       pwqr_preempt_noop_ops;
86
87 /*****************************************************************************
88  * Scoreboards
89  */
90
91 #define pwqr_sb_lock_irqsave(sb, flags) \
92         spin_lock_irqsave(&(sb)->wqh.lock, flags)
93 #define pwqr_sb_unlock_irqrestore(sb, flags) \
94         spin_unlock_irqrestore(&(sb)->wqh.lock, flags)
95
96 static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta)
97 {
98         sb->running += running_delta;
99         if (sb->running > sb->concurrency) {
100                 /* TODO see ../Documentation/pwqr.adoc */
101         } else if (sb->running == sb->concurrency) {
102                 /* do nothing */
103         } else if (sb->waiting == 0 && sb->parked) {
104                 if (!timer_pending(&sb->timer)) {
105                         mod_timer(&sb->timer, jiffies + PWQR_UNPARK_DELAY);
106                 }
107                 return;
108         }
109
110         if (timer_pending(&sb->timer))
111                 del_timer(&sb->timer);
112 }
113
114 static void pwqr_sb_timer_cb(unsigned long arg)
115 {
116         struct pwqr_sb *sb = (struct pwqr_sb *)arg;
117         unsigned long flags;
118
119         pwqr_sb_lock_irqsave(sb, flags);
120         if (sb->waiting == 0 && sb->parked && sb->running < sb->concurrency) {
121                 if (sb->overcommit_wakes == 0)
122                         wake_up_locked(&sb->wqh);
123         }
124         pwqr_sb_unlock_irqrestore(sb, flags);
125 }
126
127 static struct pwqr_sb *pwqr_sb_create(void)
128 {
129         struct pwqr_sb *sb;
130
131         sb = kzalloc(sizeof(struct pwqr_sb), GFP_KERNEL);
132         if (sb == NULL)
133                 return ERR_PTR(-ENOMEM);
134
135         kref_init(&sb->kref);
136         init_waitqueue_head(&sb->wqh);
137         sb->tgid        = current->tgid;
138         sb->concurrency = num_online_cpus();
139         init_timer(&sb->timer);
140         sb->timer.function = pwqr_sb_timer_cb;
141         sb->timer.data     = (unsigned long)sb;
142
143         __module_get(THIS_MODULE);
144         return sb;
145 }
146 static inline void pwqr_sb_get(struct pwqr_sb *sb)
147 {
148         kref_get(&sb->kref);
149 }
150
151 static void pwqr_sb_finalize(struct rcu_head *rcu)
152 {
153         struct pwqr_sb *sb = container_of(rcu, struct pwqr_sb, rcu);
154
155         module_put(THIS_MODULE);
156         kfree(sb);
157 }
158
159 static void pwqr_sb_release(struct kref *kref)
160 {
161         struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
162
163         del_timer_sync(&sb->timer);
164         call_rcu(&sb->rcu, pwqr_sb_finalize);
165 }
166 static inline void pwqr_sb_put(struct pwqr_sb *sb)
167 {
168         kref_put(&sb->kref, pwqr_sb_release);
169 }
170
171 /*****************************************************************************
172  * tasks
173  */
174 static inline struct pwqr_task_bucket *task_hbucket(struct task_struct *task)
175 {
176         return &pwqr_tasks_hash[hash_ptr(task, PWQR_HASH_BITS)];
177 }
178
179 static struct pwqr_task *pwqr_task_find(struct task_struct *task)
180 {
181         struct pwqr_task_bucket *b = task_hbucket(task);
182         struct hlist_node *node;
183         struct pwqr_task *pwqt = NULL;
184
185         spin_lock(&b->lock);
186         hlist_for_each_entry(pwqt, node, &b->tasks, link) {
187                 if (pwqt->task == task)
188                         break;
189         }
190         spin_unlock(&b->lock);
191         return pwqt;
192 }
193
194 static struct pwqr_task *pwqr_task_create(struct task_struct *task)
195 {
196         struct pwqr_task_bucket *b = task_hbucket(task);
197         struct pwqr_task *pwqt;
198
199         pwqt = kmalloc(sizeof(*pwqt), GFP_KERNEL);
200         if (pwqt == NULL)
201                 return ERR_PTR(-ENOMEM);
202
203         preempt_notifier_init(&pwqt->notifier, &pwqr_preempt_running_ops);
204         preempt_notifier_register(&pwqt->notifier);
205         pwqt->task = task;
206
207         spin_lock(&b->lock);
208         hlist_add_head(&pwqt->link, &b->tasks);
209         spin_unlock(&b->lock);
210
211         return pwqt;
212 }
213
214 __cold
215 static void pwqr_task_detach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
216 {
217         unsigned long flags;
218
219         pwqr_sb_lock_irqsave(sb, flags);
220         sb->registered--;
221         if (pwqt->notifier.ops == &pwqr_preempt_running_ops) {
222                 __pwqr_sb_update_state(sb, -1);
223         } else {
224                 __pwqr_sb_update_state(sb, 0);
225         }
226         pwqr_sb_unlock_irqrestore(sb, flags);
227         pwqr_sb_put(sb);
228         pwqt->sb = NULL;
229 }
230
231 __cold
232 static void pwqr_task_attach(struct pwqr_task *pwqt, struct pwqr_sb *sb)
233 {
234         unsigned long flags;
235
236         pwqr_sb_lock_irqsave(sb, flags);
237         pwqr_sb_get(pwqt->sb = sb);
238         sb->registered++;
239         __pwqr_sb_update_state(sb, 1);
240         pwqr_sb_unlock_irqrestore(sb, flags);
241 }
242
243 __cold
244 static void pwqr_task_release(struct pwqr_task *pwqt, bool from_notifier)
245 {
246         struct pwqr_task_bucket *b = task_hbucket(pwqt->task);
247
248         spin_lock(&b->lock);
249         hlist_del(&pwqt->link);
250         spin_unlock(&b->lock);
251         pwqt->notifier.ops = &pwqr_preempt_noop_ops;
252
253         if (from_notifier) {
254                 /* When called from sched_{out,in}, it's not allowed to
255                  * call preempt_notifier_unregister (or worse kfree())
256                  *
257                  * Though it's not a good idea to kfree() still registered
258                  * callbacks if we're not dying, it'll panic on the next
259                  * sched_{in,out} call.
260                  */
261                 BUG_ON(!(pwqt->task->state & TASK_DEAD));
262                 kfree_rcu(pwqt, rcu);
263         } else {
264                 preempt_notifier_unregister(&pwqt->notifier);
265                 kfree(pwqt);
266         }
267 }
268
269 static void pwqr_task_noop_sched_in(struct preempt_notifier *notifier, int cpu)
270 {
271 }
272
273 static void pwqr_task_noop_sched_out(struct preempt_notifier *notifier,
274                                     struct task_struct *next)
275 {
276 }
277
278 static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cpu)
279 {
280         struct pwqr_task *pwqt = container_of(notifier, struct pwqr_task, notifier);
281         struct pwqr_sb   *sb   = pwqt->sb;
282         unsigned long flags;
283
284         if (unlikely(sb->dead)) {
285                 pwqr_task_detach(pwqt, sb);
286                 pwqr_task_release(pwqt, true);
287                 return;
288         }
289
290         pwqt->notifier.ops = &pwqr_preempt_running_ops;
291         pwqr_sb_lock_irqsave(sb, flags);
292         __pwqr_sb_update_state(sb, 1);
293         pwqr_sb_unlock_irqrestore(sb, flags);
294 }
295
296 static void pwqr_task_sched_out(struct preempt_notifier *notifier,
297                                struct task_struct *next)
298 {
299         struct pwqr_task    *pwqt = container_of(notifier, struct pwqr_task, notifier);
300         struct pwqr_sb      *sb   = pwqt->sb;
301         struct task_struct *p    = pwqt->task;
302
303         if (unlikely(p->state & TASK_DEAD) || unlikely(sb->dead)) {
304                 pwqr_task_detach(pwqt, sb);
305                 pwqr_task_release(pwqt, true);
306                 return;
307         }
308         if (p->state == 0 || (p->state & (__TASK_STOPPED | __TASK_TRACED)))
309                 return;
310
311         pwqt->notifier.ops = &pwqr_preempt_blocked_ops;
312         /* see preempt.h: irq are disabled for sched_out */
313         spin_lock(&sb->wqh.lock);
314         __pwqr_sb_update_state(sb, -1);
315         spin_unlock(&sb->wqh.lock);
316 }
317
318 static struct preempt_ops __read_mostly pwqr_preempt_noop_ops = {
319         .sched_in       = pwqr_task_noop_sched_in,
320         .sched_out      = pwqr_task_noop_sched_out,
321 };
322
323 static struct preempt_ops __read_mostly pwqr_preempt_running_ops = {
324         .sched_in       = pwqr_task_noop_sched_in,
325         .sched_out      = pwqr_task_sched_out,
326 };
327
328 static struct preempt_ops __read_mostly pwqr_preempt_blocked_ops = {
329         .sched_in       = pwqr_task_blocked_sched_in,
330         .sched_out      = pwqr_task_sched_out,
331 };
332
333 /*****************************************************************************
334  * file descriptor
335  */
336 static int pwqr_open(struct inode *inode, struct file *filp)
337 {
338         struct pwqr_sb *sb;
339
340         sb = pwqr_sb_create();
341         if (IS_ERR(sb))
342                 return PTR_ERR(sb);
343         filp->private_data = sb;
344         return 0;
345 }
346
347 static int pwqr_release(struct inode *inode, struct file *filp)
348 {
349         struct pwqr_sb *sb = filp->private_data;
350         unsigned long flags;
351
352         pwqr_sb_lock_irqsave(sb, flags);
353         sb->dead = true;
354         pwqr_sb_unlock_irqrestore(sb, flags);
355         wake_up_all(&sb->wqh);
356         pwqr_sb_put(sb);
357         return 0;
358 }
359
360 static long
361 do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
362             int is_wait, struct pwqr_ioc_wait __user *arg)
363 {
364         unsigned long flags;
365         struct pwqr_ioc_wait wait;
366         long rc = 0;
367         u32 uval;
368
369         preempt_notifier_unregister(&pwqt->notifier);
370
371         if (is_wait) {
372                 if (copy_from_user(&wait, arg, sizeof(wait))) {
373                         rc = -EFAULT;
374                         goto out;
375                 }
376                 if (unlikely((long)wait.pwqr_uaddr % sizeof(int) != 0)) {
377                         rc = -EINVAL;
378                         goto out;
379                 }
380         }
381
382         pwqr_sb_lock_irqsave(sb, flags);
383         if (sb->running + sb->waiting <= sb->concurrency) {
384                 if (is_wait) {
385                         while (probe_kernel_address(wait.pwqr_uaddr, uval)) {
386                                 pwqr_sb_unlock_irqrestore(sb, flags);
387                                 rc = get_user(uval, (u32 *)wait.pwqr_uaddr);
388                                 if (rc)
389                                         goto out;
390                                 pwqr_sb_lock_irqsave(sb, flags);
391                         }
392
393                         if (uval != (u32)wait.pwqr_ticket) {
394                                 rc = -EWOULDBLOCK;
395                                 goto out_unlock;
396                         }
397                 } else {
398                         goto out_unlock;
399                 }
400         }
401
402         /* @ see <wait_event_interruptible_exclusive_locked_irq> */
403         if (likely(!sb->dead)) {
404                 DEFINE_WAIT(__wait);
405
406                 __wait.flags |= WQ_FLAG_EXCLUSIVE;
407
408                 if (is_wait) {
409                         sb->waiting++;
410                         __add_wait_queue(&sb->wqh, &__wait);
411                 } else {
412                         sb->parked++;
413                         __add_wait_queue_tail(&sb->wqh, &__wait);
414                 }
415                 __pwqr_sb_update_state(sb, -1);
416                 set_current_state(TASK_INTERRUPTIBLE);
417
418                 do {
419                         if (sb->overcommit_wakes)
420                                 break;
421                         if (signal_pending(current)) {
422                                 rc = -ERESTARTSYS;
423                                 break;
424                         }
425                         spin_unlock_irq(&sb->wqh.lock);
426                         schedule();
427                         spin_lock_irq(&sb->wqh.lock);
428                         if (is_wait)
429                                 break;
430                         if (sb->running + sb->waiting < sb->concurrency)
431                                 break;
432                 } while (likely(!sb->dead));
433
434                 __remove_wait_queue(&sb->wqh, &__wait);
435                 __set_current_state(TASK_RUNNING);
436
437                 if (is_wait) {
438                         sb->waiting--;
439                 } else {
440                         sb->parked--;
441                 }
442                 __pwqr_sb_update_state(sb, 1);
443                 if (sb->overcommit_wakes)
444                         sb->overcommit_wakes--;
445                 if (sb->waiting + sb->running > sb->concurrency)
446                         rc = -EDQUOT;
447         }
448
449 out_unlock:
450         if (unlikely(sb->dead))
451                 rc = -EBADFD;
452         pwqr_sb_unlock_irqrestore(sb, flags);
453 out:
454         preempt_notifier_register(&pwqt->notifier);
455         return rc;
456 }
457
458 static long do_pwqr_unregister(struct pwqr_sb *sb, struct pwqr_task *pwqt)
459 {
460         if (!pwqt)
461                 return -EINVAL;
462         if (pwqt->sb != sb)
463                 return -ENOENT;
464         pwqr_task_detach(pwqt, sb);
465         pwqr_task_release(pwqt, false);
466         return 0;
467 }
468
469 static long do_pwqr_set_conc(struct pwqr_sb *sb, int conc)
470 {
471         long old_conc = sb->concurrency;
472         unsigned long flags;
473
474         pwqr_sb_lock_irqsave(sb, flags);
475         if (conc <= 0)
476                 conc = num_online_cpus();
477         if (conc != old_conc) {
478                 sb->concurrency = conc;
479                 __pwqr_sb_update_state(sb, 0);
480         }
481         pwqr_sb_unlock_irqrestore(sb, flags);
482
483         return old_conc;
484 }
485
486 static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count)
487 {
488         unsigned long flags;
489         int nwake;
490
491         if (count < 0)
492                 return -EINVAL;
493
494         pwqr_sb_lock_irqsave(sb, flags);
495
496         if (oc) {
497                 nwake = sb->waiting + sb->parked - sb->overcommit_wakes;
498                 if (count > nwake) {
499                         count = nwake;
500                 } else {
501                         nwake = count;
502                 }
503                 sb->overcommit_wakes += count;
504         } else if (sb->running + sb->overcommit_wakes < sb->concurrency) {
505                 nwake = sb->concurrency - sb->overcommit_wakes - sb->running;
506                 if (nwake > sb->waiting + sb->parked - sb->overcommit_wakes) {
507                         nwake = sb->waiting + sb->parked -
508                                 sb->overcommit_wakes;
509                 }
510                 if (count > nwake) {
511                         count = nwake;
512                 } else {
513                         nwake = count;
514                 }
515         } else {
516                 /*
517                  * This codepath deserves an explanation: waking the thread
518                  * "for real" would overcommit, though userspace KNOWS there
519                  * is at least one waiting thread. Such threads are threads
520                  * that are "quarantined".
521                  *
522                  * Quarantined threads are woken up one by one, to allow a
523                  * slow ramp down, trying to minimize "waiting" <-> "parked"
524                  * flip-flops, no matter how many wakes have been asked.
525                  *
526                  * Since releasing one quarantined thread will wake up a
527                  * thread that will (almost) straight go to parked mode, lie
528                  * to userland about the fact that we unblocked that thread,
529                  * and return 0.
530                  *
531                  * Though if we're already waking all waiting threads for
532                  * overcommitting jobs, well, we don't need that.
533                  */
534                 count = 0;
535                 nwake = sb->waiting > sb->overcommit_wakes;
536         }
537         while (nwake-- > 0)
538                 wake_up_locked(&sb->wqh);
539         pwqr_sb_unlock_irqrestore(sb, flags);
540
541         return count;
542 }
543
544 static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg)
545 {
546         struct pwqr_sb      *sb   = filp->private_data;
547         struct task_struct *task = current;
548         struct pwqr_task    *pwqt;
549         int rc = 0;
550
551         if (sb->tgid != current->tgid)
552                 return -EBADFD;
553
554         switch (command) {
555         case PWQR_GET_CONC:
556                 return sb->concurrency;
557         case PWQR_SET_CONC:
558                 return do_pwqr_set_conc(sb, (int)arg);
559
560         case PWQR_WAKE:
561         case PWQR_WAKE_OC:
562                 return do_pwqr_wake(sb, command == PWQR_WAKE_OC, (int)arg);
563
564         case PWQR_WAIT:
565         case PWQR_PARK:
566         case PWQR_REGISTER:
567         case PWQR_UNREGISTER:
568                 break;
569         default:
570                 return -EINVAL;
571         }
572
573         pwqt = pwqr_task_find(task);
574         if (command == PWQR_UNREGISTER)
575                 return do_pwqr_unregister(sb, pwqt);
576
577         if (pwqt == NULL) {
578                 pwqt = pwqr_task_create(task);
579                 if (IS_ERR(pwqt))
580                         return PTR_ERR(pwqt);
581                 pwqr_task_attach(pwqt, sb);
582         } else if (unlikely(pwqt->sb != sb)) {
583                 pwqr_task_detach(pwqt, pwqt->sb);
584                 pwqr_task_attach(pwqt, sb);
585         }
586
587         switch (command) {
588         case PWQR_WAIT:
589                 rc = do_pwqr_wait(sb, pwqt, true, (struct pwqr_ioc_wait __user *)arg);
590                 break;
591         case PWQR_PARK:
592                 rc = do_pwqr_wait(sb, pwqt, false, NULL);
593                 break;
594         }
595
596         if (unlikely(sb->dead)) {
597                 pwqr_task_detach(pwqt, pwqt->sb);
598                 return -EBADFD;
599         }
600         return rc;
601 }
602
603 static const struct file_operations pwqr_dev_fops = {
604         .owner          = THIS_MODULE,
605         .open           = pwqr_open,
606         .release        = pwqr_release,
607         .unlocked_ioctl = pwqr_ioctl,
608 #ifdef CONFIG_COMPAT
609         .compat_ioctl   = pwqr_ioctl,
610 #endif
611 };
612
613 /*****************************************************************************
614  * module
615  */
616 static int __init pwqr_start(void)
617 {
618         int i;
619
620         for (i = 0; i < PWQR_HASH_SIZE; i++) {
621                 spin_lock_init(&pwqr_tasks_hash[i].lock);
622                 INIT_HLIST_HEAD(&pwqr_tasks_hash[i].tasks);
623         }
624
625         /* Register as a character device */
626         pwqr_major = register_chrdev(0, "pwqr", &pwqr_dev_fops);
627         if (pwqr_major < 0) {
628                 printk(KERN_ERR "pwqr: register_chrdev() failed\n");
629                 return pwqr_major;
630         }
631
632         /* Create a device node */
633         pwqr_class = class_create(THIS_MODULE, PWQR_DEVICE_NAME);
634         if (IS_ERR(pwqr_class)) {
635                 printk(KERN_ERR "pwqr: Error creating raw class\n");
636                 unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
637                 return PTR_ERR(pwqr_class);
638         }
639         device_create(pwqr_class, NULL, MKDEV(pwqr_major, 0), NULL, PWQR_DEVICE_NAME);
640         printk(KERN_INFO "pwqr: PThreads Work Queues Regulator v1 loaded");
641         return 0;
642 }
643
644 static void __exit pwqr_end(void)
645 {
646         rcu_barrier();
647         device_destroy(pwqr_class, MKDEV(pwqr_major, 0));
648         class_destroy(pwqr_class);
649         unregister_chrdev(pwqr_major, PWQR_DEVICE_NAME);
650 }
651
652 module_init(pwqr_start);
653 module_exit(pwqr_end);
654
655 MODULE_LICENSE("GPL");
656 MODULE_AUTHOR("Pierre Habouzit <pierre.habouzit@intersec.com>");
657 MODULE_DESCRIPTION("PThreads Work Queues Regulator");
658
659 // vim:noet:sw=8:cinoptions+=\:0,L-1,=1s: