Implement the reluctancy to unpark threads.
[~madcoder/pwqr.git] / lib / libpwqr.c
1 /*
2  * Copyright (C) 2012   Pierre Habouzit <pierre.habouzit@intersec.com>
3  * Copyright (C) 2012   Intersec SAS
4  *
5  * This file implements the Linux Pthread Workqueue Regulator usperspace.
6  *
7  * The Linux Pthread Workqueue Regulator is free software: you can
8  * redistribute it and/or modify it under the terms of the GNU Lesser
9  * General Public License as published by the Free Software Foundation,
10  * either version 2.1 of the License, or (at your option) any later version.
11  *
12  * The Linux Pthread Workqueue Regulator is distributed in the hope that it
13  * will be useful, but WITHOUT ANY WARRANTY; without even the implied
14  * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with The Linux Pthread Workqueue Regultaor.
19  * If not, see <http://www.gnu.org/licenses/>.
20  */
21
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <pthread.h>
26 #include <sched.h>
27 #include <stddef.h>
28 #include <stdint.h>
29 #include <sys/ioctl.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32
33 #include "../kernel/pwqr.h"
34
35 #define INPOOL          0
36 #define PARKED          1
37 #define PARKED_MAX     16
38 #define THREADS_MAX   128 /* something dynamic is probably better */
39
40 /* Compiler stuff {{{ */
41
42 #if defined(__x86_64__)
43 #  define mb()          asm volatile("mfence":::"memory")
44 #elif defined(__i386__)
45 #  define mb()          asm volatile("lock; addl $0,0(%%esp)":::"memory")
46 #else
47 #  error unsupported architecture
48 #endif
49
50 #define container_of(obj, type_t, member) \
51       ({ const typeof(((type_t *)0)->member) *__mptr = (obj); \
52          ((type_t *)((char *)__mptr) - offsetof(type_t, member)); })
53 #define likely(expr)     __builtin_expect(!!(expr), 1)
54 #define unlikely(expr)   __builtin_expect((expr), 0)
55
56 #define barrier()          asm volatile("":::"memory")
57 #define access_once(x)     (*(volatile typeof(x) *)&(x))
58
59 #define atomic_fetch_and_add(ptr, n) __sync_fetch_and_add(ptr, n)
60 #define atomic_add(ptr, n)           (void)atomic_fetch_and_add(ptr, n)
61 #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n)
62 #define atomic_sub(ptr, n)           (void)atomic_fetch_and_sub(ptr, n)
63
64 #define ALWAYS_INLINE     __attribute__((always_inline)) inline
65
66 /* }}} */
67 /* pwqr wrapping {{{ */
68
69 static int pwqr_create(void)
70 {
71     return open("/dev/"PWQR_DEVICE_NAME, O_RDWR);
72 }
73
74 static int pwqr_ctl(int fd, int op, int val, void *uaddr)
75 {
76     struct pwqr_ioc_wait wait;
77
78     switch (op) {
79       case PWQR_GET_CONC:
80       case PWQR_REGISTER:
81       case PWQR_UNREGISTER:
82       case PWQR_PARK:
83         return ioctl(fd, op);
84       case PWQR_SET_CONC:
85       case PWQR_WAKE:
86       case PWQR_WAKE_OC:
87         return ioctl(fd, op, val);
88       case PWQR_WAIT:
89         wait.pwqr_ticket = val;
90         wait.pwqr_uaddr  = uaddr;
91         return ioctl(fd, op, &wait);
92       default:
93         errno = EINVAL;
94         return -1;
95     }
96 }
97
98 /* }}} */
99
100 static struct {
101     int fd;
102     /* atomic */ int spinlock;
103     /* atomic */ int nthreads;
104     /* atomic */ int waiters;
105     /* atomic */ int parked;
106
107     union {
108         struct {
109 #if __BYTE_ORDER == __LITTLE_ENDIAN
110             /* atomic */ uint32_t lo;
111             /* atomic */ uint32_t hi;
112 #elif __BYTE_ORDER == __BIG_ENDIAN
113             /* atomic */ uint32_t hi;
114             /* atomic */ uint32_t lo;
115 #else
116 #error  Unsupported endianness
117 #endif
118         };
119         uint64_t ticket;
120     };
121 } pwqr_g = {
122     .fd = -1,
123 };
124
125 static ALWAYS_INLINE void pwqr_lock(void)
126 {
127     while (unlikely(__sync_lock_test_and_set(&pwqr_g.spinlock, 1)))
128         sched_yield();
129 }
130
131 static ALWAYS_INLINE void pwqr_unlock(void)
132 {
133     __sync_lock_release(&pwqr_g.spinlock);
134 }
135
136 static ALWAYS_INLINE uint64_t pwqr_get_ticket(void)
137 {
138     mb();
139     return access_once(pwqr_g.ticket);
140 }
141
142 static ALWAYS_INLINE void pwqr_signal_n(int n)
143 {
144 #if ULONG_MAX == UINT32_MAX
145     /*
146      * We don't know how slow things are between a pwqr_get() and a
147      * pwqr_wait. Hence it's not safe to assume the "count" will never
148      * rotate fully between the two.
149      *
150      * In 64bits mode, well, we have 64-bits atomic increments and all is
151      * fine. In 32bits mode, we increment the high word manually every 2^24
152      * low-word increment.
153      *
154      * This assumes that at least one of the 256 threads that will try to
155      * perform the "hi" increment won't be stalled between deciding the modulo
156      * is zero and the increment itself for an almost full cycle (2^32 - 2^24)
157      * of the low word counter.
158      */
159     if (unlikely(atomic_fetch_and_add(&pwqr_g.lo, 1) % (1U << 24) == 0))
160         atomic_add(&pwqr_g.hi, 1);
161 #else
162     atomic_add(&pwqr_g.ticket, 1);
163 #endif
164     if (atomic_fetch_and_add(&pwqr_g.waiters, 0))
165         pwqr_ctl(pwqr_g.fd, PWQR_WAKE, n, NULL);
166 }
167 static ALWAYS_INLINE void pwqr_signal(void)
168 {
169     pwqr_signal_n(1);
170 }
171
172 static ALWAYS_INLINE void pwqr_signal_relaxed_n(int n)
173 {
174     if (access_once(pwqr_g.waiters))
175         pwqr_signal_n(n);
176 }
177 static ALWAYS_INLINE void pwqr_signal_relaxed(void)
178 {
179     pwqr_signal_relaxed_n(1);
180 }
181
182
183 /* returns:
184    - INPOOL if we were woken to run jobs.
185    - PARKED if we were woken to be parked (possibly consume overcommit stuff)
186  */
187 static void pwqr_do_wait_cleanup(void *arg)
188 {
189     atomic_sub(&pwqr_g.waiters, 1);
190 }
191 static int pwqr_do_wait(uint64_t ticket)
192 {
193     int canceltype, rc;
194
195     mb();
196 #if ULONG_MAX == UINT32_MAX
197     if ((unsigned)(ticket >> 32) != access_once(pwqr_g.hi))
198         return INPOOL;
199 #else
200     if (ticket != access_once(pwqr_g.ticket))
201         return INPOOL;
202 #endif
203
204     atomic_add(&pwqr_g.waiters, 1);
205     pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL);
206     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
207
208     rc = pwqr_ctl(pwqr_g.fd, PWQR_WAIT, ticket, &pwqr_g.lo);
209     if (rc < 0) {
210         if (errno == EINTR) {
211             rc = INPOOL;
212         } else if (errno == EDQUOT) {
213             rc = PARKED;
214         } else {
215             assert (errno == EBADFD);
216         }
217     }
218
219     pthread_setcanceltype(canceltype, NULL);
220     pthread_cleanup_pop(1);
221 }
222
223 /* returns INPOOL if the thread is to run some jobs or go to WAIT
224  * returns -1 if the pwqr_g.fd is broken
225  */
226 static void pwqr_do_park_cleanup(void *arg)
227 {
228     atomic_sub(&pwqr_g.parked, 1);
229 }
230 static void pwqr_spawn_thread(int initial_state, int count);
231 static int pwqr_do_park(void)
232 {
233     int canceltype, rc = -1;
234
235     if (atomic_fetch_and_add(&pwqr_g.parked, 1) > PARKED_MAX) {
236         atomic_sub(&pwqr_g.parked, 1);
237         return -1;
238     }
239
240     pthread_cleanup_push(&pwqr_do_park_cleanup, NULL);
241     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
242
243     for (;;) {
244 #ifdef PSEUDO_CODE
245         while ((job = find_some_job(OC_JOB)))
246             run_job(job);
247 #endif
248
249         if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) {
250             if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) {
251                 pwqr_spawn_thread(PARKED, 1);
252             }
253             break;
254         }
255
256         if (rc < 0) {
257             if (errno == EBADFD) {
258                 atomic_sub(&pwqr_g.parked, 1);
259                 return -1;
260             }
261             assert (errno == EINTR || errno == EDQUOT);
262             continue;
263         }
264     }
265
266     pthread_setcanceltype(canceltype, NULL);
267     pthread_cleanup_pop(0);
268     return rc;
269 }
270
271 static void pwqr_main_cleanup(void *arg)
272 {
273     atomic_sub(&pwqr_g.nthreads, 1);
274 }
275 static void *pwqr_main(void *arg)
276 {
277     int state = (long)arg;
278
279     if (atomic_fetch_and_add(&pwqr_g.nthreads, 1) > THREADS_MAX)
280         goto out;
281
282     pthread_cleanup_push(&pwqr_main_cleanup, NULL);
283     if (pwqr_ctl(pwqr_g.fd, PWQR_REGISTER, 0, NULL) < 0)
284         goto out;
285
286     do {
287         uint64_t ticket;
288
289         if (state == PARKED && (state = pwqr_do_park()) < 0)
290             break;
291 #ifdef PSEUDO_CODE
292         while ((job = find_some_job(ANY_JOB)))
293             run_job(job);
294 #endif
295         ticket = pwqr_get_ticket();
296 #ifdef PSEUDO_CODE
297         if ((job = find_some_job(ANY_JOB))) {
298             run_job(job);
299             continue;
300         }
301 #endif
302         state = pwqr_do_wait(ticket);
303     } while (state >= 0);
304
305   out:
306     pthread_cleanup_pop(1);
307     return NULL;
308 }
309
310 static void pwqr_spawn_thread(int initial_state, int count)
311 {
312     pthread_t thr;
313     pthread_attr_t attr;
314
315     pthread_attr_init(&attr);
316     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
317     while (count-- > 0)
318         pthread_create(&thr, &attr, pwqr_main, (void *)(long)initial_state);
319     pthread_attr_destroy(&attr);
320 }
321
322 __attribute__((cold, noinline, unused))
323 static int _pthread_workqueue_init_np(void)
324 {
325     int rc = 0, fd, n;
326
327     pwqr_lock();
328     if (pwqr_g.fd >= 0)
329         goto out;
330
331     fd = pwqr_create();
332     if (fd < 0) {
333         rc = -1;
334         goto out;
335     }
336
337     pwqr_g.fd = fd;
338     n = pwqr_ctl(pwqr_g.fd, PWQR_GET_CONC, 0, NULL) + 4;
339     pwqr_spawn_thread(INPOOL, n);
340     pwqr_spawn_thread(PARKED, 4);
341
342   out:
343     pwqr_unlock();
344     return rc;
345 }