implement poll and read
[~madcoder/pwqr.git] / kernel / pwqr.c
index a6ff8e5..0d6b42c 100644 (file)
@@ -26,6 +26,7 @@
 #include <linux/init.h>
 #include <linux/kref.h>
 #include <linux/module.h>
 #include <linux/init.h>
 #include <linux/kref.h>
 #include <linux/module.h>
+#include <linux/poll.h>
 #include <linux/sched.h>
 #include <linux/slab.h>
 #include <linux/spinlock.h>
 #include <linux/sched.h>
 #include <linux/slab.h>
 #include <linux/spinlock.h>
@@ -60,6 +61,7 @@ struct pwqr_sb {
        struct rcu_head         rcu;
        struct timer_list       timer;
        wait_queue_head_t       wqh;
        struct rcu_head         rcu;
        struct timer_list       timer;
        wait_queue_head_t       wqh;
+       wait_queue_head_t       wqh_poll;
 
        unsigned                concurrency;
        unsigned                registered;
 
        unsigned                concurrency;
        unsigned                registered;
@@ -133,7 +135,7 @@ static void pwqr_sb_timer_cb(unsigned long arg)
                        wake_up_locked(&sb->wqh);
        }
        if (sb->running > sb->concurrency) {
                        wake_up_locked(&sb->wqh);
        }
        if (sb->running > sb->concurrency) {
-               /* See ../Documentation/pwqr.adoc */
+               wake_up_poll(&sb->wqh_poll, POLLIN);
        }
        pwqr_sb_unlock_irqrestore(sb, flags);
 }
        }
        pwqr_sb_unlock_irqrestore(sb, flags);
 }
@@ -174,6 +176,7 @@ static void pwqr_sb_release(struct kref *kref)
        struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
 
        del_timer_sync(&sb->timer);
        struct pwqr_sb *sb = container_of(kref, struct pwqr_sb, kref);
 
        del_timer_sync(&sb->timer);
+       wake_up_poll(&sb->wqh_poll, POLLHUP);
        call_rcu(&sb->rcu, pwqr_sb_finalize);
 }
 static inline void pwqr_sb_put(struct pwqr_sb *sb)
        call_rcu(&sb->rcu, pwqr_sb_finalize);
 }
 static inline void pwqr_sb_put(struct pwqr_sb *sb)
@@ -370,6 +373,73 @@ static int pwqr_release(struct inode *inode, struct file *filp)
        return 0;
 }
 
        return 0;
 }
 
+static unsigned int pwqr_poll(struct file *filp, poll_table *wait)
+{
+       struct pwqr_sb *sb = filp->private_data;
+       unsigned int events = 0;
+       unsigned long flags;
+
+       poll_wait(filp, &sb->wqh_poll, wait);
+
+       pwqr_sb_lock_irqsave(sb, flags);
+       if (sb->running > sb->concurrency)
+               events |= POLLIN;
+       if (sb->state < 0)
+               events |= POLLHUP;
+       pwqr_sb_unlock_irqrestore(sb, flags);
+
+       return events;
+}
+
+static inline ssize_t pwqr_sb_read(struct pwqr_sb *sb, int no_wait, u32 *cnt)
+{
+       DECLARE_WAITQUEUE(wait, current);
+       ssize_t rc = -EAGAIN;
+
+       spin_lock_irq(&sb->wqh.lock);
+       if (sb->running > sb->concurrency) {
+               rc = 0;
+       } else if (!no_wait) {
+               add_wait_queue(&sb->wqh_poll, &wait);
+               for (;;) {
+                       set_current_state(TASK_INTERRUPTIBLE);
+                       if (sb->running > sb->concurrency) {
+                               rc = 0;
+                               break;
+                       }
+                       if (signal_pending(current)) {
+                               rc = -ERESTARTSYS;
+                               break;
+                       }
+                       spin_unlock_irq(&sb->wqh.lock);
+                       schedule();
+                       spin_lock_irq(&sb->wqh.lock);
+               }
+               remove_wait_queue(&sb->wqh_poll, &wait);
+               __set_current_state(TASK_RUNNING);
+       }
+       if (likely(rc == 0))
+               *cnt = sb->running - sb->concurrency;
+       spin_unlock_irq(&sb->wqh.lock);
+
+       return rc;
+}
+
+static ssize_t
+pwqr_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
+{
+       struct pwqr_sb *sb = filp->private_data;
+       u32 cnt = 0;
+       ssize_t rc;
+
+       if (count < sizeof(cnt))
+               return -EINVAL;
+       rc = pwqr_sb_read(sb, filp->f_flags & O_NONBLOCK, &cnt);
+       if (rc < 0)
+               return rc;
+       return put_user(cnt, (u32 __user *)buf) ? -EFAULT : sizeof(cnt);
+}
+
 static long
 do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
             int is_wait, struct pwqr_ioc_wait __user *arg)
 static long
 do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
             int is_wait, struct pwqr_ioc_wait __user *arg)
