2 * Copyright (C) 2012 Pierre Habouzit <pierre.habouzit@intersec.com>
3 * Copyright (C) 2012 Intersec SAS
5 * This file implements the Linux Pthread Workqueue Regulator usperspace.
7 * The Linux Pthread Workqueue Regulator is free software: you can
8 * redistribute it and/or modify it under the terms of the GNU Lesser
9 * General Public License as published by the Free Software Foundation,
10 * either version 2.1 of the License, or (at your option) any later version.
12 * The Linux Pthread Workqueue Regulator is distributed in the hope that it
13 * will be useful, but WITHOUT ANY WARRANTY; without even the implied
14 * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with The Linux Pthread Workqueue Regultaor.
19 * If not, see <http://www.gnu.org/licenses/>.
29 #include <sys/ioctl.h>
30 #include <sys/types.h>
33 #include "../kernel/pwqr.h"
38 #define THREADS_MAX 128 /* something dynamic is probably better */
40 /* Compiler stuff {{{ */
42 #if defined(__x86_64__)
43 # define mb() asm volatile("mfence":::"memory")
44 #elif defined(__i386__)
45 # define mb() asm volatile("lock; addl $0,0(%%esp)":::"memory")
47 # error unsupported architecture
50 #define container_of(obj, type_t, member) \
51 ({ const typeof(((type_t *)0)->member) *__mptr = (obj); \
52 ((type_t *)((char *)__mptr) - offsetof(type_t, member)); })
53 #define likely(expr) __builtin_expect(!!(expr), 1)
54 #define unlikely(expr) __builtin_expect((expr), 0)
56 #define barrier() asm volatile("":::"memory")
57 #define access_once(x) (*(volatile typeof(x) *)&(x))
59 #define atomic_fetch_and_add(ptr, n) __sync_fetch_and_add(ptr, n)
60 #define atomic_add(ptr, n) (void)atomic_fetch_and_add(ptr, n)
61 #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n)
62 #define atomic_sub(ptr, n) (void)atomic_fetch_and_sub(ptr, n)
64 #define ALWAYS_INLINE __attribute__((always_inline)) inline
67 /* pwqr wrapping {{{ */
69 static int pwqr_create(void)
71 return open("/dev/"PWQR_DEVICE_NAME, O_RDWR);
74 static int pwqr_ctl(int fd, int op, int val, void *uaddr)
76 struct pwqr_ioc_wait wait;
87 return ioctl(fd, op, val);
89 wait.pwqr_ticket = val;
90 wait.pwqr_uaddr = uaddr;
91 return ioctl(fd, op, &wait);
102 /* atomic */ int spinlock;
103 /* atomic */ int nthreads;
104 /* atomic */ int waiters;
105 /* atomic */ int parked;
109 #if __BYTE_ORDER == __LITTLE_ENDIAN
110 /* atomic */ uint32_t lo;
111 /* atomic */ uint32_t hi;
112 #elif __BYTE_ORDER == __BIG_ENDIAN
113 /* atomic */ uint32_t hi;
114 /* atomic */ uint32_t lo;
116 #error Unsupported endianness
125 static ALWAYS_INLINE void pwqr_lock(void)
127 while (unlikely(__sync_lock_test_and_set(&pwqr_g.spinlock, 1)))
131 static ALWAYS_INLINE void pwqr_unlock(void)
133 __sync_lock_release(&pwqr_g.spinlock);
136 static ALWAYS_INLINE uint64_t pwqr_get_ticket(void)
139 return access_once(pwqr_g.ticket);
142 static ALWAYS_INLINE void pwqr_signal_n(int n)
144 #if ULONG_MAX == UINT32_MAX
146 * We don't know how slow things are between a pwqr_get() and a
147 * pwqr_wait. Hence it's not safe to assume the "count" will never
148 * rotate fully between the two.
150 * In 64bits mode, well, we have 64-bits atomic increments and all is
151 * fine. In 32bits mode, we increment the high word manually every 2^24
152 * low-word increment.
154 * This assumes that at least one of the 256 threads that will try to
155 * perform the "hi" increment won't be stalled between deciding the modulo
156 * is zero and the increment itself for an almost full cycle (2^32 - 2^24)
157 * of the low word counter.
159 if (unlikely(atomic_fetch_and_add(&pwqr_g.lo, 1) % (1U << 24) == 0))
160 atomic_add(&pwqr_g.hi, 1);
162 atomic_add(&pwqr_g.ticket, 1);
164 if (atomic_fetch_and_add(&pwqr_g.waiters, 0))
165 pwqr_ctl(pwqr_g.fd, PWQR_WAKE, n, NULL);
167 static ALWAYS_INLINE void pwqr_signal(void)
172 static ALWAYS_INLINE void pwqr_signal_relaxed_n(int n)
174 if (access_once(pwqr_g.waiters))
177 static ALWAYS_INLINE void pwqr_signal_relaxed(void)
179 pwqr_signal_relaxed_n(1);
184 - INPOOL if we were woken to run jobs.
185 - PARKED if we were woken to be parked (possibly consume overcommit stuff)
187 static void pwqr_do_wait_cleanup(void *arg)
189 atomic_sub(&pwqr_g.waiters, 1);
191 static int pwqr_do_wait(uint64_t ticket)
196 #if ULONG_MAX == UINT32_MAX
197 if ((unsigned)(ticket >> 32) != access_once(pwqr_g.hi))
200 if (ticket != access_once(pwqr_g.ticket))
204 atomic_add(&pwqr_g.waiters, 1);
205 pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL);
206 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
208 rc = pwqr_ctl(pwqr_g.fd, PWQR_WAIT, ticket, &pwqr_g.lo);
210 if (errno == EINTR) {
212 } else if (errno == EDQUOT) {
215 assert (errno == EBADFD);
219 pthread_setcanceltype(canceltype, NULL);
220 pthread_cleanup_pop(1);
223 /* returns INPOOL if the thread is to run some jobs or go to WAIT
224 * returns -1 if the pwqr_g.fd is broken
226 static void pwqr_do_park_cleanup(void *arg)
228 atomic_sub(&pwqr_g.parked, 1);
230 static void pwqr_spawn_thread(int initial_state, int count);
231 static int pwqr_do_park(void)
233 int canceltype, rc = -1;
235 if (atomic_fetch_and_add(&pwqr_g.parked, 1) > PARKED_MAX) {
236 atomic_sub(&pwqr_g.parked, 1);
240 pthread_cleanup_push(&pwqr_do_park_cleanup, NULL);
241 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
245 while ((job = find_some_job(OC_JOB)))
249 if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) {
250 if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) {
251 pwqr_spawn_thread(PARKED, 1);
257 if (errno == EBADFD) {
258 atomic_sub(&pwqr_g.parked, 1);
261 assert (errno == EINTR || errno == EDQUOT);
266 pthread_setcanceltype(canceltype, NULL);
267 pthread_cleanup_pop(0);
271 static void pwqr_main_cleanup(void *arg)
273 atomic_sub(&pwqr_g.nthreads, 1);
275 static void *pwqr_main(void *arg)
277 int state = (long)arg;
279 if (atomic_fetch_and_add(&pwqr_g.nthreads, 1) > THREADS_MAX)
282 pthread_cleanup_push(&pwqr_main_cleanup, NULL);
283 if (pwqr_ctl(pwqr_g.fd, PWQR_REGISTER, 0, NULL) < 0)
289 if (state == PARKED && (state = pwqr_do_park()) < 0)
292 while ((job = find_some_job(ANY_JOB)))
295 ticket = pwqr_get_ticket();
297 if ((job = find_some_job(ANY_JOB))) {
302 state = pwqr_do_wait(ticket);
303 } while (state >= 0);
306 pthread_cleanup_pop(1);
310 static void pwqr_spawn_thread(int initial_state, int count)
315 pthread_attr_init(&attr);
316 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
318 pthread_create(&thr, &attr, pwqr_main, (void *)(long)initial_state);
319 pthread_attr_destroy(&attr);
322 __attribute__((cold, noinline, unused))
323 static int _pthread_workqueue_init_np(void)
338 n = pwqr_ctl(pwqr_g.fd, PWQR_GET_CONC, 0, NULL) + 4;
339 pwqr_spawn_thread(INPOOL, n);
340 pwqr_spawn_thread(PARKED, 4);