implement poll and read
[~madcoder/pwqr.git] / kernel / pwqr.c
index 8171596..0d6b42c 100644 (file)
@@ -26,6 +26,7 @@
 #include <linux/init.h>
 #include <linux/kref.h>
 #include <linux/module.h>
+#include <linux/poll.h>
 #include <linux/sched.h>
 #include <linux/slab.h>
 #include <linux/spinlock.h>
 
 #include "pwqr.h"
 
-#define PWQR_UNPARK_DELAY      (HZ / 10)
 #define PWQR_HASH_BITS         5
 #define PWQR_HASH_SIZE         (1 << PWQR_HASH_BITS)
 
+#define PWQR_UC_DELAY          (HZ / 10)
+#define PWQR_OC_DELAY          (HZ / 20)
+
+#define PWQR_STATE_NONE                0
+#define PWQR_STATE_UC          1
+#define PWQR_STATE_OC          2
+#define PWQR_STATE_DEAD                (-1)
+
 struct pwqr_task_bucket {
        spinlock_t              lock;
        struct hlist_head       tasks;
@@ -53,7 +61,7 @@ struct pwqr_sb {
        struct rcu_head         rcu;
        struct timer_list       timer;
        wait_queue_head_t       wqh;
-       pid_t                   tgid;
+       wait_queue_head_t       wqh_poll;
 
        unsigned                concurrency;
        unsigned                registered;
@@ -63,7 +71,7 @@ struct pwqr_sb {
        unsigned                parked;
        unsigned                overcommit_wakes;
 
-       unsigned                dead;
+       int                     state;
 };
 
 struct pwqr_task {
@@ -93,22 +101,27 @@ static struct preempt_ops  pwqr_preempt_noop_ops;
 #define pwqr_sb_unlock_irqrestore(sb, flags) \
        spin_unlock_irqrestore(&(sb)->wqh.lock, flags)
 
+static inline void pwqr_arm_timer(struct pwqr_sb *sb, int how, int delay)
+{
+       if (timer_pending(&sb->timer) && sb->state == how)
+               return;
+       mod_timer(&sb->timer, jiffies + delay);
+       sb->state = how;
+}
+
 static inline void __pwqr_sb_update_state(struct pwqr_sb *sb, int running_delta)
 {
        sb->running += running_delta;
-       if (sb->running > sb->concurrency) {
-               /* TODO see ../Documentation/pwqr.adoc */
-       } else if (sb->running == sb->concurrency) {
-               /* do nothing */
-       } else if (sb->waiting == 0 && sb->parked) {
-               if (!timer_pending(&sb->timer)) {
-                       mod_timer(&sb->timer, jiffies + PWQR_UNPARK_DELAY);
-               }
-               return;
-       }
 
-       if (timer_pending(&sb->timer))
-               del_timer(&sb->timer);
+       if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) {
+               pwqr_arm_timer(sb, PWQR_STATE_UC, PWQR_UC_DELAY);
+       } else if (sb->running > sb->concurrency) {
+               pwqr_arm_timer(sb, PWQR_STATE_OC, PWQR_OC_DELAY);
+       } else {
+               sb->state = PWQR_STATE_NONE;
+               if (!timer_pending(&sb->timer))
+                       del_timer(&sb->timer);
+       }
 }
 
 static void pwqr_sb_timer_cb(unsigned long arg)
@@ -117,10 +130,13 @@ static void pwqr_sb_timer_cb(unsigned long arg)
        unsigned long flags;
 
        pwqr_sb_lock_irqsave(sb, flags);
-       if (sb->waiting == 0 && sb->parked && sb->running < sb->concurrency) {
+       if (sb->running < sb->concurrency && sb->waiting == 0 && sb->parked) {
                if (sb->overcommit_wakes == 0)
                        wake_up_locked(&sb->wqh);
        }
+       if (sb->running > sb->concurrency) {
+               wake_up_poll(&sb->wqh_poll, POLLIN);
+       }
        pwqr_sb_unlock_irqrestore(sb, flags);
 }
 
@@ -134,8 +150,7 @@ static struct pwqr_sb *pwqr_sb_create(void)
 
        kref_init(&sb->kref);
        init_waitqueue_head(&sb->wqh);
-       sb->tgid        = current->tgid;
-       sb->concurrency = num_online_cpus();
+       sb->concurrency    = num_online_cpus();
        init_timer(&sb->timer);
        sb->timer.function = pwqr_sb_timer_cb;
        sb->timer.data     = (unsigned long)sb;
@@ -161,6 +176,7 @@ static void pwqr_sb_release(struct kref *kref)
        struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
 
        del_timer_sync(&sb->timer);
+       wake_up_poll(&sb->wqh_poll, POLLHUP);
        call_rcu(&sb->rcu, pwqr_sb_finalize);
 }
 static inline void pwqr_sb_put(struct pwqr_sb *sb)
@@ -281,7 +297,7 @@ static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cp
        struct pwqr_sb   *sb   = pwqt->sb;
        unsigned long flags;
 
-       if (unlikely(sb->dead)) {
+       if (unlikely(sb->state < 0)) {
                pwqr_task_detach(pwqt, sb);
                pwqr_task_release(pwqt, true);
                return;
@@ -296,11 +312,11 @@ static void pwqr_task_blocked_sched_in(struct preempt_notifier *notifier, int cp
 static void pwqr_task_sched_out(struct preempt_notifier *notifier,
                               struct task_struct *next)
 {
-       struct pwqr_task    *pwqt = container_of(notifier, struct pwqr_task, notifier);
-       struct pwqr_sb      *sb   = pwqt->sb;
+       struct pwqr_task   *pwqt = container_of(notifier, struct pwqr_task, notifier);
+       struct pwqr_sb     *sb   = pwqt->sb;
        struct task_struct *p    = pwqt->task;
 
-       if (unlikely(p->state & TASK_DEAD) || unlikely(sb->dead)) {
+       if (unlikely(p->state & TASK_DEAD) || unlikely(sb->state < 0)) {
                pwqr_task_detach(pwqt, sb);
                pwqr_task_release(pwqt, true);
                return;
@@ -350,16 +366,83 @@ static int pwqr_release(struct inode *inode, struct file *filp)
        unsigned long flags;
 
        pwqr_sb_lock_irqsave(sb, flags);
-       sb->dead = true;
+       sb->state = PWQR_STATE_DEAD;
        pwqr_sb_unlock_irqrestore(sb, flags);
        wake_up_all(&sb->wqh);
        pwqr_sb_put(sb);
        return 0;
 }
 
+static unsigned int pwqr_poll(struct file *filp, poll_table *wait)
+{
+       struct pwqr_sb *sb = filp->private_data;
+       unsigned int events = 0;
+       unsigned long flags;
+
+       poll_wait(filp, &sb->wqh_poll, wait);
+
+       pwqr_sb_lock_irqsave(sb, flags);
+       if (sb->running > sb->concurrency)
+               events |= POLLIN;
+       if (sb->state < 0)
+               events |= POLLHUP;
+       pwqr_sb_unlock_irqrestore(sb, flags);
+
+       return events;
+}
+
+static inline ssize_t pwqr_sb_read(struct pwqr_sb *sb, int no_wait, u32 *cnt)
+{
+       DECLARE_WAITQUEUE(wait, current);
+       ssize_t rc = -EAGAIN;
+
+       spin_lock_irq(&sb->wqh.lock);
+       if (sb->running > sb->concurrency) {
+               rc = 0;
+       } else if (!no_wait) {
+               add_wait_queue(&sb->wqh_poll, &wait);
+               for (;;) {
+                       set_current_state(TASK_INTERRUPTIBLE);
+                       if (sb->running > sb->concurrency) {
+                               rc = 0;
+                               break;
+                       }
+                       if (signal_pending(current)) {
+                               rc = -ERESTARTSYS;
+                               break;
+                       }
+                       spin_unlock_irq(&sb->wqh.lock);
+                       schedule();
+                       spin_lock_irq(&sb->wqh.lock);
+               }
+               remove_wait_queue(&sb->wqh_poll, &wait);
+               __set_current_state(TASK_RUNNING);
+       }
+       if (likely(rc == 0))
+               *cnt = sb->running - sb->concurrency;
+       spin_unlock_irq(&sb->wqh.lock);
+
+       return rc;
+}
+
+static ssize_t
+pwqr_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
+{
+       struct pwqr_sb *sb = filp->private_data;
+       u32 cnt = 0;
+       ssize_t rc;
+
+       if (count < sizeof(cnt))
+               return -EINVAL;
+       rc = pwqr_sb_read(sb, filp->f_flags & O_NONBLOCK, &cnt);
+       if (rc < 0)
+               return rc;
+       return put_user(cnt, (u32 __user *)buf) ? -EFAULT : sizeof(cnt);
+}
+
 static long
 do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
-           int is_wait, struct pwqr_ioc_wait __user *arg)
+            int is_wait, struct pwqr_ioc_wait __user *arg)
 {
        unsigned long flags;
        struct pwqr_ioc_wait wait;
@@ -368,9 +451,15 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
 
        preempt_notifier_unregister(&pwqt->notifier);
 
-       if (is_wait && copy_from_user(&wait, arg, sizeof(wait))) {
-               rc = -EFAULT;
-               goto out;
+       if (is_wait) {
+               if (copy_from_user(&wait, arg, sizeof(wait))) {
+                       rc = -EFAULT;
+                       goto out;
+               }
+               if (unlikely((long)wait.pwqr_uaddr % sizeof(int) != 0)) {
+                       rc = -EINVAL;
+                       goto out;
+               }
        }
 
        pwqr_sb_lock_irqsave(sb, flags);
@@ -394,9 +483,8 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
        }
 
        /* @ see <wait_event_interruptible_exclusive_locked_irq> */
-       if (likely(!sb->dead)) {
+       if (likely(sb->state >= 0)) {
                DEFINE_WAIT(__wait);
-
                __wait.flags |= WQ_FLAG_EXCLUSIVE;
 
                if (is_wait) {
@@ -407,9 +495,9 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
                        __add_wait_queue_tail(&sb->wqh, &__wait);
                }
                __pwqr_sb_update_state(sb, -1);
-               set_current_state(TASK_INTERRUPTIBLE);
 
                do {
+                       set_current_state(TASK_INTERRUPTIBLE);
                        if (sb->overcommit_wakes)
                                break;
                        if (signal_pending(current)) {
@@ -423,17 +511,17 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
                                break;
                        if (sb->running + sb->waiting < sb->concurrency)
                                break;
-               } while (likely(!sb->dead));
+               } while (likely(sb->state >= 0));
 
                __remove_wait_queue(&sb->wqh, &__wait);
                __set_current_state(TASK_RUNNING);
-
                if (is_wait) {
                        sb->waiting--;
                } else {
                        sb->parked--;
                }
                __pwqr_sb_update_state(sb, 1);
+
                if (sb->overcommit_wakes)
                        sb->overcommit_wakes--;
                if (sb->waiting + sb->running > sb->concurrency)
@@ -441,7 +529,7 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
        }
 
 out_unlock:
-       if (unlikely(sb->dead))
+       if (unlikely(sb->state < 0))
                rc = -EBADFD;
        pwqr_sb_unlock_irqrestore(sb, flags);
 out:
@@ -537,14 +625,11 @@ static long do_pwqr_wake(struct pwqr_sb *sb, int oc, int count)
 
 static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg)
 {
-       struct pwqr_sb      *sb   = filp->private_data;
+       struct pwqr_sb     *sb   = filp->private_data;
        struct task_struct *task = current;
-       struct pwqr_task    *pwqt;
+       struct pwqr_task   *pwqt;
        int rc = 0;
 
-       if (sb->tgid != current->tgid)
-               return -EBADFD;
-
        switch (command) {
        case PWQR_GET_CONC:
                return sb->concurrency;
@@ -587,7 +672,7 @@ static long pwqr_ioctl(struct file *filp, unsigned command, unsigned long arg)
                break;
        }
 
-       if (unlikely(sb->dead)) {
+       if (unlikely(sb->state < 0)) {
                pwqr_task_detach(pwqt, pwqt->sb);
                return -EBADFD;
        }
@@ -598,6 +683,9 @@ static const struct file_operations pwqr_dev_fops = {
        .owner          = THIS_MODULE,
        .open           = pwqr_open,
        .release        = pwqr_release,
+       .poll           = pwqr_poll,
+       .read           = pwqr_read,
+       .llseek         = noop_llseek,
        .unlocked_ioctl = pwqr_ioctl,
 #ifdef CONFIG_COMPAT
        .compat_ioctl   = pwqr_ioctl,