85cccace4ee4b83a3460d866eb953240b338fab7
[~madcoder/pwqr.git] / lib / libpwqr.c
1 /*
2  * Copyright (C) 2012   Pierre Habouzit <pierre.habouzit@intersec.com>
3  * Copyright (C) 2012   Intersec SAS
4  *
5  * This file implements the Linux Pthread Workqueue Regulator usperspace.
6  *
7  * The Linux Pthread Workqueue Regulator is free software: you can
8  * redistribute it and/or modify it under the terms of the GNU Lesser
9  * General Public License as published by the Free Software Foundation,
10  * either version 2.1 of the License, or (at your option) any later version.
11  *
12  * The Linux Pthread Workqueue Regulator is distributed in the hope that it
13  * will be useful, but WITHOUT ANY WARRANTY; without even the implied
14  * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with The Linux Pthread Workqueue Regultaor.
19  * If not, see <http://www.gnu.org/licenses/>.
20  */
21
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <pthread.h>
26 #include <sched.h>
27 #include <stddef.h>
28 #include <stdint.h>
29 #include <sys/ioctl.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32
33 #include "../kernel/pwqr.h"
34
35 #define INPOOL          0
36 #define PARKED          1
37 #define PARKED_MAX     16
38 #define THREADS_MAX   128 /* something dynamic is probably better */
39
40 /* Compiler stuff {{{ */
41
42 #if defined(__x86_64__)
43 #  define mb()          asm volatile("mfence":::"memory")
44 #elif defined(__i386__)
45 #  define mb()          asm volatile("lock; addl $0,0(%%esp)":::"memory")
46 #else
47 #  error unsupported architecture
48 #endif
49
50 #define container_of(obj, type_t, member) \
51       ({ const typeof(((type_t *)0)->member) *__mptr = (obj); \
52          ((type_t *)((char *)__mptr) - offsetof(type_t, member)); })
53 #define likely(expr)     __builtin_expect(!!(expr), 1)
54 #define unlikely(expr)   __builtin_expect((expr), 0)
55
56 #define barrier()          asm volatile("":::"memory")
57 #define access_once(x)     (*(volatile typeof(x) *)&(x))
58
59 #define atomic_fetch_and_add(ptr, n) __sync_fetch_and_add(ptr, n)
60 #define atomic_add(ptr, n)           (void)atomic_fetch_and_add(ptr, n)
61 #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n)
62 #define atomic_sub(ptr, n)           (void)atomic_fetch_and_sub(ptr, n)
63
64 #define ALWAYS_INLINE     __attribute__((always_inline)) inline
65
66 /* }}} */
67 /* pwqr wrapping {{{ */
68
69 static int pwqr_create(int flags)
70 {
71     if (flags & ~(O_NONBLOCK | O_CLOEXEC)) {
72         errno = -EINVAL;
73         return -1;
74     }
75     return open("/dev/"PWQR_DEVICE_NAME, O_RDWR | flags);
76 }
77
78 static int pwqr_ctl(int fd, int op, int val, void *uaddr)
79 {
80     struct pwqr_ioc_wait wait;
81
82     switch (op) {
83       case PWQR_GET_CONC:
84       case PWQR_REGISTER:
85       case PWQR_UNREGISTER:
86       case PWQR_PARK:
87         return ioctl(fd, op);
88       case PWQR_SET_CONC:
89       case PWQR_WAKE:
90       case PWQR_WAKE_OC:
91         return ioctl(fd, op, val);
92       case PWQR_WAIT:
93         wait.pwqr_ticket = val;
94         wait.pwqr_uaddr  = uaddr;
95         return ioctl(fd, op, &wait);
96       default:
97         errno = EINVAL;
98         return -1;
99     }
100 }
101
102 /* }}} */
103
104 static struct {
105     int fd;
106     /* atomic */ int spinlock;
107     /* atomic */ int nthreads;
108     /* atomic */ int waiters;
109     /* atomic */ int parked;
110
111     union {
112         struct {
113 #if __BYTE_ORDER == __LITTLE_ENDIAN
114             /* atomic */ uint32_t lo;
115             /* atomic */ uint32_t hi;
116 #elif __BYTE_ORDER == __BIG_ENDIAN
117             /* atomic */ uint32_t hi;
118             /* atomic */ uint32_t lo;
119 #else
120 #error  Unsupported endianness
121 #endif
122         };
123         uint64_t ticket;
124     };
125 } pwqr_g = {
126     .fd = -1,
127 };
128
129 static ALWAYS_INLINE void pwqr_lock(void)
130 {
131     while (unlikely(__sync_lock_test_and_set(&pwqr_g.spinlock, 1)))
132         sched_yield();
133 }
134
135 static ALWAYS_INLINE void pwqr_unlock(void)
136 {
137     __sync_lock_release(&pwqr_g.spinlock);
138 }
139
140 static ALWAYS_INLINE uint64_t pwqr_get_ticket(void)
141 {
142     mb();
143     return access_once(pwqr_g.ticket);
144 }
145
146 static ALWAYS_INLINE void pwqr_signal_n(int n)
147 {
148 #if ULONG_MAX == UINT32_MAX
149     /*
150      * We don't know how slow things are between a pwqr_get() and a
151      * pwqr_wait. Hence it's not safe to assume the "count" will never
152      * rotate fully between the two.
153      *
154      * In 64bits mode, well, we have 64-bits atomic increments and all is
155      * fine. In 32bits mode, we increment the high word manually every 2^24
156      * low-word increment.
157      *
158      * This assumes that at least one of the 256 threads that will try to
159      * perform the "hi" increment won't be stalled between deciding the modulo
160      * is zero and the increment itself for an almost full cycle (2^32 - 2^24)
161      * of the low word counter.
162      */
163     if (unlikely(atomic_fetch_and_add(&pwqr_g.lo, 1) % (1U << 24) == 0))
164         atomic_add(&pwqr_g.hi, 1);
165 #else
166     atomic_add(&pwqr_g.ticket, 1);
167 #endif
168     if (atomic_fetch_and_add(&pwqr_g.waiters, 0))
169         pwqr_ctl(pwqr_g.fd, PWQR_WAKE, n, NULL);
170 }
171 static ALWAYS_INLINE void pwqr_signal(void)
172 {
173     pwqr_signal_n(1);
174 }
175
176 static ALWAYS_INLINE void pwqr_signal_relaxed_n(int n)
177 {
178     if (access_once(pwqr_g.waiters))
179         pwqr_signal_n(n);
180 }
181 static ALWAYS_INLINE void pwqr_signal_relaxed(void)
182 {
183     pwqr_signal_relaxed_n(1);
184 }
185
186
187 /* returns:
188    - INPOOL if we were woken to run jobs.
189    - PARKED if we were woken to be parked (possibly consume overcommit stuff)
190  */
191 static void pwqr_do_wait_cleanup(void *arg)
192 {
193     atomic_sub(&pwqr_g.waiters, 1);
194 }
195 static int pwqr_do_wait(uint64_t ticket)
196 {
197     int canceltype, rc;
198
199     mb();
200 #if ULONG_MAX == UINT32_MAX
201     if ((unsigned)(ticket >> 32) != access_once(pwqr_g.hi))
202         return INPOOL;
203 #else
204     if (ticket != access_once(pwqr_g.ticket))
205         return INPOOL;
206 #endif
207
208     atomic_add(&pwqr_g.waiters, 1);
209     pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL);
210     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
211
212     rc = pwqr_ctl(pwqr_g.fd, PWQR_WAIT, ticket, &pwqr_g.lo);
213     if (rc < 0) {
214         if (errno == EINTR) {
215             rc = INPOOL;
216         } else if (errno == EDQUOT) {
217             rc = PARKED;
218         } else {
219             assert (errno == EBADFD);
220         }
221     }
222
223     pthread_setcanceltype(canceltype, NULL);
224     pthread_cleanup_pop(1);
225 }
226
227 /* returns INPOOL if the thread is to run some jobs or go to WAIT
228  * returns -1 if the pwqr_g.fd is broken
229  */
230 static void pwqr_do_park_cleanup(void *arg)
231 {
232     atomic_sub(&pwqr_g.parked, 1);
233 }
234 static void pwqr_spawn_thread(int initial_state, int count);
235 static int pwqr_do_park(void)
236 {
237     int canceltype, rc = -1;
238
239     if (atomic_fetch_and_add(&pwqr_g.parked, 1) > PARKED_MAX) {
240         atomic_sub(&pwqr_g.parked, 1);
241         return -1;
242     }
243
244     pthread_cleanup_push(&pwqr_do_park_cleanup, NULL);
245     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
246
247     for (;;) {
248 #ifdef PSEUDO_CODE
249         while ((job = find_some_job(OC_JOB)))
250             run_job(job);
251 #endif
252
253         if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) {
254             if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) {
255                 pwqr_spawn_thread(PARKED, 1);
256             }
257             break;
258         }
259
260         if (rc < 0) {
261             if (errno == EBADFD) {
262                 atomic_sub(&pwqr_g.parked, 1);
263                 return -1;
264             }
265             assert (errno == EINTR || errno == EDQUOT);
266             continue;
267         }
268     }
269
270     pthread_setcanceltype(canceltype, NULL);
271     pthread_cleanup_pop(0);
272     return rc;
273 }
274
275 static void pwqr_main_cleanup(void *arg)
276 {
277     atomic_sub(&pwqr_g.nthreads, 1);
278 }
279 static void *pwqr_main(void *arg)
280 {
281     int state = (long)arg;
282
283     if (atomic_fetch_and_add(&pwqr_g.nthreads, 1) > THREADS_MAX)
284         goto out;
285
286     pthread_cleanup_push(&pwqr_main_cleanup, NULL);
287     if (pwqr_ctl(pwqr_g.fd, PWQR_REGISTER, 0, NULL) < 0)
288         goto out;
289
290     do {
291         uint64_t ticket;
292
293         if (state == PARKED && (state = pwqr_do_park()) < 0)
294             break;
295 #ifdef PSEUDO_CODE
296         while ((job = find_some_job(ANY_JOB)))
297             run_job(job);
298 #endif
299         ticket = pwqr_get_ticket();
300 #ifdef PSEUDO_CODE
301         if ((job = find_some_job(ANY_JOB))) {
302             run_job(job);
303             continue;
304         }
305 #endif
306         state = pwqr_do_wait(ticket);
307     } while (state >= 0);
308
309   out:
310     pthread_cleanup_pop(1);
311     return NULL;
312 }
313
314 static void pwqr_spawn_thread(int initial_state, int count)
315 {
316     pthread_t thr;
317     pthread_attr_t attr;
318
319     pthread_attr_init(&attr);
320     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
321     while (count-- > 0)
322         pthread_create(&thr, &attr, pwqr_main, (void *)(long)initial_state);
323     pthread_attr_destroy(&attr);
324 }
325
326 __attribute__((cold, noinline, unused))
327 static int _pthread_workqueue_init_np(void)
328 {
329     int rc = 0, fd, n;
330
331     pwqr_lock();
332     if (pwqr_g.fd >= 0)
333         goto out;
334
335     fd = pwqr_create(0);
336     if (fd < 0) {
337         rc = -1;
338         goto out;
339     }
340
341     pwqr_g.fd = fd;
342     n = pwqr_ctl(pwqr_g.fd, PWQR_GET_CONC, 0, NULL) + 4;
343     pwqr_spawn_thread(INPOOL, n);
344     pwqr_spawn_thread(PARKED, 4);
345
346   out:
347     pwqr_unlock();
348     return rc;
349 }