1 /*
2 * Copyright (c) 2000-2020 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
29
30 #include <sys/cdefs.h>
31
32 #include <kern/assert.h>
33 #include <kern/ast.h>
34 #include <kern/clock.h>
35 #include <kern/cpu_data.h>
36 #include <kern/kern_types.h>
37 #include <kern/policy_internal.h>
38 #include <kern/processor.h>
39 #include <kern/sched_prim.h> /* for thread_exception_return */
40 #include <kern/task.h>
41 #include <kern/thread.h>
42 #include <kern/thread_group.h>
43 #include <kern/zalloc.h>
44 #include <mach/kern_return.h>
45 #include <mach/mach_param.h>
46 #include <mach/mach_port.h>
47 #include <mach/mach_types.h>
48 #include <mach/mach_vm.h>
49 #include <mach/sync_policy.h>
50 #include <mach/task.h>
51 #include <mach/thread_act.h> /* for thread_resume */
52 #include <mach/thread_policy.h>
53 #include <mach/thread_status.h>
54 #include <mach/vm_prot.h>
55 #include <mach/vm_statistics.h>
56 #include <machine/atomic.h>
57 #include <machine/machine_routines.h>
58 #include <machine/smp.h>
59 #include <vm/vm_map.h>
60 #include <vm/vm_protos.h>
61
62 #include <sys/eventvar.h>
63 #include <sys/kdebug.h>
64 #include <sys/kernel.h>
65 #include <sys/lock.h>
66 #include <sys/param.h>
67 #include <sys/proc_info.h> /* for fill_procworkqueue */
68 #include <sys/proc_internal.h>
69 #include <sys/pthread_shims.h>
70 #include <sys/resourcevar.h>
71 #include <sys/signalvar.h>
72 #include <sys/sysctl.h>
73 #include <sys/sysproto.h>
74 #include <sys/systm.h>
75 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
76
77 #include <pthread/bsdthread_private.h>
78 #include <pthread/workqueue_syscalls.h>
79 #include <pthread/workqueue_internal.h>
80 #include <pthread/workqueue_trace.h>
81
82 #include <os/log.h>
83
84 static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2;
85 static void workq_schedule_creator(proc_t p, struct workqueue *wq,
86 workq_kern_threadreq_flags_t flags);
87
88 static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
89 workq_threadreq_t req);
90
91 static uint32_t workq_constrained_allowance(struct workqueue *wq,
92 thread_qos_t at_qos, struct uthread *uth, bool may_start_timer);
93
94 static bool _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq);
95
96 static bool workq_thread_is_busy(uint64_t cur_ts,
97 _Atomic uint64_t *lastblocked_tsp);
98
99 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
100
101 static bool
102 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags);
103
104 static inline void
105 workq_lock_spin(struct workqueue *wq);
106
107 static inline void
108 workq_unlock(struct workqueue *wq);
109
110 #pragma mark globals
111
112 struct workq_usec_var {
113 uint32_t usecs;
114 uint64_t abstime;
115 };
116
117 #define WORKQ_SYSCTL_USECS(var, init) \
118 static struct workq_usec_var var = { .usecs = init }; \
119 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
120 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
121 workq_sysctl_handle_usecs, "I", "")
122
123 static LCK_GRP_DECLARE(workq_lck_grp, "workq");
124 os_refgrp_decl(static, workq_refgrp, "workq", NULL);
125
126 static ZONE_DEFINE(workq_zone_workqueue, "workq.wq",
127 sizeof(struct workqueue), ZC_NONE);
128 static ZONE_DEFINE(workq_zone_threadreq, "workq.threadreq",
129 sizeof(struct workq_threadreq_s), ZC_CACHING);
130
131 static struct mpsc_daemon_queue workq_deallocate_queue;
132
133 WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS);
134 WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
135 WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS);
136 static uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
137 static uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
138 static uint32_t wq_init_constrained_limit = 1;
139 static uint16_t wq_death_max_load;
140 static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS];
141
142 /*
143 * This is not a hard limit but the max size we want to aim to hit across the
144 * entire cooperative pool. We can oversubscribe the pool due to non-cooperative
145 * workers and the max we will oversubscribe the pool by, is a total of
146 * wq_max_cooperative_threads * WORKQ_NUM_QOS_BUCKETS.
147 */
148 static uint32_t wq_max_cooperative_threads;
149
150 static inline uint32_t
wq_cooperative_queue_max_size(struct workqueue * wq)151 wq_cooperative_queue_max_size(struct workqueue *wq)
152 {
153 return wq->wq_cooperative_queue_has_limited_max_size ? 1 : wq_max_cooperative_threads;
154 }
155
156 #pragma mark sysctls
157
158 static int
159 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
160 {
161 #pragma unused(arg2)
162 struct workq_usec_var *v = arg1;
163 int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
164 if (error || !req->newptr) {
165 return error;
166 }
167 clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
168 &v->abstime);
169 return 0;
170 }
171
172 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
173 &wq_max_threads, 0, "");
174
175 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
176 &wq_max_constrained_threads, 0, "");
177
178 static int
179 wq_limit_cooperative_threads_for_proc SYSCTL_HANDLER_ARGS
180 {
181 #pragma unused(arg1, arg2, oidp)
182 int input_pool_size = 0;
183 int changed;
184 int error = 0;
185
186 error = sysctl_io_number(req, 0, sizeof(int), &input_pool_size, &changed);
187 if (error || !changed) {
188 return error;
189 }
190
191 #define WQ_COOPERATIVE_POOL_SIZE_DEFAULT 0
192 #define WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS -1
193 /* Not available currently, but sysctl interface is designed to allow these
194 * extra parameters:
195 * WQ_COOPERATIVE_POOL_SIZE_STRICT : -2 (across all bucket)
196 * WQ_COOPERATIVE_POOL_SIZE_CUSTOM : [1, 512]
197 */
198
199 if (input_pool_size != WQ_COOPERATIVE_POOL_SIZE_DEFAULT
200 && input_pool_size != WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS) {
201 error = EINVAL;
202 goto out;
203 }
204
205 proc_t p = req->p;
206 struct workqueue *wq = proc_get_wqptr(p);
207
208 if (wq != NULL) {
209 workq_lock_spin(wq);
210 if (wq->wq_reqcount > 0 || wq->wq_nthreads > 0) {
211 // Hackily enforce that the workqueue is still new (no requests or
212 // threads)
213 error = ENOTSUP;
214 } else {
215 wq->wq_cooperative_queue_has_limited_max_size = (input_pool_size == WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS);
216 }
217 workq_unlock(wq);
218 } else {
219 /* This process has no workqueue, calling this syctl makes no sense */
220 return ENOTSUP;
221 }
222
223 out:
224 return error;
225 }
226
227 SYSCTL_PROC(_kern, OID_AUTO, wq_limit_cooperative_threads,
228 CTLFLAG_ANYBODY | CTLFLAG_MASKED | CTLFLAG_WR | CTLFLAG_LOCKED | CTLTYPE_INT, 0, 0,
229 wq_limit_cooperative_threads_for_proc,
230 "I", "Modify the max pool size of the cooperative pool");
231
232 #pragma mark p_wqptr
233
234 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
235
236 static struct workqueue *
proc_get_wqptr_fast(struct proc * p)237 proc_get_wqptr_fast(struct proc *p)
238 {
239 return os_atomic_load(&p->p_wqptr, relaxed);
240 }
241
242 struct workqueue *
proc_get_wqptr(struct proc * p)243 proc_get_wqptr(struct proc *p)
244 {
245 struct workqueue *wq = proc_get_wqptr_fast(p);
246 return wq == WQPTR_IS_INITING_VALUE ? NULL : wq;
247 }
248
249 static void
proc_set_wqptr(struct proc * p,struct workqueue * wq)250 proc_set_wqptr(struct proc *p, struct workqueue *wq)
251 {
252 wq = os_atomic_xchg(&p->p_wqptr, wq, release);
253 if (wq == WQPTR_IS_INITING_VALUE) {
254 proc_lock(p);
255 thread_wakeup(&p->p_wqptr);
256 proc_unlock(p);
257 }
258 }
259
260 static bool
proc_init_wqptr_or_wait(struct proc * p)261 proc_init_wqptr_or_wait(struct proc *p)
262 {
263 struct workqueue *wq;
264
265 proc_lock(p);
266 wq = os_atomic_load(&p->p_wqptr, relaxed);
267
268 if (wq == NULL) {
269 os_atomic_store(&p->p_wqptr, WQPTR_IS_INITING_VALUE, relaxed);
270 proc_unlock(p);
271 return true;
272 }
273
274 if (wq == WQPTR_IS_INITING_VALUE) {
275 assert_wait(&p->p_wqptr, THREAD_UNINT);
276 proc_unlock(p);
277 thread_block(THREAD_CONTINUE_NULL);
278 } else {
279 proc_unlock(p);
280 }
281 return false;
282 }
283
284 static inline event_t
workq_parked_wait_event(struct uthread * uth)285 workq_parked_wait_event(struct uthread *uth)
286 {
287 return (event_t)&uth->uu_workq_stackaddr;
288 }
289
290 static inline void
workq_thread_wakeup(struct uthread * uth)291 workq_thread_wakeup(struct uthread *uth)
292 {
293 thread_wakeup_thread(workq_parked_wait_event(uth), get_machthread(uth));
294 }
295
296 #pragma mark wq_thactive
297
298 #if defined(__LP64__)
299 // Layout is:
300 // 127 - 115 : 13 bits of zeroes
301 // 114 - 112 : best QoS among all pending constrained requests
302 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
303 #define WQ_THACTIVE_BUCKET_WIDTH 16
304 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
305 #else
306 // Layout is:
307 // 63 - 61 : best QoS among all pending constrained requests
308 // 60 : Manager bucket (0 or 1)
309 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
310 #define WQ_THACTIVE_BUCKET_WIDTH 10
311 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
312 #endif
313 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
314 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
315
316 static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3,
317 "Make sure we have space to encode a QoS");
318
319 static inline wq_thactive_t
_wq_thactive(struct workqueue * wq)320 _wq_thactive(struct workqueue *wq)
321 {
322 return os_atomic_load_wide(&wq->wq_thactive, relaxed);
323 }
324
325 static inline uint8_t
_wq_bucket(thread_qos_t qos)326 _wq_bucket(thread_qos_t qos)
327 {
328 // Map both BG and MT to the same bucket by over-shifting down and
329 // clamping MT and BG together.
330 switch (qos) {
331 case THREAD_QOS_MAINTENANCE:
332 return 0;
333 default:
334 return qos - 2;
335 }
336 }
337
338 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
339 ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT))
340
341 static inline thread_qos_t
_wq_thactive_best_constrained_req_qos(struct workqueue * wq)342 _wq_thactive_best_constrained_req_qos(struct workqueue *wq)
343 {
344 // Avoid expensive atomic operations: the three bits we're loading are in
345 // a single byte, and always updated under the workqueue lock
346 wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive;
347 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v);
348 }
349
350 static void
_wq_thactive_refresh_best_constrained_req_qos(struct workqueue * wq)351 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq)
352 {
353 thread_qos_t old_qos, new_qos;
354 workq_threadreq_t req;
355
356 req = priority_queue_max(&wq->wq_constrained_queue,
357 struct workq_threadreq_s, tr_entry);
358 new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED;
359 old_qos = _wq_thactive_best_constrained_req_qos(wq);
360 if (old_qos != new_qos) {
361 long delta = (long)new_qos - (long)old_qos;
362 wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT;
363 /*
364 * We can do an atomic add relative to the initial load because updates
365 * to this qos are always serialized under the workqueue lock.
366 */
367 v = os_atomic_add(&wq->wq_thactive, v, relaxed);
368 #ifdef __LP64__
369 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v,
370 (uint64_t)(v >> 64), 0);
371 #else
372 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0);
373 #endif
374 }
375 }
376
377 static inline wq_thactive_t
_wq_thactive_offset_for_qos(thread_qos_t qos)378 _wq_thactive_offset_for_qos(thread_qos_t qos)
379 {
380 uint8_t bucket = _wq_bucket(qos);
381 __builtin_assume(bucket < WORKQ_NUM_BUCKETS);
382 return (wq_thactive_t)1 << (bucket * WQ_THACTIVE_BUCKET_WIDTH);
383 }
384
385 static inline wq_thactive_t
_wq_thactive_inc(struct workqueue * wq,thread_qos_t qos)386 _wq_thactive_inc(struct workqueue *wq, thread_qos_t qos)
387 {
388 wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
389 return os_atomic_add_orig(&wq->wq_thactive, v, relaxed);
390 }
391
392 static inline wq_thactive_t
_wq_thactive_dec(struct workqueue * wq,thread_qos_t qos)393 _wq_thactive_dec(struct workqueue *wq, thread_qos_t qos)
394 {
395 wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
396 return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed);
397 }
398
399 static inline void
_wq_thactive_move(struct workqueue * wq,thread_qos_t old_qos,thread_qos_t new_qos)400 _wq_thactive_move(struct workqueue *wq,
401 thread_qos_t old_qos, thread_qos_t new_qos)
402 {
403 wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) -
404 _wq_thactive_offset_for_qos(old_qos);
405 os_atomic_add(&wq->wq_thactive, v, relaxed);
406 wq->wq_thscheduled_count[_wq_bucket(old_qos)]--;
407 wq->wq_thscheduled_count[_wq_bucket(new_qos)]++;
408 }
409
410 static inline uint32_t
_wq_thactive_aggregate_downto_qos(struct workqueue * wq,wq_thactive_t v,thread_qos_t qos,uint32_t * busycount,uint32_t * max_busycount)411 _wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v,
412 thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount)
413 {
414 uint32_t count = 0, active;
415 uint64_t curtime;
416
417 assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX);
418
419 if (busycount) {
420 curtime = mach_absolute_time();
421 *busycount = 0;
422 }
423 if (max_busycount) {
424 *max_busycount = THREAD_QOS_LAST - qos;
425 }
426
427 uint8_t i = _wq_bucket(qos);
428 v >>= i * WQ_THACTIVE_BUCKET_WIDTH;
429 for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) {
430 active = v & WQ_THACTIVE_BUCKET_MASK;
431 count += active;
432
433 if (busycount && wq->wq_thscheduled_count[i] > active) {
434 if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) {
435 /*
436 * We only consider the last blocked thread for a given bucket
437 * as busy because we don't want to take the list lock in each
438 * sched callback. However this is an approximation that could
439 * contribute to thread creation storms.
440 */
441 (*busycount)++;
442 }
443 }
444 }
445
446 return count;
447 }
448
449 /* The input qos here should be the requested QoS of the thread, not accounting
450 * for any overrides */
451 static inline void
_wq_cooperative_queue_scheduled_count_dec(struct workqueue * wq,thread_qos_t qos)452 _wq_cooperative_queue_scheduled_count_dec(struct workqueue *wq, thread_qos_t qos)
453 {
454 __assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]--;
455 assert(old_scheduled_count > 0);
456 }
457
458 /* The input qos here should be the requested QoS of the thread, not accounting
459 * for any overrides */
460 static inline void
_wq_cooperative_queue_scheduled_count_inc(struct workqueue * wq,thread_qos_t qos)461 _wq_cooperative_queue_scheduled_count_inc(struct workqueue *wq, thread_qos_t qos)
462 {
463 __assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]++;
464 assert(old_scheduled_count < UINT8_MAX);
465 }
466
467 #pragma mark wq_flags
468
469 static inline uint32_t
_wq_flags(struct workqueue * wq)470 _wq_flags(struct workqueue *wq)
471 {
472 return os_atomic_load(&wq->wq_flags, relaxed);
473 }
474
475 static inline bool
_wq_exiting(struct workqueue * wq)476 _wq_exiting(struct workqueue *wq)
477 {
478 return _wq_flags(wq) & WQ_EXITING;
479 }
480
481 bool
workq_is_exiting(struct proc * p)482 workq_is_exiting(struct proc *p)
483 {
484 struct workqueue *wq = proc_get_wqptr(p);
485 return !wq || _wq_exiting(wq);
486 }
487
488
489 #pragma mark workqueue lock
490
491 static bool
workq_lock_is_acquired_kdp(struct workqueue * wq)492 workq_lock_is_acquired_kdp(struct workqueue *wq)
493 {
494 return kdp_lck_ticket_is_acquired(&wq->wq_lock);
495 }
496
497 static inline void
workq_lock_spin(struct workqueue * wq)498 workq_lock_spin(struct workqueue *wq)
499 {
500 lck_ticket_lock(&wq->wq_lock, &workq_lck_grp);
501 }
502
503 static inline void
workq_lock_held(struct workqueue * wq)504 workq_lock_held(struct workqueue *wq)
505 {
506 LCK_TICKET_ASSERT_OWNED(&wq->wq_lock);
507 }
508
509 static inline bool
workq_lock_try(struct workqueue * wq)510 workq_lock_try(struct workqueue *wq)
511 {
512 return lck_ticket_lock_try(&wq->wq_lock, &workq_lck_grp);
513 }
514
515 static inline void
workq_unlock(struct workqueue * wq)516 workq_unlock(struct workqueue *wq)
517 {
518 lck_ticket_unlock(&wq->wq_lock);
519 }
520
521 #pragma mark idle thread lists
522
523 #define WORKQ_POLICY_INIT(qos) \
524 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
525
526 static inline thread_qos_t
workq_pri_bucket(struct uu_workq_policy req)527 workq_pri_bucket(struct uu_workq_policy req)
528 {
529 return MAX(MAX(req.qos_req, req.qos_max), req.qos_override);
530 }
531
532 static inline thread_qos_t
workq_pri_override(struct uu_workq_policy req)533 workq_pri_override(struct uu_workq_policy req)
534 {
535 return MAX(workq_pri_bucket(req), req.qos_bucket);
536 }
537
538 static inline bool
workq_thread_needs_params_change(workq_threadreq_t req,struct uthread * uth)539 workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth)
540 {
541 workq_threadreq_param_t cur_trp, req_trp = { };
542
543 cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params;
544 if (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
545 req_trp = kqueue_threadreq_workloop_param(req);
546 }
547
548 /*
549 * CPU percent flags are handled separately to policy changes, so ignore
550 * them for all of these checks.
551 */
552 uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT);
553 uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT);
554
555 if (!req_flags && !cur_flags) {
556 return false;
557 }
558
559 if (req_flags != cur_flags) {
560 return true;
561 }
562
563 if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) {
564 return true;
565 }
566
567 if ((req_flags & TRP_POLICY) && req_trp.trp_pol != cur_trp.trp_pol) {
568 return true;
569 }
570
571 return false;
572 }
573
574 static inline bool
workq_thread_needs_priority_change(workq_threadreq_t req,struct uthread * uth)575 workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth)
576 {
577 if (workq_thread_needs_params_change(req, uth)) {
578 return true;
579 }
580
581 if (req->tr_qos != workq_pri_override(uth->uu_workq_pri)) {
582 return true;
583 }
584
585 #if CONFIG_PREADOPT_TG
586 thread_group_qos_t tg = kqr_preadopt_thread_group(req);
587 if (KQWL_HAS_VALID_PREADOPTED_TG(tg)) {
588 /*
589 * Ideally, we'd add check here to see if thread's preadopt TG is same
590 * as the thread requests's thread group and short circuit if that is
591 * the case. But in the interest of keeping the code clean and not
592 * taking the thread lock here, we're going to skip this. We will
593 * eventually shortcircuit once we try to set the preadoption thread
594 * group on the thread.
595 */
596 return true;
597 }
598 #endif
599
600 return false;
601 }
602
603 /* Input thread must be self. Called during self override, resetting overrides
604 * or while processing kevents
605 *
606 * Called with workq lock held. Sometimes also the thread mutex
607 */
608 static void
workq_thread_update_bucket(proc_t p,struct workqueue * wq,struct uthread * uth,struct uu_workq_policy old_pri,struct uu_workq_policy new_pri,bool force_run)609 workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth,
610 struct uu_workq_policy old_pri, struct uu_workq_policy new_pri,
611 bool force_run)
612 {
613 assert(uth == current_uthread());
614
615 thread_qos_t old_bucket = old_pri.qos_bucket;
616 thread_qos_t new_bucket = workq_pri_bucket(new_pri);
617
618 if (old_bucket != new_bucket) {
619 _wq_thactive_move(wq, old_bucket, new_bucket);
620 }
621
622 new_pri.qos_bucket = new_bucket;
623 uth->uu_workq_pri = new_pri;
624
625 if (old_pri.qos_override != new_pri.qos_override) {
626 thread_set_workq_override(get_machthread(uth), new_pri.qos_override);
627 }
628
629 if (wq->wq_reqcount && (old_bucket > new_bucket || force_run)) {
630 int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS;
631 if (old_bucket > new_bucket) {
632 /*
633 * When lowering our bucket, we may unblock a thread request,
634 * but we can't drop our priority before we have evaluated
635 * whether this is the case, and if we ever drop the workqueue lock
636 * that would cause a priority inversion.
637 *
638 * We hence have to disallow thread creation in that case.
639 */
640 flags = 0;
641 }
642 workq_schedule_creator(p, wq, flags);
643 }
644 }
645
646 /*
647 * Sets/resets the cpu percent limits on the current thread. We can't set
648 * these limits from outside of the current thread, so this function needs
649 * to be called when we're executing on the intended
650 */
651 static void
workq_thread_reset_cpupercent(workq_threadreq_t req,struct uthread * uth)652 workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth)
653 {
654 assert(uth == current_uthread());
655 workq_threadreq_param_t trp = { };
656
657 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
658 trp = kqueue_threadreq_workloop_param(req);
659 }
660
661 if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) {
662 /*
663 * Going through disable when we have an existing CPU percent limit
664 * set will force the ledger to refill the token bucket of the current
665 * thread. Removing any penalty applied by previous thread use.
666 */
667 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0);
668 uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT;
669 }
670
671 if (trp.trp_flags & TRP_CPUPERCENT) {
672 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent,
673 (uint64_t)trp.trp_refillms * NSEC_PER_SEC);
674 uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT;
675 }
676 }
677
678 /* Called with the workq lock held */
679 static void
workq_thread_reset_pri(struct workqueue * wq,struct uthread * uth,workq_threadreq_t req,bool unpark)680 workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth,
681 workq_threadreq_t req, bool unpark)
682 {
683 thread_t th = get_machthread(uth);
684 thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP;
685 workq_threadreq_param_t trp = { };
686 int priority = 31;
687 int policy = POLICY_TIMESHARE;
688
689 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
690 trp = kqueue_threadreq_workloop_param(req);
691 }
692
693 uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
694 uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS;
695
696 if (unpark) {
697 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
698 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
699 uth->uu_save.uus_workq_park_data.qos = qos;
700 }
701
702 if (qos == WORKQ_THREAD_QOS_MANAGER) {
703 uint32_t mgr_pri = wq->wq_event_manager_priority;
704 assert(trp.trp_value == 0); // manager qos and thread policy don't mix
705
706 if (_pthread_priority_has_sched_pri(mgr_pri)) {
707 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
708 thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri,
709 POLICY_TIMESHARE);
710 return;
711 }
712
713 qos = _pthread_priority_thread_qos(mgr_pri);
714 } else {
715 if (trp.trp_flags & TRP_PRIORITY) {
716 qos = THREAD_QOS_UNSPECIFIED;
717 priority = trp.trp_pri;
718 uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS;
719 }
720
721 if (trp.trp_flags & TRP_POLICY) {
722 policy = trp.trp_pol;
723 }
724 }
725
726 #if CONFIG_PREADOPT_TG
727 if (req && (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP)) {
728 /*
729 * We cannot safely read and borrow the reference from the kqwl since it
730 * can disappear from under us at any time due to the max-ing logic in
731 * kqueue_set_preadopted_thread_group.
732 *
733 * As such, we do the following dance:
734 *
735 * 1) cmpxchng and steal the kqwl's preadopt thread group and leave
736 * behind with (NULL + QoS). At this point, we have the reference
737 * to the thread group from the kqwl.
738 * 2) Have the thread set the preadoption thread group on itself.
739 * 3) cmpxchng from (NULL + QoS) which we set earlier in (1), back to
740 * thread_group + QoS. ie we try to give the reference back to the kqwl.
741 * If we fail, that's because a higher QoS thread group was set on the
742 * kqwl in kqueue_set_preadopted_thread_group in which case, we need to
743 * go back to (1).
744 */
745
746 _Atomic(struct thread_group *) * tg_loc = kqr_preadopt_thread_group_addr(req);
747
748 thread_group_qos_t old_tg, new_tg;
749 int ret = 0;
750 again:
751 ret = os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
752 if (!KQWL_HAS_VALID_PREADOPTED_TG(old_tg)) {
753 os_atomic_rmw_loop_give_up(break);
754 }
755
756 /*
757 * Leave the QoS behind - kqueue_set_preadopted_thread_group will
758 * only modify it if there is a higher QoS thread group to attach
759 */
760 new_tg = (thread_group_qos_t) ((uintptr_t) old_tg & KQWL_PREADOPT_TG_QOS_MASK);
761 });
762
763 if (ret) {
764 /*
765 * We successfully took the ref from the kqwl so set it on the
766 * thread now
767 */
768 thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg));
769
770 thread_group_qos_t thread_group_to_expect = new_tg;
771 thread_group_qos_t thread_group_to_set = old_tg;
772
773 os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
774 if (old_tg != thread_group_to_expect) {
775 /*
776 * There was an intervening write to the kqwl_preadopt_tg,
777 * and it has a higher QoS than what we are working with
778 * here. Abandon our current adopted thread group and redo
779 * the full dance
780 */
781 thread_group_deallocate_safe(KQWL_GET_PREADOPTED_TG(thread_group_to_set));
782 os_atomic_rmw_loop_give_up(goto again);
783 }
784
785 new_tg = thread_group_to_set;
786 });
787 } else {
788 /* Nothing valid on the kqwl, just clear what's on the thread */
789 thread_set_preadopt_thread_group(th, NULL);
790 }
791 } else {
792 /* Not even a kqwl, clear what's on the thread */
793 thread_set_preadopt_thread_group(th, NULL);
794 }
795 #endif
796 thread_set_workq_pri(th, qos, priority, policy);
797 }
798
799 /*
800 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
801 * every time a servicer is being told about a new max QoS.
802 */
803 void
workq_thread_set_max_qos(struct proc * p,workq_threadreq_t kqr)804 workq_thread_set_max_qos(struct proc *p, workq_threadreq_t kqr)
805 {
806 struct uu_workq_policy old_pri, new_pri;
807 struct uthread *uth = current_uthread();
808 struct workqueue *wq = proc_get_wqptr_fast(p);
809 thread_qos_t qos = kqr->tr_kq_qos_index;
810
811 if (uth->uu_workq_pri.qos_max == qos) {
812 return;
813 }
814
815 workq_lock_spin(wq);
816 old_pri = new_pri = uth->uu_workq_pri;
817 new_pri.qos_max = qos;
818 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
819 workq_unlock(wq);
820 }
821
822 #pragma mark idle threads accounting and handling
823
824 static inline struct uthread *
workq_oldest_killable_idle_thread(struct workqueue * wq)825 workq_oldest_killable_idle_thread(struct workqueue *wq)
826 {
827 struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
828
829 if (uth && !uth->uu_save.uus_workq_park_data.has_stack) {
830 uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry);
831 if (uth) {
832 assert(uth->uu_save.uus_workq_park_data.has_stack);
833 }
834 }
835 return uth;
836 }
837
838 static inline uint64_t
workq_kill_delay_for_idle_thread(struct workqueue * wq)839 workq_kill_delay_for_idle_thread(struct workqueue *wq)
840 {
841 uint64_t delay = wq_reduce_pool_window.abstime;
842 uint16_t idle = wq->wq_thidlecount;
843
844 /*
845 * If we have less than wq_death_max_load threads, have a 5s timer.
846 *
847 * For the next wq_max_constrained_threads ones, decay linearly from
848 * from 5s to 50ms.
849 */
850 if (idle <= wq_death_max_load) {
851 return delay;
852 }
853
854 if (wq_max_constrained_threads > idle - wq_death_max_load) {
855 delay *= (wq_max_constrained_threads - (idle - wq_death_max_load));
856 }
857 return delay / wq_max_constrained_threads;
858 }
859
860 static inline bool
workq_should_kill_idle_thread(struct workqueue * wq,struct uthread * uth,uint64_t now)861 workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth,
862 uint64_t now)
863 {
864 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
865 return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
866 }
867
868 static void
workq_death_call_schedule(struct workqueue * wq,uint64_t deadline)869 workq_death_call_schedule(struct workqueue *wq, uint64_t deadline)
870 {
871 uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed);
872
873 if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
874 return;
875 }
876 os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
877
878 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0);
879
880 /*
881 * <rdar://problem/13139182> Due to how long term timers work, the leeway
882 * can't be too short, so use 500ms which is long enough that we will not
883 * wake up the CPU for killing threads, but short enough that it doesn't
884 * fall into long-term timer list shenanigans.
885 */
886 thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline,
887 wq_reduce_pool_window.abstime / 10,
888 THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
889 }
890
891 /*
892 * `decrement` is set to the number of threads that are no longer dying:
893 * - because they have been resuscitated just in time (workq_pop_idle_thread)
894 * - or have been killed (workq_thread_terminate).
895 */
896 static void
workq_death_policy_evaluate(struct workqueue * wq,uint16_t decrement)897 workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement)
898 {
899 struct uthread *uth;
900
901 assert(wq->wq_thdying_count >= decrement);
902 if ((wq->wq_thdying_count -= decrement) > 0) {
903 return;
904 }
905
906 if (wq->wq_thidlecount <= 1) {
907 return;
908 }
909
910 if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) {
911 return;
912 }
913
914 uint64_t now = mach_absolute_time();
915 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
916
917 if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
918 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
919 wq, wq->wq_thidlecount, 0, 0);
920 wq->wq_thdying_count++;
921 uth->uu_workq_flags |= UT_WORKQ_DYING;
922 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) {
923 workq_thread_wakeup(uth);
924 }
925 return;
926 }
927
928 workq_death_call_schedule(wq,
929 uth->uu_save.uus_workq_park_data.idle_stamp + delay);
930 }
931
932 void
workq_thread_terminate(struct proc * p,struct uthread * uth)933 workq_thread_terminate(struct proc *p, struct uthread *uth)
934 {
935 struct workqueue *wq = proc_get_wqptr_fast(p);
936
937 workq_lock_spin(wq);
938 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
939 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
940 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END,
941 wq, wq->wq_thidlecount, 0, 0);
942 workq_death_policy_evaluate(wq, 1);
943 }
944 if (wq->wq_nthreads-- == wq_max_threads) {
945 /*
946 * We got under the thread limit again, which may have prevented
947 * thread creation from happening, redrive if there are pending requests
948 */
949 if (wq->wq_reqcount) {
950 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
951 }
952 }
953 workq_unlock(wq);
954
955 thread_deallocate(get_machthread(uth));
956 }
957
958 static void
workq_kill_old_threads_call(void * param0,void * param1 __unused)959 workq_kill_old_threads_call(void *param0, void *param1 __unused)
960 {
961 struct workqueue *wq = param0;
962
963 workq_lock_spin(wq);
964 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0);
965 os_atomic_andnot(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
966 workq_death_policy_evaluate(wq, 0);
967 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0);
968 workq_unlock(wq);
969 }
970
971 static struct uthread *
workq_pop_idle_thread(struct workqueue * wq,uint16_t uu_flags,bool * needs_wakeup)972 workq_pop_idle_thread(struct workqueue *wq, uint16_t uu_flags,
973 bool *needs_wakeup)
974 {
975 struct uthread *uth;
976
977 if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) {
978 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
979 } else {
980 uth = TAILQ_FIRST(&wq->wq_thnewlist);
981 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
982 }
983 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
984
985 assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
986 uth->uu_workq_flags |= UT_WORKQ_RUNNING | uu_flags;
987
988 /* A thread is never woken up as part of the cooperative pool */
989 assert((uu_flags & UT_WORKQ_COOPERATIVE) == 0);
990
991 if ((uu_flags & UT_WORKQ_OVERCOMMIT) == 0) {
992 wq->wq_constrained_threads_scheduled++;
993 }
994 wq->wq_threads_scheduled++;
995 wq->wq_thidlecount--;
996
997 if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
998 uth->uu_workq_flags ^= UT_WORKQ_DYING;
999 workq_death_policy_evaluate(wq, 1);
1000 *needs_wakeup = false;
1001 } else if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) {
1002 *needs_wakeup = false;
1003 } else {
1004 *needs_wakeup = true;
1005 }
1006 return uth;
1007 }
1008
1009 /*
1010 * Called by thread_create_workq_waiting() during thread initialization, before
1011 * assert_wait, before the thread has been started.
1012 */
1013 event_t
workq_thread_init_and_wq_lock(task_t task,thread_t th)1014 workq_thread_init_and_wq_lock(task_t task, thread_t th)
1015 {
1016 struct uthread *uth = get_bsdthread_info(th);
1017
1018 uth->uu_workq_flags = UT_WORKQ_NEW;
1019 uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
1020 uth->uu_workq_thport = MACH_PORT_NULL;
1021 uth->uu_workq_stackaddr = 0;
1022 uth->uu_workq_pthread_kill_allowed = 0;
1023
1024 thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE);
1025 thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
1026
1027 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task)));
1028 return workq_parked_wait_event(uth);
1029 }
1030
1031 /**
1032 * Try to add a new workqueue thread.
1033 *
1034 * - called with workq lock held
1035 * - dropped and retaken around thread creation
1036 * - return with workq lock held
1037 */
1038 static bool
workq_add_new_idle_thread(proc_t p,struct workqueue * wq)1039 workq_add_new_idle_thread(proc_t p, struct workqueue *wq)
1040 {
1041 mach_vm_offset_t th_stackaddr;
1042 kern_return_t kret;
1043 thread_t th;
1044
1045 wq->wq_nthreads++;
1046
1047 workq_unlock(wq);
1048
1049 vm_map_t vmap = get_task_map(proc_task(p));
1050
1051 kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr);
1052 if (kret != KERN_SUCCESS) {
1053 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1054 kret, 1, 0);
1055 goto out;
1056 }
1057
1058 kret = thread_create_workq_waiting(proc_task(p), workq_unpark_continue, &th);
1059 if (kret != KERN_SUCCESS) {
1060 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1061 kret, 0, 0);
1062 pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr);
1063 goto out;
1064 }
1065
1066 // thread_create_workq_waiting() will return with the wq lock held
1067 // on success, because it calls workq_thread_init_and_wq_lock() above
1068
1069 struct uthread *uth = get_bsdthread_info(th);
1070
1071 wq->wq_creations++;
1072 wq->wq_thidlecount++;
1073 uth->uu_workq_stackaddr = (user_addr_t)th_stackaddr;
1074 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1075
1076 WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0);
1077 return true;
1078
1079 out:
1080 workq_lock_spin(wq);
1081 /*
1082 * Do not redrive here if we went under wq_max_threads again,
1083 * it is the responsibility of the callers of this function
1084 * to do so when it fails.
1085 */
1086 wq->wq_nthreads--;
1087 return false;
1088 }
1089
1090 static inline bool
workq_thread_is_overcommit(struct uthread * uth)1091 workq_thread_is_overcommit(struct uthread *uth)
1092 {
1093 return (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0;
1094 }
1095
1096 static inline bool
workq_thread_is_nonovercommit(struct uthread * uth)1097 workq_thread_is_nonovercommit(struct uthread *uth)
1098 {
1099 return (uth->uu_workq_flags & (UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE)) == 0;
1100 }
1101
1102 static inline bool
workq_thread_is_cooperative(struct uthread * uth)1103 workq_thread_is_cooperative(struct uthread *uth)
1104 {
1105 return (uth->uu_workq_flags & UT_WORKQ_COOPERATIVE) != 0;
1106 }
1107
1108 static inline void
workq_thread_set_type(struct uthread * uth,uint16_t flags)1109 workq_thread_set_type(struct uthread *uth, uint16_t flags)
1110 {
1111 uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1112 uth->uu_workq_flags |= flags;
1113 }
1114
1115
1116 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
1117
1118 __attribute__((noreturn, noinline))
1119 static void
workq_unpark_for_death_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t death_flags,uint32_t setup_flags)1120 workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq,
1121 struct uthread *uth, uint32_t death_flags, uint32_t setup_flags)
1122 {
1123 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
1124 bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW;
1125
1126 if (qos > WORKQ_THREAD_QOS_CLEANUP) {
1127 workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true);
1128 qos = WORKQ_THREAD_QOS_CLEANUP;
1129 }
1130
1131 workq_thread_reset_cpupercent(NULL, uth);
1132
1133 if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
1134 wq->wq_thidlecount--;
1135 if (first_use) {
1136 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
1137 } else {
1138 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
1139 }
1140 }
1141 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
1142
1143 workq_unlock(wq);
1144
1145 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
1146 __assert_only kern_return_t kr;
1147 kr = thread_set_voucher_name(MACH_PORT_NULL);
1148 assert(kr == KERN_SUCCESS);
1149 }
1150
1151 uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS;
1152 thread_t th = get_machthread(uth);
1153 vm_map_t vmap = get_task_map(proc_task(p));
1154
1155 if (!first_use) {
1156 flags |= WQ_FLAG_THREAD_REUSE;
1157 }
1158
1159 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
1160 uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, flags);
1161 __builtin_unreachable();
1162 }
1163
1164 bool
workq_is_current_thread_updating_turnstile(struct workqueue * wq)1165 workq_is_current_thread_updating_turnstile(struct workqueue *wq)
1166 {
1167 return wq->wq_turnstile_updater == current_thread();
1168 }
1169
1170 __attribute__((always_inline))
1171 static inline void
1172 workq_perform_turnstile_operation_locked(struct workqueue *wq,
1173 void (^operation)(void))
1174 {
1175 workq_lock_held(wq);
1176 wq->wq_turnstile_updater = current_thread();
1177 operation();
1178 wq->wq_turnstile_updater = THREAD_NULL;
1179 }
1180
1181 static void
workq_turnstile_update_inheritor(struct workqueue * wq,turnstile_inheritor_t inheritor,turnstile_update_flags_t flags)1182 workq_turnstile_update_inheritor(struct workqueue *wq,
1183 turnstile_inheritor_t inheritor,
1184 turnstile_update_flags_t flags)
1185 {
1186 if (wq->wq_inheritor == inheritor) {
1187 return;
1188 }
1189 wq->wq_inheritor = inheritor;
1190 workq_perform_turnstile_operation_locked(wq, ^{
1191 turnstile_update_inheritor(wq->wq_turnstile, inheritor,
1192 flags | TURNSTILE_IMMEDIATE_UPDATE);
1193 turnstile_update_inheritor_complete(wq->wq_turnstile,
1194 TURNSTILE_INTERLOCK_HELD);
1195 });
1196 }
1197
1198 static void
workq_push_idle_thread(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)1199 workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth,
1200 uint32_t setup_flags)
1201 {
1202 uint64_t now = mach_absolute_time();
1203 bool is_creator = (uth == wq->wq_creator);
1204
1205 if (workq_thread_is_cooperative(uth)) {
1206 assert(!is_creator);
1207
1208 thread_qos_t thread_qos = uth->uu_workq_pri.qos_req;
1209 _wq_cooperative_queue_scheduled_count_dec(wq, thread_qos);
1210
1211 /* Before we get here, we always go through
1212 * workq_select_threadreq_or_park_and_unlock. If we got here, it means
1213 * that we went through the logic in workq_threadreq_select which
1214 * did the refresh for the next best cooperative qos while
1215 * excluding the current thread - we shouldn't need to do it again.
1216 */
1217 assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
1218 } else if (workq_thread_is_nonovercommit(uth)) {
1219 assert(!is_creator);
1220
1221 wq->wq_constrained_threads_scheduled--;
1222 }
1223
1224 uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1225 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
1226 wq->wq_threads_scheduled--;
1227
1228 if (is_creator) {
1229 wq->wq_creator = NULL;
1230 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0,
1231 uth->uu_save.uus_workq_park_data.yields);
1232 }
1233
1234 if (wq->wq_inheritor == get_machthread(uth)) {
1235 assert(wq->wq_creator == NULL);
1236 if (wq->wq_reqcount) {
1237 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
1238 } else {
1239 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
1240 }
1241 }
1242
1243 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
1244 assert(is_creator || (_wq_flags(wq) & WQ_EXITING));
1245 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1246 wq->wq_thidlecount++;
1247 return;
1248 }
1249
1250 if (!is_creator) {
1251 _wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket);
1252 wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--;
1253 uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP;
1254 }
1255
1256 uth->uu_save.uus_workq_park_data.idle_stamp = now;
1257
1258 struct uthread *oldest = workq_oldest_killable_idle_thread(wq);
1259 uint16_t cur_idle = wq->wq_thidlecount;
1260
1261 if (cur_idle >= wq_max_constrained_threads ||
1262 (wq->wq_thdying_count == 0 && oldest &&
1263 workq_should_kill_idle_thread(wq, oldest, now))) {
1264 /*
1265 * Immediately kill threads if we have too may of them.
1266 *
1267 * And swap "place" with the oldest one we'd have woken up.
1268 * This is a relatively desperate situation where we really
1269 * need to kill threads quickly and it's best to kill
1270 * the one that's currently on core than context switching.
1271 */
1272 if (oldest) {
1273 oldest->uu_save.uus_workq_park_data.idle_stamp = now;
1274 TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry);
1275 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry);
1276 }
1277
1278 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
1279 wq, cur_idle, 0, 0);
1280 wq->wq_thdying_count++;
1281 uth->uu_workq_flags |= UT_WORKQ_DYING;
1282 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
1283 workq_unpark_for_death_and_unlock(p, wq, uth, 0, setup_flags);
1284 __builtin_unreachable();
1285 }
1286
1287 struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
1288
1289 cur_idle += 1;
1290 wq->wq_thidlecount = cur_idle;
1291
1292 if (cur_idle >= wq_death_max_load && tail &&
1293 tail->uu_save.uus_workq_park_data.has_stack) {
1294 uth->uu_save.uus_workq_park_data.has_stack = false;
1295 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry);
1296 } else {
1297 uth->uu_save.uus_workq_park_data.has_stack = true;
1298 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry);
1299 }
1300
1301 if (!tail) {
1302 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
1303 workq_death_call_schedule(wq, now + delay);
1304 }
1305 }
1306
1307 #pragma mark thread requests
1308
1309 static inline bool
workq_tr_is_overcommit(workq_tr_flags_t tr_flags)1310 workq_tr_is_overcommit(workq_tr_flags_t tr_flags)
1311 {
1312 return (tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) != 0;
1313 }
1314
1315 static inline bool
workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)1316 workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)
1317 {
1318 return (tr_flags & (WORKQ_TR_FLAG_OVERCOMMIT | WORKQ_TR_FLAG_COOPERATIVE)) == 0;
1319 }
1320
1321 static inline bool
workq_tr_is_cooperative(workq_tr_flags_t tr_flags)1322 workq_tr_is_cooperative(workq_tr_flags_t tr_flags)
1323 {
1324 return (tr_flags & WORKQ_TR_FLAG_COOPERATIVE) != 0;
1325 }
1326
1327 #define workq_threadreq_is_overcommit(req) workq_tr_is_overcommit((req)->tr_flags)
1328 #define workq_threadreq_is_nonovercommit(req) workq_tr_is_nonovercommit((req)->tr_flags)
1329 #define workq_threadreq_is_cooperative(req) workq_tr_is_cooperative((req)->tr_flags)
1330
1331 static inline int
workq_priority_for_req(workq_threadreq_t req)1332 workq_priority_for_req(workq_threadreq_t req)
1333 {
1334 thread_qos_t qos = req->tr_qos;
1335
1336 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1337 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
1338 assert(trp.trp_flags & TRP_PRIORITY);
1339 return trp.trp_pri;
1340 }
1341 return thread_workq_pri_for_qos(qos);
1342 }
1343
1344 static inline struct priority_queue_sched_max *
workq_priority_queue_for_req(struct workqueue * wq,workq_threadreq_t req)1345 workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req)
1346 {
1347 assert(!workq_tr_is_cooperative(req->tr_flags));
1348
1349 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1350 return &wq->wq_special_queue;
1351 } else if (workq_tr_is_overcommit(req->tr_flags)) {
1352 return &wq->wq_overcommit_queue;
1353 } else {
1354 return &wq->wq_constrained_queue;
1355 }
1356 }
1357
1358
1359 /* Calculates the number of threads scheduled >= the input QoS */
1360 static uint64_t
workq_num_cooperative_threads_scheduled_to_qos(struct workqueue * wq,thread_qos_t qos)1361 workq_num_cooperative_threads_scheduled_to_qos(struct workqueue *wq, thread_qos_t qos)
1362 {
1363 workq_lock_held(wq);
1364
1365 uint64_t num_cooperative_threads = 0;
1366
1367 for (thread_qos_t cur_qos = WORKQ_THREAD_QOS_MAX; cur_qos >= qos; cur_qos--) {
1368 uint8_t bucket = _wq_bucket(cur_qos);
1369 num_cooperative_threads += wq->wq_cooperative_queue_scheduled_count[bucket];
1370 }
1371
1372 return num_cooperative_threads;
1373 }
1374
1375 static uint64_t
workq_num_cooperative_threads_scheduled_total(struct workqueue * wq)1376 workq_num_cooperative_threads_scheduled_total(struct workqueue *wq)
1377 {
1378 return workq_num_cooperative_threads_scheduled_to_qos(wq, WORKQ_THREAD_QOS_MIN);
1379 }
1380
1381 #if DEBUG || DEVELOPMENT
1382 static bool
workq_has_cooperative_thread_requests(struct workqueue * wq)1383 workq_has_cooperative_thread_requests(struct workqueue *wq)
1384 {
1385 for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1386 uint8_t bucket = _wq_bucket(qos);
1387 if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1388 return true;
1389 }
1390 }
1391
1392 return false;
1393 }
1394 #endif
1395
1396 /*
1397 * Determines the next QoS bucket we should service next in the cooperative
1398 * pool. This function will always return a QoS for cooperative pool as long as
1399 * there are requests to be serviced.
1400 *
1401 * Unlike the other thread pools, for the cooperative thread pool the schedule
1402 * counts for the various buckets in the pool affect the next best request for
1403 * it.
1404 *
1405 * This function is called in the following contexts:
1406 *
1407 * a) When determining the best thread QoS for cooperative bucket for the
1408 * creator/thread reuse
1409 *
1410 * b) Once (a) has happened and thread has bound to a thread request, figuring
1411 * out whether the next best request for this pool has changed so that creator
1412 * can be scheduled.
1413 *
1414 * Returns true if the cooperative queue's best qos changed from previous
1415 * value.
1416 */
1417 static bool
_wq_cooperative_queue_refresh_best_req_qos(struct workqueue * wq)1418 _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq)
1419 {
1420 workq_lock_held(wq);
1421
1422 thread_qos_t old_best_req_qos = wq->wq_cooperative_queue_best_req_qos;
1423
1424 /* We determine the next best cooperative thread request based on the
1425 * following:
1426 *
1427 * 1. Take the MAX of the following:
1428 * a) Highest qos with pending TRs such that number of scheduled
1429 * threads so far with >= qos is < wq_max_cooperative_threads
1430 * b) Highest qos bucket with pending TRs but no scheduled threads for that bucket
1431 *
1432 * 2. If the result of (1) is UN, then we pick the highest priority amongst
1433 * pending thread requests in the pool.
1434 *
1435 */
1436 thread_qos_t highest_qos_with_no_scheduled = THREAD_QOS_UNSPECIFIED;
1437 thread_qos_t highest_qos_req_with_width = THREAD_QOS_UNSPECIFIED;
1438
1439 thread_qos_t highest_qos_req = THREAD_QOS_UNSPECIFIED;
1440
1441 int scheduled_count_till_qos = 0;
1442
1443 for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1444 uint8_t bucket = _wq_bucket(qos);
1445 uint8_t scheduled_count_for_bucket = wq->wq_cooperative_queue_scheduled_count[bucket];
1446 scheduled_count_till_qos += scheduled_count_for_bucket;
1447
1448 if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1449 if (qos > highest_qos_req) {
1450 highest_qos_req = qos;
1451 }
1452 /*
1453 * The pool isn't saturated for threads at and above this QoS, and
1454 * this qos bucket has pending requests
1455 */
1456 if (scheduled_count_till_qos < wq_cooperative_queue_max_size(wq)) {
1457 if (qos > highest_qos_req_with_width) {
1458 highest_qos_req_with_width = qos;
1459 }
1460 }
1461
1462 /*
1463 * There are no threads scheduled for this bucket but there
1464 * is work pending, give it at least 1 thread
1465 */
1466 if (scheduled_count_for_bucket == 0) {
1467 if (qos > highest_qos_with_no_scheduled) {
1468 highest_qos_with_no_scheduled = qos;
1469 }
1470 }
1471 }
1472 }
1473
1474 wq->wq_cooperative_queue_best_req_qos = MAX(highest_qos_with_no_scheduled, highest_qos_req_with_width);
1475 if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1476 wq->wq_cooperative_queue_best_req_qos = highest_qos_req;
1477 }
1478
1479 #if DEBUG || DEVELOPMENT
1480 /* Assert that if we are showing up the next best req as UN, then there
1481 * actually is no thread request in the cooperative pool buckets */
1482 if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1483 assert(!workq_has_cooperative_thread_requests(wq));
1484 }
1485 #endif
1486
1487 return old_best_req_qos != wq->wq_cooperative_queue_best_req_qos;
1488 }
1489
1490 /*
1491 * Returns whether or not the input thread (or creator thread if uth is NULL)
1492 * should be allowed to work as part of the cooperative pool for the <input qos>
1493 * bucket.
1494 *
1495 * This function is called in a bunch of places:
1496 * a) Quantum expires for a thread and it is part of the cooperative pool
1497 * b) When trying to pick a thread request for the creator thread to
1498 * represent.
1499 * c) When a thread is trying to pick a thread request to actually bind to
1500 * and service.
1501 *
1502 * Called with workq lock held.
1503 */
1504
1505 #define WQ_COOPERATIVE_POOL_UNSATURATED 1
1506 #define WQ_COOPERATIVE_BUCKET_UNSERVICED 2
1507 #define WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS 3
1508
1509 static bool
workq_cooperative_allowance(struct workqueue * wq,thread_qos_t qos,struct uthread * uth,bool may_start_timer)1510 workq_cooperative_allowance(struct workqueue *wq, thread_qos_t qos, struct uthread *uth,
1511 bool may_start_timer)
1512 {
1513 workq_lock_held(wq);
1514
1515 bool exclude_thread_as_scheduled = false;
1516 bool passed_admissions = false;
1517 uint8_t bucket = _wq_bucket(qos);
1518
1519 if (uth && workq_thread_is_cooperative(uth)) {
1520 exclude_thread_as_scheduled = true;
1521 _wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req);
1522 }
1523
1524 /*
1525 * We have not saturated the pool yet, let this thread continue
1526 */
1527 uint64_t total_cooperative_threads;
1528 total_cooperative_threads = workq_num_cooperative_threads_scheduled_total(wq);
1529 if (total_cooperative_threads < wq_cooperative_queue_max_size(wq)) {
1530 passed_admissions = true;
1531 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1532 total_cooperative_threads, qos, passed_admissions,
1533 WQ_COOPERATIVE_POOL_UNSATURATED);
1534 goto out;
1535 }
1536
1537 /*
1538 * Without this thread, nothing is servicing the bucket which has pending
1539 * work
1540 */
1541 uint64_t bucket_scheduled = wq->wq_cooperative_queue_scheduled_count[bucket];
1542 if (bucket_scheduled == 0 &&
1543 !STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1544 passed_admissions = true;
1545 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1546 total_cooperative_threads, qos, passed_admissions,
1547 WQ_COOPERATIVE_BUCKET_UNSERVICED);
1548 goto out;
1549 }
1550
1551 /*
1552 * If number of threads at the QoS bucket >= input QoS exceeds the max we want
1553 * for the pool, deny this thread
1554 */
1555 uint64_t aggregate_down_to_qos = workq_num_cooperative_threads_scheduled_to_qos(wq, qos);
1556 passed_admissions = (aggregate_down_to_qos < wq_cooperative_queue_max_size(wq));
1557 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, aggregate_down_to_qos,
1558 qos, passed_admissions, WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS);
1559
1560 if (!passed_admissions && may_start_timer) {
1561 workq_schedule_delayed_thread_creation(wq, 0);
1562 }
1563
1564 out:
1565 if (exclude_thread_as_scheduled) {
1566 _wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req);
1567 }
1568 return passed_admissions;
1569 }
1570
1571 /*
1572 * returns true if the best request for the pool changed as a result of
1573 * enqueuing this thread request.
1574 */
1575 static bool
workq_threadreq_enqueue(struct workqueue * wq,workq_threadreq_t req)1576 workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req)
1577 {
1578 assert(req->tr_state == WORKQ_TR_STATE_NEW);
1579
1580 req->tr_state = WORKQ_TR_STATE_QUEUED;
1581 wq->wq_reqcount += req->tr_count;
1582
1583 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1584 assert(wq->wq_event_manager_threadreq == NULL);
1585 assert(req->tr_flags & WORKQ_TR_FLAG_KEVENT);
1586 assert(req->tr_count == 1);
1587 wq->wq_event_manager_threadreq = req;
1588 return true;
1589 }
1590
1591 if (workq_threadreq_is_cooperative(req)) {
1592 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1593 assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1594
1595 struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1596 STAILQ_INSERT_TAIL(bucket, req, tr_link);
1597
1598 return _wq_cooperative_queue_refresh_best_req_qos(wq);
1599 }
1600
1601 struct priority_queue_sched_max *q = workq_priority_queue_for_req(wq, req);
1602
1603 priority_queue_entry_set_sched_pri(q, &req->tr_entry,
1604 workq_priority_for_req(req), false);
1605
1606 if (priority_queue_insert(q, &req->tr_entry)) {
1607 if (workq_threadreq_is_nonovercommit(req)) {
1608 _wq_thactive_refresh_best_constrained_req_qos(wq);
1609 }
1610 return true;
1611 }
1612 return false;
1613 }
1614
1615 /*
1616 * returns true if one of the following is true (so as to update creator if
1617 * needed):
1618 *
1619 * (a) the next highest request of the pool we dequeued the request from changed
1620 * (b) the next highest requests of the pool the current thread used to be a
1621 * part of, changed
1622 *
1623 * For overcommit, special and constrained pools, the next highest QoS for each
1624 * pool just a MAX of pending requests so tracking (a) is sufficient.
1625 *
1626 * But for cooperative thread pool, the next highest QoS for the pool depends on
1627 * schedule counts in the pool as well. So if the current thread used to be
1628 * cooperative in it's previous logical run ie (b), then that can also affect
1629 * cooperative pool's next best QoS requests.
1630 */
1631 static bool
workq_threadreq_dequeue(struct workqueue * wq,workq_threadreq_t req,bool cooperative_sched_count_changed)1632 workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req,
1633 bool cooperative_sched_count_changed)
1634 {
1635 wq->wq_reqcount--;
1636
1637 bool next_highest_request_changed = false;
1638
1639 if (--req->tr_count == 0) {
1640 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1641 assert(wq->wq_event_manager_threadreq == req);
1642 assert(req->tr_count == 0);
1643 wq->wq_event_manager_threadreq = NULL;
1644
1645 /* If a cooperative thread was the one which picked up the manager
1646 * thread request, we need to reevaluate the cooperative pool
1647 * anyways.
1648 */
1649 if (cooperative_sched_count_changed) {
1650 _wq_cooperative_queue_refresh_best_req_qos(wq);
1651 }
1652 return true;
1653 }
1654
1655 if (workq_threadreq_is_cooperative(req)) {
1656 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1657 assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1658 /* Account for the fact that BG and MT are coalesced when
1659 * calculating best request for cooperative pool
1660 */
1661 assert(_wq_bucket(req->tr_qos) == _wq_bucket(wq->wq_cooperative_queue_best_req_qos));
1662
1663 struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1664 __assert_only workq_threadreq_t head = STAILQ_FIRST(bucket);
1665
1666 assert(head == req);
1667 STAILQ_REMOVE_HEAD(bucket, tr_link);
1668
1669 /*
1670 * If the request we're dequeueing is cooperative, then the sched
1671 * counts definitely changed.
1672 */
1673 assert(cooperative_sched_count_changed);
1674 }
1675
1676 /*
1677 * We want to do the cooperative pool refresh after dequeueing a
1678 * cooperative thread request if any (to combine both effects into 1
1679 * refresh operation)
1680 */
1681 if (cooperative_sched_count_changed) {
1682 next_highest_request_changed = _wq_cooperative_queue_refresh_best_req_qos(wq);
1683 }
1684
1685 if (!workq_threadreq_is_cooperative(req)) {
1686 /*
1687 * All other types of requests are enqueued in priority queues
1688 */
1689
1690 if (priority_queue_remove(workq_priority_queue_for_req(wq, req),
1691 &req->tr_entry)) {
1692 next_highest_request_changed |= true;
1693 if (workq_threadreq_is_nonovercommit(req)) {
1694 _wq_thactive_refresh_best_constrained_req_qos(wq);
1695 }
1696 }
1697 }
1698 }
1699
1700 return next_highest_request_changed;
1701 }
1702
1703 static void
workq_threadreq_destroy(proc_t p,workq_threadreq_t req)1704 workq_threadreq_destroy(proc_t p, workq_threadreq_t req)
1705 {
1706 req->tr_state = WORKQ_TR_STATE_CANCELED;
1707 if (req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)) {
1708 kqueue_threadreq_cancel(p, req);
1709 } else {
1710 zfree(workq_zone_threadreq, req);
1711 }
1712 }
1713
1714 #pragma mark workqueue thread creation thread calls
1715
1716 static inline bool
workq_thread_call_prepost(struct workqueue * wq,uint32_t sched,uint32_t pend,uint32_t fail_mask)1717 workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend,
1718 uint32_t fail_mask)
1719 {
1720 uint32_t old_flags, new_flags;
1721
1722 os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, {
1723 if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) {
1724 os_atomic_rmw_loop_give_up(return false);
1725 }
1726 if (__improbable(old_flags & WQ_PROC_SUSPENDED)) {
1727 new_flags = old_flags | pend;
1728 } else {
1729 new_flags = old_flags | sched;
1730 }
1731 });
1732
1733 return (old_flags & WQ_PROC_SUSPENDED) == 0;
1734 }
1735
1736 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1737
1738 static bool
workq_schedule_delayed_thread_creation(struct workqueue * wq,int flags)1739 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags)
1740 {
1741 assert(!preemption_enabled());
1742
1743 if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED,
1744 WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED |
1745 WQ_IMMEDIATE_CALL_SCHEDULED)) {
1746 return false;
1747 }
1748
1749 uint64_t now = mach_absolute_time();
1750
1751 if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) {
1752 /* do not change the window */
1753 } else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) {
1754 wq->wq_timer_interval *= 2;
1755 if (wq->wq_timer_interval > wq_max_timer_interval.abstime) {
1756 wq->wq_timer_interval = (uint32_t)wq_max_timer_interval.abstime;
1757 }
1758 } else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) {
1759 wq->wq_timer_interval /= 2;
1760 if (wq->wq_timer_interval < wq_stalled_window.abstime) {
1761 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
1762 }
1763 }
1764
1765 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1766 _wq_flags(wq), wq->wq_timer_interval);
1767
1768 thread_call_t call = wq->wq_delayed_call;
1769 uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED;
1770 uint64_t deadline = now + wq->wq_timer_interval;
1771 if (thread_call_enter1_delayed(call, (void *)arg, deadline)) {
1772 panic("delayed_call was already enqueued");
1773 }
1774 return true;
1775 }
1776
1777 static void
workq_schedule_immediate_thread_creation(struct workqueue * wq)1778 workq_schedule_immediate_thread_creation(struct workqueue *wq)
1779 {
1780 assert(!preemption_enabled());
1781
1782 if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED,
1783 WQ_IMMEDIATE_CALL_PENDED, 0)) {
1784 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1785 _wq_flags(wq), 0);
1786
1787 uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED;
1788 if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) {
1789 panic("immediate_call was already enqueued");
1790 }
1791 }
1792 }
1793
1794 void
workq_proc_suspended(struct proc * p)1795 workq_proc_suspended(struct proc *p)
1796 {
1797 struct workqueue *wq = proc_get_wqptr(p);
1798
1799 if (wq) {
1800 os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed);
1801 }
1802 }
1803
1804 void
workq_proc_resumed(struct proc * p)1805 workq_proc_resumed(struct proc *p)
1806 {
1807 struct workqueue *wq = proc_get_wqptr(p);
1808 uint32_t wq_flags;
1809
1810 if (!wq) {
1811 return;
1812 }
1813
1814 wq_flags = os_atomic_andnot_orig(&wq->wq_flags, WQ_PROC_SUSPENDED |
1815 WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED, relaxed);
1816 if ((wq_flags & WQ_EXITING) == 0) {
1817 disable_preemption();
1818 if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) {
1819 workq_schedule_immediate_thread_creation(wq);
1820 } else if (wq_flags & WQ_DELAYED_CALL_PENDED) {
1821 workq_schedule_delayed_thread_creation(wq,
1822 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART);
1823 }
1824 enable_preemption();
1825 }
1826 }
1827
1828 /**
1829 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1830 */
1831 static bool
workq_thread_is_busy(uint64_t now,_Atomic uint64_t * lastblocked_tsp)1832 workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp)
1833 {
1834 uint64_t lastblocked_ts = os_atomic_load_wide(lastblocked_tsp, relaxed);
1835 if (now <= lastblocked_ts) {
1836 /*
1837 * Because the update of the timestamp when a thread blocks
1838 * isn't serialized against us looking at it (i.e. we don't hold
1839 * the workq lock), it's possible to have a timestamp that matches
1840 * the current time or that even looks to be in the future relative
1841 * to when we grabbed the current time...
1842 *
1843 * Just treat this as a busy thread since it must have just blocked.
1844 */
1845 return true;
1846 }
1847 return (now - lastblocked_ts) < wq_stalled_window.abstime;
1848 }
1849
1850 static void
workq_add_new_threads_call(void * _p,void * flags)1851 workq_add_new_threads_call(void *_p, void *flags)
1852 {
1853 proc_t p = _p;
1854 struct workqueue *wq = proc_get_wqptr(p);
1855 uint32_t my_flag = (uint32_t)(uintptr_t)flags;
1856
1857 /*
1858 * workq_exit() will set the workqueue to NULL before
1859 * it cancels thread calls.
1860 */
1861 if (!wq) {
1862 return;
1863 }
1864
1865 assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) ||
1866 (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED));
1867
1868 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq),
1869 wq->wq_nthreads, wq->wq_thidlecount);
1870
1871 workq_lock_spin(wq);
1872
1873 wq->wq_thread_call_last_run = mach_absolute_time();
1874 os_atomic_andnot(&wq->wq_flags, my_flag, release);
1875
1876 /* This can drop the workqueue lock, and take it again */
1877 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
1878
1879 workq_unlock(wq);
1880
1881 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0,
1882 wq->wq_nthreads, wq->wq_thidlecount);
1883 }
1884
1885 #pragma mark thread state tracking
1886
1887 static void
workq_sched_callback(int type,thread_t thread)1888 workq_sched_callback(int type, thread_t thread)
1889 {
1890 thread_ro_t tro = get_thread_ro(thread);
1891 struct uthread *uth = get_bsdthread_info(thread);
1892 struct workqueue *wq = proc_get_wqptr(tro->tro_proc);
1893 thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket;
1894 wq_thactive_t old_thactive;
1895 bool start_timer = false;
1896
1897 if (qos == WORKQ_THREAD_QOS_MANAGER) {
1898 return;
1899 }
1900
1901 switch (type) {
1902 case SCHED_CALL_BLOCK:
1903 old_thactive = _wq_thactive_dec(wq, qos);
1904 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1905
1906 /*
1907 * Remember the timestamp of the last thread that blocked in this
1908 * bucket, it used used by admission checks to ignore one thread
1909 * being inactive if this timestamp is recent enough.
1910 *
1911 * If we collide with another thread trying to update the
1912 * last_blocked (really unlikely since another thread would have to
1913 * get scheduled and then block after we start down this path), it's
1914 * not a problem. Either timestamp is adequate, so no need to retry
1915 */
1916 os_atomic_store_wide(&wq->wq_lastblocked_ts[_wq_bucket(qos)],
1917 thread_last_run_time(thread), relaxed);
1918
1919 if (req_qos == THREAD_QOS_UNSPECIFIED) {
1920 /*
1921 * No pending request at the moment we could unblock, move on.
1922 */
1923 } else if (qos < req_qos) {
1924 /*
1925 * The blocking thread is at a lower QoS than the highest currently
1926 * pending constrained request, nothing has to be redriven
1927 */
1928 } else {
1929 uint32_t max_busycount, old_req_count;
1930 old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
1931 req_qos, NULL, &max_busycount);
1932 /*
1933 * If it is possible that may_start_constrained_thread had refused
1934 * admission due to being over the max concurrency, we may need to
1935 * spin up a new thread.
1936 *
1937 * We take into account the maximum number of busy threads
1938 * that can affect may_start_constrained_thread as looking at the
1939 * actual number may_start_constrained_thread will see is racy.
1940 *
1941 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1942 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1943 */
1944 uint32_t conc = wq_max_parallelism[_wq_bucket(qos)];
1945 if (old_req_count <= conc && conc <= old_req_count + max_busycount) {
1946 start_timer = workq_schedule_delayed_thread_creation(wq, 0);
1947 }
1948 }
1949 if (__improbable(kdebug_enable)) {
1950 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1951 old_thactive, qos, NULL, NULL);
1952 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq,
1953 old - 1, qos | (req_qos << 8),
1954 wq->wq_reqcount << 1 | start_timer);
1955 }
1956 break;
1957
1958 case SCHED_CALL_UNBLOCK:
1959 /*
1960 * we cannot take the workqueue_lock here...
1961 * an UNBLOCK can occur from a timer event which
1962 * is run from an interrupt context... if the workqueue_lock
1963 * is already held by this processor, we'll deadlock...
1964 * the thread lock for the thread being UNBLOCKED
1965 * is also held
1966 */
1967 old_thactive = _wq_thactive_inc(wq, qos);
1968 if (__improbable(kdebug_enable)) {
1969 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1970 old_thactive, qos, NULL, NULL);
1971 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1972 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq,
1973 old + 1, qos | (req_qos << 8),
1974 wq->wq_threads_scheduled);
1975 }
1976 break;
1977 }
1978 }
1979
1980 #pragma mark workq lifecycle
1981
1982 void
workq_reference(struct workqueue * wq)1983 workq_reference(struct workqueue *wq)
1984 {
1985 os_ref_retain(&wq->wq_refcnt);
1986 }
1987
1988 static void
workq_deallocate_queue_invoke(mpsc_queue_chain_t e,__assert_only mpsc_daemon_queue_t dq)1989 workq_deallocate_queue_invoke(mpsc_queue_chain_t e,
1990 __assert_only mpsc_daemon_queue_t dq)
1991 {
1992 struct workqueue *wq;
1993 struct turnstile *ts;
1994
1995 wq = mpsc_queue_element(e, struct workqueue, wq_destroy_link);
1996 assert(dq == &workq_deallocate_queue);
1997
1998 turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts, TURNSTILE_WORKQS);
1999 assert(ts);
2000 turnstile_cleanup();
2001 turnstile_deallocate(ts);
2002
2003 lck_ticket_destroy(&wq->wq_lock, &workq_lck_grp);
2004 zfree(workq_zone_workqueue, wq);
2005 }
2006
2007 static void
workq_deallocate(struct workqueue * wq)2008 workq_deallocate(struct workqueue *wq)
2009 {
2010 if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) {
2011 workq_deallocate_queue_invoke(&wq->wq_destroy_link,
2012 &workq_deallocate_queue);
2013 }
2014 }
2015
2016 void
workq_deallocate_safe(struct workqueue * wq)2017 workq_deallocate_safe(struct workqueue *wq)
2018 {
2019 if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) {
2020 mpsc_daemon_enqueue(&workq_deallocate_queue, &wq->wq_destroy_link,
2021 MPSC_QUEUE_DISABLE_PREEMPTION);
2022 }
2023 }
2024
2025 /**
2026 * Setup per-process state for the workqueue.
2027 */
2028 int
workq_open(struct proc * p,__unused struct workq_open_args * uap,__unused int32_t * retval)2029 workq_open(struct proc *p, __unused struct workq_open_args *uap,
2030 __unused int32_t *retval)
2031 {
2032 struct workqueue *wq;
2033 int error = 0;
2034
2035 if ((p->p_lflag & P_LREGISTER) == 0) {
2036 return EINVAL;
2037 }
2038
2039 if (wq_init_constrained_limit) {
2040 uint32_t limit, num_cpus = ml_wait_max_cpus();
2041
2042 /*
2043 * set up the limit for the constrained pool
2044 * this is a virtual pool in that we don't
2045 * maintain it on a separate idle and run list
2046 */
2047 limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR;
2048
2049 if (limit > wq_max_constrained_threads) {
2050 wq_max_constrained_threads = limit;
2051 }
2052
2053 if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) {
2054 wq_max_threads = WQ_THACTIVE_BUCKET_HALF;
2055 }
2056 if (wq_max_threads > CONFIG_THREAD_MAX - 20) {
2057 wq_max_threads = CONFIG_THREAD_MAX - 20;
2058 }
2059
2060 wq_death_max_load = (uint16_t)fls(num_cpus) + 1;
2061
2062 for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) {
2063 wq_max_parallelism[_wq_bucket(qos)] =
2064 qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL);
2065 }
2066
2067 wq_max_cooperative_threads = num_cpus;
2068
2069 wq_init_constrained_limit = 0;
2070 }
2071
2072 if (proc_get_wqptr(p) == NULL) {
2073 if (proc_init_wqptr_or_wait(p) == FALSE) {
2074 assert(proc_get_wqptr(p) != NULL);
2075 goto out;
2076 }
2077
2078 wq = zalloc_flags(workq_zone_workqueue, Z_WAITOK | Z_ZERO);
2079
2080 os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1);
2081
2082 // Start the event manager at the priority hinted at by the policy engine
2083 thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task());
2084 pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0);
2085 wq->wq_event_manager_priority = (uint32_t)pp;
2086 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
2087 wq->wq_proc = p;
2088 turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(),
2089 TURNSTILE_WORKQS);
2090
2091 TAILQ_INIT(&wq->wq_thrunlist);
2092 TAILQ_INIT(&wq->wq_thnewlist);
2093 TAILQ_INIT(&wq->wq_thidlelist);
2094 priority_queue_init(&wq->wq_overcommit_queue);
2095 priority_queue_init(&wq->wq_constrained_queue);
2096 priority_queue_init(&wq->wq_special_queue);
2097 for (int bucket = 0; bucket < WORKQ_NUM_QOS_BUCKETS; bucket++) {
2098 STAILQ_INIT(&wq->wq_cooperative_queue[bucket]);
2099 }
2100
2101 /* We are only using the delayed thread call for the constrained pool
2102 * which can't have work at >= UI QoS and so we can be fine with a
2103 * UI QoS thread call.
2104 */
2105 wq->wq_delayed_call = thread_call_allocate_with_qos(
2106 workq_add_new_threads_call, p, THREAD_QOS_USER_INTERACTIVE,
2107 THREAD_CALL_OPTIONS_ONCE);
2108 wq->wq_immediate_call = thread_call_allocate_with_options(
2109 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL,
2110 THREAD_CALL_OPTIONS_ONCE);
2111 wq->wq_death_call = thread_call_allocate_with_options(
2112 workq_kill_old_threads_call, wq,
2113 THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
2114
2115 lck_ticket_init(&wq->wq_lock, &workq_lck_grp);
2116
2117 WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq,
2118 VM_KERNEL_ADDRHIDE(wq), 0, 0);
2119 proc_set_wqptr(p, wq);
2120 }
2121 out:
2122
2123 return error;
2124 }
2125
2126 /*
2127 * Routine: workq_mark_exiting
2128 *
2129 * Function: Mark the work queue such that new threads will not be added to the
2130 * work queue after we return.
2131 *
2132 * Conditions: Called against the current process.
2133 */
2134 void
workq_mark_exiting(struct proc * p)2135 workq_mark_exiting(struct proc *p)
2136 {
2137 struct workqueue *wq = proc_get_wqptr(p);
2138 uint32_t wq_flags;
2139 workq_threadreq_t mgr_req;
2140
2141 if (!wq) {
2142 return;
2143 }
2144
2145 WQ_TRACE_WQ(TRACE_wq_pthread_exit | DBG_FUNC_START, wq, 0, 0, 0);
2146
2147 workq_lock_spin(wq);
2148
2149 wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed);
2150 if (__improbable(wq_flags & WQ_EXITING)) {
2151 panic("workq_mark_exiting called twice");
2152 }
2153
2154 /*
2155 * Opportunistically try to cancel thread calls that are likely in flight.
2156 * workq_exit() will do the proper cleanup.
2157 */
2158 if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) {
2159 thread_call_cancel(wq->wq_immediate_call);
2160 }
2161 if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) {
2162 thread_call_cancel(wq->wq_delayed_call);
2163 }
2164 if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
2165 thread_call_cancel(wq->wq_death_call);
2166 }
2167
2168 mgr_req = wq->wq_event_manager_threadreq;
2169 wq->wq_event_manager_threadreq = NULL;
2170 wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */
2171 wq->wq_creator = NULL;
2172 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
2173
2174 workq_unlock(wq);
2175
2176 if (mgr_req) {
2177 kqueue_threadreq_cancel(p, mgr_req);
2178 }
2179 /*
2180 * No one touches the priority queues once WQ_EXITING is set.
2181 * It is hence safe to do the tear down without holding any lock.
2182 */
2183 priority_queue_destroy(&wq->wq_overcommit_queue,
2184 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2185 workq_threadreq_destroy(p, e);
2186 });
2187 priority_queue_destroy(&wq->wq_constrained_queue,
2188 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2189 workq_threadreq_destroy(p, e);
2190 });
2191 priority_queue_destroy(&wq->wq_special_queue,
2192 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2193 workq_threadreq_destroy(p, e);
2194 });
2195
2196 WQ_TRACE(TRACE_wq_pthread_exit | DBG_FUNC_END, 0, 0, 0, 0);
2197 }
2198
2199 /*
2200 * Routine: workq_exit
2201 *
2202 * Function: clean up the work queue structure(s) now that there are no threads
2203 * left running inside the work queue (except possibly current_thread).
2204 *
2205 * Conditions: Called by the last thread in the process.
2206 * Called against current process.
2207 */
2208 void
workq_exit(struct proc * p)2209 workq_exit(struct proc *p)
2210 {
2211 struct workqueue *wq;
2212 struct uthread *uth, *tmp;
2213
2214 wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed);
2215 if (wq != NULL) {
2216 thread_t th = current_thread();
2217
2218 WQ_TRACE_WQ(TRACE_wq_workqueue_exit | DBG_FUNC_START, wq, 0, 0, 0);
2219
2220 if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) {
2221 /*
2222 * <rdar://problem/40111515> Make sure we will no longer call the
2223 * sched call, if we ever block this thread, which the cancel_wait
2224 * below can do.
2225 */
2226 thread_sched_call(th, NULL);
2227 }
2228
2229 /*
2230 * Thread calls are always scheduled by the proc itself or under the
2231 * workqueue spinlock if WQ_EXITING is not yet set.
2232 *
2233 * Either way, when this runs, the proc has no threads left beside
2234 * the one running this very code, so we know no thread call can be
2235 * dispatched anymore.
2236 */
2237 thread_call_cancel_wait(wq->wq_delayed_call);
2238 thread_call_cancel_wait(wq->wq_immediate_call);
2239 thread_call_cancel_wait(wq->wq_death_call);
2240 thread_call_free(wq->wq_delayed_call);
2241 thread_call_free(wq->wq_immediate_call);
2242 thread_call_free(wq->wq_death_call);
2243
2244 /*
2245 * Clean up workqueue data structures for threads that exited and
2246 * didn't get a chance to clean up after themselves.
2247 *
2248 * idle/new threads should have been interrupted and died on their own
2249 */
2250 TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) {
2251 thread_t mth = get_machthread(uth);
2252 thread_sched_call(mth, NULL);
2253 thread_deallocate(mth);
2254 }
2255 assert(TAILQ_EMPTY(&wq->wq_thnewlist));
2256 assert(TAILQ_EMPTY(&wq->wq_thidlelist));
2257
2258 WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq,
2259 VM_KERNEL_ADDRHIDE(wq), 0, 0);
2260
2261 workq_deallocate(wq);
2262
2263 WQ_TRACE(TRACE_wq_workqueue_exit | DBG_FUNC_END, 0, 0, 0, 0);
2264 }
2265 }
2266
2267
2268 #pragma mark bsd thread control
2269
2270 bool
bsdthread_part_of_cooperative_workqueue(struct uthread * uth)2271 bsdthread_part_of_cooperative_workqueue(struct uthread *uth)
2272 {
2273 return (workq_thread_is_cooperative(uth) || workq_thread_is_nonovercommit(uth)) &&
2274 (uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER);
2275 }
2276
2277 static bool
_pthread_priority_to_policy(pthread_priority_t priority,thread_qos_policy_data_t * data)2278 _pthread_priority_to_policy(pthread_priority_t priority,
2279 thread_qos_policy_data_t *data)
2280 {
2281 data->qos_tier = _pthread_priority_thread_qos(priority);
2282 data->tier_importance = _pthread_priority_relpri(priority);
2283 if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 ||
2284 data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) {
2285 return false;
2286 }
2287 return true;
2288 }
2289
2290 static int
bsdthread_set_self(proc_t p,thread_t th,pthread_priority_t priority,mach_port_name_t voucher,enum workq_set_self_flags flags)2291 bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority,
2292 mach_port_name_t voucher, enum workq_set_self_flags flags)
2293 {
2294 struct uthread *uth = get_bsdthread_info(th);
2295 struct workqueue *wq = proc_get_wqptr(p);
2296
2297 kern_return_t kr;
2298 int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0;
2299 bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE);
2300
2301 assert(th == current_thread());
2302 if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) {
2303 if (!is_wq_thread) {
2304 unbind_rv = EINVAL;
2305 goto qos;
2306 }
2307
2308 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
2309 unbind_rv = EINVAL;
2310 goto qos;
2311 }
2312
2313 workq_threadreq_t kqr = uth->uu_kqr_bound;
2314 if (kqr == NULL) {
2315 unbind_rv = EALREADY;
2316 goto qos;
2317 }
2318
2319 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2320 unbind_rv = EINVAL;
2321 goto qos;
2322 }
2323
2324 kqueue_threadreq_unbind(p, kqr);
2325 }
2326
2327 qos:
2328 if (flags & (WORKQ_SET_SELF_QOS_FLAG | WORKQ_SET_SELF_QOS_OVERRIDE_FLAG)) {
2329 assert(flags & WORKQ_SET_SELF_QOS_FLAG);
2330
2331 thread_qos_policy_data_t new_policy;
2332 thread_qos_t qos_override = THREAD_QOS_UNSPECIFIED;
2333
2334 if (!_pthread_priority_to_policy(priority, &new_policy)) {
2335 qos_rv = EINVAL;
2336 goto voucher;
2337 }
2338
2339 if (flags & WORKQ_SET_SELF_QOS_OVERRIDE_FLAG) {
2340 /*
2341 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is set, we definitely
2342 * should have an override QoS in the pthread_priority_t and we should
2343 * only come into this path for cooperative thread requests
2344 */
2345 if (!_pthread_priority_has_override_qos(priority) ||
2346 !_pthread_priority_is_cooperative(priority)) {
2347 qos_rv = EINVAL;
2348 goto voucher;
2349 }
2350 qos_override = _pthread_priority_thread_override_qos(priority);
2351 } else {
2352 /*
2353 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is not set, we definitely
2354 * should not have an override QoS in the pthread_priority_t
2355 */
2356 if (_pthread_priority_has_override_qos(priority)) {
2357 qos_rv = EINVAL;
2358 goto voucher;
2359 }
2360 }
2361
2362 if (!is_wq_thread) {
2363 /*
2364 * Threads opted out of QoS can't change QoS
2365 */
2366 if (!thread_has_qos_policy(th)) {
2367 qos_rv = EPERM;
2368 goto voucher;
2369 }
2370 } else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER ||
2371 uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_ABOVEUI) {
2372 /*
2373 * Workqueue manager threads or threads above UI can't change QoS
2374 */
2375 qos_rv = EINVAL;
2376 goto voucher;
2377 } else {
2378 /*
2379 * For workqueue threads, possibly adjust buckets and redrive thread
2380 * requests.
2381 *
2382 * Transitions allowed:
2383 *
2384 * overcommit --> non-overcommit
2385 * overcommit --> overcommit
2386 * non-overcommit --> non-overcommit
2387 * non-overcommit --> overcommit (to be deprecated later)
2388 * cooperative --> cooperative
2389 *
2390 * All other transitions aren't allowed so reject them.
2391 */
2392 if (workq_thread_is_overcommit(uth) && _pthread_priority_is_cooperative(priority)) {
2393 qos_rv = EINVAL;
2394 goto voucher;
2395 } else if (workq_thread_is_cooperative(uth) && !_pthread_priority_is_cooperative(priority)) {
2396 qos_rv = EINVAL;
2397 goto voucher;
2398 } else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_cooperative(priority)) {
2399 qos_rv = EINVAL;
2400 goto voucher;
2401 }
2402
2403 struct uu_workq_policy old_pri, new_pri;
2404 bool force_run = false;
2405
2406 if (qos_override) {
2407 /*
2408 * We're in the case of a thread clarifying that it is for eg. not IN
2409 * req QoS but rather, UT req QoS with IN override. However, this can
2410 * race with a concurrent override happening to the thread via
2411 * workq_thread_add_dispatch_override so this needs to be
2412 * synchronized with the thread mutex.
2413 */
2414 thread_mtx_lock(th);
2415 }
2416
2417 workq_lock_spin(wq);
2418
2419 old_pri = new_pri = uth->uu_workq_pri;
2420 new_pri.qos_req = (thread_qos_t)new_policy.qos_tier;
2421
2422 if (old_pri.qos_override < qos_override) {
2423 /*
2424 * Since this can race with a concurrent override via
2425 * workq_thread_add_dispatch_override, only adjust override value if we
2426 * are higher - this is a saturating function.
2427 *
2428 * We should not be changing the final override values, we should simply
2429 * be redistributing the current value with a different breakdown of req
2430 * vs override QoS - assert to that effect. Therefore, buckets should
2431 * not change.
2432 */
2433 new_pri.qos_override = qos_override;
2434 assert(workq_pri_override(new_pri) == workq_pri_override(old_pri));
2435 assert(workq_pri_bucket(new_pri) == workq_pri_bucket(old_pri));
2436 }
2437
2438 /* Adjust schedule counts for various types of transitions */
2439
2440 /* overcommit -> non-overcommit */
2441 if (workq_thread_is_overcommit(uth) && _pthread_priority_is_nonovercommit(priority)) {
2442 workq_thread_set_type(uth, 0);
2443 wq->wq_constrained_threads_scheduled++;
2444
2445 /* non-overcommit -> overcommit */
2446 } else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_overcommit(priority)) {
2447 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
2448 force_run = (wq->wq_constrained_threads_scheduled-- == wq_max_constrained_threads);
2449
2450 /* cooperative -> cooperative */
2451 } else if (workq_thread_is_cooperative(uth)) {
2452 _wq_cooperative_queue_scheduled_count_dec(wq, old_pri.qos_req);
2453 _wq_cooperative_queue_scheduled_count_inc(wq, new_pri.qos_req);
2454
2455 /* We're changing schedule counts within cooperative pool, we
2456 * need to refresh best cooperative QoS logic again */
2457 force_run = _wq_cooperative_queue_refresh_best_req_qos(wq);
2458 }
2459
2460 /*
2461 * This will set up an override on the thread if any and will also call
2462 * schedule_creator if needed
2463 */
2464 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run);
2465 workq_unlock(wq);
2466
2467 if (qos_override) {
2468 thread_mtx_unlock(th);
2469 }
2470
2471 if (workq_thread_is_overcommit(uth)) {
2472 thread_disarm_workqueue_quantum(th);
2473 } else {
2474 /* If the thread changed QoS buckets, the quantum duration
2475 * may have changed too */
2476 thread_arm_workqueue_quantum(th);
2477 }
2478 }
2479
2480 kr = thread_policy_set_internal(th, THREAD_QOS_POLICY,
2481 (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT);
2482 if (kr != KERN_SUCCESS) {
2483 qos_rv = EINVAL;
2484 }
2485 }
2486
2487 voucher:
2488 if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) {
2489 kr = thread_set_voucher_name(voucher);
2490 if (kr != KERN_SUCCESS) {
2491 voucher_rv = ENOENT;
2492 goto fixedpri;
2493 }
2494 }
2495
2496 fixedpri:
2497 if (qos_rv) {
2498 goto done;
2499 }
2500 if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) {
2501 thread_extended_policy_data_t extpol = {.timeshare = 0};
2502
2503 if (is_wq_thread) {
2504 /* Not allowed on workqueue threads */
2505 fixedpri_rv = ENOTSUP;
2506 goto done;
2507 }
2508
2509 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2510 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2511 if (kr != KERN_SUCCESS) {
2512 fixedpri_rv = EINVAL;
2513 goto done;
2514 }
2515 } else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) {
2516 thread_extended_policy_data_t extpol = {.timeshare = 1};
2517
2518 if (is_wq_thread) {
2519 /* Not allowed on workqueue threads */
2520 fixedpri_rv = ENOTSUP;
2521 goto done;
2522 }
2523
2524 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2525 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2526 if (kr != KERN_SUCCESS) {
2527 fixedpri_rv = EINVAL;
2528 goto done;
2529 }
2530 }
2531
2532 done:
2533 if (qos_rv && voucher_rv) {
2534 /* Both failed, give that a unique error. */
2535 return EBADMSG;
2536 }
2537
2538 if (unbind_rv) {
2539 return unbind_rv;
2540 }
2541
2542 if (qos_rv) {
2543 return qos_rv;
2544 }
2545
2546 if (voucher_rv) {
2547 return voucher_rv;
2548 }
2549
2550 if (fixedpri_rv) {
2551 return fixedpri_rv;
2552 }
2553
2554
2555 return 0;
2556 }
2557
2558 static int
bsdthread_add_explicit_override(proc_t p,mach_port_name_t kport,pthread_priority_t pp,user_addr_t resource)2559 bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport,
2560 pthread_priority_t pp, user_addr_t resource)
2561 {
2562 thread_qos_t qos = _pthread_priority_thread_qos(pp);
2563 if (qos == THREAD_QOS_UNSPECIFIED) {
2564 return EINVAL;
2565 }
2566
2567 thread_t th = port_name_to_thread(kport,
2568 PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2569 if (th == THREAD_NULL) {
2570 return ESRCH;
2571 }
2572
2573 int rv = proc_thread_qos_add_override(proc_task(p), th, 0, qos, TRUE,
2574 resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2575
2576 thread_deallocate(th);
2577 return rv;
2578 }
2579
2580 static int
bsdthread_remove_explicit_override(proc_t p,mach_port_name_t kport,user_addr_t resource)2581 bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport,
2582 user_addr_t resource)
2583 {
2584 thread_t th = port_name_to_thread(kport,
2585 PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2586 if (th == THREAD_NULL) {
2587 return ESRCH;
2588 }
2589
2590 int rv = proc_thread_qos_remove_override(proc_task(p), th, 0, resource,
2591 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2592
2593 thread_deallocate(th);
2594 return rv;
2595 }
2596
2597 static int
workq_thread_add_dispatch_override(proc_t p,mach_port_name_t kport,pthread_priority_t pp,user_addr_t ulock_addr)2598 workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport,
2599 pthread_priority_t pp, user_addr_t ulock_addr)
2600 {
2601 struct uu_workq_policy old_pri, new_pri;
2602 struct workqueue *wq = proc_get_wqptr(p);
2603
2604 thread_qos_t qos_override = _pthread_priority_thread_qos(pp);
2605 if (qos_override == THREAD_QOS_UNSPECIFIED) {
2606 return EINVAL;
2607 }
2608
2609 thread_t thread = port_name_to_thread(kport,
2610 PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2611 if (thread == THREAD_NULL) {
2612 return ESRCH;
2613 }
2614
2615 struct uthread *uth = get_bsdthread_info(thread);
2616 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2617 thread_deallocate(thread);
2618 return EPERM;
2619 }
2620
2621 WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE,
2622 wq, thread_tid(thread), 1, pp);
2623
2624 thread_mtx_lock(thread);
2625
2626 if (ulock_addr) {
2627 uint32_t val;
2628 int rc;
2629 /*
2630 * Workaround lack of explicit support for 'no-fault copyin'
2631 * <rdar://problem/24999882>, as disabling preemption prevents paging in
2632 */
2633 disable_preemption();
2634 rc = copyin_atomic32(ulock_addr, &val);
2635 enable_preemption();
2636 if (rc == 0 && ulock_owner_value_to_port_name(val) != kport) {
2637 goto out;
2638 }
2639 }
2640
2641 workq_lock_spin(wq);
2642
2643 old_pri = uth->uu_workq_pri;
2644 if (old_pri.qos_override >= qos_override) {
2645 /* Nothing to do */
2646 } else if (thread == current_thread()) {
2647 new_pri = old_pri;
2648 new_pri.qos_override = qos_override;
2649 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2650 } else {
2651 uth->uu_workq_pri.qos_override = qos_override;
2652 if (qos_override > workq_pri_override(old_pri)) {
2653 thread_set_workq_override(thread, qos_override);
2654 }
2655 }
2656
2657 workq_unlock(wq);
2658
2659 out:
2660 thread_mtx_unlock(thread);
2661 thread_deallocate(thread);
2662 return 0;
2663 }
2664
2665 static int
workq_thread_reset_dispatch_override(proc_t p,thread_t thread)2666 workq_thread_reset_dispatch_override(proc_t p, thread_t thread)
2667 {
2668 struct uu_workq_policy old_pri, new_pri;
2669 struct workqueue *wq = proc_get_wqptr(p);
2670 struct uthread *uth = get_bsdthread_info(thread);
2671
2672 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2673 return EPERM;
2674 }
2675
2676 WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0);
2677
2678 /*
2679 * workq_thread_add_dispatch_override takes the thread mutex before doing the
2680 * copyin to validate the drainer and apply the override. We need to do the
2681 * same here. See rdar://84472518
2682 */
2683 thread_mtx_lock(thread);
2684
2685 workq_lock_spin(wq);
2686 old_pri = new_pri = uth->uu_workq_pri;
2687 new_pri.qos_override = THREAD_QOS_UNSPECIFIED;
2688 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2689 workq_unlock(wq);
2690
2691 thread_mtx_unlock(thread);
2692 return 0;
2693 }
2694
2695 static int
workq_thread_allow_kill(__unused proc_t p,thread_t thread,bool enable)2696 workq_thread_allow_kill(__unused proc_t p, thread_t thread, bool enable)
2697 {
2698 if (!(thread_get_tag(thread) & THREAD_TAG_WORKQUEUE)) {
2699 // If the thread isn't a workqueue thread, don't set the
2700 // kill_allowed bit; however, we still need to return 0
2701 // instead of an error code since this code is executed
2702 // on the abort path which needs to not depend on the
2703 // pthread_t (returning an error depends on pthread_t via
2704 // cerror_nocancel)
2705 return 0;
2706 }
2707 struct uthread *uth = get_bsdthread_info(thread);
2708 uth->uu_workq_pthread_kill_allowed = enable;
2709 return 0;
2710 }
2711
2712 static int
bsdthread_get_max_parallelism(thread_qos_t qos,unsigned long flags,int * retval)2713 bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags,
2714 int *retval)
2715 {
2716 static_assert(QOS_PARALLELISM_COUNT_LOGICAL ==
2717 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical");
2718 static_assert(QOS_PARALLELISM_REALTIME ==
2719 _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime");
2720 static_assert(QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE ==
2721 _PTHREAD_QOS_PARALLELISM_CLUSTER_SHARED_RSRC, "cluster shared resource");
2722
2723 if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL | QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE)) {
2724 return EINVAL;
2725 }
2726
2727 /* No units are present */
2728 if (flags & QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE) {
2729 return ENOTSUP;
2730 }
2731
2732 if (flags & QOS_PARALLELISM_REALTIME) {
2733 if (qos) {
2734 return EINVAL;
2735 }
2736 } else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) {
2737 return EINVAL;
2738 }
2739
2740 *retval = qos_max_parallelism(qos, flags);
2741 return 0;
2742 }
2743
2744 static int
bsdthread_dispatch_apply_attr(__unused struct proc * p,thread_t thread,unsigned long flags,uint64_t value1,__unused uint64_t value2)2745 bsdthread_dispatch_apply_attr(__unused struct proc *p, thread_t thread,
2746 unsigned long flags, uint64_t value1, __unused uint64_t value2)
2747 {
2748 uint32_t apply_worker_index;
2749 kern_return_t kr;
2750
2751 switch (flags) {
2752 case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_SET:
2753 apply_worker_index = (uint32_t)value1;
2754 kr = thread_shared_rsrc_policy_set(thread, apply_worker_index, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2755 /*
2756 * KERN_INVALID_POLICY indicates that the thread was trying to bind to a
2757 * cluster which it was not eligible to execute on.
2758 */
2759 return (kr == KERN_SUCCESS) ? 0 : ((kr == KERN_INVALID_POLICY) ? ENOTSUP : EINVAL);
2760 case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_CLEAR:
2761 kr = thread_shared_rsrc_policy_clear(thread, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2762 return (kr == KERN_SUCCESS) ? 0 : EINVAL;
2763 default:
2764 return EINVAL;
2765 }
2766 }
2767
2768 #define ENSURE_UNUSED(arg) \
2769 ({ if ((arg) != 0) { return EINVAL; } })
2770
2771 int
bsdthread_ctl(struct proc * p,struct bsdthread_ctl_args * uap,int * retval)2772 bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval)
2773 {
2774 switch (uap->cmd) {
2775 case BSDTHREAD_CTL_QOS_OVERRIDE_START:
2776 return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1,
2777 (pthread_priority_t)uap->arg2, uap->arg3);
2778 case BSDTHREAD_CTL_QOS_OVERRIDE_END:
2779 ENSURE_UNUSED(uap->arg3);
2780 return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1,
2781 (user_addr_t)uap->arg2);
2782
2783 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH:
2784 return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1,
2785 (pthread_priority_t)uap->arg2, uap->arg3);
2786 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET:
2787 return workq_thread_reset_dispatch_override(p, current_thread());
2788
2789 case BSDTHREAD_CTL_SET_SELF:
2790 return bsdthread_set_self(p, current_thread(),
2791 (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2,
2792 (enum workq_set_self_flags)uap->arg3);
2793
2794 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM:
2795 ENSURE_UNUSED(uap->arg3);
2796 return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1,
2797 (unsigned long)uap->arg2, retval);
2798 case BSDTHREAD_CTL_WORKQ_ALLOW_KILL:
2799 ENSURE_UNUSED(uap->arg2);
2800 ENSURE_UNUSED(uap->arg3);
2801 return workq_thread_allow_kill(p, current_thread(), (bool)uap->arg1);
2802 case BSDTHREAD_CTL_DISPATCH_APPLY_ATTR:
2803 return bsdthread_dispatch_apply_attr(p, current_thread(),
2804 (unsigned long)uap->arg1, (uint64_t)uap->arg2,
2805 (uint64_t)uap->arg3);
2806 case BSDTHREAD_CTL_SET_QOS:
2807 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD:
2808 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET:
2809 /* no longer supported */
2810 return ENOTSUP;
2811
2812 default:
2813 return EINVAL;
2814 }
2815 }
2816
2817 #pragma mark workqueue thread manipulation
2818
2819 static void __dead2
2820 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2821 struct uthread *uth, uint32_t setup_flags);
2822
2823 static void __dead2
2824 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2825 struct uthread *uth, uint32_t setup_flags);
2826
2827 static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2;
2828
2829 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2830 static inline uint64_t
workq_trace_req_id(workq_threadreq_t req)2831 workq_trace_req_id(workq_threadreq_t req)
2832 {
2833 struct kqworkloop *kqwl;
2834 if (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2835 kqwl = __container_of(req, struct kqworkloop, kqwl_request);
2836 return kqwl->kqwl_dynamicid;
2837 }
2838
2839 return VM_KERNEL_ADDRHIDE(req);
2840 }
2841 #endif
2842
2843 /**
2844 * Entry point for libdispatch to ask for threads
2845 */
2846 static int
workq_reqthreads(struct proc * p,uint32_t reqcount,pthread_priority_t pp,bool cooperative)2847 workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp, bool cooperative)
2848 {
2849 thread_qos_t qos = _pthread_priority_thread_qos(pp);
2850 struct workqueue *wq = proc_get_wqptr(p);
2851 uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI;
2852 int ret = 0;
2853
2854 if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX ||
2855 qos == THREAD_QOS_UNSPECIFIED) {
2856 ret = EINVAL;
2857 goto exit;
2858 }
2859
2860 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE,
2861 wq, reqcount, pp, cooperative);
2862
2863 workq_threadreq_t req = zalloc(workq_zone_threadreq);
2864 priority_queue_entry_init(&req->tr_entry);
2865 req->tr_state = WORKQ_TR_STATE_NEW;
2866 req->tr_qos = qos;
2867 workq_tr_flags_t tr_flags = 0;
2868
2869 if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
2870 tr_flags |= WORKQ_TR_FLAG_OVERCOMMIT;
2871 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2872 }
2873
2874 if (cooperative) {
2875 tr_flags |= WORKQ_TR_FLAG_COOPERATIVE;
2876 upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
2877
2878 if (reqcount > 1) {
2879 ret = ENOTSUP;
2880 goto free_and_exit;
2881 }
2882 }
2883
2884 /* A thread request cannot be both overcommit and cooperative */
2885 if (workq_tr_is_cooperative(tr_flags) &&
2886 workq_tr_is_overcommit(tr_flags)) {
2887 ret = EINVAL;
2888 goto free_and_exit;
2889 }
2890 req->tr_flags = tr_flags;
2891
2892 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE,
2893 wq, workq_trace_req_id(req), req->tr_qos, reqcount);
2894
2895 workq_lock_spin(wq);
2896 do {
2897 if (_wq_exiting(wq)) {
2898 goto unlock_and_exit;
2899 }
2900
2901 /*
2902 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2903 * threads without pacing, to inform the scheduler of that workload.
2904 *
2905 * The last requests, or the ones that failed the admission checks are
2906 * enqueued and go through the regular creator codepath.
2907 *
2908 * If there aren't enough threads, add one, but re-evaluate everything
2909 * as conditions may now have changed.
2910 */
2911 unpaced = reqcount - 1;
2912
2913 if (reqcount > 1) {
2914 /* We don't handle asking for parallelism on the cooperative
2915 * workqueue just yet */
2916 assert(!workq_threadreq_is_cooperative(req));
2917
2918 if (workq_threadreq_is_nonovercommit(req)) {
2919 unpaced = workq_constrained_allowance(wq, qos, NULL, false);
2920 if (unpaced >= reqcount - 1) {
2921 unpaced = reqcount - 1;
2922 }
2923 }
2924 }
2925
2926 /*
2927 * This path does not currently handle custom workloop parameters
2928 * when creating threads for parallelism.
2929 */
2930 assert(!(req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS));
2931
2932 /*
2933 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2934 */
2935 while (unpaced > 0 && wq->wq_thidlecount) {
2936 struct uthread *uth;
2937 bool needs_wakeup;
2938 uint8_t uu_flags = UT_WORKQ_EARLY_BOUND;
2939
2940 if (workq_tr_is_overcommit(req->tr_flags)) {
2941 uu_flags |= UT_WORKQ_OVERCOMMIT;
2942 }
2943
2944 uth = workq_pop_idle_thread(wq, uu_flags, &needs_wakeup);
2945
2946 _wq_thactive_inc(wq, qos);
2947 wq->wq_thscheduled_count[_wq_bucket(qos)]++;
2948 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
2949 wq->wq_fulfilled++;
2950
2951 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
2952 uth->uu_save.uus_workq_park_data.thread_request = req;
2953 if (needs_wakeup) {
2954 workq_thread_wakeup(uth);
2955 }
2956 unpaced--;
2957 reqcount--;
2958 }
2959 } while (unpaced && wq->wq_nthreads < wq_max_threads &&
2960 workq_add_new_idle_thread(p, wq));
2961
2962 if (_wq_exiting(wq)) {
2963 goto unlock_and_exit;
2964 }
2965
2966 req->tr_count = (uint16_t)reqcount;
2967 if (workq_threadreq_enqueue(wq, req)) {
2968 /* This can drop the workqueue lock, and take it again */
2969 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
2970 }
2971 workq_unlock(wq);
2972 return 0;
2973
2974 unlock_and_exit:
2975 workq_unlock(wq);
2976 free_and_exit:
2977 zfree(workq_zone_threadreq, req);
2978 exit:
2979 return ret;
2980 }
2981
2982 bool
workq_kern_threadreq_initiate(struct proc * p,workq_threadreq_t req,struct turnstile * workloop_ts,thread_qos_t qos,workq_kern_threadreq_flags_t flags)2983 workq_kern_threadreq_initiate(struct proc *p, workq_threadreq_t req,
2984 struct turnstile *workloop_ts, thread_qos_t qos,
2985 workq_kern_threadreq_flags_t flags)
2986 {
2987 struct workqueue *wq = proc_get_wqptr_fast(p);
2988 struct uthread *uth = NULL;
2989
2990 assert(req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT));
2991
2992 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
2993 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
2994 qos = thread_workq_qos_for_pri(trp.trp_pri);
2995 if (qos == THREAD_QOS_UNSPECIFIED) {
2996 qos = WORKQ_THREAD_QOS_ABOVEUI;
2997 }
2998 }
2999
3000 assert(req->tr_state == WORKQ_TR_STATE_IDLE);
3001 priority_queue_entry_init(&req->tr_entry);
3002 req->tr_count = 1;
3003 req->tr_state = WORKQ_TR_STATE_NEW;
3004 req->tr_qos = qos;
3005
3006 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq,
3007 workq_trace_req_id(req), qos, 1);
3008
3009 if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) {
3010 /*
3011 * we're called back synchronously from the context of
3012 * kqueue_threadreq_unbind from within workq_thread_return()
3013 * we can try to match up this thread with this request !
3014 */
3015 uth = current_uthread();
3016 assert(uth->uu_kqr_bound == NULL);
3017 }
3018
3019 workq_lock_spin(wq);
3020 if (_wq_exiting(wq)) {
3021 req->tr_state = WORKQ_TR_STATE_IDLE;
3022 workq_unlock(wq);
3023 return false;
3024 }
3025
3026 if (uth && workq_threadreq_admissible(wq, uth, req)) {
3027 /* This is the case of the rebind - we were about to park and unbind
3028 * when more events came so keep the binding.
3029 */
3030 assert(uth != wq->wq_creator);
3031
3032 if (uth->uu_workq_pri.qos_bucket != req->tr_qos) {
3033 _wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos);
3034 workq_thread_reset_pri(wq, uth, req, /*unpark*/ false);
3035 }
3036 /*
3037 * We're called from workq_kern_threadreq_initiate()
3038 * due to an unbind, with the kq req held.
3039 */
3040 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
3041 workq_trace_req_id(req), req->tr_flags, 0);
3042 wq->wq_fulfilled++;
3043
3044 kqueue_threadreq_bind(p, req, get_machthread(uth), 0);
3045 } else {
3046 if (workloop_ts) {
3047 workq_perform_turnstile_operation_locked(wq, ^{
3048 turnstile_update_inheritor(workloop_ts, wq->wq_turnstile,
3049 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
3050 turnstile_update_inheritor_complete(workloop_ts,
3051 TURNSTILE_INTERLOCK_HELD);
3052 });
3053 }
3054
3055 bool reevaluate_creator_thread_group = false;
3056 #if CONFIG_PREADOPT_TG
3057 reevaluate_creator_thread_group = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
3058 #endif
3059 /* We enqueued the highest priority item or we may need to reevaluate if
3060 * the creator needs a thread group pre-adoption */
3061 if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_thread_group) {
3062 workq_schedule_creator(p, wq, flags);
3063 }
3064 }
3065
3066 workq_unlock(wq);
3067
3068 return true;
3069 }
3070
3071 void
workq_kern_threadreq_modify(struct proc * p,workq_threadreq_t req,thread_qos_t qos,workq_kern_threadreq_flags_t flags)3072 workq_kern_threadreq_modify(struct proc *p, workq_threadreq_t req,
3073 thread_qos_t qos, workq_kern_threadreq_flags_t flags)
3074 {
3075 struct workqueue *wq = proc_get_wqptr_fast(p);
3076 bool make_overcommit = false;
3077
3078 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
3079 /* Requests outside-of-QoS shouldn't accept modify operations */
3080 return;
3081 }
3082
3083 workq_lock_spin(wq);
3084
3085 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3086 assert(req->tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP));
3087
3088 if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3089 kqueue_threadreq_bind(p, req, req->tr_thread, 0);
3090 workq_unlock(wq);
3091 return;
3092 }
3093
3094 if (flags & WORKQ_THREADREQ_MAKE_OVERCOMMIT) {
3095 /* TODO (rokhinip): We come into this code path for kqwl thread
3096 * requests. kqwl requests cannot be cooperative.
3097 */
3098 assert(!workq_threadreq_is_cooperative(req));
3099
3100 make_overcommit = workq_threadreq_is_nonovercommit(req);
3101 }
3102
3103 if (_wq_exiting(wq) || (req->tr_qos == qos && !make_overcommit)) {
3104 workq_unlock(wq);
3105 return;
3106 }
3107
3108 assert(req->tr_count == 1);
3109 if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3110 panic("Invalid thread request (%p) state %d", req, req->tr_state);
3111 }
3112
3113 WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq,
3114 workq_trace_req_id(req), qos, 0);
3115
3116 struct priority_queue_sched_max *pq = workq_priority_queue_for_req(wq, req);
3117 workq_threadreq_t req_max;
3118
3119 /*
3120 * Stage 1: Dequeue the request from its priority queue.
3121 *
3122 * If we dequeue the root item of the constrained priority queue,
3123 * maintain the best constrained request qos invariant.
3124 */
3125 if (priority_queue_remove(pq, &req->tr_entry)) {
3126 if (workq_threadreq_is_nonovercommit(req)) {
3127 _wq_thactive_refresh_best_constrained_req_qos(wq);
3128 }
3129 }
3130
3131 /*
3132 * Stage 2: Apply changes to the thread request
3133 *
3134 * If the item will not become the root of the priority queue it belongs to,
3135 * then we need to wait in line, just enqueue and return quickly.
3136 */
3137 if (__improbable(make_overcommit)) {
3138 req->tr_flags ^= WORKQ_TR_FLAG_OVERCOMMIT;
3139 pq = workq_priority_queue_for_req(wq, req);
3140 }
3141 req->tr_qos = qos;
3142
3143 req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry);
3144 if (req_max && req_max->tr_qos >= qos) {
3145 priority_queue_entry_set_sched_pri(pq, &req->tr_entry,
3146 workq_priority_for_req(req), false);
3147 priority_queue_insert(pq, &req->tr_entry);
3148 workq_unlock(wq);
3149 return;
3150 }
3151
3152 /*
3153 * Stage 3: Reevaluate whether we should run the thread request.
3154 *
3155 * Pretend the thread request is new again:
3156 * - adjust wq_reqcount to not count it anymore.
3157 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
3158 * properly attempts a synchronous bind)
3159 */
3160 wq->wq_reqcount--;
3161 req->tr_state = WORKQ_TR_STATE_NEW;
3162
3163 /* We enqueued the highest priority item or we may need to reevaluate if
3164 * the creator needs a thread group pre-adoption if the request got a new TG */
3165 bool reevaluate_creator_tg = false;
3166
3167 #if CONFIG_PREADOPT_TG
3168 reevaluate_creator_tg = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
3169 #endif
3170
3171 if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_tg) {
3172 workq_schedule_creator(p, wq, flags);
3173 }
3174 workq_unlock(wq);
3175 }
3176
3177 void
workq_kern_threadreq_lock(struct proc * p)3178 workq_kern_threadreq_lock(struct proc *p)
3179 {
3180 workq_lock_spin(proc_get_wqptr_fast(p));
3181 }
3182
3183 void
workq_kern_threadreq_unlock(struct proc * p)3184 workq_kern_threadreq_unlock(struct proc *p)
3185 {
3186 workq_unlock(proc_get_wqptr_fast(p));
3187 }
3188
3189 void
workq_kern_threadreq_update_inheritor(struct proc * p,workq_threadreq_t req,thread_t owner,struct turnstile * wl_ts,turnstile_update_flags_t flags)3190 workq_kern_threadreq_update_inheritor(struct proc *p, workq_threadreq_t req,
3191 thread_t owner, struct turnstile *wl_ts,
3192 turnstile_update_flags_t flags)
3193 {
3194 struct workqueue *wq = proc_get_wqptr_fast(p);
3195 turnstile_inheritor_t inheritor;
3196
3197 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3198 assert(req->tr_flags & WORKQ_TR_FLAG_WORKLOOP);
3199 workq_lock_held(wq);
3200
3201 if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3202 kqueue_threadreq_bind(p, req, req->tr_thread,
3203 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE);
3204 return;
3205 }
3206
3207 if (_wq_exiting(wq)) {
3208 inheritor = TURNSTILE_INHERITOR_NULL;
3209 } else {
3210 if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3211 panic("Invalid thread request (%p) state %d", req, req->tr_state);
3212 }
3213
3214 if (owner) {
3215 inheritor = owner;
3216 flags |= TURNSTILE_INHERITOR_THREAD;
3217 } else {
3218 inheritor = wq->wq_turnstile;
3219 flags |= TURNSTILE_INHERITOR_TURNSTILE;
3220 }
3221 }
3222
3223 workq_perform_turnstile_operation_locked(wq, ^{
3224 turnstile_update_inheritor(wl_ts, inheritor, flags);
3225 });
3226 }
3227
3228 void
workq_kern_threadreq_redrive(struct proc * p,workq_kern_threadreq_flags_t flags)3229 workq_kern_threadreq_redrive(struct proc *p, workq_kern_threadreq_flags_t flags)
3230 {
3231 struct workqueue *wq = proc_get_wqptr_fast(p);
3232
3233 workq_lock_spin(wq);
3234 workq_schedule_creator(p, wq, flags);
3235 workq_unlock(wq);
3236 }
3237
3238 /*
3239 * Always called at AST by the thread on itself
3240 *
3241 * Upon quantum expiry, the workqueue subsystem evaluates its state and decides
3242 * on what the thread should do next. The TSD value is always set by the thread
3243 * on itself in the kernel and cleared either by userspace when it acks the TSD
3244 * value and takes action, or by the thread in the kernel when the quantum
3245 * expires again.
3246 */
3247 void
workq_kern_quantum_expiry_reevaluate(proc_t proc,thread_t thread)3248 workq_kern_quantum_expiry_reevaluate(proc_t proc, thread_t thread)
3249 {
3250 struct uthread *uth = get_bsdthread_info(thread);
3251
3252 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3253 return;
3254 }
3255
3256 if (!thread_supports_cooperative_workqueue(thread)) {
3257 panic("Quantum expired for thread that doesn't support cooperative workqueue");
3258 }
3259
3260 thread_qos_t qos = uth->uu_workq_pri.qos_bucket;
3261 if (qos == THREAD_QOS_UNSPECIFIED) {
3262 panic("Thread should not have workq bucket of QoS UN");
3263 }
3264
3265 assert(thread_has_expired_workqueue_quantum(thread, false));
3266
3267 struct workqueue *wq = proc_get_wqptr(proc);
3268 assert(wq != NULL);
3269
3270 /*
3271 * For starters, we're just going to evaluate and see if we need to narrow
3272 * the pool and tell this thread to park if needed. In the future, we'll
3273 * evaluate and convey other workqueue state information like needing to
3274 * pump kevents, etc.
3275 */
3276 uint64_t flags = 0;
3277
3278 workq_lock_spin(wq);
3279
3280 if (workq_thread_is_cooperative(uth)) {
3281 if (!workq_cooperative_allowance(wq, qos, uth, false)) {
3282 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3283 } else {
3284 /* In the future, when we have kevent hookups for the cooperative
3285 * pool, we need fancier logic for what userspace should do. But
3286 * right now, only userspace thread requests exist - so we'll just
3287 * tell userspace to shuffle work items */
3288 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_SHUFFLE;
3289 }
3290 } else if (workq_thread_is_nonovercommit(uth)) {
3291 if (!workq_constrained_allowance(wq, qos, uth, false)) {
3292 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3293 }
3294 }
3295 workq_unlock(wq);
3296
3297 WQ_TRACE(TRACE_wq_quantum_expiry_reevaluate, flags, 0, 0, 0);
3298
3299 kevent_set_workq_quantum_expiry_user_tsd(proc, thread, flags);
3300
3301 /* We have conveyed to userspace about what it needs to do upon quantum
3302 * expiry, now rearm the workqueue quantum again */
3303 thread_arm_workqueue_quantum(get_machthread(uth));
3304 }
3305
3306 void
workq_schedule_creator_turnstile_redrive(struct workqueue * wq,bool locked)3307 workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked)
3308 {
3309 if (locked) {
3310 workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_NONE);
3311 } else {
3312 workq_schedule_immediate_thread_creation(wq);
3313 }
3314 }
3315
3316 static int
workq_thread_return(struct proc * p,struct workq_kernreturn_args * uap,struct workqueue * wq)3317 workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap,
3318 struct workqueue *wq)
3319 {
3320 thread_t th = current_thread();
3321 struct uthread *uth = get_bsdthread_info(th);
3322 workq_threadreq_t kqr = uth->uu_kqr_bound;
3323 workq_threadreq_param_t trp = { };
3324 int nevents = uap->affinity, error;
3325 user_addr_t eventlist = uap->item;
3326
3327 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3328 (uth->uu_workq_flags & UT_WORKQ_DYING)) {
3329 return EINVAL;
3330 }
3331
3332 if (eventlist && nevents && kqr == NULL) {
3333 return EINVAL;
3334 }
3335
3336 /* reset signal mask on the workqueue thread to default state */
3337 if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) {
3338 proc_lock(p);
3339 uth->uu_sigmask = ~workq_threadmask;
3340 proc_unlock(p);
3341 }
3342
3343 if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
3344 /*
3345 * Ensure we store the threadreq param before unbinding
3346 * the kqr from this thread.
3347 */
3348 trp = kqueue_threadreq_workloop_param(kqr);
3349 }
3350
3351 /*
3352 * Freeze the base pri while we decide the fate of this thread.
3353 *
3354 * Either:
3355 * - we return to user and kevent_cleanup will have unfrozen the base pri,
3356 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will.
3357 */
3358 thread_freeze_base_pri(th);
3359
3360 if (kqr) {
3361 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE;
3362 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
3363 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
3364 } else {
3365 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
3366 }
3367 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
3368 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
3369 } else {
3370 if (workq_thread_is_overcommit(uth)) {
3371 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
3372 }
3373 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
3374 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
3375 } else {
3376 upcall_flags |= uth->uu_workq_pri.qos_req |
3377 WQ_FLAG_THREAD_PRIO_QOS;
3378 }
3379 }
3380 error = pthread_functions->workq_handle_stack_events(p, th,
3381 get_task_map(proc_task(p)), uth->uu_workq_stackaddr,
3382 uth->uu_workq_thport, eventlist, nevents, upcall_flags);
3383 if (error) {
3384 assert(uth->uu_kqr_bound == kqr);
3385 return error;
3386 }
3387
3388 // pthread is supposed to pass KEVENT_FLAG_PARKING here
3389 // which should cause the above call to either:
3390 // - not return
3391 // - return an error
3392 // - return 0 and have unbound properly
3393 assert(uth->uu_kqr_bound == NULL);
3394 }
3395
3396 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0);
3397
3398 thread_sched_call(th, NULL);
3399 thread_will_park_or_terminate(th);
3400 #if CONFIG_WORKLOOP_DEBUG
3401 UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, });
3402 #endif
3403
3404 workq_lock_spin(wq);
3405 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3406 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
3407 workq_select_threadreq_or_park_and_unlock(p, wq, uth,
3408 WQ_SETUP_CLEAR_VOUCHER);
3409 __builtin_unreachable();
3410 }
3411
3412 /**
3413 * Multiplexed call to interact with the workqueue mechanism
3414 */
3415 int
workq_kernreturn(struct proc * p,struct workq_kernreturn_args * uap,int32_t * retval)3416 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval)
3417 {
3418 int options = uap->options;
3419 int arg2 = uap->affinity;
3420 int arg3 = uap->prio;
3421 struct workqueue *wq = proc_get_wqptr(p);
3422 int error = 0;
3423
3424 if ((p->p_lflag & P_LREGISTER) == 0) {
3425 return EINVAL;
3426 }
3427
3428 switch (options) {
3429 case WQOPS_QUEUE_NEWSPISUPP: {
3430 /*
3431 * arg2 = offset of serialno into dispatch queue
3432 * arg3 = kevent support
3433 */
3434 int offset = arg2;
3435 if (arg3 & 0x01) {
3436 // If we get here, then userspace has indicated support for kevent delivery.
3437 }
3438
3439 p->p_dispatchqueue_serialno_offset = (uint64_t)offset;
3440 break;
3441 }
3442 case WQOPS_QUEUE_REQTHREADS: {
3443 /*
3444 * arg2 = number of threads to start
3445 * arg3 = priority
3446 */
3447 error = workq_reqthreads(p, arg2, arg3, false);
3448 break;
3449 }
3450 /* For requesting threads for the cooperative pool */
3451 case WQOPS_QUEUE_REQTHREADS2: {
3452 /*
3453 * arg2 = number of threads to start
3454 * arg3 = priority
3455 */
3456 error = workq_reqthreads(p, arg2, arg3, true);
3457 break;
3458 }
3459 case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
3460 /*
3461 * arg2 = priority for the manager thread
3462 *
3463 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
3464 * the low bits of the value contains a scheduling priority
3465 * instead of a QOS value
3466 */
3467 pthread_priority_t pri = arg2;
3468
3469 if (wq == NULL) {
3470 error = EINVAL;
3471 break;
3472 }
3473
3474 /*
3475 * Normalize the incoming priority so that it is ordered numerically.
3476 */
3477 if (_pthread_priority_has_sched_pri(pri)) {
3478 pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK |
3479 _PTHREAD_PRIORITY_SCHED_PRI_FLAG);
3480 } else {
3481 thread_qos_t qos = _pthread_priority_thread_qos(pri);
3482 int relpri = _pthread_priority_relpri(pri);
3483 if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE ||
3484 qos == THREAD_QOS_UNSPECIFIED) {
3485 error = EINVAL;
3486 break;
3487 }
3488 pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3489 }
3490
3491 /*
3492 * If userspace passes a scheduling priority, that wins over any QoS.
3493 * Userspace should takes care not to lower the priority this way.
3494 */
3495 workq_lock_spin(wq);
3496 if (wq->wq_event_manager_priority < (uint32_t)pri) {
3497 wq->wq_event_manager_priority = (uint32_t)pri;
3498 }
3499 workq_unlock(wq);
3500 break;
3501 }
3502 case WQOPS_THREAD_KEVENT_RETURN:
3503 case WQOPS_THREAD_WORKLOOP_RETURN:
3504 case WQOPS_THREAD_RETURN: {
3505 error = workq_thread_return(p, uap, wq);
3506 break;
3507 }
3508
3509 case WQOPS_SHOULD_NARROW: {
3510 /*
3511 * arg2 = priority to test
3512 * arg3 = unused
3513 */
3514 thread_t th = current_thread();
3515 struct uthread *uth = get_bsdthread_info(th);
3516 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3517 (uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) {
3518 error = EINVAL;
3519 break;
3520 }
3521
3522 thread_qos_t qos = _pthread_priority_thread_qos(arg2);
3523 if (qos == THREAD_QOS_UNSPECIFIED) {
3524 error = EINVAL;
3525 break;
3526 }
3527 workq_lock_spin(wq);
3528 bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false);
3529 workq_unlock(wq);
3530
3531 *retval = should_narrow;
3532 break;
3533 }
3534 case WQOPS_SETUP_DISPATCH: {
3535 /*
3536 * item = pointer to workq_dispatch_config structure
3537 * arg2 = sizeof(item)
3538 */
3539 struct workq_dispatch_config cfg;
3540 bzero(&cfg, sizeof(cfg));
3541
3542 error = copyin(uap->item, &cfg, MIN(sizeof(cfg), (unsigned long) arg2));
3543 if (error) {
3544 break;
3545 }
3546
3547 if (cfg.wdc_flags & ~WORKQ_DISPATCH_SUPPORTED_FLAGS ||
3548 cfg.wdc_version < WORKQ_DISPATCH_MIN_SUPPORTED_VERSION) {
3549 error = ENOTSUP;
3550 break;
3551 }
3552
3553 /* Load fields from version 1 */
3554 p->p_dispatchqueue_serialno_offset = cfg.wdc_queue_serialno_offs;
3555
3556 /* Load fields from version 2 */
3557 if (cfg.wdc_version >= 2) {
3558 p->p_dispatchqueue_label_offset = cfg.wdc_queue_label_offs;
3559 }
3560
3561 break;
3562 }
3563 default:
3564 error = EINVAL;
3565 break;
3566 }
3567
3568 return error;
3569 }
3570
3571 /*
3572 * We have no work to do, park ourselves on the idle list.
3573 *
3574 * Consumes the workqueue lock and does not return.
3575 */
3576 __attribute__((noreturn, noinline))
3577 static void
workq_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)3578 workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth,
3579 uint32_t setup_flags)
3580 {
3581 assert(uth == current_uthread());
3582 assert(uth->uu_kqr_bound == NULL);
3583 workq_push_idle_thread(p, wq, uth, setup_flags); // may not return
3584
3585 workq_thread_reset_cpupercent(NULL, uth);
3586
3587 #if CONFIG_PREADOPT_TG
3588 /* Clear the preadoption thread group on the thread.
3589 *
3590 * Case 1:
3591 * Creator thread which never picked up a thread request. We set a
3592 * preadoption thread group on creator threads but if it never picked
3593 * up a thread request and didn't go to userspace, then the thread will
3594 * park with a preadoption thread group but no explicitly adopted
3595 * voucher or work interval.
3596 *
3597 * We drop the preadoption thread group here before proceeding to park.
3598 * Note - we may get preempted when we drop the workq lock below.
3599 *
3600 * Case 2:
3601 * Thread picked up a thread request and bound to it and returned back
3602 * from userspace and is parking. At this point, preadoption thread
3603 * group should be NULL since the thread has unbound from the thread
3604 * request. So this operation should be a no-op.
3605 */
3606 thread_set_preadopt_thread_group(get_machthread(uth), NULL);
3607 #endif
3608
3609 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) &&
3610 !(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3611 workq_unlock(wq);
3612
3613 /*
3614 * workq_push_idle_thread() will unset `has_stack`
3615 * if it wants us to free the stack before parking.
3616 */
3617 if (!uth->uu_save.uus_workq_park_data.has_stack) {
3618 pthread_functions->workq_markfree_threadstack(p,
3619 get_machthread(uth), get_task_map(proc_task(p)),
3620 uth->uu_workq_stackaddr);
3621 }
3622
3623 /*
3624 * When we remove the voucher from the thread, we may lose our importance
3625 * causing us to get preempted, so we do this after putting the thread on
3626 * the idle list. Then, when we get our importance back we'll be able to
3627 * use this thread from e.g. the kevent call out to deliver a boosting
3628 * message.
3629 *
3630 * Note that setting the voucher to NULL will not clear the preadoption
3631 * thread since this thread could have become the creator again and
3632 * perhaps acquired a preadoption thread group.
3633 */
3634 __assert_only kern_return_t kr;
3635 kr = thread_set_voucher_name(MACH_PORT_NULL);
3636 assert(kr == KERN_SUCCESS);
3637
3638 workq_lock_spin(wq);
3639 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
3640 setup_flags &= ~WQ_SETUP_CLEAR_VOUCHER;
3641 }
3642
3643 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3644
3645 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
3646 /*
3647 * While we'd dropped the lock to unset our voucher, someone came
3648 * around and made us runnable. But because we weren't waiting on the
3649 * event their thread_wakeup() was ineffectual. To correct for that,
3650 * we just run the continuation ourselves.
3651 */
3652 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
3653 __builtin_unreachable();
3654 }
3655
3656 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3657 workq_unpark_for_death_and_unlock(p, wq, uth,
3658 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
3659 __builtin_unreachable();
3660 }
3661
3662 /* Disarm the workqueue quantum since the thread is now idle */
3663 thread_disarm_workqueue_quantum(get_machthread(uth));
3664
3665 thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue);
3666 assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
3667 workq_unlock(wq);
3668 thread_block(workq_unpark_continue);
3669 __builtin_unreachable();
3670 }
3671
3672 static inline bool
workq_may_start_event_mgr_thread(struct workqueue * wq,struct uthread * uth)3673 workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth)
3674 {
3675 /*
3676 * There's an event manager request and either:
3677 * - no event manager currently running
3678 * - we are re-using the event manager
3679 */
3680 return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 ||
3681 (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER);
3682 }
3683
3684 static uint32_t
workq_constrained_allowance(struct workqueue * wq,thread_qos_t at_qos,struct uthread * uth,bool may_start_timer)3685 workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos,
3686 struct uthread *uth, bool may_start_timer)
3687 {
3688 assert(at_qos != WORKQ_THREAD_QOS_MANAGER);
3689 uint32_t count = 0;
3690
3691 uint32_t max_count = wq->wq_constrained_threads_scheduled;
3692 if (uth && workq_thread_is_nonovercommit(uth)) {
3693 /*
3694 * don't count the current thread as scheduled
3695 */
3696 assert(max_count > 0);
3697 max_count--;
3698 }
3699 if (max_count >= wq_max_constrained_threads) {
3700 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1,
3701 wq->wq_constrained_threads_scheduled,
3702 wq_max_constrained_threads);
3703 /*
3704 * we need 1 or more constrained threads to return to the kernel before
3705 * we can dispatch additional work
3706 */
3707 return 0;
3708 }
3709 max_count -= wq_max_constrained_threads;
3710
3711 /*
3712 * Compute a metric for many how many threads are active. We find the
3713 * highest priority request outstanding and then add up the number of active
3714 * threads in that and all higher-priority buckets. We'll also add any
3715 * "busy" threads which are not currently active but blocked recently enough
3716 * that we can't be sure that they won't be unblocked soon and start
3717 * being active again.
3718 *
3719 * We'll then compare this metric to our max concurrency to decide whether
3720 * to add a new thread.
3721 */
3722
3723 uint32_t busycount, thactive_count;
3724
3725 thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq),
3726 at_qos, &busycount, NULL);
3727
3728 if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER &&
3729 at_qos <= uth->uu_workq_pri.qos_bucket) {
3730 /*
3731 * Don't count this thread as currently active, but only if it's not
3732 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
3733 * managers.
3734 */
3735 assert(thactive_count > 0);
3736 thactive_count--;
3737 }
3738
3739 count = wq_max_parallelism[_wq_bucket(at_qos)];
3740 if (count > thactive_count + busycount) {
3741 count -= thactive_count + busycount;
3742 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2,
3743 thactive_count, busycount);
3744 return MIN(count, max_count);
3745 } else {
3746 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3,
3747 thactive_count, busycount);
3748 }
3749
3750 if (may_start_timer) {
3751 /*
3752 * If this is called from the add timer, we won't have another timer
3753 * fire when the thread exits the "busy" state, so rearm the timer.
3754 */
3755 workq_schedule_delayed_thread_creation(wq, 0);
3756 }
3757
3758 return 0;
3759 }
3760
3761 static bool
workq_threadreq_admissible(struct workqueue * wq,struct uthread * uth,workq_threadreq_t req)3762 workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
3763 workq_threadreq_t req)
3764 {
3765 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
3766 return workq_may_start_event_mgr_thread(wq, uth);
3767 }
3768 if (workq_threadreq_is_cooperative(req)) {
3769 return workq_cooperative_allowance(wq, req->tr_qos, uth, true);
3770 }
3771 if (workq_threadreq_is_nonovercommit(req)) {
3772 return workq_constrained_allowance(wq, req->tr_qos, uth, true);
3773 }
3774
3775 return true;
3776 }
3777
3778 /*
3779 * Called from the context of selecting thread requests for threads returning
3780 * from userspace or creator thread
3781 */
3782 static workq_threadreq_t
workq_cooperative_queue_best_req(struct workqueue * wq,struct uthread * uth)3783 workq_cooperative_queue_best_req(struct workqueue *wq, struct uthread *uth)
3784 {
3785 workq_lock_held(wq);
3786
3787 /*
3788 * If the current thread is cooperative, we need to exclude it as part of
3789 * cooperative schedule count since this thread is looking for a new
3790 * request. Change in the schedule count for cooperative pool therefore
3791 * requires us to reeevaluate the next best request for it.
3792 */
3793 if (uth && workq_thread_is_cooperative(uth)) {
3794 _wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req);
3795
3796 (void) _wq_cooperative_queue_refresh_best_req_qos(wq);
3797
3798 _wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req);
3799 } else {
3800 /*
3801 * The old value that was already precomputed should be safe to use -
3802 * add an assert that asserts that the best req QoS doesn't change in
3803 * this case
3804 */
3805 assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
3806 }
3807
3808 thread_qos_t qos = wq->wq_cooperative_queue_best_req_qos;
3809
3810 /* There are no eligible requests in the cooperative pool */
3811 if (qos == THREAD_QOS_UNSPECIFIED) {
3812 return NULL;
3813 }
3814 assert(qos != WORKQ_THREAD_QOS_ABOVEUI);
3815 assert(qos != WORKQ_THREAD_QOS_MANAGER);
3816
3817 uint8_t bucket = _wq_bucket(qos);
3818 assert(!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket]));
3819
3820 return STAILQ_FIRST(&wq->wq_cooperative_queue[bucket]);
3821 }
3822
3823 static workq_threadreq_t
workq_threadreq_select_for_creator(struct workqueue * wq)3824 workq_threadreq_select_for_creator(struct workqueue *wq)
3825 {
3826 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
3827 thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
3828 uint8_t pri = 0;
3829
3830 /*
3831 * Compute the best priority request, and ignore the turnstile for now
3832 */
3833
3834 req_pri = priority_queue_max(&wq->wq_special_queue,
3835 struct workq_threadreq_s, tr_entry);
3836 if (req_pri) {
3837 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
3838 &req_pri->tr_entry);
3839 }
3840
3841 /*
3842 * Handle the manager thread request. The special queue might yield
3843 * a higher priority, but the manager always beats the QoS world.
3844 */
3845
3846 req_mgr = wq->wq_event_manager_threadreq;
3847 if (req_mgr && workq_may_start_event_mgr_thread(wq, NULL)) {
3848 uint32_t mgr_pri = wq->wq_event_manager_priority;
3849
3850 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
3851 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
3852 } else {
3853 mgr_pri = thread_workq_pri_for_qos(
3854 _pthread_priority_thread_qos(mgr_pri));
3855 }
3856
3857 return mgr_pri >= pri ? req_mgr : req_pri;
3858 }
3859
3860 /*
3861 * Compute the best QoS Request, and check whether it beats the "pri" one
3862 *
3863 * Start by comparing the overcommit and the cooperative pool
3864 */
3865 req_qos = priority_queue_max(&wq->wq_overcommit_queue,
3866 struct workq_threadreq_s, tr_entry);
3867 if (req_qos) {
3868 qos = req_qos->tr_qos;
3869 }
3870
3871 req_tmp = workq_cooperative_queue_best_req(wq, NULL);
3872 if (req_tmp && qos <= req_tmp->tr_qos) {
3873 /*
3874 * Cooperative TR is better between overcommit and cooperative. Note
3875 * that if qos is same between overcommit and cooperative, we choose
3876 * cooperative.
3877 *
3878 * Pick cooperative pool if it passes the admissions check
3879 */
3880 if (workq_cooperative_allowance(wq, req_tmp->tr_qos, NULL, true)) {
3881 req_qos = req_tmp;
3882 qos = req_qos->tr_qos;
3883 }
3884 }
3885
3886 /*
3887 * Compare the best QoS so far - either from overcommit or from cooperative
3888 * pool - and compare it with the constrained pool
3889 */
3890 req_tmp = priority_queue_max(&wq->wq_constrained_queue,
3891 struct workq_threadreq_s, tr_entry);
3892
3893 if (req_tmp && qos < req_tmp->tr_qos) {
3894 /*
3895 * Constrained pool is best in QoS between overcommit, cooperative
3896 * and constrained. Now check how it fairs against the priority case
3897 */
3898 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
3899 return req_pri;
3900 }
3901
3902 if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true)) {
3903 /*
3904 * If the constrained thread request is the best one and passes
3905 * the admission check, pick it.
3906 */
3907 return req_tmp;
3908 }
3909 }
3910
3911 /*
3912 * Compare the best of the QoS world with the priority
3913 */
3914 if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
3915 return req_pri;
3916 }
3917
3918 if (req_qos) {
3919 return req_qos;
3920 }
3921
3922 /*
3923 * If we had no eligible request but we have a turnstile push,
3924 * it must be a non overcommit thread request that failed
3925 * the admission check.
3926 *
3927 * Just fake a BG thread request so that if the push stops the creator
3928 * priority just drops to 4.
3929 */
3930 if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) {
3931 static struct workq_threadreq_s workq_sync_push_fake_req = {
3932 .tr_qos = THREAD_QOS_BACKGROUND,
3933 };
3934
3935 return &workq_sync_push_fake_req;
3936 }
3937
3938 return NULL;
3939 }
3940
3941 /*
3942 * Returns true if this caused a change in the schedule counts of the
3943 * cooperative pool
3944 */
3945 static bool
workq_adjust_cooperative_constrained_schedule_counts(struct workqueue * wq,struct uthread * uth,thread_qos_t old_thread_qos,workq_tr_flags_t tr_flags)3946 workq_adjust_cooperative_constrained_schedule_counts(struct workqueue *wq,
3947 struct uthread *uth, thread_qos_t old_thread_qos, workq_tr_flags_t tr_flags)
3948 {
3949 workq_lock_held(wq);
3950
3951 /*
3952 * Row: thread type
3953 * Column: Request type
3954 *
3955 * overcommit non-overcommit cooperative
3956 * overcommit X case 1 case 2
3957 * cooperative case 3 case 4 case 5
3958 * non-overcommit case 6 X case 7
3959 *
3960 * Move the thread to the right bucket depending on what state it currently
3961 * has and what state the thread req it picks, is going to have.
3962 *
3963 * Note that the creator thread is an overcommit thread.
3964 */
3965 thread_qos_t new_thread_qos = uth->uu_workq_pri.qos_req;
3966
3967 /*
3968 * Anytime a cooperative bucket's schedule count changes, we need to
3969 * potentially refresh the next best QoS for that pool when we determine
3970 * the next request for the creator
3971 */
3972 bool cooperative_pool_sched_count_changed = false;
3973
3974 if (workq_thread_is_overcommit(uth)) {
3975 if (workq_tr_is_nonovercommit(tr_flags)) {
3976 // Case 1: thread is overcommit, req is non-overcommit
3977 wq->wq_constrained_threads_scheduled++;
3978 } else if (workq_tr_is_cooperative(tr_flags)) {
3979 // Case 2: thread is overcommit, req is cooperative
3980 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
3981 cooperative_pool_sched_count_changed = true;
3982 }
3983 } else if (workq_thread_is_cooperative(uth)) {
3984 if (workq_tr_is_overcommit(tr_flags)) {
3985 // Case 3: thread is cooperative, req is overcommit
3986 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3987 } else if (workq_tr_is_nonovercommit(tr_flags)) {
3988 // Case 4: thread is cooperative, req is non-overcommit
3989 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3990 wq->wq_constrained_threads_scheduled++;
3991 } else {
3992 // Case 5: thread is cooperative, req is also cooperative
3993 assert(workq_tr_is_cooperative(tr_flags));
3994 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3995 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
3996 }
3997 cooperative_pool_sched_count_changed = true;
3998 } else {
3999 if (workq_tr_is_overcommit(tr_flags)) {
4000 // Case 6: Thread is non-overcommit, req is overcommit
4001 wq->wq_constrained_threads_scheduled--;
4002 } else if (workq_tr_is_cooperative(tr_flags)) {
4003 // Case 7: Thread is non-overcommit, req is cooperative
4004 wq->wq_constrained_threads_scheduled--;
4005 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
4006 cooperative_pool_sched_count_changed = true;
4007 }
4008 }
4009
4010 return cooperative_pool_sched_count_changed;
4011 }
4012
4013 static workq_threadreq_t
workq_threadreq_select(struct workqueue * wq,struct uthread * uth)4014 workq_threadreq_select(struct workqueue *wq, struct uthread *uth)
4015 {
4016 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
4017 uintptr_t proprietor;
4018 thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
4019 uint8_t pri = 0;
4020
4021 if (uth == wq->wq_creator) {
4022 uth = NULL;
4023 }
4024
4025 /*
4026 * Compute the best priority request (special or turnstile)
4027 */
4028
4029 pri = (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile,
4030 &proprietor);
4031 if (pri) {
4032 struct kqworkloop *kqwl = (struct kqworkloop *)proprietor;
4033 req_pri = &kqwl->kqwl_request;
4034 if (req_pri->tr_state != WORKQ_TR_STATE_QUEUED) {
4035 panic("Invalid thread request (%p) state %d",
4036 req_pri, req_pri->tr_state);
4037 }
4038 } else {
4039 req_pri = NULL;
4040 }
4041
4042 req_tmp = priority_queue_max(&wq->wq_special_queue,
4043 struct workq_threadreq_s, tr_entry);
4044 if (req_tmp && pri < priority_queue_entry_sched_pri(&wq->wq_special_queue,
4045 &req_tmp->tr_entry)) {
4046 req_pri = req_tmp;
4047 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
4048 &req_tmp->tr_entry);
4049 }
4050
4051 /*
4052 * Handle the manager thread request. The special queue might yield
4053 * a higher priority, but the manager always beats the QoS world.
4054 */
4055
4056 req_mgr = wq->wq_event_manager_threadreq;
4057 if (req_mgr && workq_may_start_event_mgr_thread(wq, uth)) {
4058 uint32_t mgr_pri = wq->wq_event_manager_priority;
4059
4060 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
4061 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
4062 } else {
4063 mgr_pri = thread_workq_pri_for_qos(
4064 _pthread_priority_thread_qos(mgr_pri));
4065 }
4066
4067 return mgr_pri >= pri ? req_mgr : req_pri;
4068 }
4069
4070 /*
4071 * Compute the best QoS Request, and check whether it beats the "pri" one
4072 */
4073
4074 req_qos = priority_queue_max(&wq->wq_overcommit_queue,
4075 struct workq_threadreq_s, tr_entry);
4076 if (req_qos) {
4077 qos = req_qos->tr_qos;
4078 }
4079
4080 req_tmp = workq_cooperative_queue_best_req(wq, uth);
4081 if (req_tmp && qos <= req_tmp->tr_qos) {
4082 /*
4083 * Cooperative TR is better between overcommit and cooperative. Note
4084 * that if qos is same between overcommit and cooperative, we choose
4085 * cooperative.
4086 *
4087 * Pick cooperative pool if it passes the admissions check
4088 */
4089 if (workq_cooperative_allowance(wq, req_tmp->tr_qos, uth, true)) {
4090 req_qos = req_tmp;
4091 qos = req_qos->tr_qos;
4092 }
4093 }
4094
4095 /*
4096 * Compare the best QoS so far - either from overcommit or from cooperative
4097 * pool - and compare it with the constrained pool
4098 */
4099 req_tmp = priority_queue_max(&wq->wq_constrained_queue,
4100 struct workq_threadreq_s, tr_entry);
4101
4102 if (req_tmp && qos < req_tmp->tr_qos) {
4103 /*
4104 * Constrained pool is best in QoS between overcommit, cooperative
4105 * and constrained. Now check how it fairs against the priority case
4106 */
4107 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
4108 return req_pri;
4109 }
4110
4111 if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true)) {
4112 /*
4113 * If the constrained thread request is the best one and passes
4114 * the admission check, pick it.
4115 */
4116 return req_tmp;
4117 }
4118 }
4119
4120 if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
4121 return req_pri;
4122 }
4123
4124 return req_qos;
4125 }
4126
4127 /*
4128 * The creator is an anonymous thread that is counted as scheduled,
4129 * but otherwise without its scheduler callback set or tracked as active
4130 * that is used to make other threads.
4131 *
4132 * When more requests are added or an existing one is hurried along,
4133 * a creator is elected and setup, or the existing one overridden accordingly.
4134 *
4135 * While this creator is in flight, because no request has been dequeued,
4136 * already running threads have a chance at stealing thread requests avoiding
4137 * useless context switches, and the creator once scheduled may not find any
4138 * work to do and will then just park again.
4139 *
4140 * The creator serves the dual purpose of informing the scheduler of work that
4141 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
4142 * for thread creation.
4143 *
4144 * By being anonymous (and not bound to anything) it means that thread requests
4145 * can be stolen from this creator by threads already on core yielding more
4146 * efficient scheduling and reduced context switches.
4147 */
4148 static void
workq_schedule_creator(proc_t p,struct workqueue * wq,workq_kern_threadreq_flags_t flags)4149 workq_schedule_creator(proc_t p, struct workqueue *wq,
4150 workq_kern_threadreq_flags_t flags)
4151 {
4152 workq_threadreq_t req;
4153 struct uthread *uth;
4154 bool needs_wakeup;
4155
4156 workq_lock_held(wq);
4157 assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0);
4158
4159 again:
4160 uth = wq->wq_creator;
4161
4162 if (!wq->wq_reqcount) {
4163 /*
4164 * There is no thread request left.
4165 *
4166 * If there is a creator, leave everything in place, so that it cleans
4167 * up itself in workq_push_idle_thread().
4168 *
4169 * Else, make sure the turnstile state is reset to no inheritor.
4170 */
4171 if (uth == NULL) {
4172 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4173 }
4174 return;
4175 }
4176
4177 req = workq_threadreq_select_for_creator(wq);
4178 if (req == NULL) {
4179 /*
4180 * There isn't a thread request that passes the admission check.
4181 *
4182 * If there is a creator, do not touch anything, the creator will sort
4183 * it out when it runs.
4184 *
4185 * Else, set the inheritor to "WORKQ" so that the turnstile propagation
4186 * code calls us if anything changes.
4187 */
4188 if (uth == NULL) {
4189 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
4190 }
4191 return;
4192 }
4193
4194
4195 if (uth) {
4196 /*
4197 * We need to maybe override the creator we already have
4198 */
4199 if (workq_thread_needs_priority_change(req, uth)) {
4200 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4201 wq, 1, uthread_tid(uth), req->tr_qos);
4202 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4203 }
4204 assert(wq->wq_inheritor == get_machthread(uth));
4205 } else if (wq->wq_thidlecount) {
4206 /*
4207 * We need to unpark a creator thread
4208 */
4209 wq->wq_creator = uth = workq_pop_idle_thread(wq, UT_WORKQ_OVERCOMMIT,
4210 &needs_wakeup);
4211 /* Always reset the priorities on the newly chosen creator */
4212 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4213 workq_turnstile_update_inheritor(wq, get_machthread(uth),
4214 TURNSTILE_INHERITOR_THREAD);
4215 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4216 wq, 2, uthread_tid(uth), req->tr_qos);
4217 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4218 uth->uu_save.uus_workq_park_data.yields = 0;
4219 if (needs_wakeup) {
4220 workq_thread_wakeup(uth);
4221 }
4222 } else {
4223 /*
4224 * We need to allocate a thread...
4225 */
4226 if (__improbable(wq->wq_nthreads >= wq_max_threads)) {
4227 /* out of threads, just go away */
4228 flags = WORKQ_THREADREQ_NONE;
4229 } else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) {
4230 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ);
4231 } else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) {
4232 /* This can drop the workqueue lock, and take it again */
4233 workq_schedule_immediate_thread_creation(wq);
4234 } else if (workq_add_new_idle_thread(p, wq)) {
4235 goto again;
4236 } else {
4237 workq_schedule_delayed_thread_creation(wq, 0);
4238 }
4239
4240 /*
4241 * If the current thread is the inheritor:
4242 *
4243 * If we set the AST, then the thread will stay the inheritor until
4244 * either the AST calls workq_kern_threadreq_redrive(), or it parks
4245 * and calls workq_push_idle_thread().
4246 *
4247 * Else, the responsibility of the thread creation is with a thread-call
4248 * and we need to clear the inheritor.
4249 */
4250 if ((flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) == 0 &&
4251 wq->wq_inheritor == current_thread()) {
4252 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4253 }
4254 }
4255 }
4256
4257 /**
4258 * Same as workq_unpark_select_threadreq_or_park_and_unlock,
4259 * but do not allow early binds.
4260 *
4261 * Called with the base pri frozen, will unfreeze it.
4262 */
4263 __attribute__((noreturn, noinline))
4264 static void
workq_select_threadreq_or_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)4265 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4266 struct uthread *uth, uint32_t setup_flags)
4267 {
4268 workq_threadreq_t req = NULL;
4269 bool is_creator = (wq->wq_creator == uth);
4270 bool schedule_creator = false;
4271
4272 if (__improbable(_wq_exiting(wq))) {
4273 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0);
4274 goto park;
4275 }
4276
4277 if (wq->wq_reqcount == 0) {
4278 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0);
4279 goto park;
4280 }
4281
4282 req = workq_threadreq_select(wq, uth);
4283 if (__improbable(req == NULL)) {
4284 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0);
4285 goto park;
4286 }
4287
4288 struct uu_workq_policy old_pri = uth->uu_workq_pri;
4289 uint8_t tr_flags = req->tr_flags;
4290 struct turnstile *req_ts = kqueue_threadreq_get_turnstile(req);
4291
4292 /*
4293 * Attempt to setup ourselves as the new thing to run, moving all priority
4294 * pushes to ourselves.
4295 *
4296 * If the current thread is the creator, then the fact that we are presently
4297 * running is proof that we'll do something useful, so keep going.
4298 *
4299 * For other cases, peek at the AST to know whether the scheduler wants
4300 * to preempt us, if yes, park instead, and move the thread request
4301 * turnstile back to the workqueue.
4302 */
4303 if (req_ts) {
4304 workq_perform_turnstile_operation_locked(wq, ^{
4305 turnstile_update_inheritor(req_ts, get_machthread(uth),
4306 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_THREAD);
4307 turnstile_update_inheritor_complete(req_ts,
4308 TURNSTILE_INTERLOCK_HELD);
4309 });
4310 }
4311
4312 /* accounting changes of aggregate thscheduled_count and thactive which has
4313 * to be paired with the workq_thread_reset_pri below so that we have
4314 * uth->uu_workq_pri match with thactive.
4315 *
4316 * This is undone when the thread parks */
4317 if (is_creator) {
4318 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0,
4319 uth->uu_save.uus_workq_park_data.yields);
4320 wq->wq_creator = NULL;
4321 _wq_thactive_inc(wq, req->tr_qos);
4322 wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++;
4323 } else if (old_pri.qos_bucket != req->tr_qos) {
4324 _wq_thactive_move(wq, old_pri.qos_bucket, req->tr_qos);
4325 }
4326 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4327
4328 /*
4329 * Make relevant accounting changes for pool specific counts.
4330 *
4331 * The schedule counts changing can affect what the next best request
4332 * for cooperative thread pool is if this request is dequeued.
4333 */
4334 bool cooperative_sched_count_changed =
4335 workq_adjust_cooperative_constrained_schedule_counts(wq, uth,
4336 old_pri.qos_req, tr_flags);
4337
4338 if (workq_tr_is_overcommit(tr_flags)) {
4339 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
4340 } else if (workq_tr_is_cooperative(tr_flags)) {
4341 workq_thread_set_type(uth, UT_WORKQ_COOPERATIVE);
4342 } else {
4343 workq_thread_set_type(uth, 0);
4344 }
4345
4346 if (__improbable(thread_unfreeze_base_pri(get_machthread(uth)) && !is_creator)) {
4347 if (req_ts) {
4348 workq_perform_turnstile_operation_locked(wq, ^{
4349 turnstile_update_inheritor(req_ts, wq->wq_turnstile,
4350 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
4351 turnstile_update_inheritor_complete(req_ts,
4352 TURNSTILE_INTERLOCK_HELD);
4353 });
4354 }
4355 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 3, 0, 0);
4356 goto park_thawed;
4357 }
4358
4359 /*
4360 * We passed all checks, dequeue the request, bind to it, and set it up
4361 * to return to user.
4362 */
4363 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4364 workq_trace_req_id(req), tr_flags, 0);
4365 wq->wq_fulfilled++;
4366 schedule_creator = workq_threadreq_dequeue(wq, req,
4367 cooperative_sched_count_changed);
4368
4369 workq_thread_reset_cpupercent(req, uth);
4370
4371 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4372 kqueue_threadreq_bind_prepost(p, req, uth);
4373 req = NULL;
4374 } else if (req->tr_count > 0) {
4375 req = NULL;
4376 }
4377
4378 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4379 uth->uu_workq_flags ^= UT_WORKQ_NEW;
4380 setup_flags |= WQ_SETUP_FIRST_USE;
4381 }
4382
4383 /* If one of the following is true, call workq_schedule_creator (which also
4384 * adjusts priority of existing creator):
4385 *
4386 * - We are the creator currently so the wq may need a new creator
4387 * - The request we're binding to is the highest priority one, existing
4388 * creator's priority might need to be adjusted to reflect the next
4389 * highest TR
4390 */
4391 if (is_creator || schedule_creator) {
4392 /* This can drop the workqueue lock, and take it again */
4393 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
4394 }
4395
4396 workq_unlock(wq);
4397
4398 if (req) {
4399 zfree(workq_zone_threadreq, req);
4400 }
4401
4402 /*
4403 * Run Thread, Run!
4404 */
4405 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI;
4406 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
4407 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
4408 } else if (workq_tr_is_overcommit(tr_flags)) {
4409 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
4410 } else if (workq_tr_is_cooperative(tr_flags)) {
4411 upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
4412 }
4413 if (tr_flags & WORKQ_TR_FLAG_KEVENT) {
4414 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
4415 assert((upcall_flags & WQ_FLAG_THREAD_COOPERATIVE) == 0);
4416 }
4417
4418 if (tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
4419 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
4420 }
4421 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
4422
4423 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4424 kqueue_threadreq_bind_commit(p, get_machthread(uth));
4425 } else {
4426 #if CONFIG_PREADOPT_TG
4427 /*
4428 * The thread may have a preadopt thread group on it already because it
4429 * got tagged with it as a creator thread. So we need to make sure to
4430 * clear that since we don't have preadoption for anonymous thread
4431 * requests
4432 */
4433 thread_set_preadopt_thread_group(get_machthread(uth), NULL);
4434 #endif
4435 }
4436
4437 workq_setup_and_run(p, uth, setup_flags);
4438 __builtin_unreachable();
4439
4440 park:
4441 thread_unfreeze_base_pri(get_machthread(uth));
4442 park_thawed:
4443 workq_park_and_unlock(p, wq, uth, setup_flags);
4444 }
4445
4446 /**
4447 * Runs a thread request on a thread
4448 *
4449 * - if thread is THREAD_NULL, will find a thread and run the request there.
4450 * Otherwise, the thread must be the current thread.
4451 *
4452 * - if req is NULL, will find the highest priority request and run that. If
4453 * it is not NULL, it must be a threadreq object in state NEW. If it can not
4454 * be run immediately, it will be enqueued and moved to state QUEUED.
4455 *
4456 * Either way, the thread request object serviced will be moved to state
4457 * BINDING and attached to the uthread.
4458 *
4459 * Should be called with the workqueue lock held. Will drop it.
4460 * Should be called with the base pri not frozen.
4461 */
4462 __attribute__((noreturn, noinline))
4463 static void
workq_unpark_select_threadreq_or_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)4464 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4465 struct uthread *uth, uint32_t setup_flags)
4466 {
4467 if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) {
4468 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4469 setup_flags |= WQ_SETUP_FIRST_USE;
4470 }
4471 uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND);
4472 /*
4473 * This pointer is possibly freed and only used for tracing purposes.
4474 */
4475 workq_threadreq_t req = uth->uu_save.uus_workq_park_data.thread_request;
4476 workq_unlock(wq);
4477 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4478 VM_KERNEL_ADDRHIDE(req), 0, 0);
4479 (void)req;
4480
4481 workq_setup_and_run(p, uth, setup_flags);
4482 __builtin_unreachable();
4483 }
4484
4485 thread_freeze_base_pri(get_machthread(uth));
4486 workq_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
4487 }
4488
4489 static bool
workq_creator_should_yield(struct workqueue * wq,struct uthread * uth)4490 workq_creator_should_yield(struct workqueue *wq, struct uthread *uth)
4491 {
4492 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
4493
4494 if (qos >= THREAD_QOS_USER_INTERACTIVE) {
4495 return false;
4496 }
4497
4498 uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot;
4499 if (wq->wq_fulfilled == snapshot) {
4500 return false;
4501 }
4502
4503 uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)];
4504 if (wq->wq_fulfilled - snapshot > conc) {
4505 /* we fulfilled more than NCPU requests since being dispatched */
4506 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1,
4507 wq->wq_fulfilled, snapshot);
4508 return true;
4509 }
4510
4511 for (uint8_t i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) {
4512 cnt += wq->wq_thscheduled_count[i];
4513 }
4514 if (conc <= cnt) {
4515 /* We fulfilled requests and have more than NCPU scheduled threads */
4516 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2,
4517 wq->wq_fulfilled, snapshot);
4518 return true;
4519 }
4520
4521 return false;
4522 }
4523
4524 /**
4525 * parked thread wakes up
4526 */
4527 __attribute__((noreturn, noinline))
4528 static void
workq_unpark_continue(void * parameter __unused,wait_result_t wr __unused)4529 workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused)
4530 {
4531 thread_t th = current_thread();
4532 struct uthread *uth = get_bsdthread_info(th);
4533 proc_t p = current_proc();
4534 struct workqueue *wq = proc_get_wqptr_fast(p);
4535
4536 workq_lock_spin(wq);
4537
4538 if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) {
4539 /*
4540 * If the number of threads we have out are able to keep up with the
4541 * demand, then we should avoid sending this creator thread to
4542 * userspace.
4543 */
4544 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4545 uth->uu_save.uus_workq_park_data.yields++;
4546 workq_unlock(wq);
4547 thread_yield_with_continuation(workq_unpark_continue, NULL);
4548 __builtin_unreachable();
4549 }
4550
4551 if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
4552 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, WQ_SETUP_NONE);
4553 __builtin_unreachable();
4554 }
4555
4556 if (__probable(wr == THREAD_AWAKENED)) {
4557 /*
4558 * We were set running, but for the purposes of dying.
4559 */
4560 assert(uth->uu_workq_flags & UT_WORKQ_DYING);
4561 assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
4562 } else {
4563 /*
4564 * workaround for <rdar://problem/38647347>,
4565 * in case we do hit userspace, make sure calling
4566 * workq_thread_terminate() does the right thing here,
4567 * and if we never call it, that workq_exit() will too because it sees
4568 * this thread on the runlist.
4569 */
4570 assert(wr == THREAD_INTERRUPTED);
4571 wq->wq_thdying_count++;
4572 uth->uu_workq_flags |= UT_WORKQ_DYING;
4573 }
4574
4575 workq_unpark_for_death_and_unlock(p, wq, uth,
4576 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE);
4577 __builtin_unreachable();
4578 }
4579
4580 __attribute__((noreturn, noinline))
4581 static void
workq_setup_and_run(proc_t p,struct uthread * uth,int setup_flags)4582 workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags)
4583 {
4584 thread_t th = get_machthread(uth);
4585 vm_map_t vmap = get_task_map(proc_task(p));
4586
4587 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
4588 /*
4589 * For preemption reasons, we want to reset the voucher as late as
4590 * possible, so we do it in two places:
4591 * - Just before parking (i.e. in workq_park_and_unlock())
4592 * - Prior to doing the setup for the next workitem (i.e. here)
4593 *
4594 * Those two places are sufficient to ensure we always reset it before
4595 * it goes back out to user space, but be careful to not break that
4596 * guarantee.
4597 *
4598 * Note that setting the voucher to NULL will not clear the preadoption
4599 * thread group on this thread
4600 */
4601 __assert_only kern_return_t kr;
4602 kr = thread_set_voucher_name(MACH_PORT_NULL);
4603 assert(kr == KERN_SUCCESS);
4604 }
4605
4606 uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags;
4607 if (!(setup_flags & WQ_SETUP_FIRST_USE)) {
4608 upcall_flags |= WQ_FLAG_THREAD_REUSE;
4609 }
4610
4611 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
4612 /*
4613 * For threads that have an outside-of-QoS thread priority, indicate
4614 * to userspace that setting QoS should only affect the TSD and not
4615 * change QOS in the kernel.
4616 */
4617 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
4618 } else {
4619 /*
4620 * Put the QoS class value into the lower bits of the reuse_thread
4621 * register, this is where the thread priority used to be stored
4622 * anyway.
4623 */
4624 upcall_flags |= uth->uu_save.uus_workq_park_data.qos |
4625 WQ_FLAG_THREAD_PRIO_QOS;
4626 }
4627
4628 if (uth->uu_workq_thport == MACH_PORT_NULL) {
4629 /* convert_thread_to_port_pinned() consumes a reference */
4630 thread_reference(th);
4631 /* Convert to immovable/pinned thread port, but port is not pinned yet */
4632 ipc_port_t port = convert_thread_to_port_pinned(th);
4633 /* Atomically, pin and copy out the port */
4634 uth->uu_workq_thport = ipc_port_copyout_send_pinned(port, get_task_ipcspace(proc_task(p)));
4635 }
4636
4637 /* Thread has been set up to run, arm its next workqueue quantum or disarm
4638 * if it is no longer supporting that */
4639 if (thread_supports_cooperative_workqueue(th)) {
4640 thread_arm_workqueue_quantum(th);
4641 } else {
4642 thread_disarm_workqueue_quantum(th);
4643 }
4644
4645 /*
4646 * Call out to pthread, this sets up the thread, pulls in kevent structs
4647 * onto the stack, sets up the thread state and then returns to userspace.
4648 */
4649 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START,
4650 proc_get_wqptr_fast(p), 0, 0, 0);
4651
4652 if (workq_thread_is_cooperative(uth)) {
4653 thread_sched_call(th, NULL);
4654 } else {
4655 thread_sched_call(th, workq_sched_callback);
4656 }
4657
4658 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
4659 uth->uu_workq_thport, 0, setup_flags, upcall_flags);
4660
4661 __builtin_unreachable();
4662 }
4663
4664 #pragma mark misc
4665
4666 int
fill_procworkqueue(proc_t p,struct proc_workqueueinfo * pwqinfo)4667 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
4668 {
4669 struct workqueue *wq = proc_get_wqptr(p);
4670 int error = 0;
4671 int activecount;
4672
4673 if (wq == NULL) {
4674 return EINVAL;
4675 }
4676
4677 /*
4678 * This is sometimes called from interrupt context by the kperf sampler.
4679 * In that case, it's not safe to spin trying to take the lock since we
4680 * might already hold it. So, we just try-lock it and error out if it's
4681 * already held. Since this is just a debugging aid, and all our callers
4682 * are able to handle an error, that's fine.
4683 */
4684 bool locked = workq_lock_try(wq);
4685 if (!locked) {
4686 return EBUSY;
4687 }
4688
4689 wq_thactive_t act = _wq_thactive(wq);
4690 activecount = _wq_thactive_aggregate_downto_qos(wq, act,
4691 WORKQ_THREAD_QOS_MIN, NULL, NULL);
4692 if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) {
4693 activecount++;
4694 }
4695 pwqinfo->pwq_nthreads = wq->wq_nthreads;
4696 pwqinfo->pwq_runthreads = activecount;
4697 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
4698 pwqinfo->pwq_state = 0;
4699
4700 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
4701 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
4702 }
4703
4704 if (wq->wq_nthreads >= wq_max_threads) {
4705 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
4706 }
4707
4708 workq_unlock(wq);
4709 return error;
4710 }
4711
4712 boolean_t
workqueue_get_pwq_exceeded(void * v,boolean_t * exceeded_total,boolean_t * exceeded_constrained)4713 workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total,
4714 boolean_t *exceeded_constrained)
4715 {
4716 proc_t p = v;
4717 struct proc_workqueueinfo pwqinfo;
4718 int err;
4719
4720 assert(p != NULL);
4721 assert(exceeded_total != NULL);
4722 assert(exceeded_constrained != NULL);
4723
4724 err = fill_procworkqueue(p, &pwqinfo);
4725 if (err) {
4726 return FALSE;
4727 }
4728 if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) {
4729 return FALSE;
4730 }
4731
4732 *exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT);
4733 *exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT);
4734
4735 return TRUE;
4736 }
4737
4738 uint32_t
workqueue_get_pwq_state_kdp(void * v)4739 workqueue_get_pwq_state_kdp(void * v)
4740 {
4741 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) ==
4742 kTaskWqExceededConstrainedThreadLimit);
4743 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) ==
4744 kTaskWqExceededTotalThreadLimit);
4745 static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable);
4746 static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT |
4747 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT) == 0x7);
4748
4749 if (v == NULL) {
4750 return 0;
4751 }
4752
4753 proc_t p = v;
4754 struct workqueue *wq = proc_get_wqptr(p);
4755
4756 if (wq == NULL || workq_lock_is_acquired_kdp(wq)) {
4757 return 0;
4758 }
4759
4760 uint32_t pwq_state = WQ_FLAGS_AVAILABLE;
4761
4762 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
4763 pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
4764 }
4765
4766 if (wq->wq_nthreads >= wq_max_threads) {
4767 pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
4768 }
4769
4770 return pwq_state;
4771 }
4772
4773 void
workq_init(void)4774 workq_init(void)
4775 {
4776 clock_interval_to_absolutetime_interval(wq_stalled_window.usecs,
4777 NSEC_PER_USEC, &wq_stalled_window.abstime);
4778 clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs,
4779 NSEC_PER_USEC, &wq_reduce_pool_window.abstime);
4780 clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs,
4781 NSEC_PER_USEC, &wq_max_timer_interval.abstime);
4782
4783 thread_deallocate_daemon_register_queue(&workq_deallocate_queue,
4784 workq_deallocate_queue_invoke);
4785 }
4786