2 * Copyright (C) 2012 Pierre Habouzit <pierre.habouzit@intersec.com>
3 * Copyright (C) 2012 Intersec SAS
5 * This file implements the Linux Pthread Workqueue Regulator usperspace.
7 * The Linux Pthread Workqueue Regulator is free software: you can
8 * redistribute it and/or modify it under the terms of the GNU Lesser
9 * General Public License as published by the Free Software Foundation,
10 * either version 2.1 of the License, or (at your option) any later version.
12 * The Linux Pthread Workqueue Regulator is distributed in the hope that it
13 * will be useful, but WITHOUT ANY WARRANTY; without even the implied
14 * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with The Linux Pthread Workqueue Regultaor.
19 * If not, see <http://www.gnu.org/licenses/>.
29 #include <sys/ioctl.h>
30 #include <sys/types.h>
33 #include "../kernel/pwqr.h"
38 #define THREADS_MAX 128 /* something dynamic is probably better */
40 /* Compiler stuff {{{ */
42 #if defined(__x86_64__)
43 # define mb() asm volatile("mfence":::"memory")
44 #elif defined(__i386__)
45 # define mb() asm volatile("lock; addl $0,0(%%esp)":::"memory")
47 # error unsupported architecture
50 #define container_of(obj, type_t, member) \
51 ({ const typeof(((type_t *)0)->member) *__mptr = (obj); \
52 ((type_t *)((char *)__mptr) - offsetof(type_t, member)); })
53 #define likely(expr) __builtin_expect(!!(expr), 1)
54 #define unlikely(expr) __builtin_expect((expr), 0)
56 #define barrier() asm volatile("":::"memory")
57 #define access_once(x) (*(volatile typeof(x) *)&(x))
59 #define atomic_fetch_and_add(ptr, n) __sync_fetch_and_add(ptr, n)
60 #define atomic_add(ptr, n) (void)atomic_fetch_and_add(ptr, n)
61 #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n)
62 #define atomic_sub(ptr, n) (void)atomic_fetch_and_sub(ptr, n)
64 #define ALWAYS_INLINE __attribute__((always_inline)) inline
67 /* pwqr wrapping {{{ */
69 static int pwqr_create(int flags)
71 if (flags & ~(O_NONBLOCK | O_CLOEXEC)) {
75 return open("/dev/"PWQR_DEVICE_NAME, O_RDWR | flags);
78 static int pwqr_ctl(int fd, int op, int val, void *uaddr)
80 struct pwqr_ioc_wait wait;
91 return ioctl(fd, op, val);
93 wait.pwqr_ticket = val;
94 wait.pwqr_uaddr = uaddr;
95 return ioctl(fd, op, &wait);
106 /* atomic */ int spinlock;
107 /* atomic */ int nthreads;
108 /* atomic */ int waiters;
109 /* atomic */ int parked;
113 #if __BYTE_ORDER == __LITTLE_ENDIAN
114 /* atomic */ uint32_t lo;
115 /* atomic */ uint32_t hi;
116 #elif __BYTE_ORDER == __BIG_ENDIAN
117 /* atomic */ uint32_t hi;
118 /* atomic */ uint32_t lo;
120 #error Unsupported endianness
129 static ALWAYS_INLINE void pwqr_lock(void)
131 while (unlikely(__sync_lock_test_and_set(&pwqr_g.spinlock, 1)))
135 static ALWAYS_INLINE void pwqr_unlock(void)
137 __sync_lock_release(&pwqr_g.spinlock);
140 static ALWAYS_INLINE uint64_t pwqr_get_ticket(void)
143 return access_once(pwqr_g.ticket);
146 static ALWAYS_INLINE void pwqr_signal_n(int n)
148 #if ULONG_MAX == UINT32_MAX
150 * We don't know how slow things are between a pwqr_get() and a
151 * pwqr_wait. Hence it's not safe to assume the "count" will never
152 * rotate fully between the two.
154 * In 64bits mode, well, we have 64-bits atomic increments and all is
155 * fine. In 32bits mode, we increment the high word manually every 2^24
156 * low-word increment.
158 * This assumes that at least one of the 256 threads that will try to
159 * perform the "hi" increment won't be stalled between deciding the modulo
160 * is zero and the increment itself for an almost full cycle (2^32 - 2^24)
161 * of the low word counter.
163 if (unlikely(atomic_fetch_and_add(&pwqr_g.lo, 1) % (1U << 24) == 0))
164 atomic_add(&pwqr_g.hi, 1);
166 atomic_add(&pwqr_g.ticket, 1);
168 if (atomic_fetch_and_add(&pwqr_g.waiters, 0))
169 pwqr_ctl(pwqr_g.fd, PWQR_WAKE, n, NULL);
171 static ALWAYS_INLINE void pwqr_signal(void)
176 static ALWAYS_INLINE void pwqr_signal_relaxed_n(int n)
178 if (access_once(pwqr_g.waiters))
181 static ALWAYS_INLINE void pwqr_signal_relaxed(void)
183 pwqr_signal_relaxed_n(1);
188 - INPOOL if we were woken to run jobs.
189 - PARKED if we were woken to be parked (possibly consume overcommit stuff)
191 static void pwqr_do_wait_cleanup(void *arg)
193 atomic_sub(&pwqr_g.waiters, 1);
195 static int pwqr_do_wait(uint64_t ticket)
200 #if ULONG_MAX == UINT32_MAX
201 if ((unsigned)(ticket >> 32) != access_once(pwqr_g.hi))
204 if (ticket != access_once(pwqr_g.ticket))
208 atomic_add(&pwqr_g.waiters, 1);
209 pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL);
210 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
212 rc = pwqr_ctl(pwqr_g.fd, PWQR_WAIT, ticket, &pwqr_g.lo);
214 if (errno == EINTR) {
216 } else if (errno == EDQUOT) {
219 assert (errno == EBADFD);
223 pthread_setcanceltype(canceltype, NULL);
224 pthread_cleanup_pop(1);
227 /* returns INPOOL if the thread is to run some jobs or go to WAIT
228 * returns -1 if the pwqr_g.fd is broken
230 static void pwqr_do_park_cleanup(void *arg)
232 atomic_sub(&pwqr_g.parked, 1);
234 static void pwqr_spawn_thread(int initial_state, int count);
235 static int pwqr_do_park(void)
237 int canceltype, rc = -1;
239 if (atomic_fetch_and_add(&pwqr_g.parked, 1) > PARKED_MAX) {
240 atomic_sub(&pwqr_g.parked, 1);
244 pthread_cleanup_push(&pwqr_do_park_cleanup, NULL);
245 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
249 while ((job = find_some_job(OC_JOB)))
253 if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_PARK, 0, NULL)) == 0) {
254 if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) {
255 pwqr_spawn_thread(PARKED, 1);
261 if (errno == EBADFD) {
262 atomic_sub(&pwqr_g.parked, 1);
265 assert (errno == EINTR || errno == EDQUOT);
270 pthread_setcanceltype(canceltype, NULL);
271 pthread_cleanup_pop(0);
275 static void pwqr_main_cleanup(void *arg)
277 atomic_sub(&pwqr_g.nthreads, 1);
279 static void *pwqr_main(void *arg)
281 int state = (long)arg;
283 if (atomic_fetch_and_add(&pwqr_g.nthreads, 1) > THREADS_MAX)
286 pthread_cleanup_push(&pwqr_main_cleanup, NULL);
287 if (pwqr_ctl(pwqr_g.fd, PWQR_REGISTER, 0, NULL) < 0)
293 if (state == PARKED && (state = pwqr_do_park()) < 0)
296 while ((job = find_some_job(ANY_JOB)))
299 ticket = pwqr_get_ticket();
301 if ((job = find_some_job(ANY_JOB))) {
306 state = pwqr_do_wait(ticket);
307 } while (state >= 0);
310 pthread_cleanup_pop(1);
314 static void pwqr_spawn_thread(int initial_state, int count)
319 pthread_attr_init(&attr);
320 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
322 pthread_create(&thr, &attr, pwqr_main, (void *)(long)initial_state);
323 pthread_attr_destroy(&attr);
326 __attribute__((cold, noinline, unused))
327 static int _pthread_workqueue_init_np(void)
342 n = pwqr_ctl(pwqr_g.fd, PWQR_GET_CONC, 0, NULL) + 4;
343 pwqr_spawn_thread(INPOOL, n);
344 pwqr_spawn_thread(PARKED, 4);