fix a few more bugs detected by the simple tester.
[~madcoder/pwqr.git] / lib / libpwqr.c
1 /*
2  * Copyright (C) 2012   Pierre Habouzit <pierre.habouzit@intersec.com>
3  * Copyright (C) 2012   Intersec SAS
4  *
5  * This file implements the Linux Pthread Workqueue Regulator usperspace.
6  *
7  * The Linux Pthread Workqueue Regulator is free software: you can
8  * redistribute it and/or modify it under the terms of the GNU Lesser
9  * General Public License as published by the Free Software Foundation,
10  * either version 2.1 of the License, or (at your option) any later version.
11  *
12  * The Linux Pthread Workqueue Regulator is distributed in the hope that it
13  * will be useful, but WITHOUT ANY WARRANTY; without even the implied
14  * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with The Linux Pthread Workqueue Regultaor.
19  * If not, see <http://www.gnu.org/licenses/>.
20  */
21
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <pthread.h>
26 #include <sched.h>
27 #include <stddef.h>
28 #include <stdint.h>
29 #include <sys/ioctl.h>
30 #include <sys/poll.h>
31 #include <sys/types.h>
32 #include <unistd.h>
33
34 #include "../kernel/pwqr.h"
35
36 #define INPOOL          0
37 #define PARKED          1
38 #define PARKED_MAX     16
39 #define THREADS_MAX   128 /* something dynamic is probably better */
40
41 /* Compiler stuff {{{ */
42
43 #if defined(__x86_64__)
44 #  define mb()          asm volatile("mfence":::"memory")
45 #elif defined(__i386__)
46 #  define mb()          asm volatile("lock; addl $0,0(%%esp)":::"memory")
47 #else
48 #  error unsupported architecture
49 #endif
50
51 #define container_of(obj, type_t, member) \
52       ({ const typeof(((type_t *)0)->member) *__mptr = (obj); \
53          ((type_t *)((char *)__mptr) - offsetof(type_t, member)); })
54 #define likely(expr)     __builtin_expect(!!(expr), 1)
55 #define unlikely(expr)   __builtin_expect((expr), 0)
56
57 #define barrier()          asm volatile("":::"memory")
58 #define access_once(x)     (*(volatile typeof(x) *)&(x))
59
60 #define atomic_fetch_and_add(ptr, n) __sync_fetch_and_add(ptr, n)
61 #define atomic_add(ptr, n)           (void)atomic_fetch_and_add(ptr, n)
62 #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n)
63 #define atomic_sub(ptr, n)           (void)atomic_fetch_and_sub(ptr, n)
64 #define atomic_xchg(ptr, v)          ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, v))
65
66 #define ALWAYS_INLINE     __attribute__((always_inline)) inline
67
68 /* }}} */
69 /* pwqr wrapping {{{ */
70
71 static int pwqr_create(int flags)
72 {
73     if (flags & ~PWQR_FL__SET) {
74         errno = -EINVAL;
75         return -1;
76     }
77     return open("/dev/"PWQR_DEVICE_NAME, O_RDWR | flags);
78 }
79
80 static int pwqr_ctl(int fd, int op, int val, void *uaddr)
81 {
82     struct pwqr_ioc_wait wait;
83
84     switch (op) {
85       case PWQR_CTL_GET_CONC:
86       case PWQR_CTL_REGISTER:
87       case PWQR_CTL_UNREGISTER:
88       case PWQR_CTL_PARK:
89         return ioctl(fd, op);
90       case PWQR_CTL_SET_CONC:
91       case PWQR_CTL_WAKE:
92       case PWQR_CTL_WAKE_OC:
93         return ioctl(fd, op, val);
94       case PWQR_CTL_WAIT:
95         wait.pwqr_ticket = val;
96         wait.pwqr_uaddr  = uaddr;
97         return ioctl(fd, op, &wait);
98       default:
99         errno = EINVAL;
100         return -1;
101     }
102 }
103
104 /* }}} */
105
106 static struct {
107     int fd;
108     /* atomic */ int spinlock;
109     /* atomic */ int nthreads;
110     /* atomic */ int waiters;
111     /* atomic */ int parked;
112     /* atomic */ int overcommit_count;
113
114     union {
115         struct {
116 #if __BYTE_ORDER == __LITTLE_ENDIAN
117             /* atomic */ uint32_t lo;
118             /* atomic */ uint32_t hi;
119 #elif __BYTE_ORDER == __BIG_ENDIAN
120             /* atomic */ uint32_t hi;
121             /* atomic */ uint32_t lo;
122 #else
123 #error  Unsupported endianness
124 #endif
125         };
126         uint64_t ticket;
127     };
128 } pwqr_g = {
129     .fd = -1,
130 };
131
132 static ALWAYS_INLINE void pwqr_lock(void)
133 {
134     while (unlikely(__sync_lock_test_and_set(&pwqr_g.spinlock, 1)))
135         sched_yield();
136 }
137
138 static ALWAYS_INLINE void pwqr_unlock(void)
139 {
140     __sync_lock_release(&pwqr_g.spinlock);
141 }
142
143 static ALWAYS_INLINE uint64_t pwqr_get_ticket(void)
144 {
145     mb();
146     return access_once(pwqr_g.ticket);
147 }
148
149 static ALWAYS_INLINE void pwqr_signal_n(int n)
150 {
151 #if ULONG_MAX == UINT32_MAX
152     /*
153      * We don't know how slow things are between a pwqr_get() and a
154      * pwqr_wait. Hence it's not safe to assume the "count" will never
155      * rotate fully between the two.
156      *
157      * In 64bits mode, well, we have 64-bits atomic increments and all is
158      * fine. In 32bits mode, we increment the high word manually every 2^24
159      * low-word increment.
160      *
161      * This assumes that at least one of the 256 threads that will try to
162      * perform the "hi" increment won't be stalled between deciding the modulo
163      * is zero and the increment itself for an almost full cycle (2^32 - 2^24)
164      * of the low word counter.
165      */
166     if (unlikely(atomic_fetch_and_add(&pwqr_g.lo, 1) % (1U << 24) == 0))
167         atomic_add(&pwqr_g.hi, 1);
168 #else
169     atomic_add(&pwqr_g.ticket, 1);
170 #endif
171     if (atomic_fetch_and_add(&pwqr_g.waiters, 0))
172         pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAKE, n, NULL);
173 }
174 static ALWAYS_INLINE void pwqr_signal(void)
175 {
176     pwqr_signal_n(1);
177 }
178
179 static ALWAYS_INLINE void pwqr_signal_relaxed_n(int n)
180 {
181     if (access_once(pwqr_g.waiters))
182         pwqr_signal_n(n);
183 }
184 static ALWAYS_INLINE void pwqr_signal_relaxed(void)
185 {
186     pwqr_signal_relaxed_n(1);
187 }
188
189 static int pwqr_fd_read_overcommit(int fd)
190 {
191     int buf;
192
193     if (read(fd, &buf, sizeof(buf)) == sizeof(buf))
194         return buf;
195     return 0;
196 }
197
198 __attribute__((cold))
199 static int pwqr_fd_overcommit_check(void)
200 {
201     if (atomic_xchg(&pwqr_g.overcommit_count, 0)) {
202         int oc = pwqr_fd_read_overcommit(pwqr_g.fd);
203
204         if (oc) {
205             access_once(pwqr_g.overcommit_count) = (oc - 1);
206             return 1;
207         }
208     }
209     return 0;
210 }
211
212 __attribute__((unused))
213 static void pwqr_overcommit_poll_loop(void)
214 {
215     struct pollfd pfd = {
216         .fd     = pwqr_g.fd,
217         .events = POLLIN,
218     };
219
220     for (;;) {
221         pfd.revents = 0;
222         if (poll(&pfd, 1, -1) >= 0) {
223             if (pfd.revents & POLLIN) {
224                 access_once(pwqr_g.overcommit_count) =
225                     pwqr_fd_read_overcommit(pwqr_g.fd);
226             }
227             if (pfd.revents & (POLLHUP | POLLERR))
228                 return;
229         } else if (errno != EINTR && errno != EAGAIN) {
230             return;
231         }
232     }
233 }
234
235
236 /* returns:
237    - INPOOL if we were woken to run jobs.
238    - PARKED if we were woken to be parked (possibly consume overcommit stuff)
239  */
240 static void pwqr_do_wait_cleanup(void *arg)
241 {
242     atomic_sub(&pwqr_g.waiters, 1);
243 }
244 static int pwqr_do_wait(uint64_t ticket)
245 {
246     int canceltype, rc;
247
248     mb();
249 #if ULONG_MAX == UINT32_MAX
250     if ((unsigned)(ticket >> 32) != access_once(pwqr_g.hi))
251         return INPOOL;
252 #else
253     if (ticket != access_once(pwqr_g.ticket))
254         return INPOOL;
255 #endif
256
257     atomic_add(&pwqr_g.waiters, 1);
258     pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL);
259     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
260
261     rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAIT, ticket, &pwqr_g.lo);
262     if (rc < 0) {
263         if (errno == EINTR) {
264             rc = INPOOL;
265         } else if (errno == EDQUOT) {
266             rc = PARKED;
267         } else {
268             assert (errno == EBADFD);
269         }
270     }
271
272     pthread_setcanceltype(canceltype, NULL);
273     pthread_cleanup_pop(1);
274 }
275
276 /* returns INPOOL if the thread is to run some jobs or go to WAIT
277  * returns -1 if the pwqr_g.fd is broken
278  */
279 static void pwqr_do_park_cleanup(void *arg)
280 {
281     atomic_sub(&pwqr_g.parked, 1);
282 }
283 static void pwqr_spawn_thread(int initial_state, int count);
284 static int pwqr_do_park(void)
285 {
286     int canceltype, rc = -1;
287
288     if (atomic_fetch_and_add(&pwqr_g.parked, 1) > PARKED_MAX) {
289         atomic_sub(&pwqr_g.parked, 1);
290         return -1;
291     }
292
293     pthread_cleanup_push(&pwqr_do_park_cleanup, NULL);
294     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
295
296     for (;;) {
297 #ifdef PSEUDO_CODE
298         while ((job = find_some_job(OC_JOB)))
299             run_job(job);
300 #endif
301
302         if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_PARK, 0, NULL)) == 0) {
303             access_once(pwqr_g.overcommit_count) = 0;
304             if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) {
305                 pwqr_spawn_thread(PARKED, 1);
306             }
307             break;
308         }
309
310         if (rc < 0) {
311             if (errno == EBADFD) {
312                 atomic_sub(&pwqr_g.parked, 1);
313                 return -1;
314             }
315             assert (errno == EINTR || errno == EDQUOT);
316             continue;
317         }
318     }
319
320     pthread_setcanceltype(canceltype, NULL);
321     pthread_cleanup_pop(0);
322     return rc;
323 }
324
325 static void pwqr_main_cleanup(void *arg)
326 {
327     atomic_sub(&pwqr_g.nthreads, 1);
328 }
329 static void *pwqr_main(void *arg)
330 {
331     int state = (long)arg;
332
333     if (atomic_fetch_and_add(&pwqr_g.nthreads, 1) > THREADS_MAX)
334         goto out;
335
336     pthread_cleanup_push(&pwqr_main_cleanup, NULL);
337     if (pwqr_ctl(pwqr_g.fd, PWQR_CTL_REGISTER, 0, NULL) < 0)
338         goto out;
339
340     do {
341         uint64_t ticket;
342
343       park:
344         if (state == PARKED && (state = pwqr_do_park()) < 0)
345             break;
346         for (;;) {
347             if (unlikely(pwqr_g.overcommit_count)) {
348                 if (pwqr_fd_overcommit_check()) {
349                     state = PARKED;
350                     goto park;
351                 }
352             }
353 #ifdef PSEUDO_CODE
354             job = find_some_job(ANY_JOB);
355             if (!job)
356                 break;
357             run_job(job);
358 #endif
359         }
360         ticket = pwqr_get_ticket();
361 #ifdef PSEUDO_CODE
362         if ((job = find_some_job(ANY_JOB))) {
363             run_job(job);
364             continue;
365         }
366 #endif
367         state = pwqr_do_wait(ticket);
368     } while (state >= 0);
369
370   out:
371     pthread_cleanup_pop(1);
372     return NULL;
373 }
374
375 static void pwqr_spawn_thread(int initial_state, int count)
376 {
377     pthread_t thr;
378     pthread_attr_t attr;
379
380     pthread_attr_init(&attr);
381     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
382     while (count-- > 0)
383         pthread_create(&thr, &attr, pwqr_main, (void *)(long)initial_state);
384     pthread_attr_destroy(&attr);
385 }
386
387 __attribute__((cold, noinline, unused))
388 static int _pthread_workqueue_init_np(void)
389 {
390     int rc = 0, fd, n;
391
392     pwqr_lock();
393     if (pwqr_g.fd >= 0)
394         goto out;
395
396     fd = pwqr_create(0);
397     if (fd < 0) {
398         rc = -1;
399         goto out;
400     }
401
402     pwqr_g.fd = fd;
403     n = pwqr_ctl(pwqr_g.fd, PWQR_CTL_GET_CONC, 0, NULL) + 4;
404     pwqr_spawn_thread(INPOOL, n);
405     pwqr_spawn_thread(PARKED, 4);
406
407   out:
408     pwqr_unlock();
409     return rc;
410 }