@@ -415,7 +485,6 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
        /* @ see <wait_event_interruptible_exclusive_locked_irq> */
        if (likely(sb->state >= 0)) {
                DEFINE_WAIT(__wait);
        /* @ see <wait_event_interruptible_exclusive_locked_irq> */
        if (likely(sb->state >= 0)) {
                DEFINE_WAIT(__wait);
-
                __wait.flags |= WQ_FLAG_EXCLUSIVE;
 
                if (is_wait) {
                __wait.flags |= WQ_FLAG_EXCLUSIVE;
 
                if (is_wait) {
@@ -426,9 +495,9 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
                        __add_wait_queue_tail(&sb->wqh, &__wait);
                }
                __pwqr_sb_update_state(sb, -1);
                        __add_wait_queue_tail(&sb->wqh, &__wait);
                }
                __pwqr_sb_update_state(sb, -1);
-               set_current_state(TASK_INTERRUPTIBLE);
 
                do {
 
                do {
+                       set_current_state(TASK_INTERRUPTIBLE);
                        if (sb->overcommit_wakes)
                                break;
                        if (signal_pending(current)) {
                        if (sb->overcommit_wakes)
                                break;
                        if (signal_pending(current)) {
@@ -446,13 +515,13 @@ do_pwqr_wait(struct pwqr_sb *sb, struct pwqr_task *pwqt,
 
                __remove_wait_queue(&sb->wqh, &__wait);
                __set_current_state(TASK_RUNNING);
 
                __remove_wait_queue(&sb->wqh, &__wait);
                __set_current_state(TASK_RUNNING);
-
                if (is_wait) {
                        sb->waiting--;
                } else {
                        sb->parked--;
                }
                __pwqr_sb_update_state(sb, 1);
                if (is_wait) {
                        sb->waiting--;
                } else {
                        sb->parked--;
                }
                __pwqr_sb_update_state(sb, 1);
+
                if (sb->overcommit_wakes)
                        sb->overcommit_wakes--;
                if (sb->waiting + sb->running > sb->concurrency)
                if (sb->overcommit_wakes)
                        sb->overcommit_wakes--;
                if (sb->waiting + sb->running > sb->concurrency)
@@ -614,6 +683,9 @@ static const struct file_operations pwqr_dev_fops = {
        .owner          = THIS_MODULE,
        .open           = pwqr_open,
        .release        = pwqr_release,
        .owner          = THIS_MODULE,
        .open           = pwqr_open,
        .release        = pwqr_release,
+       .poll           = pwqr_poll,
+       .read           = pwqr_read,
+       .llseek         = noop_llseek,
        .unlocked_ioctl = pwqr_ioctl,
 #ifdef CONFIG_COMPAT
        .compat_ioctl   = pwqr_ioctl,
        .unlocked_ioctl = pwqr_ioctl,
 #ifdef CONFIG_COMPAT
        .compat_ioctl   = pwqr_ioctl,