2 * Copyright (C) 2012 Pierre Habouzit <pierre.habouzit@intersec.com>
3 * Copyright (C) 2012 Intersec SAS
5 * This file implements the Linux Pthread Workqueue Regulator usperspace.
7 * The Linux Pthread Workqueue Regulator is free software: you can
8 * redistribute it and/or modify it under the terms of the GNU Lesser
9 * General Public License as published by the Free Software Foundation,
10 * either version 2.1 of the License, or (at your option) any later version.
12 * The Linux Pthread Workqueue Regulator is distributed in the hope that it
13 * will be useful, but WITHOUT ANY WARRANTY; without even the implied
14 * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public License
18 * along with The Linux Pthread Workqueue Regultaor.
19 * If not, see <http://www.gnu.org/licenses/>.
29 #include <sys/ioctl.h>
31 #include <sys/types.h>
34 #include "../kernel/pwqr.h"
39 #define THREADS_MAX 128 /* something dynamic is probably better */
41 /* Compiler stuff {{{ */
43 #if defined(__x86_64__)
44 # define mb() asm volatile("mfence":::"memory")
45 #elif defined(__i386__)
46 # define mb() asm volatile("lock; addl $0,0(%%esp)":::"memory")
48 # error unsupported architecture
51 #define container_of(obj, type_t, member) \
52 ({ const typeof(((type_t *)0)->member) *__mptr = (obj); \
53 ((type_t *)((char *)__mptr) - offsetof(type_t, member)); })
54 #define likely(expr) __builtin_expect(!!(expr), 1)
55 #define unlikely(expr) __builtin_expect((expr), 0)
57 #define barrier() asm volatile("":::"memory")
58 #define access_once(x) (*(volatile typeof(x) *)&(x))
60 #define atomic_fetch_and_add(ptr, n) __sync_fetch_and_add(ptr, n)
61 #define atomic_add(ptr, n) (void)atomic_fetch_and_add(ptr, n)
62 #define atomic_fetch_and_sub(ptr, n) __sync_fetch_and_sub(ptr, n)
63 #define atomic_sub(ptr, n) (void)atomic_fetch_and_sub(ptr, n)
64 #define atomic_xchg(ptr, v) ((typeof(*(ptr)))__sync_lock_test_and_set(ptr, v))
66 #define ALWAYS_INLINE __attribute__((always_inline)) inline
69 /* pwqr wrapping {{{ */
71 static int pwqr_create(int flags)
73 if (flags & ~PWQR_FL__SET) {
77 return open("/dev/"PWQR_DEVICE_NAME, O_RDWR | flags);
80 static int pwqr_ctl(int fd, int op, int val, void *uaddr)
82 struct pwqr_ioc_wait wait;
85 case PWQR_CTL_GET_CONC:
86 case PWQR_CTL_REGISTER:
87 case PWQR_CTL_UNREGISTER:
90 case PWQR_CTL_SET_CONC:
92 case PWQR_CTL_WAKE_OC:
93 return ioctl(fd, op, val);
95 wait.pwqr_ticket = val;
96 wait.pwqr_uaddr = uaddr;
97 return ioctl(fd, op, &wait);
108 /* atomic */ int spinlock;
109 /* atomic */ int nthreads;
110 /* atomic */ int waiters;
111 /* atomic */ int parked;
112 /* atomic */ int overcommit_count;
116 #if __BYTE_ORDER == __LITTLE_ENDIAN
117 /* atomic */ uint32_t lo;
118 /* atomic */ uint32_t hi;
119 #elif __BYTE_ORDER == __BIG_ENDIAN
120 /* atomic */ uint32_t hi;
121 /* atomic */ uint32_t lo;
123 #error Unsupported endianness
132 static ALWAYS_INLINE void pwqr_lock(void)
134 while (unlikely(__sync_lock_test_and_set(&pwqr_g.spinlock, 1)))
138 static ALWAYS_INLINE void pwqr_unlock(void)
140 __sync_lock_release(&pwqr_g.spinlock);
143 static ALWAYS_INLINE uint64_t pwqr_get_ticket(void)
146 return access_once(pwqr_g.ticket);
149 static ALWAYS_INLINE void pwqr_signal_n(int n)
151 #if ULONG_MAX == UINT32_MAX
153 * We don't know how slow things are between a pwqr_get() and a
154 * pwqr_wait. Hence it's not safe to assume the "count" will never
155 * rotate fully between the two.
157 * In 64bits mode, well, we have 64-bits atomic increments and all is
158 * fine. In 32bits mode, we increment the high word manually every 2^24
159 * low-word increment.
161 * This assumes that at least one of the 256 threads that will try to
162 * perform the "hi" increment won't be stalled between deciding the modulo
163 * is zero and the increment itself for an almost full cycle (2^32 - 2^24)
164 * of the low word counter.
166 if (unlikely(atomic_fetch_and_add(&pwqr_g.lo, 1) % (1U << 24) == 0))
167 atomic_add(&pwqr_g.hi, 1);
169 atomic_add(&pwqr_g.ticket, 1);
171 if (atomic_fetch_and_add(&pwqr_g.waiters, 0))
172 pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAKE, n, NULL);
174 static ALWAYS_INLINE void pwqr_signal(void)
179 static ALWAYS_INLINE void pwqr_signal_relaxed_n(int n)
181 if (access_once(pwqr_g.waiters))
184 static ALWAYS_INLINE void pwqr_signal_relaxed(void)
186 pwqr_signal_relaxed_n(1);
189 static int pwqr_fd_read_overcommit(int fd)
193 if (read(fd, &buf, sizeof(buf)) == sizeof(buf))
198 __attribute__((cold))
199 static int pwqr_fd_overcommit_check(void)
201 if (atomic_xchg(&pwqr_g.overcommit_count, 0)) {
202 int oc = pwqr_fd_read_overcommit(pwqr_g.fd);
205 access_once(pwqr_g.overcommit_count) = (oc - 1);
212 __attribute__((unused))
213 static void pwqr_overcommit_poll_loop(void)
215 struct pollfd pfd = {
222 if (poll(&pfd, 1, -1) >= 0) {
223 if (pfd.revents & POLLIN) {
224 access_once(pwqr_g.overcommit_count) =
225 pwqr_fd_read_overcommit(pwqr_g.fd);
227 if (pfd.revents & (POLLHUP | POLLERR))
229 } else if (errno != EINTR && errno != EAGAIN) {
237 - INPOOL if we were woken to run jobs.
238 - PARKED if we were woken to be parked (possibly consume overcommit stuff)
240 static void pwqr_do_wait_cleanup(void *arg)
242 atomic_sub(&pwqr_g.waiters, 1);
244 static int pwqr_do_wait(uint64_t ticket)
249 #if ULONG_MAX == UINT32_MAX
250 if ((unsigned)(ticket >> 32) != access_once(pwqr_g.hi))
253 if (ticket != access_once(pwqr_g.ticket))
257 atomic_add(&pwqr_g.waiters, 1);
258 pthread_cleanup_push(&pwqr_do_wait_cleanup, NULL);
259 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
261 rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_WAIT, ticket, &pwqr_g.lo);
263 if (errno == EINTR) {
265 } else if (errno == EDQUOT) {
268 assert (errno == EBADFD);
272 pthread_setcanceltype(canceltype, NULL);
273 pthread_cleanup_pop(1);
276 /* returns INPOOL if the thread is to run some jobs or go to WAIT
277 * returns -1 if the pwqr_g.fd is broken
279 static void pwqr_do_park_cleanup(void *arg)
281 atomic_sub(&pwqr_g.parked, 1);
283 static void pwqr_spawn_thread(int initial_state, int count);
284 static int pwqr_do_park(void)
286 int canceltype, rc = -1;
288 if (atomic_fetch_and_add(&pwqr_g.parked, 1) > PARKED_MAX) {
289 atomic_sub(&pwqr_g.parked, 1);
293 pthread_cleanup_push(&pwqr_do_park_cleanup, NULL);
294 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &canceltype);
298 while ((job = find_some_job(OC_JOB)))
302 if ((rc = pwqr_ctl(pwqr_g.fd, PWQR_CTL_PARK, 0, NULL)) == 0) {
303 access_once(pwqr_g.overcommit_count) = 0;
304 if (atomic_fetch_and_sub(&pwqr_g.parked, 1) == 0) {
305 pwqr_spawn_thread(PARKED, 1);
311 if (errno == EBADFD) {
312 atomic_sub(&pwqr_g.parked, 1);
315 assert (errno == EINTR || errno == EDQUOT);
320 pthread_setcanceltype(canceltype, NULL);
321 pthread_cleanup_pop(0);
325 static void pwqr_main_cleanup(void *arg)
327 atomic_sub(&pwqr_g.nthreads, 1);
329 static void *pwqr_main(void *arg)
331 int state = (long)arg;
333 if (atomic_fetch_and_add(&pwqr_g.nthreads, 1) > THREADS_MAX)
336 pthread_cleanup_push(&pwqr_main_cleanup, NULL);
337 if (pwqr_ctl(pwqr_g.fd, PWQR_CTL_REGISTER, 0, NULL) < 0)
344 if (state == PARKED && (state = pwqr_do_park()) < 0)
347 if (unlikely(pwqr_g.overcommit_count)) {
348 if (pwqr_fd_overcommit_check()) {
354 job = find_some_job(ANY_JOB);
360 ticket = pwqr_get_ticket();
362 if ((job = find_some_job(ANY_JOB))) {
367 state = pwqr_do_wait(ticket);
368 } while (state >= 0);
371 pthread_cleanup_pop(1);
375 static void pwqr_spawn_thread(int initial_state, int count)
380 pthread_attr_init(&attr);
381 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
383 pthread_create(&thr, &attr, pwqr_main, (void *)(long)initial_state);
384 pthread_attr_destroy(&attr);
387 __attribute__((cold, noinline, unused))
388 static int _pthread_workqueue_init_np(void)
403 n = pwqr_ctl(pwqr_g.fd, PWQR_CTL_GET_CONC, 0, NULL) + 4;
404 pwqr_spawn_thread(INPOOL, n);
405 pwqr_spawn_thread(PARKED, 4);