1 /*
2 * Copyright (c) 2000-2020 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
29
30 #include <sys/cdefs.h>
31
32 #include <kern/assert.h>
33 #include <kern/ast.h>
34 #include <kern/clock.h>
35 #include <kern/cpu_data.h>
36 #include <kern/kern_types.h>
37 #include <kern/policy_internal.h>
38 #include <kern/processor.h>
39 #include <kern/sched_prim.h> /* for thread_exception_return */
40 #include <kern/task.h>
41 #include <kern/thread.h>
42 #include <kern/thread_group.h>
43 #include <kern/zalloc.h>
44 #include <mach/kern_return.h>
45 #include <mach/mach_param.h>
46 #include <mach/mach_port.h>
47 #include <mach/mach_types.h>
48 #include <mach/mach_vm.h>
49 #include <mach/sync_policy.h>
50 #include <mach/task.h>
51 #include <mach/thread_act.h> /* for thread_resume */
52 #include <mach/thread_policy.h>
53 #include <mach/thread_status.h>
54 #include <mach/vm_prot.h>
55 #include <mach/vm_statistics.h>
56 #include <machine/atomic.h>
57 #include <machine/machine_routines.h>
58 #include <machine/smp.h>
59 #include <vm/vm_map.h>
60 #include <vm/vm_protos.h>
61
62 #include <sys/eventvar.h>
63 #include <sys/kdebug.h>
64 #include <sys/kernel.h>
65 #include <sys/lock.h>
66 #include <sys/param.h>
67 #include <sys/proc_info.h> /* for fill_procworkqueue */
68 #include <sys/proc_internal.h>
69 #include <sys/pthread_shims.h>
70 #include <sys/resourcevar.h>
71 #include <sys/signalvar.h>
72 #include <sys/sysctl.h>
73 #include <sys/sysproto.h>
74 #include <sys/systm.h>
75 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
76
77 #include <pthread/bsdthread_private.h>
78 #include <pthread/workqueue_syscalls.h>
79 #include <pthread/workqueue_internal.h>
80 #include <pthread/workqueue_trace.h>
81
82 #include <os/log.h>
83
84 static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2;
85 static void workq_schedule_creator(proc_t p, struct workqueue *wq,
86 workq_kern_threadreq_flags_t flags);
87
88 static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
89 workq_threadreq_t req);
90
91 static uint32_t workq_constrained_allowance(struct workqueue *wq,
92 thread_qos_t at_qos, struct uthread *uth, bool may_start_timer);
93
94 static bool _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq);
95
96 static bool workq_thread_is_busy(uint64_t cur_ts,
97 _Atomic uint64_t *lastblocked_tsp);
98
99 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
100
101 static bool
102 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags);
103
104 static inline void
105 workq_lock_spin(struct workqueue *wq);
106
107 static inline void
108 workq_unlock(struct workqueue *wq);
109
110 #pragma mark globals
111
112 struct workq_usec_var {
113 uint32_t usecs;
114 uint64_t abstime;
115 };
116
117 #define WORKQ_SYSCTL_USECS(var, init) \
118 static struct workq_usec_var var = { .usecs = init }; \
119 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
120 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
121 workq_sysctl_handle_usecs, "I", "")
122
123 static LCK_GRP_DECLARE(workq_lck_grp, "workq");
124 os_refgrp_decl(static, workq_refgrp, "workq", NULL);
125
126 static ZONE_DECLARE(workq_zone_workqueue, "workq.wq",
127 sizeof(struct workqueue), ZC_NONE);
128 static ZONE_DECLARE(workq_zone_threadreq, "workq.threadreq",
129 sizeof(struct workq_threadreq_s), ZC_CACHING);
130
131 static struct mpsc_daemon_queue workq_deallocate_queue;
132
133 WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS);
134 WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
135 WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS);
136 static uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
137 static uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
138 static uint32_t wq_init_constrained_limit = 1;
139 static uint16_t wq_death_max_load;
140 static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS];
141
142 /*
143 * This is not a hard limit but the max size we want to aim to hit across the
144 * entire cooperative pool. We can oversubscribe the pool due to non-cooperative
145 * workers and the max we will oversubscribe the pool by, is a total of
146 * wq_max_cooperative_threads * WORKQ_NUM_QOS_BUCKETS.
147 */
148 static uint32_t wq_max_cooperative_threads;
149
150 static inline uint32_t
wq_cooperative_queue_max_size(struct workqueue * wq)151 wq_cooperative_queue_max_size(struct workqueue *wq)
152 {
153 return wq->wq_cooperative_queue_has_limited_max_size ? 1 : wq_max_cooperative_threads;
154 }
155
156 #pragma mark sysctls
157
158 static int
159 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
160 {
161 #pragma unused(arg2)
162 struct workq_usec_var *v = arg1;
163 int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
164 if (error || !req->newptr) {
165 return error;
166 }
167 clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
168 &v->abstime);
169 return 0;
170 }
171
172 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
173 &wq_max_threads, 0, "");
174
175 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
176 &wq_max_constrained_threads, 0, "");
177
178 static int
179 wq_limit_cooperative_threads_for_proc SYSCTL_HANDLER_ARGS
180 {
181 #pragma unused(arg1, arg2, oidp)
182 int input_pool_size = 0;
183 int changed;
184 int error = 0;
185
186 error = sysctl_io_number(req, 0, sizeof(int), &input_pool_size, &changed);
187 if (error || !changed) {
188 return error;
189 }
190
191 #define WQ_COOPERATIVE_POOL_SIZE_DEFAULT 0
192 #define WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS -1
193 /* Not available currently, but sysctl interface is designed to allow these
194 * extra parameters:
195 * WQ_COOPERATIVE_POOL_SIZE_STRICT : -2 (across all bucket)
196 * WQ_COOPERATIVE_POOL_SIZE_CUSTOM : [1, 512]
197 */
198
199 if (input_pool_size != WQ_COOPERATIVE_POOL_SIZE_DEFAULT
200 && input_pool_size != WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS) {
201 error = EINVAL;
202 goto out;
203 }
204
205 proc_t p = req->p;
206 struct workqueue *wq = proc_get_wqptr(p);
207
208 if (wq != NULL) {
209 workq_lock_spin(wq);
210 if (wq->wq_reqcount > 0 || wq->wq_nthreads > 0) {
211 // Hackily enforce that the workqueue is still new (no requests or
212 // threads)
213 error = ENOTSUP;
214 } else {
215 wq->wq_cooperative_queue_has_limited_max_size = (input_pool_size == WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS);
216 }
217 workq_unlock(wq);
218 } else {
219 /* This process has no workqueue, calling this syctl makes no sense */
220 return ENOTSUP;
221 }
222
223 out:
224 return error;
225 }
226
227 SYSCTL_PROC(_kern, OID_AUTO, wq_limit_cooperative_threads,
228 CTLFLAG_ANYBODY | CTLFLAG_MASKED | CTLFLAG_WR | CTLFLAG_LOCKED | CTLTYPE_INT, 0, 0,
229 wq_limit_cooperative_threads_for_proc,
230 "I", "Modify the max pool size of the cooperative pool");
231
232 #pragma mark p_wqptr
233
234 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
235
236 static struct workqueue *
proc_get_wqptr_fast(struct proc * p)237 proc_get_wqptr_fast(struct proc *p)
238 {
239 return os_atomic_load(&p->p_wqptr, relaxed);
240 }
241
242 struct workqueue *
proc_get_wqptr(struct proc * p)243 proc_get_wqptr(struct proc *p)
244 {
245 struct workqueue *wq = proc_get_wqptr_fast(p);
246 return wq == WQPTR_IS_INITING_VALUE ? NULL : wq;
247 }
248
249 static void
proc_set_wqptr(struct proc * p,struct workqueue * wq)250 proc_set_wqptr(struct proc *p, struct workqueue *wq)
251 {
252 wq = os_atomic_xchg(&p->p_wqptr, wq, release);
253 if (wq == WQPTR_IS_INITING_VALUE) {
254 proc_lock(p);
255 thread_wakeup(&p->p_wqptr);
256 proc_unlock(p);
257 }
258 }
259
260 static bool
proc_init_wqptr_or_wait(struct proc * p)261 proc_init_wqptr_or_wait(struct proc *p)
262 {
263 struct workqueue *wq;
264
265 proc_lock(p);
266 wq = os_atomic_load(&p->p_wqptr, relaxed);
267
268 if (wq == NULL) {
269 os_atomic_store(&p->p_wqptr, WQPTR_IS_INITING_VALUE, relaxed);
270 proc_unlock(p);
271 return true;
272 }
273
274 if (wq == WQPTR_IS_INITING_VALUE) {
275 assert_wait(&p->p_wqptr, THREAD_UNINT);
276 proc_unlock(p);
277 thread_block(THREAD_CONTINUE_NULL);
278 } else {
279 proc_unlock(p);
280 }
281 return false;
282 }
283
284 static inline event_t
workq_parked_wait_event(struct uthread * uth)285 workq_parked_wait_event(struct uthread *uth)
286 {
287 return (event_t)&uth->uu_workq_stackaddr;
288 }
289
290 static inline void
workq_thread_wakeup(struct uthread * uth)291 workq_thread_wakeup(struct uthread *uth)
292 {
293 thread_wakeup_thread(workq_parked_wait_event(uth), get_machthread(uth));
294 }
295
296 #pragma mark wq_thactive
297
298 #if defined(__LP64__)
299 // Layout is:
300 // 127 - 115 : 13 bits of zeroes
301 // 114 - 112 : best QoS among all pending constrained requests
302 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
303 #define WQ_THACTIVE_BUCKET_WIDTH 16
304 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
305 #else
306 // Layout is:
307 // 63 - 61 : best QoS among all pending constrained requests
308 // 60 : Manager bucket (0 or 1)
309 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
310 #define WQ_THACTIVE_BUCKET_WIDTH 10
311 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
312 #endif
313 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
314 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
315
316 static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3,
317 "Make sure we have space to encode a QoS");
318
319 static inline wq_thactive_t
_wq_thactive(struct workqueue * wq)320 _wq_thactive(struct workqueue *wq)
321 {
322 return os_atomic_load_wide(&wq->wq_thactive, relaxed);
323 }
324
325 static inline int
_wq_bucket(thread_qos_t qos)326 _wq_bucket(thread_qos_t qos)
327 {
328 // Map both BG and MT to the same bucket by over-shifting down and
329 // clamping MT and BG together.
330 switch (qos) {
331 case THREAD_QOS_MAINTENANCE:
332 return 0;
333 default:
334 return qos - 2;
335 }
336 }
337
338 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
339 ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT))
340
341 static inline thread_qos_t
_wq_thactive_best_constrained_req_qos(struct workqueue * wq)342 _wq_thactive_best_constrained_req_qos(struct workqueue *wq)
343 {
344 // Avoid expensive atomic operations: the three bits we're loading are in
345 // a single byte, and always updated under the workqueue lock
346 wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive;
347 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v);
348 }
349
350 static void
_wq_thactive_refresh_best_constrained_req_qos(struct workqueue * wq)351 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq)
352 {
353 thread_qos_t old_qos, new_qos;
354 workq_threadreq_t req;
355
356 req = priority_queue_max(&wq->wq_constrained_queue,
357 struct workq_threadreq_s, tr_entry);
358 new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED;
359 old_qos = _wq_thactive_best_constrained_req_qos(wq);
360 if (old_qos != new_qos) {
361 long delta = (long)new_qos - (long)old_qos;
362 wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT;
363 /*
364 * We can do an atomic add relative to the initial load because updates
365 * to this qos are always serialized under the workqueue lock.
366 */
367 v = os_atomic_add(&wq->wq_thactive, v, relaxed);
368 #ifdef __LP64__
369 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v,
370 (uint64_t)(v >> 64), 0);
371 #else
372 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0);
373 #endif
374 }
375 }
376
377 static inline wq_thactive_t
_wq_thactive_offset_for_qos(thread_qos_t qos)378 _wq_thactive_offset_for_qos(thread_qos_t qos)
379 {
380 return (wq_thactive_t)1 << (_wq_bucket(qos) * WQ_THACTIVE_BUCKET_WIDTH);
381 }
382
383 static inline wq_thactive_t
_wq_thactive_inc(struct workqueue * wq,thread_qos_t qos)384 _wq_thactive_inc(struct workqueue *wq, thread_qos_t qos)
385 {
386 wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
387 return os_atomic_add_orig(&wq->wq_thactive, v, relaxed);
388 }
389
390 static inline wq_thactive_t
_wq_thactive_dec(struct workqueue * wq,thread_qos_t qos)391 _wq_thactive_dec(struct workqueue *wq, thread_qos_t qos)
392 {
393 wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
394 return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed);
395 }
396
397 static inline void
_wq_thactive_move(struct workqueue * wq,thread_qos_t old_qos,thread_qos_t new_qos)398 _wq_thactive_move(struct workqueue *wq,
399 thread_qos_t old_qos, thread_qos_t new_qos)
400 {
401 wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) -
402 _wq_thactive_offset_for_qos(old_qos);
403 os_atomic_add(&wq->wq_thactive, v, relaxed);
404 wq->wq_thscheduled_count[_wq_bucket(old_qos)]--;
405 wq->wq_thscheduled_count[_wq_bucket(new_qos)]++;
406 }
407
408 static inline uint32_t
_wq_thactive_aggregate_downto_qos(struct workqueue * wq,wq_thactive_t v,thread_qos_t qos,uint32_t * busycount,uint32_t * max_busycount)409 _wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v,
410 thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount)
411 {
412 uint32_t count = 0, active;
413 uint64_t curtime;
414
415 assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX);
416
417 if (busycount) {
418 curtime = mach_absolute_time();
419 *busycount = 0;
420 }
421 if (max_busycount) {
422 *max_busycount = THREAD_QOS_LAST - qos;
423 }
424
425 int i = _wq_bucket(qos);
426 v >>= i * WQ_THACTIVE_BUCKET_WIDTH;
427 for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) {
428 active = v & WQ_THACTIVE_BUCKET_MASK;
429 count += active;
430
431 if (busycount && wq->wq_thscheduled_count[i] > active) {
432 if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) {
433 /*
434 * We only consider the last blocked thread for a given bucket
435 * as busy because we don't want to take the list lock in each
436 * sched callback. However this is an approximation that could
437 * contribute to thread creation storms.
438 */
439 (*busycount)++;
440 }
441 }
442 }
443
444 return count;
445 }
446
447 static inline void
_wq_cooperative_queue_scheduled_count_dec(struct workqueue * wq,thread_qos_t qos)448 _wq_cooperative_queue_scheduled_count_dec(struct workqueue *wq, thread_qos_t qos)
449 {
450 __assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]--;
451 assert(old_scheduled_count > 0);
452 }
453
454 static inline void
_wq_cooperative_queue_scheduled_count_inc(struct workqueue * wq,thread_qos_t qos)455 _wq_cooperative_queue_scheduled_count_inc(struct workqueue *wq, thread_qos_t qos)
456 {
457 __assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]++;
458 assert(old_scheduled_count < UINT8_MAX);
459 }
460
461 #pragma mark wq_flags
462
463 static inline uint32_t
_wq_flags(struct workqueue * wq)464 _wq_flags(struct workqueue *wq)
465 {
466 return os_atomic_load(&wq->wq_flags, relaxed);
467 }
468
469 static inline bool
_wq_exiting(struct workqueue * wq)470 _wq_exiting(struct workqueue *wq)
471 {
472 return _wq_flags(wq) & WQ_EXITING;
473 }
474
475 bool
workq_is_exiting(struct proc * p)476 workq_is_exiting(struct proc *p)
477 {
478 struct workqueue *wq = proc_get_wqptr(p);
479 return !wq || _wq_exiting(wq);
480 }
481
482
483 #pragma mark workqueue lock
484
485 static bool
workq_lock_is_acquired_kdp(struct workqueue * wq)486 workq_lock_is_acquired_kdp(struct workqueue *wq)
487 {
488 return kdp_lck_ticket_is_acquired(&wq->wq_lock);
489 }
490
491 static inline void
workq_lock_spin(struct workqueue * wq)492 workq_lock_spin(struct workqueue *wq)
493 {
494 lck_ticket_lock(&wq->wq_lock, &workq_lck_grp);
495 }
496
497 static inline void
workq_lock_held(struct workqueue * wq)498 workq_lock_held(struct workqueue *wq)
499 {
500 LCK_TICKET_ASSERT_OWNED(&wq->wq_lock);
501 }
502
503 static inline bool
workq_lock_try(struct workqueue * wq)504 workq_lock_try(struct workqueue *wq)
505 {
506 return lck_ticket_lock_try(&wq->wq_lock, &workq_lck_grp);
507 }
508
509 static inline void
workq_unlock(struct workqueue * wq)510 workq_unlock(struct workqueue *wq)
511 {
512 lck_ticket_unlock(&wq->wq_lock);
513 }
514
515 #pragma mark idle thread lists
516
517 #define WORKQ_POLICY_INIT(qos) \
518 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
519
520 static inline thread_qos_t
workq_pri_bucket(struct uu_workq_policy req)521 workq_pri_bucket(struct uu_workq_policy req)
522 {
523 return MAX(MAX(req.qos_req, req.qos_max), req.qos_override);
524 }
525
526 static inline thread_qos_t
workq_pri_override(struct uu_workq_policy req)527 workq_pri_override(struct uu_workq_policy req)
528 {
529 return MAX(workq_pri_bucket(req), req.qos_bucket);
530 }
531
532 static inline bool
workq_thread_needs_params_change(workq_threadreq_t req,struct uthread * uth)533 workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth)
534 {
535 workq_threadreq_param_t cur_trp, req_trp = { };
536
537 cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params;
538 if (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
539 req_trp = kqueue_threadreq_workloop_param(req);
540 }
541
542 /*
543 * CPU percent flags are handled separately to policy changes, so ignore
544 * them for all of these checks.
545 */
546 uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT);
547 uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT);
548
549 if (!req_flags && !cur_flags) {
550 return false;
551 }
552
553 if (req_flags != cur_flags) {
554 return true;
555 }
556
557 if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) {
558 return true;
559 }
560
561 if ((req_flags & TRP_POLICY) && req_trp.trp_pol != cur_trp.trp_pol) {
562 return true;
563 }
564
565 return false;
566 }
567
568 static inline bool
workq_thread_needs_priority_change(workq_threadreq_t req,struct uthread * uth)569 workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth)
570 {
571 if (workq_thread_needs_params_change(req, uth)) {
572 return true;
573 }
574
575 if (req->tr_qos != workq_pri_override(uth->uu_workq_pri)) {
576 return true;
577 }
578
579 #if CONFIG_PREADOPT_TG
580 thread_group_qos_t tg = kqr_preadopt_thread_group(req);
581 if (KQWL_HAS_VALID_PREADOPTED_TG(tg)) {
582 /*
583 * Ideally, we'd add check here to see if thread's preadopt TG is same
584 * as the thread requests's thread group and short circuit if that is
585 * the case. But in the interest of keeping the code clean and not
586 * taking the thread lock here, we're going to skip this. We will
587 * eventually shortcircuit once we try to set the preadoption thread
588 * group on the thread.
589 */
590 return true;
591 }
592 #endif
593
594 return false;
595 }
596
597 static void
workq_thread_update_bucket(proc_t p,struct workqueue * wq,struct uthread * uth,struct uu_workq_policy old_pri,struct uu_workq_policy new_pri,bool force_run)598 workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth,
599 struct uu_workq_policy old_pri, struct uu_workq_policy new_pri,
600 bool force_run)
601 {
602 thread_qos_t old_bucket = old_pri.qos_bucket;
603 thread_qos_t new_bucket = workq_pri_bucket(new_pri);
604
605 if (old_bucket != new_bucket) {
606 _wq_thactive_move(wq, old_bucket, new_bucket);
607 }
608
609 new_pri.qos_bucket = new_bucket;
610 uth->uu_workq_pri = new_pri;
611
612 if (workq_pri_override(old_pri) != new_bucket) {
613 thread_set_workq_override(get_machthread(uth), new_bucket);
614 }
615
616 if (wq->wq_reqcount && (old_bucket > new_bucket || force_run)) {
617 int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS;
618 if (old_bucket > new_bucket) {
619 /*
620 * When lowering our bucket, we may unblock a thread request,
621 * but we can't drop our priority before we have evaluated
622 * whether this is the case, and if we ever drop the workqueue lock
623 * that would cause a priority inversion.
624 *
625 * We hence have to disallow thread creation in that case.
626 */
627 flags = 0;
628 }
629 workq_schedule_creator(p, wq, flags);
630 }
631 }
632
633 /*
634 * Sets/resets the cpu percent limits on the current thread. We can't set
635 * these limits from outside of the current thread, so this function needs
636 * to be called when we're executing on the intended
637 */
638 static void
workq_thread_reset_cpupercent(workq_threadreq_t req,struct uthread * uth)639 workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth)
640 {
641 assert(uth == current_uthread());
642 workq_threadreq_param_t trp = { };
643
644 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
645 trp = kqueue_threadreq_workloop_param(req);
646 }
647
648 if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) {
649 /*
650 * Going through disable when we have an existing CPU percent limit
651 * set will force the ledger to refill the token bucket of the current
652 * thread. Removing any penalty applied by previous thread use.
653 */
654 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0);
655 uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT;
656 }
657
658 if (trp.trp_flags & TRP_CPUPERCENT) {
659 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent,
660 (uint64_t)trp.trp_refillms * NSEC_PER_SEC);
661 uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT;
662 }
663 }
664
665 /* Called with the workq lock held */
666 static void
workq_thread_reset_pri(struct workqueue * wq,struct uthread * uth,workq_threadreq_t req,bool unpark)667 workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth,
668 workq_threadreq_t req, bool unpark)
669 {
670 thread_t th = get_machthread(uth);
671 thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP;
672 workq_threadreq_param_t trp = { };
673 int priority = 31;
674 int policy = POLICY_TIMESHARE;
675
676 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
677 trp = kqueue_threadreq_workloop_param(req);
678 }
679
680 uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
681 uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS;
682
683 if (unpark) {
684 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
685 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
686 uth->uu_save.uus_workq_park_data.qos = qos;
687 }
688
689 if (qos == WORKQ_THREAD_QOS_MANAGER) {
690 uint32_t mgr_pri = wq->wq_event_manager_priority;
691 assert(trp.trp_value == 0); // manager qos and thread policy don't mix
692
693 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
694 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
695 thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri,
696 POLICY_TIMESHARE);
697 return;
698 }
699
700 qos = _pthread_priority_thread_qos(mgr_pri);
701 } else {
702 if (trp.trp_flags & TRP_PRIORITY) {
703 qos = THREAD_QOS_UNSPECIFIED;
704 priority = trp.trp_pri;
705 uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS;
706 }
707
708 if (trp.trp_flags & TRP_POLICY) {
709 policy = trp.trp_pol;
710 }
711 }
712
713 #if CONFIG_PREADOPT_TG
714 if (req && (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP)) {
715 /*
716 * We cannot safely read and borrow the reference from the kqwl since it
717 * can disappear from under us at any time due to the max-ing logic in
718 * kqueue_set_preadopted_thread_group.
719 *
720 * As such, we do the following dance:
721 *
722 * 1) cmpxchng and steal the kqwl's preadopt thread group and leave
723 * behind with (NULL + QoS). At this point, we have the reference
724 * to the thread group from the kqwl.
725 * 2) Have the thread set the preadoption thread group on itself.
726 * 3) cmpxchng from (NULL + QoS) which we set earlier in (1), back to
727 * thread_group + QoS. ie we try to give the reference back to the kqwl.
728 * If we fail, that's because a higher QoS thread group was set on the
729 * kqwl in kqueue_set_preadopted_thread_group in which case, we need to
730 * go back to (1).
731 */
732
733 _Atomic(struct thread_group *) * tg_loc = kqr_preadopt_thread_group_addr(req);
734
735 thread_group_qos_t old_tg, new_tg;
736 int ret = 0;
737 again:
738 ret = os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
739 if (!KQWL_HAS_VALID_PREADOPTED_TG(old_tg)) {
740 os_atomic_rmw_loop_give_up(break);
741 }
742
743 /*
744 * Leave the QoS behind - kqueue_set_preadopted_thread_group will
745 * only modify it if there is a higher QoS thread group to attach
746 */
747 new_tg = (thread_group_qos_t) ((uintptr_t) old_tg & KQWL_PREADOPT_TG_QOS_MASK);
748 });
749
750 if (ret) {
751 /*
752 * We successfully took the ref from the kqwl so set it on the
753 * thread now
754 */
755 thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg));
756
757 thread_group_qos_t thread_group_to_expect = new_tg;
758 thread_group_qos_t thread_group_to_set = old_tg;
759
760 os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
761 if (old_tg != thread_group_to_expect) {
762 /*
763 * There was an intervening write to the kqwl_preadopt_tg,
764 * and it has a higher QoS than what we are working with
765 * here. Abandon our current adopted thread group and redo
766 * the full dance
767 */
768 thread_group_deallocate_safe(KQWL_GET_PREADOPTED_TG(thread_group_to_set));
769 os_atomic_rmw_loop_give_up(goto again);
770 }
771
772 new_tg = thread_group_to_set;
773 });
774 } else {
775 /* Nothing valid on the kqwl, just clear what's on the thread */
776 thread_set_preadopt_thread_group(th, NULL);
777 }
778 } else {
779 /* Not even a kqwl, clear what's on the thread */
780 thread_set_preadopt_thread_group(th, NULL);
781 }
782 #endif
783 thread_set_workq_pri(th, qos, priority, policy);
784 }
785
786 /*
787 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
788 * every time a servicer is being told about a new max QoS.
789 */
790 void
workq_thread_set_max_qos(struct proc * p,workq_threadreq_t kqr)791 workq_thread_set_max_qos(struct proc *p, workq_threadreq_t kqr)
792 {
793 struct uu_workq_policy old_pri, new_pri;
794 struct uthread *uth = current_uthread();
795 struct workqueue *wq = proc_get_wqptr_fast(p);
796 thread_qos_t qos = kqr->tr_kq_qos_index;
797
798 if (uth->uu_workq_pri.qos_max == qos) {
799 return;
800 }
801
802 workq_lock_spin(wq);
803 old_pri = new_pri = uth->uu_workq_pri;
804 new_pri.qos_max = qos;
805 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
806 workq_unlock(wq);
807 }
808
809 #pragma mark idle threads accounting and handling
810
811 static inline struct uthread *
workq_oldest_killable_idle_thread(struct workqueue * wq)812 workq_oldest_killable_idle_thread(struct workqueue *wq)
813 {
814 struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
815
816 if (uth && !uth->uu_save.uus_workq_park_data.has_stack) {
817 uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry);
818 if (uth) {
819 assert(uth->uu_save.uus_workq_park_data.has_stack);
820 }
821 }
822 return uth;
823 }
824
825 static inline uint64_t
workq_kill_delay_for_idle_thread(struct workqueue * wq)826 workq_kill_delay_for_idle_thread(struct workqueue *wq)
827 {
828 uint64_t delay = wq_reduce_pool_window.abstime;
829 uint16_t idle = wq->wq_thidlecount;
830
831 /*
832 * If we have less than wq_death_max_load threads, have a 5s timer.
833 *
834 * For the next wq_max_constrained_threads ones, decay linearly from
835 * from 5s to 50ms.
836 */
837 if (idle <= wq_death_max_load) {
838 return delay;
839 }
840
841 if (wq_max_constrained_threads > idle - wq_death_max_load) {
842 delay *= (wq_max_constrained_threads - (idle - wq_death_max_load));
843 }
844 return delay / wq_max_constrained_threads;
845 }
846
847 static inline bool
workq_should_kill_idle_thread(struct workqueue * wq,struct uthread * uth,uint64_t now)848 workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth,
849 uint64_t now)
850 {
851 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
852 return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
853 }
854
855 static void
workq_death_call_schedule(struct workqueue * wq,uint64_t deadline)856 workq_death_call_schedule(struct workqueue *wq, uint64_t deadline)
857 {
858 uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed);
859
860 if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
861 return;
862 }
863 os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
864
865 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0);
866
867 /*
868 * <rdar://problem/13139182> Due to how long term timers work, the leeway
869 * can't be too short, so use 500ms which is long enough that we will not
870 * wake up the CPU for killing threads, but short enough that it doesn't
871 * fall into long-term timer list shenanigans.
872 */
873 thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline,
874 wq_reduce_pool_window.abstime / 10,
875 THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
876 }
877
878 /*
879 * `decrement` is set to the number of threads that are no longer dying:
880 * - because they have been resuscitated just in time (workq_pop_idle_thread)
881 * - or have been killed (workq_thread_terminate).
882 */
883 static void
workq_death_policy_evaluate(struct workqueue * wq,uint16_t decrement)884 workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement)
885 {
886 struct uthread *uth;
887
888 assert(wq->wq_thdying_count >= decrement);
889 if ((wq->wq_thdying_count -= decrement) > 0) {
890 return;
891 }
892
893 if (wq->wq_thidlecount <= 1) {
894 return;
895 }
896
897 if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) {
898 return;
899 }
900
901 uint64_t now = mach_absolute_time();
902 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
903
904 if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
905 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
906 wq, wq->wq_thidlecount, 0, 0);
907 wq->wq_thdying_count++;
908 uth->uu_workq_flags |= UT_WORKQ_DYING;
909 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) {
910 workq_thread_wakeup(uth);
911 }
912 return;
913 }
914
915 workq_death_call_schedule(wq,
916 uth->uu_save.uus_workq_park_data.idle_stamp + delay);
917 }
918
919 void
workq_thread_terminate(struct proc * p,struct uthread * uth)920 workq_thread_terminate(struct proc *p, struct uthread *uth)
921 {
922 struct workqueue *wq = proc_get_wqptr_fast(p);
923
924 workq_lock_spin(wq);
925 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
926 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
927 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END,
928 wq, wq->wq_thidlecount, 0, 0);
929 workq_death_policy_evaluate(wq, 1);
930 }
931 if (wq->wq_nthreads-- == wq_max_threads) {
932 /*
933 * We got under the thread limit again, which may have prevented
934 * thread creation from happening, redrive if there are pending requests
935 */
936 if (wq->wq_reqcount) {
937 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
938 }
939 }
940 workq_unlock(wq);
941
942 thread_deallocate(get_machthread(uth));
943 }
944
945 static void
workq_kill_old_threads_call(void * param0,void * param1 __unused)946 workq_kill_old_threads_call(void *param0, void *param1 __unused)
947 {
948 struct workqueue *wq = param0;
949
950 workq_lock_spin(wq);
951 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0);
952 os_atomic_andnot(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
953 workq_death_policy_evaluate(wq, 0);
954 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0);
955 workq_unlock(wq);
956 }
957
958 static struct uthread *
workq_pop_idle_thread(struct workqueue * wq,uint16_t uu_flags,bool * needs_wakeup)959 workq_pop_idle_thread(struct workqueue *wq, uint16_t uu_flags,
960 bool *needs_wakeup)
961 {
962 struct uthread *uth;
963
964 if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) {
965 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
966 } else {
967 uth = TAILQ_FIRST(&wq->wq_thnewlist);
968 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
969 }
970 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
971
972 assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
973 uth->uu_workq_flags |= UT_WORKQ_RUNNING | uu_flags;
974
975 /* A thread is never woken up as part of the cooperative pool */
976 assert((uu_flags & UT_WORKQ_COOPERATIVE) == 0);
977
978 if ((uu_flags & UT_WORKQ_OVERCOMMIT) == 0) {
979 wq->wq_constrained_threads_scheduled++;
980 }
981 wq->wq_threads_scheduled++;
982 wq->wq_thidlecount--;
983
984 if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
985 uth->uu_workq_flags ^= UT_WORKQ_DYING;
986 workq_death_policy_evaluate(wq, 1);
987 *needs_wakeup = false;
988 } else if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) {
989 *needs_wakeup = false;
990 } else {
991 *needs_wakeup = true;
992 }
993 return uth;
994 }
995
996 /*
997 * Called by thread_create_workq_waiting() during thread initialization, before
998 * assert_wait, before the thread has been started.
999 */
1000 event_t
workq_thread_init_and_wq_lock(task_t task,thread_t th)1001 workq_thread_init_and_wq_lock(task_t task, thread_t th)
1002 {
1003 struct uthread *uth = get_bsdthread_info(th);
1004
1005 uth->uu_workq_flags = UT_WORKQ_NEW;
1006 uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
1007 uth->uu_workq_thport = MACH_PORT_NULL;
1008 uth->uu_workq_stackaddr = 0;
1009 uth->uu_workq_pthread_kill_allowed = 0;
1010
1011 thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE);
1012 thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
1013
1014 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task)));
1015 return workq_parked_wait_event(uth);
1016 }
1017
1018 /**
1019 * Try to add a new workqueue thread.
1020 *
1021 * - called with workq lock held
1022 * - dropped and retaken around thread creation
1023 * - return with workq lock held
1024 */
1025 static bool
workq_add_new_idle_thread(proc_t p,struct workqueue * wq)1026 workq_add_new_idle_thread(proc_t p, struct workqueue *wq)
1027 {
1028 mach_vm_offset_t th_stackaddr;
1029 kern_return_t kret;
1030 thread_t th;
1031
1032 wq->wq_nthreads++;
1033
1034 workq_unlock(wq);
1035
1036 vm_map_t vmap = get_task_map(p->task);
1037
1038 kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr);
1039 if (kret != KERN_SUCCESS) {
1040 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1041 kret, 1, 0);
1042 goto out;
1043 }
1044
1045 kret = thread_create_workq_waiting(p->task, workq_unpark_continue, &th);
1046 if (kret != KERN_SUCCESS) {
1047 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1048 kret, 0, 0);
1049 pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr);
1050 goto out;
1051 }
1052
1053 // thread_create_workq_waiting() will return with the wq lock held
1054 // on success, because it calls workq_thread_init_and_wq_lock() above
1055
1056 struct uthread *uth = get_bsdthread_info(th);
1057
1058 wq->wq_creations++;
1059 wq->wq_thidlecount++;
1060 uth->uu_workq_stackaddr = (user_addr_t)th_stackaddr;
1061 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1062
1063 WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0);
1064 return true;
1065
1066 out:
1067 workq_lock_spin(wq);
1068 /*
1069 * Do not redrive here if we went under wq_max_threads again,
1070 * it is the responsibility of the callers of this function
1071 * to do so when it fails.
1072 */
1073 wq->wq_nthreads--;
1074 return false;
1075 }
1076
1077 static inline bool
workq_thread_is_overcommit(struct uthread * uth)1078 workq_thread_is_overcommit(struct uthread *uth)
1079 {
1080 return (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0;
1081 }
1082
1083 static inline bool
workq_thread_is_nonovercommit(struct uthread * uth)1084 workq_thread_is_nonovercommit(struct uthread *uth)
1085 {
1086 return (uth->uu_workq_flags & (UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE)) == 0;
1087 }
1088
1089 static inline bool
workq_thread_is_cooperative(struct uthread * uth)1090 workq_thread_is_cooperative(struct uthread *uth)
1091 {
1092 return (uth->uu_workq_flags & UT_WORKQ_COOPERATIVE) != 0;
1093 }
1094
1095 static inline void
workq_thread_set_type(struct uthread * uth,uint16_t flags)1096 workq_thread_set_type(struct uthread *uth, uint16_t flags)
1097 {
1098 uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1099 uth->uu_workq_flags |= flags;
1100 }
1101
1102
1103 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
1104
1105 __attribute__((noreturn, noinline))
1106 static void
workq_unpark_for_death_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t death_flags,uint32_t setup_flags)1107 workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq,
1108 struct uthread *uth, uint32_t death_flags, uint32_t setup_flags)
1109 {
1110 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
1111 bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW;
1112
1113 if (qos > WORKQ_THREAD_QOS_CLEANUP) {
1114 workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true);
1115 qos = WORKQ_THREAD_QOS_CLEANUP;
1116 }
1117
1118 workq_thread_reset_cpupercent(NULL, uth);
1119
1120 if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
1121 wq->wq_thidlecount--;
1122 if (first_use) {
1123 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
1124 } else {
1125 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
1126 }
1127 }
1128 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
1129
1130 workq_unlock(wq);
1131
1132 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
1133 __assert_only kern_return_t kr;
1134 kr = thread_set_voucher_name(MACH_PORT_NULL);
1135 assert(kr == KERN_SUCCESS);
1136 }
1137
1138 uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS;
1139 thread_t th = get_machthread(uth);
1140 vm_map_t vmap = get_task_map(p->task);
1141
1142 if (!first_use) {
1143 flags |= WQ_FLAG_THREAD_REUSE;
1144 }
1145
1146 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
1147 uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, flags);
1148 __builtin_unreachable();
1149 }
1150
1151 bool
workq_is_current_thread_updating_turnstile(struct workqueue * wq)1152 workq_is_current_thread_updating_turnstile(struct workqueue *wq)
1153 {
1154 return wq->wq_turnstile_updater == current_thread();
1155 }
1156
1157 __attribute__((always_inline))
1158 static inline void
1159 workq_perform_turnstile_operation_locked(struct workqueue *wq,
1160 void (^operation)(void))
1161 {
1162 workq_lock_held(wq);
1163 wq->wq_turnstile_updater = current_thread();
1164 operation();
1165 wq->wq_turnstile_updater = THREAD_NULL;
1166 }
1167
1168 static void
workq_turnstile_update_inheritor(struct workqueue * wq,turnstile_inheritor_t inheritor,turnstile_update_flags_t flags)1169 workq_turnstile_update_inheritor(struct workqueue *wq,
1170 turnstile_inheritor_t inheritor,
1171 turnstile_update_flags_t flags)
1172 {
1173 if (wq->wq_inheritor == inheritor) {
1174 return;
1175 }
1176 wq->wq_inheritor = inheritor;
1177 workq_perform_turnstile_operation_locked(wq, ^{
1178 turnstile_update_inheritor(wq->wq_turnstile, inheritor,
1179 flags | TURNSTILE_IMMEDIATE_UPDATE);
1180 turnstile_update_inheritor_complete(wq->wq_turnstile,
1181 TURNSTILE_INTERLOCK_HELD);
1182 });
1183 }
1184
1185 static void
workq_push_idle_thread(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)1186 workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth,
1187 uint32_t setup_flags)
1188 {
1189 uint64_t now = mach_absolute_time();
1190 bool is_creator = (uth == wq->wq_creator);
1191
1192 if (workq_thread_is_cooperative(uth)) {
1193 assert(!is_creator);
1194
1195 thread_qos_t thread_qos = uth->uu_workq_pri.qos_bucket;
1196 _wq_cooperative_queue_scheduled_count_dec(wq, thread_qos);
1197
1198 /* Before we get here, we always go through
1199 * workq_select_threadreq_or_park_and_unlock. If we got here, it means
1200 * that we went through the logic in workq_threadreq_select which
1201 * did the refresh for the next best cooperative qos while
1202 * excluding the current thread - we shouldn't need to do it again.
1203 */
1204 assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
1205 } else if (workq_thread_is_nonovercommit(uth)) {
1206 assert(!is_creator);
1207
1208 wq->wq_constrained_threads_scheduled--;
1209 }
1210
1211 uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1212 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
1213 wq->wq_threads_scheduled--;
1214
1215 if (is_creator) {
1216 wq->wq_creator = NULL;
1217 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0,
1218 uth->uu_save.uus_workq_park_data.yields);
1219 }
1220
1221 if (wq->wq_inheritor == get_machthread(uth)) {
1222 assert(wq->wq_creator == NULL);
1223 if (wq->wq_reqcount) {
1224 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
1225 } else {
1226 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
1227 }
1228 }
1229
1230 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
1231 assert(is_creator || (_wq_flags(wq) & WQ_EXITING));
1232 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1233 wq->wq_thidlecount++;
1234 return;
1235 }
1236
1237 if (!is_creator) {
1238 _wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket);
1239 wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--;
1240 uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP;
1241 }
1242
1243 uth->uu_save.uus_workq_park_data.idle_stamp = now;
1244
1245 struct uthread *oldest = workq_oldest_killable_idle_thread(wq);
1246 uint16_t cur_idle = wq->wq_thidlecount;
1247
1248 if (cur_idle >= wq_max_constrained_threads ||
1249 (wq->wq_thdying_count == 0 && oldest &&
1250 workq_should_kill_idle_thread(wq, oldest, now))) {
1251 /*
1252 * Immediately kill threads if we have too may of them.
1253 *
1254 * And swap "place" with the oldest one we'd have woken up.
1255 * This is a relatively desperate situation where we really
1256 * need to kill threads quickly and it's best to kill
1257 * the one that's currently on core than context switching.
1258 */
1259 if (oldest) {
1260 oldest->uu_save.uus_workq_park_data.idle_stamp = now;
1261 TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry);
1262 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry);
1263 }
1264
1265 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
1266 wq, cur_idle, 0, 0);
1267 wq->wq_thdying_count++;
1268 uth->uu_workq_flags |= UT_WORKQ_DYING;
1269 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
1270 workq_unpark_for_death_and_unlock(p, wq, uth, 0, setup_flags);
1271 __builtin_unreachable();
1272 }
1273
1274 struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
1275
1276 cur_idle += 1;
1277 wq->wq_thidlecount = cur_idle;
1278
1279 if (cur_idle >= wq_death_max_load && tail &&
1280 tail->uu_save.uus_workq_park_data.has_stack) {
1281 uth->uu_save.uus_workq_park_data.has_stack = false;
1282 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry);
1283 } else {
1284 uth->uu_save.uus_workq_park_data.has_stack = true;
1285 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry);
1286 }
1287
1288 if (!tail) {
1289 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
1290 workq_death_call_schedule(wq, now + delay);
1291 }
1292 }
1293
1294 #pragma mark thread requests
1295
1296 static inline bool
workq_tr_is_overcommit(workq_tr_flags_t tr_flags)1297 workq_tr_is_overcommit(workq_tr_flags_t tr_flags)
1298 {
1299 return (tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) != 0;
1300 }
1301
1302 static inline bool
workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)1303 workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)
1304 {
1305 return (tr_flags & (WORKQ_TR_FLAG_OVERCOMMIT | WORKQ_TR_FLAG_COOPERATIVE)) == 0;
1306 }
1307
1308 static inline bool
workq_tr_is_cooperative(workq_tr_flags_t tr_flags)1309 workq_tr_is_cooperative(workq_tr_flags_t tr_flags)
1310 {
1311 return (tr_flags & WORKQ_TR_FLAG_COOPERATIVE) != 0;
1312 }
1313
1314 #define workq_threadreq_is_overcommit(req) workq_tr_is_overcommit((req)->tr_flags)
1315 #define workq_threadreq_is_nonovercommit(req) workq_tr_is_nonovercommit((req)->tr_flags)
1316 #define workq_threadreq_is_cooperative(req) workq_tr_is_cooperative((req)->tr_flags)
1317
1318 static inline int
workq_priority_for_req(workq_threadreq_t req)1319 workq_priority_for_req(workq_threadreq_t req)
1320 {
1321 thread_qos_t qos = req->tr_qos;
1322
1323 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1324 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
1325 assert(trp.trp_flags & TRP_PRIORITY);
1326 return trp.trp_pri;
1327 }
1328 return thread_workq_pri_for_qos(qos);
1329 }
1330
1331 static inline struct priority_queue_sched_max *
workq_priority_queue_for_req(struct workqueue * wq,workq_threadreq_t req)1332 workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req)
1333 {
1334 assert(!workq_tr_is_cooperative(req->tr_flags));
1335
1336 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1337 return &wq->wq_special_queue;
1338 } else if (workq_tr_is_overcommit(req->tr_flags)) {
1339 return &wq->wq_overcommit_queue;
1340 } else {
1341 return &wq->wq_constrained_queue;
1342 }
1343 }
1344
1345
1346 /* Calculates the number of threads scheduled >= the input QoS */
1347 static uint64_t
workq_num_cooperative_threads_scheduled_to_qos(struct workqueue * wq,thread_qos_t qos)1348 workq_num_cooperative_threads_scheduled_to_qos(struct workqueue *wq, thread_qos_t qos)
1349 {
1350 workq_lock_held(wq);
1351
1352 uint64_t num_cooperative_threads = 0;
1353
1354 for (thread_qos_t cur_qos = WORKQ_THREAD_QOS_MAX; cur_qos >= qos; cur_qos--) {
1355 int bucket = _wq_bucket(cur_qos);
1356 num_cooperative_threads += wq->wq_cooperative_queue_scheduled_count[bucket];
1357 }
1358
1359 return num_cooperative_threads;
1360 }
1361
1362 static uint64_t
workq_num_cooperative_threads_scheduled_total(struct workqueue * wq)1363 workq_num_cooperative_threads_scheduled_total(struct workqueue *wq)
1364 {
1365 return workq_num_cooperative_threads_scheduled_to_qos(wq, WORKQ_THREAD_QOS_MIN);
1366 }
1367
1368 #if DEBUG || DEVELOPMENT
1369 static bool
workq_has_cooperative_thread_requests(struct workqueue * wq)1370 workq_has_cooperative_thread_requests(struct workqueue *wq)
1371 {
1372 for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1373 int bucket = _wq_bucket(qos);
1374 if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1375 return true;
1376 }
1377 }
1378
1379 return false;
1380 }
1381 #endif
1382
1383 /*
1384 * Determines the next QoS bucket we should service next in the cooperative
1385 * pool. This function will always return a QoS for cooperative pool as long as
1386 * there are requests to be serviced.
1387 *
1388 * Unlike the other thread pools, for the cooperative thread pool the schedule
1389 * counts for the various buckets in the pool affect the next best request for
1390 * it.
1391 *
1392 * This function is called in the following contexts:
1393 *
1394 * a) When determining the best thread QoS for cooperative bucket for the
1395 * creator/thread reuse
1396 *
1397 * b) Once (a) has happened and thread has bound to a thread request, figuring
1398 * out whether the next best request for this pool has changed so that creator
1399 * can be scheduled.
1400 *
1401 * Returns true if the cooperative queue's best qos changed from previous
1402 * value.
1403 */
1404 static bool
_wq_cooperative_queue_refresh_best_req_qos(struct workqueue * wq)1405 _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq)
1406 {
1407 workq_lock_held(wq);
1408
1409 thread_qos_t old_best_req_qos = wq->wq_cooperative_queue_best_req_qos;
1410
1411 /* We determine the next best cooperative thread request based on the
1412 * following:
1413 *
1414 * 1. Take the MAX of the following:
1415 * a) Highest qos with pending TRs such that number of scheduled
1416 * threads so far with >= qos is < wq_max_cooperative_threads
1417 * b) Highest qos bucket with pending TRs but no scheduled threads for that bucket
1418 *
1419 * 2. If the result of (1) is UN, then we pick the highest priority amongst
1420 * pending thread requests in the pool.
1421 *
1422 */
1423 thread_qos_t highest_qos_with_no_scheduled = THREAD_QOS_UNSPECIFIED;
1424 thread_qos_t highest_qos_req_with_width = THREAD_QOS_UNSPECIFIED;
1425
1426 thread_qos_t highest_qos_req = THREAD_QOS_UNSPECIFIED;
1427
1428 int scheduled_count_till_qos = 0;
1429
1430 for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1431 int bucket = _wq_bucket(qos);
1432 uint8_t scheduled_count_for_bucket = wq->wq_cooperative_queue_scheduled_count[bucket];
1433 scheduled_count_till_qos += scheduled_count_for_bucket;
1434
1435 if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1436 if (qos > highest_qos_req) {
1437 highest_qos_req = qos;
1438 }
1439 /*
1440 * The pool isn't saturated for threads at and above this QoS, and
1441 * this qos bucket has pending requests
1442 */
1443 if (scheduled_count_till_qos < wq_cooperative_queue_max_size(wq)) {
1444 if (qos > highest_qos_req_with_width) {
1445 highest_qos_req_with_width = qos;
1446 }
1447 }
1448
1449 /*
1450 * There are no threads scheduled for this bucket but there
1451 * is work pending, give it at least 1 thread
1452 */
1453 if (scheduled_count_for_bucket == 0) {
1454 if (qos > highest_qos_with_no_scheduled) {
1455 highest_qos_with_no_scheduled = qos;
1456 }
1457 }
1458 }
1459 }
1460
1461 wq->wq_cooperative_queue_best_req_qos = MAX(highest_qos_with_no_scheduled, highest_qos_req_with_width);
1462 if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1463 wq->wq_cooperative_queue_best_req_qos = highest_qos_req;
1464 }
1465
1466 #if DEBUG || DEVELOPMENT
1467 /* Assert that if we are showing up the next best req as UN, then there
1468 * actually is no thread request in the cooperative pool buckets */
1469 if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1470 assert(!workq_has_cooperative_thread_requests(wq));
1471 }
1472 #endif
1473
1474 return old_best_req_qos != wq->wq_cooperative_queue_best_req_qos;
1475 }
1476
1477 /*
1478 * Returns whether or not the input thread (or creator thread if uth is NULL)
1479 * should be allowed to work as part of the cooperative pool for the <input qos>
1480 * bucket.
1481 *
1482 * This function is called in a bunch of places:
1483 * a) Quantum expires for a thread and it is part of the cooperative pool
1484 * b) When trying to pick a thread request for the creator thread to
1485 * represent.
1486 * c) When a thread is trying to pick a thread request to actually bind to
1487 * and service.
1488 *
1489 * Called with workq lock held.
1490 */
1491
1492 #define WQ_COOPERATIVE_POOL_UNSATURATED 1
1493 #define WQ_COOPERATIVE_BUCKET_UNSERVICED 2
1494 #define WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS 3
1495
1496 static bool
workq_cooperative_allowance(struct workqueue * wq,thread_qos_t qos,struct uthread * uth,bool may_start_timer)1497 workq_cooperative_allowance(struct workqueue *wq, thread_qos_t qos, struct uthread *uth,
1498 bool may_start_timer)
1499 {
1500 workq_lock_held(wq);
1501
1502 bool exclude_thread_as_scheduled = false;
1503 bool passed_admissions = false;
1504 int bucket = _wq_bucket(qos);
1505
1506 if (uth && workq_thread_is_cooperative(uth)) {
1507 exclude_thread_as_scheduled = true;
1508 _wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_bucket);
1509 }
1510
1511 /*
1512 * We have not saturated the pool yet, let this thread continue
1513 */
1514 uint64_t total_cooperative_threads;
1515 total_cooperative_threads = workq_num_cooperative_threads_scheduled_total(wq);
1516 if (total_cooperative_threads < wq_cooperative_queue_max_size(wq)) {
1517 passed_admissions = true;
1518 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1519 total_cooperative_threads, qos, passed_admissions,
1520 WQ_COOPERATIVE_POOL_UNSATURATED);
1521 goto out;
1522 }
1523
1524 /*
1525 * Without this thread, nothing is servicing the bucket which has pending
1526 * work
1527 */
1528 uint64_t bucket_scheduled = wq->wq_cooperative_queue_scheduled_count[bucket];
1529 if (bucket_scheduled == 0 &&
1530 !STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1531 passed_admissions = true;
1532 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1533 total_cooperative_threads, qos, passed_admissions,
1534 WQ_COOPERATIVE_BUCKET_UNSERVICED);
1535 goto out;
1536 }
1537
1538 /*
1539 * If number of threads at the QoS bucket >= input QoS exceeds the max we want
1540 * for the pool, deny this thread
1541 */
1542 uint64_t aggregate_down_to_qos = workq_num_cooperative_threads_scheduled_to_qos(wq, qos);
1543 passed_admissions = (aggregate_down_to_qos < wq_cooperative_queue_max_size(wq));
1544 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, aggregate_down_to_qos,
1545 qos, passed_admissions, WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS);
1546
1547 if (!passed_admissions && may_start_timer) {
1548 workq_schedule_delayed_thread_creation(wq, 0);
1549 }
1550
1551 out:
1552 if (exclude_thread_as_scheduled) {
1553 _wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_bucket);
1554 }
1555 return passed_admissions;
1556 }
1557
1558 /*
1559 * returns true if the best request for the pool changed as a result of
1560 * enqueuing this thread request.
1561 */
1562 static bool
workq_threadreq_enqueue(struct workqueue * wq,workq_threadreq_t req)1563 workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req)
1564 {
1565 assert(req->tr_state == WORKQ_TR_STATE_NEW);
1566
1567 req->tr_state = WORKQ_TR_STATE_QUEUED;
1568 wq->wq_reqcount += req->tr_count;
1569
1570 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1571 assert(wq->wq_event_manager_threadreq == NULL);
1572 assert(req->tr_flags & WORKQ_TR_FLAG_KEVENT);
1573 assert(req->tr_count == 1);
1574 wq->wq_event_manager_threadreq = req;
1575 return true;
1576 }
1577
1578 if (workq_threadreq_is_cooperative(req)) {
1579 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1580 assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1581
1582 struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1583 STAILQ_INSERT_TAIL(bucket, req, tr_link);
1584
1585 return _wq_cooperative_queue_refresh_best_req_qos(wq);
1586 }
1587
1588 struct priority_queue_sched_max *q = workq_priority_queue_for_req(wq, req);
1589
1590 priority_queue_entry_set_sched_pri(q, &req->tr_entry,
1591 workq_priority_for_req(req), false);
1592
1593 if (priority_queue_insert(q, &req->tr_entry)) {
1594 if (workq_threadreq_is_nonovercommit(req)) {
1595 _wq_thactive_refresh_best_constrained_req_qos(wq);
1596 }
1597 return true;
1598 }
1599 return false;
1600 }
1601
1602 /*
1603 * returns true if one of the following is true (so as to update creator if
1604 * needed):
1605 *
1606 * (a) the next highest request of the pool we dequeued the request from changed
1607 * (b) the next highest requests of the pool the current thread used to be a
1608 * part of, changed
1609 *
1610 * For overcommit, special and constrained pools, the next highest QoS for each
1611 * pool just a MAX of pending requests so tracking (a) is sufficient.
1612 *
1613 * But for cooperative thread pool, the next highest QoS for the pool depends on
1614 * schedule counts in the pool as well. So if the current thread used to be
1615 * cooperative in it's previous logical run ie (b), then that can also affect
1616 * cooperative pool's next best QoS requests.
1617 */
1618 static bool
workq_threadreq_dequeue(struct workqueue * wq,workq_threadreq_t req,bool cooperative_sched_count_changed)1619 workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req,
1620 bool cooperative_sched_count_changed)
1621 {
1622 wq->wq_reqcount--;
1623
1624 bool next_highest_request_changed = false;
1625
1626 if (--req->tr_count == 0) {
1627 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1628 assert(wq->wq_event_manager_threadreq == req);
1629 assert(req->tr_count == 0);
1630 wq->wq_event_manager_threadreq = NULL;
1631
1632 /* If a cooperative thread was the one which picked up the manager
1633 * thread request, we need to reevaluate the cooperative pool
1634 * anyways.
1635 */
1636 if (cooperative_sched_count_changed) {
1637 _wq_cooperative_queue_refresh_best_req_qos(wq);
1638 }
1639 return true;
1640 }
1641
1642 if (workq_threadreq_is_cooperative(req)) {
1643 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1644 assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1645 assert(req->tr_qos == wq->wq_cooperative_queue_best_req_qos);
1646
1647 struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1648 __assert_only workq_threadreq_t head = STAILQ_FIRST(bucket);
1649
1650 assert(head == req);
1651 STAILQ_REMOVE_HEAD(bucket, tr_link);
1652
1653 /*
1654 * If the request we're dequeueing is cooperative, then the sched
1655 * counts definitely changed.
1656 */
1657 assert(cooperative_sched_count_changed);
1658 }
1659
1660 /*
1661 * We want to do the cooperative pool refresh after dequeueing a
1662 * cooperative thread request if any (to combine both effects into 1
1663 * refresh operation)
1664 */
1665 if (cooperative_sched_count_changed) {
1666 next_highest_request_changed = _wq_cooperative_queue_refresh_best_req_qos(wq);
1667 }
1668
1669 if (!workq_threadreq_is_cooperative(req)) {
1670 /*
1671 * All other types of requests are enqueued in priority queues
1672 */
1673
1674 if (priority_queue_remove(workq_priority_queue_for_req(wq, req),
1675 &req->tr_entry)) {
1676 next_highest_request_changed |= true;
1677 if (workq_threadreq_is_nonovercommit(req)) {
1678 _wq_thactive_refresh_best_constrained_req_qos(wq);
1679 }
1680 }
1681 }
1682 }
1683
1684 return next_highest_request_changed;
1685 }
1686
1687 static void
workq_threadreq_destroy(proc_t p,workq_threadreq_t req)1688 workq_threadreq_destroy(proc_t p, workq_threadreq_t req)
1689 {
1690 req->tr_state = WORKQ_TR_STATE_CANCELED;
1691 if (req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)) {
1692 kqueue_threadreq_cancel(p, req);
1693 } else {
1694 zfree(workq_zone_threadreq, req);
1695 }
1696 }
1697
1698 #pragma mark workqueue thread creation thread calls
1699
1700 static inline bool
workq_thread_call_prepost(struct workqueue * wq,uint32_t sched,uint32_t pend,uint32_t fail_mask)1701 workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend,
1702 uint32_t fail_mask)
1703 {
1704 uint32_t old_flags, new_flags;
1705
1706 os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, {
1707 if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) {
1708 os_atomic_rmw_loop_give_up(return false);
1709 }
1710 if (__improbable(old_flags & WQ_PROC_SUSPENDED)) {
1711 new_flags = old_flags | pend;
1712 } else {
1713 new_flags = old_flags | sched;
1714 }
1715 });
1716
1717 return (old_flags & WQ_PROC_SUSPENDED) == 0;
1718 }
1719
1720 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1721
1722 static bool
workq_schedule_delayed_thread_creation(struct workqueue * wq,int flags)1723 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags)
1724 {
1725 assert(!preemption_enabled());
1726
1727 if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED,
1728 WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED |
1729 WQ_IMMEDIATE_CALL_SCHEDULED)) {
1730 return false;
1731 }
1732
1733 uint64_t now = mach_absolute_time();
1734
1735 if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) {
1736 /* do not change the window */
1737 } else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) {
1738 wq->wq_timer_interval *= 2;
1739 if (wq->wq_timer_interval > wq_max_timer_interval.abstime) {
1740 wq->wq_timer_interval = (uint32_t)wq_max_timer_interval.abstime;
1741 }
1742 } else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) {
1743 wq->wq_timer_interval /= 2;
1744 if (wq->wq_timer_interval < wq_stalled_window.abstime) {
1745 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
1746 }
1747 }
1748
1749 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1750 _wq_flags(wq), wq->wq_timer_interval);
1751
1752 thread_call_t call = wq->wq_delayed_call;
1753 uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED;
1754 uint64_t deadline = now + wq->wq_timer_interval;
1755 if (thread_call_enter1_delayed(call, (void *)arg, deadline)) {
1756 panic("delayed_call was already enqueued");
1757 }
1758 return true;
1759 }
1760
1761 static void
workq_schedule_immediate_thread_creation(struct workqueue * wq)1762 workq_schedule_immediate_thread_creation(struct workqueue *wq)
1763 {
1764 assert(!preemption_enabled());
1765
1766 if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED,
1767 WQ_IMMEDIATE_CALL_PENDED, 0)) {
1768 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1769 _wq_flags(wq), 0);
1770
1771 uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED;
1772 if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) {
1773 panic("immediate_call was already enqueued");
1774 }
1775 }
1776 }
1777
1778 void
workq_proc_suspended(struct proc * p)1779 workq_proc_suspended(struct proc *p)
1780 {
1781 struct workqueue *wq = proc_get_wqptr(p);
1782
1783 if (wq) {
1784 os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed);
1785 }
1786 }
1787
1788 void
workq_proc_resumed(struct proc * p)1789 workq_proc_resumed(struct proc *p)
1790 {
1791 struct workqueue *wq = proc_get_wqptr(p);
1792 uint32_t wq_flags;
1793
1794 if (!wq) {
1795 return;
1796 }
1797
1798 wq_flags = os_atomic_andnot_orig(&wq->wq_flags, WQ_PROC_SUSPENDED |
1799 WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED, relaxed);
1800 if ((wq_flags & WQ_EXITING) == 0) {
1801 disable_preemption();
1802 if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) {
1803 workq_schedule_immediate_thread_creation(wq);
1804 } else if (wq_flags & WQ_DELAYED_CALL_PENDED) {
1805 workq_schedule_delayed_thread_creation(wq,
1806 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART);
1807 }
1808 enable_preemption();
1809 }
1810 }
1811
1812 /**
1813 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1814 */
1815 static bool
workq_thread_is_busy(uint64_t now,_Atomic uint64_t * lastblocked_tsp)1816 workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp)
1817 {
1818 uint64_t lastblocked_ts = os_atomic_load_wide(lastblocked_tsp, relaxed);
1819 if (now <= lastblocked_ts) {
1820 /*
1821 * Because the update of the timestamp when a thread blocks
1822 * isn't serialized against us looking at it (i.e. we don't hold
1823 * the workq lock), it's possible to have a timestamp that matches
1824 * the current time or that even looks to be in the future relative
1825 * to when we grabbed the current time...
1826 *
1827 * Just treat this as a busy thread since it must have just blocked.
1828 */
1829 return true;
1830 }
1831 return (now - lastblocked_ts) < wq_stalled_window.abstime;
1832 }
1833
1834 static void
workq_add_new_threads_call(void * _p,void * flags)1835 workq_add_new_threads_call(void *_p, void *flags)
1836 {
1837 proc_t p = _p;
1838 struct workqueue *wq = proc_get_wqptr(p);
1839 uint32_t my_flag = (uint32_t)(uintptr_t)flags;
1840
1841 /*
1842 * workq_exit() will set the workqueue to NULL before
1843 * it cancels thread calls.
1844 */
1845 if (!wq) {
1846 return;
1847 }
1848
1849 assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) ||
1850 (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED));
1851
1852 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq),
1853 wq->wq_nthreads, wq->wq_thidlecount);
1854
1855 workq_lock_spin(wq);
1856
1857 wq->wq_thread_call_last_run = mach_absolute_time();
1858 os_atomic_andnot(&wq->wq_flags, my_flag, release);
1859
1860 /* This can drop the workqueue lock, and take it again */
1861 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
1862
1863 workq_unlock(wq);
1864
1865 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0,
1866 wq->wq_nthreads, wq->wq_thidlecount);
1867 }
1868
1869 #pragma mark thread state tracking
1870
1871 static void
workq_sched_callback(int type,thread_t thread)1872 workq_sched_callback(int type, thread_t thread)
1873 {
1874 thread_ro_t tro = get_thread_ro(thread);
1875 struct uthread *uth = get_bsdthread_info(thread);
1876 struct workqueue *wq = proc_get_wqptr(tro->tro_proc);
1877 thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket;
1878 wq_thactive_t old_thactive;
1879 bool start_timer = false;
1880
1881 if (qos == WORKQ_THREAD_QOS_MANAGER) {
1882 return;
1883 }
1884
1885 switch (type) {
1886 case SCHED_CALL_BLOCK:
1887 old_thactive = _wq_thactive_dec(wq, qos);
1888 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1889
1890 /*
1891 * Remember the timestamp of the last thread that blocked in this
1892 * bucket, it used used by admission checks to ignore one thread
1893 * being inactive if this timestamp is recent enough.
1894 *
1895 * If we collide with another thread trying to update the
1896 * last_blocked (really unlikely since another thread would have to
1897 * get scheduled and then block after we start down this path), it's
1898 * not a problem. Either timestamp is adequate, so no need to retry
1899 */
1900 os_atomic_store_wide(&wq->wq_lastblocked_ts[_wq_bucket(qos)],
1901 thread_last_run_time(thread), relaxed);
1902
1903 if (req_qos == THREAD_QOS_UNSPECIFIED) {
1904 /*
1905 * No pending request at the moment we could unblock, move on.
1906 */
1907 } else if (qos < req_qos) {
1908 /*
1909 * The blocking thread is at a lower QoS than the highest currently
1910 * pending constrained request, nothing has to be redriven
1911 */
1912 } else {
1913 uint32_t max_busycount, old_req_count;
1914 old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
1915 req_qos, NULL, &max_busycount);
1916 /*
1917 * If it is possible that may_start_constrained_thread had refused
1918 * admission due to being over the max concurrency, we may need to
1919 * spin up a new thread.
1920 *
1921 * We take into account the maximum number of busy threads
1922 * that can affect may_start_constrained_thread as looking at the
1923 * actual number may_start_constrained_thread will see is racy.
1924 *
1925 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1926 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1927 */
1928 uint32_t conc = wq_max_parallelism[_wq_bucket(qos)];
1929 if (old_req_count <= conc && conc <= old_req_count + max_busycount) {
1930 start_timer = workq_schedule_delayed_thread_creation(wq, 0);
1931 }
1932 }
1933 if (__improbable(kdebug_enable)) {
1934 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1935 old_thactive, qos, NULL, NULL);
1936 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq,
1937 old - 1, qos | (req_qos << 8),
1938 wq->wq_reqcount << 1 | start_timer);
1939 }
1940 break;
1941
1942 case SCHED_CALL_UNBLOCK:
1943 /*
1944 * we cannot take the workqueue_lock here...
1945 * an UNBLOCK can occur from a timer event which
1946 * is run from an interrupt context... if the workqueue_lock
1947 * is already held by this processor, we'll deadlock...
1948 * the thread lock for the thread being UNBLOCKED
1949 * is also held
1950 */
1951 old_thactive = _wq_thactive_inc(wq, qos);
1952 if (__improbable(kdebug_enable)) {
1953 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1954 old_thactive, qos, NULL, NULL);
1955 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1956 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq,
1957 old + 1, qos | (req_qos << 8),
1958 wq->wq_threads_scheduled);
1959 }
1960 break;
1961 }
1962 }
1963
1964 #pragma mark workq lifecycle
1965
1966 void
workq_reference(struct workqueue * wq)1967 workq_reference(struct workqueue *wq)
1968 {
1969 os_ref_retain(&wq->wq_refcnt);
1970 }
1971
1972 static void
workq_deallocate_queue_invoke(mpsc_queue_chain_t e,__assert_only mpsc_daemon_queue_t dq)1973 workq_deallocate_queue_invoke(mpsc_queue_chain_t e,
1974 __assert_only mpsc_daemon_queue_t dq)
1975 {
1976 struct workqueue *wq;
1977 struct turnstile *ts;
1978
1979 wq = mpsc_queue_element(e, struct workqueue, wq_destroy_link);
1980 assert(dq == &workq_deallocate_queue);
1981
1982 turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts, TURNSTILE_WORKQS);
1983 assert(ts);
1984 turnstile_cleanup();
1985 turnstile_deallocate(ts);
1986
1987 lck_ticket_destroy(&wq->wq_lock, &workq_lck_grp);
1988 zfree(workq_zone_workqueue, wq);
1989 }
1990
1991 static void
workq_deallocate(struct workqueue * wq)1992 workq_deallocate(struct workqueue *wq)
1993 {
1994 if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) {
1995 workq_deallocate_queue_invoke(&wq->wq_destroy_link,
1996 &workq_deallocate_queue);
1997 }
1998 }
1999
2000 void
workq_deallocate_safe(struct workqueue * wq)2001 workq_deallocate_safe(struct workqueue *wq)
2002 {
2003 if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) {
2004 mpsc_daemon_enqueue(&workq_deallocate_queue, &wq->wq_destroy_link,
2005 MPSC_QUEUE_DISABLE_PREEMPTION);
2006 }
2007 }
2008
2009 /**
2010 * Setup per-process state for the workqueue.
2011 */
2012 int
workq_open(struct proc * p,__unused struct workq_open_args * uap,__unused int32_t * retval)2013 workq_open(struct proc *p, __unused struct workq_open_args *uap,
2014 __unused int32_t *retval)
2015 {
2016 struct workqueue *wq;
2017 int error = 0;
2018
2019 if ((p->p_lflag & P_LREGISTER) == 0) {
2020 return EINVAL;
2021 }
2022
2023 if (wq_init_constrained_limit) {
2024 uint32_t limit, num_cpus = ml_wait_max_cpus();
2025
2026 /*
2027 * set up the limit for the constrained pool
2028 * this is a virtual pool in that we don't
2029 * maintain it on a separate idle and run list
2030 */
2031 limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR;
2032
2033 if (limit > wq_max_constrained_threads) {
2034 wq_max_constrained_threads = limit;
2035 }
2036
2037 if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) {
2038 wq_max_threads = WQ_THACTIVE_BUCKET_HALF;
2039 }
2040 if (wq_max_threads > CONFIG_THREAD_MAX - 20) {
2041 wq_max_threads = CONFIG_THREAD_MAX - 20;
2042 }
2043
2044 wq_death_max_load = (uint16_t)fls(num_cpus) + 1;
2045
2046 for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) {
2047 wq_max_parallelism[_wq_bucket(qos)] =
2048 qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL);
2049 }
2050
2051 wq_max_cooperative_threads = num_cpus;
2052
2053 wq_init_constrained_limit = 0;
2054 }
2055
2056 if (proc_get_wqptr(p) == NULL) {
2057 if (proc_init_wqptr_or_wait(p) == FALSE) {
2058 assert(proc_get_wqptr(p) != NULL);
2059 goto out;
2060 }
2061
2062 wq = zalloc_flags(workq_zone_workqueue, Z_WAITOK | Z_ZERO);
2063
2064 os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1);
2065
2066 // Start the event manager at the priority hinted at by the policy engine
2067 thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task());
2068 pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0);
2069 wq->wq_event_manager_priority = (uint32_t)pp;
2070 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
2071 wq->wq_proc = p;
2072 turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(),
2073 TURNSTILE_WORKQS);
2074
2075 TAILQ_INIT(&wq->wq_thrunlist);
2076 TAILQ_INIT(&wq->wq_thnewlist);
2077 TAILQ_INIT(&wq->wq_thidlelist);
2078 priority_queue_init(&wq->wq_overcommit_queue);
2079 priority_queue_init(&wq->wq_constrained_queue);
2080 priority_queue_init(&wq->wq_special_queue);
2081 for (int bucket = 0; bucket < WORKQ_NUM_QOS_BUCKETS; bucket++) {
2082 STAILQ_INIT(&wq->wq_cooperative_queue[bucket]);
2083 }
2084
2085 /* We are only using the delayed thread call for the constrained pool
2086 * which can't have work at >= UI QoS and so we can be fine with a
2087 * UI QoS thread call.
2088 */
2089 wq->wq_delayed_call = thread_call_allocate_with_qos(
2090 workq_add_new_threads_call, p, THREAD_QOS_USER_INTERACTIVE,
2091 THREAD_CALL_OPTIONS_ONCE);
2092 wq->wq_immediate_call = thread_call_allocate_with_options(
2093 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL,
2094 THREAD_CALL_OPTIONS_ONCE);
2095 wq->wq_death_call = thread_call_allocate_with_options(
2096 workq_kill_old_threads_call, wq,
2097 THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
2098
2099 lck_ticket_init(&wq->wq_lock, &workq_lck_grp);
2100
2101 WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq,
2102 VM_KERNEL_ADDRHIDE(wq), 0, 0);
2103 proc_set_wqptr(p, wq);
2104 }
2105 out:
2106
2107 return error;
2108 }
2109
2110 /*
2111 * Routine: workq_mark_exiting
2112 *
2113 * Function: Mark the work queue such that new threads will not be added to the
2114 * work queue after we return.
2115 *
2116 * Conditions: Called against the current process.
2117 */
2118 void
workq_mark_exiting(struct proc * p)2119 workq_mark_exiting(struct proc *p)
2120 {
2121 struct workqueue *wq = proc_get_wqptr(p);
2122 uint32_t wq_flags;
2123 workq_threadreq_t mgr_req;
2124
2125 if (!wq) {
2126 return;
2127 }
2128
2129 WQ_TRACE_WQ(TRACE_wq_pthread_exit | DBG_FUNC_START, wq, 0, 0, 0);
2130
2131 workq_lock_spin(wq);
2132
2133 wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed);
2134 if (__improbable(wq_flags & WQ_EXITING)) {
2135 panic("workq_mark_exiting called twice");
2136 }
2137
2138 /*
2139 * Opportunistically try to cancel thread calls that are likely in flight.
2140 * workq_exit() will do the proper cleanup.
2141 */
2142 if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) {
2143 thread_call_cancel(wq->wq_immediate_call);
2144 }
2145 if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) {
2146 thread_call_cancel(wq->wq_delayed_call);
2147 }
2148 if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
2149 thread_call_cancel(wq->wq_death_call);
2150 }
2151
2152 mgr_req = wq->wq_event_manager_threadreq;
2153 wq->wq_event_manager_threadreq = NULL;
2154 wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */
2155 wq->wq_creator = NULL;
2156 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
2157
2158 workq_unlock(wq);
2159
2160 if (mgr_req) {
2161 kqueue_threadreq_cancel(p, mgr_req);
2162 }
2163 /*
2164 * No one touches the priority queues once WQ_EXITING is set.
2165 * It is hence safe to do the tear down without holding any lock.
2166 */
2167 priority_queue_destroy(&wq->wq_overcommit_queue,
2168 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2169 workq_threadreq_destroy(p, e);
2170 });
2171 priority_queue_destroy(&wq->wq_constrained_queue,
2172 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2173 workq_threadreq_destroy(p, e);
2174 });
2175 priority_queue_destroy(&wq->wq_special_queue,
2176 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2177 workq_threadreq_destroy(p, e);
2178 });
2179
2180 WQ_TRACE(TRACE_wq_pthread_exit | DBG_FUNC_END, 0, 0, 0, 0);
2181 }
2182
2183 /*
2184 * Routine: workq_exit
2185 *
2186 * Function: clean up the work queue structure(s) now that there are no threads
2187 * left running inside the work queue (except possibly current_thread).
2188 *
2189 * Conditions: Called by the last thread in the process.
2190 * Called against current process.
2191 */
2192 void
workq_exit(struct proc * p)2193 workq_exit(struct proc *p)
2194 {
2195 struct workqueue *wq;
2196 struct uthread *uth, *tmp;
2197
2198 wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed);
2199 if (wq != NULL) {
2200 thread_t th = current_thread();
2201
2202 WQ_TRACE_WQ(TRACE_wq_workqueue_exit | DBG_FUNC_START, wq, 0, 0, 0);
2203
2204 if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) {
2205 /*
2206 * <rdar://problem/40111515> Make sure we will no longer call the
2207 * sched call, if we ever block this thread, which the cancel_wait
2208 * below can do.
2209 */
2210 thread_sched_call(th, NULL);
2211 }
2212
2213 /*
2214 * Thread calls are always scheduled by the proc itself or under the
2215 * workqueue spinlock if WQ_EXITING is not yet set.
2216 *
2217 * Either way, when this runs, the proc has no threads left beside
2218 * the one running this very code, so we know no thread call can be
2219 * dispatched anymore.
2220 */
2221 thread_call_cancel_wait(wq->wq_delayed_call);
2222 thread_call_cancel_wait(wq->wq_immediate_call);
2223 thread_call_cancel_wait(wq->wq_death_call);
2224 thread_call_free(wq->wq_delayed_call);
2225 thread_call_free(wq->wq_immediate_call);
2226 thread_call_free(wq->wq_death_call);
2227
2228 /*
2229 * Clean up workqueue data structures for threads that exited and
2230 * didn't get a chance to clean up after themselves.
2231 *
2232 * idle/new threads should have been interrupted and died on their own
2233 */
2234 TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) {
2235 thread_t mth = get_machthread(uth);
2236 thread_sched_call(mth, NULL);
2237 thread_deallocate(mth);
2238 }
2239 assert(TAILQ_EMPTY(&wq->wq_thnewlist));
2240 assert(TAILQ_EMPTY(&wq->wq_thidlelist));
2241
2242 WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq,
2243 VM_KERNEL_ADDRHIDE(wq), 0, 0);
2244
2245 workq_deallocate(wq);
2246
2247 WQ_TRACE(TRACE_wq_workqueue_exit | DBG_FUNC_END, 0, 0, 0, 0);
2248 }
2249 }
2250
2251
2252 #pragma mark bsd thread control
2253
2254 bool
bsdthread_part_of_cooperative_workqueue(struct uthread * uth)2255 bsdthread_part_of_cooperative_workqueue(struct uthread *uth)
2256 {
2257 return (workq_thread_is_cooperative(uth) || workq_thread_is_nonovercommit(uth)) &&
2258 (uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER);
2259 }
2260
2261 static bool
_pthread_priority_to_policy(pthread_priority_t priority,thread_qos_policy_data_t * data)2262 _pthread_priority_to_policy(pthread_priority_t priority,
2263 thread_qos_policy_data_t *data)
2264 {
2265 data->qos_tier = _pthread_priority_thread_qos(priority);
2266 data->tier_importance = _pthread_priority_relpri(priority);
2267 if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 ||
2268 data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) {
2269 return false;
2270 }
2271 return true;
2272 }
2273
2274 static int
bsdthread_set_self(proc_t p,thread_t th,pthread_priority_t priority,mach_port_name_t voucher,enum workq_set_self_flags flags)2275 bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority,
2276 mach_port_name_t voucher, enum workq_set_self_flags flags)
2277 {
2278 struct uthread *uth = get_bsdthread_info(th);
2279 struct workqueue *wq = proc_get_wqptr(p);
2280
2281 kern_return_t kr;
2282 int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0;
2283 bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE);
2284
2285 if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) {
2286 if (!is_wq_thread) {
2287 unbind_rv = EINVAL;
2288 goto qos;
2289 }
2290
2291 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
2292 unbind_rv = EINVAL;
2293 goto qos;
2294 }
2295
2296 workq_threadreq_t kqr = uth->uu_kqr_bound;
2297 if (kqr == NULL) {
2298 unbind_rv = EALREADY;
2299 goto qos;
2300 }
2301
2302 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2303 unbind_rv = EINVAL;
2304 goto qos;
2305 }
2306
2307 kqueue_threadreq_unbind(p, kqr);
2308 }
2309
2310 qos:
2311 if (flags & WORKQ_SET_SELF_QOS_FLAG) {
2312 thread_qos_policy_data_t new_policy;
2313
2314 if (!_pthread_priority_to_policy(priority, &new_policy)) {
2315 qos_rv = EINVAL;
2316 goto voucher;
2317 }
2318
2319 if (!is_wq_thread) {
2320 /*
2321 * Threads opted out of QoS can't change QoS
2322 */
2323 if (!thread_has_qos_policy(th)) {
2324 qos_rv = EPERM;
2325 goto voucher;
2326 }
2327 } else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER ||
2328 uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_ABOVEUI) {
2329 /*
2330 * Workqueue manager threads or threads above UI can't change QoS
2331 */
2332 qos_rv = EINVAL;
2333 goto voucher;
2334 } else {
2335 /*
2336 * For workqueue threads, possibly adjust buckets and redrive thread
2337 * requests.
2338 *
2339 * Transitions allowed:
2340 *
2341 * overcommit --> non-overcommit
2342 * overcommit --> overcommit
2343 * non-overcommit --> non-overcommit
2344 * non-overcommit --> overcommit (to be deprecated later)
2345 * cooperative --> cooperative
2346 *
2347 * All other transitions aren't allowed so reject them.
2348 */
2349 if (workq_thread_is_overcommit(uth) && _pthread_priority_is_cooperative(priority)) {
2350 qos_rv = EINVAL;
2351 goto voucher;
2352 } else if (workq_thread_is_cooperative(uth) && !_pthread_priority_is_cooperative(priority)) {
2353 qos_rv = EINVAL;
2354 goto voucher;
2355 } else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_cooperative(priority)) {
2356 qos_rv = EINVAL;
2357 goto voucher;
2358 }
2359
2360 struct uu_workq_policy old_pri, new_pri;
2361 bool force_run = false;
2362
2363 workq_lock_spin(wq);
2364
2365 old_pri = new_pri = uth->uu_workq_pri;
2366 new_pri.qos_req = (thread_qos_t)new_policy.qos_tier;
2367
2368 /* Adjust schedule counts for various types of transitions */
2369
2370 /* overcommit -> non-overcommit */
2371 if (workq_thread_is_overcommit(uth) && _pthread_priority_is_nonovercommit(priority)) {
2372 workq_thread_set_type(uth, 0);
2373 wq->wq_constrained_threads_scheduled++;
2374
2375 /* non-overcommit -> overcommit */
2376 } else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_overcommit(priority)) {
2377 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
2378 force_run = (wq->wq_constrained_threads_scheduled-- == wq_max_constrained_threads);
2379
2380 /* cooperative -> cooperative */
2381 } else if (workq_thread_is_cooperative(uth)) {
2382 _wq_cooperative_queue_scheduled_count_dec(wq, old_pri.qos_bucket);
2383 _wq_cooperative_queue_scheduled_count_inc(wq, workq_pri_bucket(new_pri));
2384
2385 /* We're changing schedule counts within cooperative pool, we
2386 * need to refresh best cooperative QoS logic again */
2387 force_run = _wq_cooperative_queue_refresh_best_req_qos(wq);
2388 }
2389
2390 /* This will also call schedule_creator if needed */
2391 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run);
2392 workq_unlock(wq);
2393
2394 if (workq_thread_is_overcommit(uth)) {
2395 thread_disarm_workqueue_quantum(th);
2396 } else {
2397 /* If the thread changed QoS buckets, the quantum duration
2398 * may have changed too */
2399 thread_arm_workqueue_quantum(th);
2400 }
2401 }
2402
2403 kr = thread_policy_set_internal(th, THREAD_QOS_POLICY,
2404 (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT);
2405 if (kr != KERN_SUCCESS) {
2406 qos_rv = EINVAL;
2407 }
2408 }
2409
2410 voucher:
2411 if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) {
2412 kr = thread_set_voucher_name(voucher);
2413 if (kr != KERN_SUCCESS) {
2414 voucher_rv = ENOENT;
2415 goto fixedpri;
2416 }
2417 }
2418
2419 fixedpri:
2420 if (qos_rv) {
2421 goto done;
2422 }
2423 if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) {
2424 thread_extended_policy_data_t extpol = {.timeshare = 0};
2425
2426 if (is_wq_thread) {
2427 /* Not allowed on workqueue threads */
2428 fixedpri_rv = ENOTSUP;
2429 goto done;
2430 }
2431
2432 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2433 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2434 if (kr != KERN_SUCCESS) {
2435 fixedpri_rv = EINVAL;
2436 goto done;
2437 }
2438 } else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) {
2439 thread_extended_policy_data_t extpol = {.timeshare = 1};
2440
2441 if (is_wq_thread) {
2442 /* Not allowed on workqueue threads */
2443 fixedpri_rv = ENOTSUP;
2444 goto done;
2445 }
2446
2447 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2448 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2449 if (kr != KERN_SUCCESS) {
2450 fixedpri_rv = EINVAL;
2451 goto done;
2452 }
2453 }
2454
2455 done:
2456 if (qos_rv && voucher_rv) {
2457 /* Both failed, give that a unique error. */
2458 return EBADMSG;
2459 }
2460
2461 if (unbind_rv) {
2462 return unbind_rv;
2463 }
2464
2465 if (qos_rv) {
2466 return qos_rv;
2467 }
2468
2469 if (voucher_rv) {
2470 return voucher_rv;
2471 }
2472
2473 if (fixedpri_rv) {
2474 return fixedpri_rv;
2475 }
2476
2477
2478 return 0;
2479 }
2480
2481 static int
bsdthread_add_explicit_override(proc_t p,mach_port_name_t kport,pthread_priority_t pp,user_addr_t resource)2482 bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport,
2483 pthread_priority_t pp, user_addr_t resource)
2484 {
2485 thread_qos_t qos = _pthread_priority_thread_qos(pp);
2486 if (qos == THREAD_QOS_UNSPECIFIED) {
2487 return EINVAL;
2488 }
2489
2490 thread_t th = port_name_to_thread(kport,
2491 PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2492 if (th == THREAD_NULL) {
2493 return ESRCH;
2494 }
2495
2496 int rv = proc_thread_qos_add_override(p->task, th, 0, qos, TRUE,
2497 resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2498
2499 thread_deallocate(th);
2500 return rv;
2501 }
2502
2503 static int
bsdthread_remove_explicit_override(proc_t p,mach_port_name_t kport,user_addr_t resource)2504 bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport,
2505 user_addr_t resource)
2506 {
2507 thread_t th = port_name_to_thread(kport,
2508 PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2509 if (th == THREAD_NULL) {
2510 return ESRCH;
2511 }
2512
2513 int rv = proc_thread_qos_remove_override(p->task, th, 0, resource,
2514 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2515
2516 thread_deallocate(th);
2517 return rv;
2518 }
2519
2520 static int
workq_thread_add_dispatch_override(proc_t p,mach_port_name_t kport,pthread_priority_t pp,user_addr_t ulock_addr)2521 workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport,
2522 pthread_priority_t pp, user_addr_t ulock_addr)
2523 {
2524 struct uu_workq_policy old_pri, new_pri;
2525 struct workqueue *wq = proc_get_wqptr(p);
2526
2527 thread_qos_t qos_override = _pthread_priority_thread_qos(pp);
2528 if (qos_override == THREAD_QOS_UNSPECIFIED) {
2529 return EINVAL;
2530 }
2531
2532 thread_t thread = port_name_to_thread(kport,
2533 PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2534 if (thread == THREAD_NULL) {
2535 return ESRCH;
2536 }
2537
2538 struct uthread *uth = get_bsdthread_info(thread);
2539 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2540 thread_deallocate(thread);
2541 return EPERM;
2542 }
2543
2544 WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE,
2545 wq, thread_tid(thread), 1, pp);
2546
2547 thread_mtx_lock(thread);
2548
2549 if (ulock_addr) {
2550 uint32_t val;
2551 int rc;
2552 /*
2553 * Workaround lack of explicit support for 'no-fault copyin'
2554 * <rdar://problem/24999882>, as disabling preemption prevents paging in
2555 */
2556 disable_preemption();
2557 rc = copyin_atomic32(ulock_addr, &val);
2558 enable_preemption();
2559 if (rc == 0 && ulock_owner_value_to_port_name(val) != kport) {
2560 goto out;
2561 }
2562 }
2563
2564 workq_lock_spin(wq);
2565
2566 old_pri = uth->uu_workq_pri;
2567 if (old_pri.qos_override >= qos_override) {
2568 /* Nothing to do */
2569 } else if (thread == current_thread()) {
2570 new_pri = old_pri;
2571 new_pri.qos_override = qos_override;
2572 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2573 } else {
2574 uth->uu_workq_pri.qos_override = qos_override;
2575 if (qos_override > workq_pri_override(old_pri)) {
2576 thread_set_workq_override(thread, qos_override);
2577 }
2578 }
2579
2580 workq_unlock(wq);
2581
2582 out:
2583 thread_mtx_unlock(thread);
2584 thread_deallocate(thread);
2585 return 0;
2586 }
2587
2588 static int
workq_thread_reset_dispatch_override(proc_t p,thread_t thread)2589 workq_thread_reset_dispatch_override(proc_t p, thread_t thread)
2590 {
2591 struct uu_workq_policy old_pri, new_pri;
2592 struct workqueue *wq = proc_get_wqptr(p);
2593 struct uthread *uth = get_bsdthread_info(thread);
2594
2595 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2596 return EPERM;
2597 }
2598
2599 WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0);
2600
2601 workq_lock_spin(wq);
2602 old_pri = new_pri = uth->uu_workq_pri;
2603 new_pri.qos_override = THREAD_QOS_UNSPECIFIED;
2604 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2605 workq_unlock(wq);
2606 return 0;
2607 }
2608
2609 static int
workq_thread_allow_kill(__unused proc_t p,thread_t thread,bool enable)2610 workq_thread_allow_kill(__unused proc_t p, thread_t thread, bool enable)
2611 {
2612 if (!(thread_get_tag(thread) & THREAD_TAG_WORKQUEUE)) {
2613 // If the thread isn't a workqueue thread, don't set the
2614 // kill_allowed bit; however, we still need to return 0
2615 // instead of an error code since this code is executed
2616 // on the abort path which needs to not depend on the
2617 // pthread_t (returning an error depends on pthread_t via
2618 // cerror_nocancel)
2619 return 0;
2620 }
2621 struct uthread *uth = get_bsdthread_info(thread);
2622 uth->uu_workq_pthread_kill_allowed = enable;
2623 return 0;
2624 }
2625
2626 static int
bsdthread_get_max_parallelism(thread_qos_t qos,unsigned long flags,int * retval)2627 bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags,
2628 int *retval)
2629 {
2630 static_assert(QOS_PARALLELISM_COUNT_LOGICAL ==
2631 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical");
2632 static_assert(QOS_PARALLELISM_REALTIME ==
2633 _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime");
2634 static_assert(QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE ==
2635 _PTHREAD_QOS_PARALLELISM_CLUSTER_SHARED_RSRC, "cluster shared resource");
2636
2637 if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL | QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE)) {
2638 return EINVAL;
2639 }
2640
2641 /* No units are present */
2642 if (flags & QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE) {
2643 return ENOTSUP;
2644 }
2645
2646 if (flags & QOS_PARALLELISM_REALTIME) {
2647 if (qos) {
2648 return EINVAL;
2649 }
2650 } else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) {
2651 return EINVAL;
2652 }
2653
2654 *retval = qos_max_parallelism(qos, flags);
2655 return 0;
2656 }
2657
2658 static int
bsdthread_dispatch_apply_attr(__unused struct proc * p,thread_t thread,unsigned long flags,uint64_t value1,__unused uint64_t value2)2659 bsdthread_dispatch_apply_attr(__unused struct proc *p, thread_t thread,
2660 unsigned long flags, uint64_t value1, __unused uint64_t value2)
2661 {
2662 uint32_t apply_worker_index;
2663 kern_return_t kr;
2664
2665 switch (flags) {
2666 case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_SET:
2667 apply_worker_index = (uint32_t)value1;
2668 kr = thread_shared_rsrc_policy_set(thread, apply_worker_index, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2669 /*
2670 * KERN_INVALID_POLICY indicates that the thread was trying to bind to a
2671 * cluster which it was not eligible to execute on.
2672 */
2673 return (kr == KERN_SUCCESS) ? 0 : ((kr == KERN_INVALID_POLICY) ? ENOTSUP : EINVAL);
2674 case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_CLEAR:
2675 kr = thread_shared_rsrc_policy_clear(thread, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2676 return (kr == KERN_SUCCESS) ? 0 : EINVAL;
2677 default:
2678 return EINVAL;
2679 }
2680 }
2681
2682 #define ENSURE_UNUSED(arg) \
2683 ({ if ((arg) != 0) { return EINVAL; } })
2684
2685 int
bsdthread_ctl(struct proc * p,struct bsdthread_ctl_args * uap,int * retval)2686 bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval)
2687 {
2688 switch (uap->cmd) {
2689 case BSDTHREAD_CTL_QOS_OVERRIDE_START:
2690 return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1,
2691 (pthread_priority_t)uap->arg2, uap->arg3);
2692 case BSDTHREAD_CTL_QOS_OVERRIDE_END:
2693 ENSURE_UNUSED(uap->arg3);
2694 return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1,
2695 (user_addr_t)uap->arg2);
2696
2697 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH:
2698 return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1,
2699 (pthread_priority_t)uap->arg2, uap->arg3);
2700 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET:
2701 return workq_thread_reset_dispatch_override(p, current_thread());
2702
2703 case BSDTHREAD_CTL_SET_SELF:
2704 return bsdthread_set_self(p, current_thread(),
2705 (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2,
2706 (enum workq_set_self_flags)uap->arg3);
2707
2708 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM:
2709 ENSURE_UNUSED(uap->arg3);
2710 return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1,
2711 (unsigned long)uap->arg2, retval);
2712 case BSDTHREAD_CTL_WORKQ_ALLOW_KILL:
2713 ENSURE_UNUSED(uap->arg2);
2714 ENSURE_UNUSED(uap->arg3);
2715 return workq_thread_allow_kill(p, current_thread(), (bool)uap->arg1);
2716 case BSDTHREAD_CTL_DISPATCH_APPLY_ATTR:
2717 return bsdthread_dispatch_apply_attr(p, current_thread(),
2718 (unsigned long)uap->arg1, (uint64_t)uap->arg2,
2719 (uint64_t)uap->arg3);
2720 case BSDTHREAD_CTL_SET_QOS:
2721 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD:
2722 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET:
2723 /* no longer supported */
2724 return ENOTSUP;
2725
2726 default:
2727 return EINVAL;
2728 }
2729 }
2730
2731 #pragma mark workqueue thread manipulation
2732
2733 static void __dead2
2734 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2735 struct uthread *uth, uint32_t setup_flags);
2736
2737 static void __dead2
2738 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2739 struct uthread *uth, uint32_t setup_flags);
2740
2741 static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2;
2742
2743 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2744 static inline uint64_t
workq_trace_req_id(workq_threadreq_t req)2745 workq_trace_req_id(workq_threadreq_t req)
2746 {
2747 struct kqworkloop *kqwl;
2748 if (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2749 kqwl = __container_of(req, struct kqworkloop, kqwl_request);
2750 return kqwl->kqwl_dynamicid;
2751 }
2752
2753 return VM_KERNEL_ADDRHIDE(req);
2754 }
2755 #endif
2756
2757 /**
2758 * Entry point for libdispatch to ask for threads
2759 */
2760 static int
workq_reqthreads(struct proc * p,uint32_t reqcount,pthread_priority_t pp,bool cooperative)2761 workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp, bool cooperative)
2762 {
2763 thread_qos_t qos = _pthread_priority_thread_qos(pp);
2764 struct workqueue *wq = proc_get_wqptr(p);
2765 uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI;
2766 int ret = 0;
2767
2768 if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX ||
2769 qos == THREAD_QOS_UNSPECIFIED) {
2770 ret = EINVAL;
2771 goto exit;
2772 }
2773
2774 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE,
2775 wq, reqcount, pp, cooperative);
2776
2777 workq_threadreq_t req = zalloc(workq_zone_threadreq);
2778 priority_queue_entry_init(&req->tr_entry);
2779 req->tr_state = WORKQ_TR_STATE_NEW;
2780 req->tr_qos = qos;
2781 workq_tr_flags_t tr_flags = 0;
2782
2783 if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
2784 tr_flags |= WORKQ_TR_FLAG_OVERCOMMIT;
2785 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2786 }
2787
2788 if (cooperative) {
2789 tr_flags |= WORKQ_TR_FLAG_COOPERATIVE;
2790 upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
2791
2792 if (reqcount > 1) {
2793 ret = ENOTSUP;
2794 goto free_and_exit;
2795 }
2796 }
2797
2798 /* A thread request cannot be both overcommit and cooperative */
2799 if (workq_tr_is_cooperative(tr_flags) &&
2800 workq_tr_is_overcommit(tr_flags)) {
2801 ret = EINVAL;
2802 goto free_and_exit;
2803 }
2804 req->tr_flags = tr_flags;
2805
2806 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE,
2807 wq, workq_trace_req_id(req), req->tr_qos, reqcount);
2808
2809 workq_lock_spin(wq);
2810 do {
2811 if (_wq_exiting(wq)) {
2812 goto unlock_and_exit;
2813 }
2814
2815 /*
2816 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2817 * threads without pacing, to inform the scheduler of that workload.
2818 *
2819 * The last requests, or the ones that failed the admission checks are
2820 * enqueued and go through the regular creator codepath.
2821 *
2822 * If there aren't enough threads, add one, but re-evaluate everything
2823 * as conditions may now have changed.
2824 */
2825 unpaced = reqcount - 1;
2826
2827 if (reqcount > 1) {
2828 /* We don't handle asking for parallelism on the cooperative
2829 * workqueue just yet */
2830 assert(!workq_threadreq_is_cooperative(req));
2831
2832 if (workq_threadreq_is_nonovercommit(req)) {
2833 unpaced = workq_constrained_allowance(wq, qos, NULL, false);
2834 if (unpaced >= reqcount - 1) {
2835 unpaced = reqcount - 1;
2836 }
2837 }
2838 }
2839
2840 /*
2841 * This path does not currently handle custom workloop parameters
2842 * when creating threads for parallelism.
2843 */
2844 assert(!(req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS));
2845
2846 /*
2847 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2848 */
2849 while (unpaced > 0 && wq->wq_thidlecount) {
2850 struct uthread *uth;
2851 bool needs_wakeup;
2852 uint8_t uu_flags = UT_WORKQ_EARLY_BOUND;
2853
2854 if (workq_tr_is_overcommit(req->tr_flags)) {
2855 uu_flags |= UT_WORKQ_OVERCOMMIT;
2856 }
2857
2858 uth = workq_pop_idle_thread(wq, uu_flags, &needs_wakeup);
2859
2860 _wq_thactive_inc(wq, qos);
2861 wq->wq_thscheduled_count[_wq_bucket(qos)]++;
2862 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
2863 wq->wq_fulfilled++;
2864
2865 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
2866 uth->uu_save.uus_workq_park_data.thread_request = req;
2867 if (needs_wakeup) {
2868 workq_thread_wakeup(uth);
2869 }
2870 unpaced--;
2871 reqcount--;
2872 }
2873 } while (unpaced && wq->wq_nthreads < wq_max_threads &&
2874 workq_add_new_idle_thread(p, wq));
2875
2876 if (_wq_exiting(wq)) {
2877 goto unlock_and_exit;
2878 }
2879
2880 req->tr_count = (uint16_t)reqcount;
2881 if (workq_threadreq_enqueue(wq, req)) {
2882 /* This can drop the workqueue lock, and take it again */
2883 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
2884 }
2885 workq_unlock(wq);
2886 return 0;
2887
2888 unlock_and_exit:
2889 workq_unlock(wq);
2890 free_and_exit:
2891 zfree(workq_zone_threadreq, req);
2892 exit:
2893 return ret;
2894 }
2895
2896 bool
workq_kern_threadreq_initiate(struct proc * p,workq_threadreq_t req,struct turnstile * workloop_ts,thread_qos_t qos,workq_kern_threadreq_flags_t flags)2897 workq_kern_threadreq_initiate(struct proc *p, workq_threadreq_t req,
2898 struct turnstile *workloop_ts, thread_qos_t qos,
2899 workq_kern_threadreq_flags_t flags)
2900 {
2901 struct workqueue *wq = proc_get_wqptr_fast(p);
2902 struct uthread *uth = NULL;
2903
2904 assert(req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT));
2905
2906 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
2907 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
2908 qos = thread_workq_qos_for_pri(trp.trp_pri);
2909 if (qos == THREAD_QOS_UNSPECIFIED) {
2910 qos = WORKQ_THREAD_QOS_ABOVEUI;
2911 }
2912 }
2913
2914 assert(req->tr_state == WORKQ_TR_STATE_IDLE);
2915 priority_queue_entry_init(&req->tr_entry);
2916 req->tr_count = 1;
2917 req->tr_state = WORKQ_TR_STATE_NEW;
2918 req->tr_qos = qos;
2919
2920 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq,
2921 workq_trace_req_id(req), qos, 1);
2922
2923 if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) {
2924 /*
2925 * we're called back synchronously from the context of
2926 * kqueue_threadreq_unbind from within workq_thread_return()
2927 * we can try to match up this thread with this request !
2928 */
2929 uth = current_uthread();
2930 assert(uth->uu_kqr_bound == NULL);
2931 }
2932
2933 workq_lock_spin(wq);
2934 if (_wq_exiting(wq)) {
2935 req->tr_state = WORKQ_TR_STATE_IDLE;
2936 workq_unlock(wq);
2937 return false;
2938 }
2939
2940 if (uth && workq_threadreq_admissible(wq, uth, req)) {
2941 /* This is the case of the rebind - we were about to park and unbind
2942 * when more events came so keep the binding.
2943 */
2944 assert(uth != wq->wq_creator);
2945
2946 if (uth->uu_workq_pri.qos_bucket != req->tr_qos) {
2947 _wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos);
2948 workq_thread_reset_pri(wq, uth, req, /*unpark*/ false);
2949 }
2950 /*
2951 * We're called from workq_kern_threadreq_initiate()
2952 * due to an unbind, with the kq req held.
2953 */
2954 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
2955 workq_trace_req_id(req), req->tr_flags, 0);
2956 wq->wq_fulfilled++;
2957
2958 kqueue_threadreq_bind(p, req, get_machthread(uth), 0);
2959 } else {
2960 if (workloop_ts) {
2961 workq_perform_turnstile_operation_locked(wq, ^{
2962 turnstile_update_inheritor(workloop_ts, wq->wq_turnstile,
2963 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
2964 turnstile_update_inheritor_complete(workloop_ts,
2965 TURNSTILE_INTERLOCK_HELD);
2966 });
2967 }
2968
2969 bool reevaluate_creator_thread_group = false;
2970 #if CONFIG_PREADOPT_TG
2971 reevaluate_creator_thread_group = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
2972 #endif
2973 /* We enqueued the highest priority item or we may need to reevaluate if
2974 * the creator needs a thread group pre-adoption */
2975 if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_thread_group) {
2976 workq_schedule_creator(p, wq, flags);
2977 }
2978 }
2979
2980 workq_unlock(wq);
2981
2982 return true;
2983 }
2984
2985 void
workq_kern_threadreq_modify(struct proc * p,workq_threadreq_t req,thread_qos_t qos,workq_kern_threadreq_flags_t flags)2986 workq_kern_threadreq_modify(struct proc *p, workq_threadreq_t req,
2987 thread_qos_t qos, workq_kern_threadreq_flags_t flags)
2988 {
2989 struct workqueue *wq = proc_get_wqptr_fast(p);
2990 bool make_overcommit = false;
2991
2992 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
2993 /* Requests outside-of-QoS shouldn't accept modify operations */
2994 return;
2995 }
2996
2997 workq_lock_spin(wq);
2998
2999 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3000 assert(req->tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP));
3001
3002 if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3003 kqueue_threadreq_bind(p, req, req->tr_thread, 0);
3004 workq_unlock(wq);
3005 return;
3006 }
3007
3008 if (flags & WORKQ_THREADREQ_MAKE_OVERCOMMIT) {
3009 /* TODO (rokhinip): We come into this code path for kqwl thread
3010 * requests. kqwl requests cannot be cooperative.
3011 */
3012 assert(!workq_threadreq_is_cooperative(req));
3013
3014 make_overcommit = workq_threadreq_is_nonovercommit(req);
3015 }
3016
3017 if (_wq_exiting(wq) || (req->tr_qos == qos && !make_overcommit)) {
3018 workq_unlock(wq);
3019 return;
3020 }
3021
3022 assert(req->tr_count == 1);
3023 if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3024 panic("Invalid thread request (%p) state %d", req, req->tr_state);
3025 }
3026
3027 WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq,
3028 workq_trace_req_id(req), qos, 0);
3029
3030 struct priority_queue_sched_max *pq = workq_priority_queue_for_req(wq, req);
3031 workq_threadreq_t req_max;
3032
3033 /*
3034 * Stage 1: Dequeue the request from its priority queue.
3035 *
3036 * If we dequeue the root item of the constrained priority queue,
3037 * maintain the best constrained request qos invariant.
3038 */
3039 if (priority_queue_remove(pq, &req->tr_entry)) {
3040 if (workq_threadreq_is_nonovercommit(req)) {
3041 _wq_thactive_refresh_best_constrained_req_qos(wq);
3042 }
3043 }
3044
3045 /*
3046 * Stage 2: Apply changes to the thread request
3047 *
3048 * If the item will not become the root of the priority queue it belongs to,
3049 * then we need to wait in line, just enqueue and return quickly.
3050 */
3051 if (__improbable(make_overcommit)) {
3052 req->tr_flags ^= WORKQ_TR_FLAG_OVERCOMMIT;
3053 pq = workq_priority_queue_for_req(wq, req);
3054 }
3055 req->tr_qos = qos;
3056
3057 req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry);
3058 if (req_max && req_max->tr_qos >= qos) {
3059 priority_queue_entry_set_sched_pri(pq, &req->tr_entry,
3060 workq_priority_for_req(req), false);
3061 priority_queue_insert(pq, &req->tr_entry);
3062 workq_unlock(wq);
3063 return;
3064 }
3065
3066 /*
3067 * Stage 3: Reevaluate whether we should run the thread request.
3068 *
3069 * Pretend the thread request is new again:
3070 * - adjust wq_reqcount to not count it anymore.
3071 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
3072 * properly attempts a synchronous bind)
3073 */
3074 wq->wq_reqcount--;
3075 req->tr_state = WORKQ_TR_STATE_NEW;
3076
3077 /* We enqueued the highest priority item or we may need to reevaluate if
3078 * the creator needs a thread group pre-adoption if the request got a new TG */
3079 bool reevaluate_creator_tg = false;
3080
3081 #if CONFIG_PREADOPT_TG
3082 reevaluate_creator_tg = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
3083 #endif
3084
3085 if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_tg) {
3086 workq_schedule_creator(p, wq, flags);
3087 }
3088 workq_unlock(wq);
3089 }
3090
3091 void
workq_kern_threadreq_lock(struct proc * p)3092 workq_kern_threadreq_lock(struct proc *p)
3093 {
3094 workq_lock_spin(proc_get_wqptr_fast(p));
3095 }
3096
3097 void
workq_kern_threadreq_unlock(struct proc * p)3098 workq_kern_threadreq_unlock(struct proc *p)
3099 {
3100 workq_unlock(proc_get_wqptr_fast(p));
3101 }
3102
3103 void
workq_kern_threadreq_update_inheritor(struct proc * p,workq_threadreq_t req,thread_t owner,struct turnstile * wl_ts,turnstile_update_flags_t flags)3104 workq_kern_threadreq_update_inheritor(struct proc *p, workq_threadreq_t req,
3105 thread_t owner, struct turnstile *wl_ts,
3106 turnstile_update_flags_t flags)
3107 {
3108 struct workqueue *wq = proc_get_wqptr_fast(p);
3109 turnstile_inheritor_t inheritor;
3110
3111 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3112 assert(req->tr_flags & WORKQ_TR_FLAG_WORKLOOP);
3113 workq_lock_held(wq);
3114
3115 if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3116 kqueue_threadreq_bind(p, req, req->tr_thread,
3117 KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE);
3118 return;
3119 }
3120
3121 if (_wq_exiting(wq)) {
3122 inheritor = TURNSTILE_INHERITOR_NULL;
3123 } else {
3124 if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3125 panic("Invalid thread request (%p) state %d", req, req->tr_state);
3126 }
3127
3128 if (owner) {
3129 inheritor = owner;
3130 flags |= TURNSTILE_INHERITOR_THREAD;
3131 } else {
3132 inheritor = wq->wq_turnstile;
3133 flags |= TURNSTILE_INHERITOR_TURNSTILE;
3134 }
3135 }
3136
3137 workq_perform_turnstile_operation_locked(wq, ^{
3138 turnstile_update_inheritor(wl_ts, inheritor, flags);
3139 });
3140 }
3141
3142 void
workq_kern_threadreq_redrive(struct proc * p,workq_kern_threadreq_flags_t flags)3143 workq_kern_threadreq_redrive(struct proc *p, workq_kern_threadreq_flags_t flags)
3144 {
3145 struct workqueue *wq = proc_get_wqptr_fast(p);
3146
3147 workq_lock_spin(wq);
3148 workq_schedule_creator(p, wq, flags);
3149 workq_unlock(wq);
3150 }
3151
3152 /*
3153 * Always called at AST by the thread on itself
3154 *
3155 * Upon quantum expiry, the workqueue subsystem evaluates its state and decides
3156 * on what the thread should do next. The TSD value is always set by the thread
3157 * on itself in the kernel and cleared either by userspace when it acks the TSD
3158 * value and takes action, or by the thread in the kernel when the quantum
3159 * expires again.
3160 */
3161 void
workq_kern_quantum_expiry_reevaluate(proc_t proc,thread_t thread)3162 workq_kern_quantum_expiry_reevaluate(proc_t proc, thread_t thread)
3163 {
3164 struct uthread *uth = get_bsdthread_info(thread);
3165
3166 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3167 return;
3168 }
3169
3170 if (!thread_supports_cooperative_workqueue(thread)) {
3171 panic("Quantum expired for thread that doesn't support cooperative workqueue");
3172 }
3173
3174 thread_qos_t qos = uth->uu_workq_pri.qos_bucket;
3175 if (qos == THREAD_QOS_UNSPECIFIED) {
3176 panic("Thread should not have workq bucket of QoS UN");
3177 }
3178
3179 assert(thread_has_expired_workqueue_quantum(thread, false));
3180
3181 struct workqueue *wq = proc_get_wqptr(proc);
3182 assert(wq != NULL);
3183
3184 /*
3185 * For starters, we're just going to evaluate and see if we need to narrow
3186 * the pool and tell this thread to park if needed. In the future, we'll
3187 * evaluate and convey other workqueue state information like needing to
3188 * pump kevents, etc.
3189 */
3190 uint64_t flags = 0;
3191
3192 workq_lock_spin(wq);
3193
3194 if (workq_thread_is_cooperative(uth)) {
3195 if (!workq_cooperative_allowance(wq, qos, uth, false)) {
3196 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3197 } else {
3198 /* In the future, when we have kevent hookups for the cooperative
3199 * pool, we need fancier logic for what userspace should do. But
3200 * right now, only userspace thread requests exist - so we'll just
3201 * tell userspace to shuffle work items */
3202 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_SHUFFLE;
3203 }
3204 } else if (workq_thread_is_nonovercommit(uth)) {
3205 if (!workq_constrained_allowance(wq, qos, uth, false)) {
3206 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3207 }
3208 }
3209 workq_unlock(wq);
3210
3211 WQ_TRACE(TRACE_wq_quantum_expiry_reevaluate, flags, 0, 0, 0);
3212
3213 kevent_set_workq_quantum_expiry_user_tsd(proc, thread, flags);
3214
3215 /* We have conveyed to userspace about what it needs to do upon quantum
3216 * expiry, now rearm the workqueue quantum again */
3217 thread_arm_workqueue_quantum(get_machthread(uth));
3218 }
3219
3220 void
workq_schedule_creator_turnstile_redrive(struct workqueue * wq,bool locked)3221 workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked)
3222 {
3223 if (locked) {
3224 workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_NONE);
3225 } else {
3226 workq_schedule_immediate_thread_creation(wq);
3227 }
3228 }
3229
3230 static int
workq_thread_return(struct proc * p,struct workq_kernreturn_args * uap,struct workqueue * wq)3231 workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap,
3232 struct workqueue *wq)
3233 {
3234 thread_t th = current_thread();
3235 struct uthread *uth = get_bsdthread_info(th);
3236 workq_threadreq_t kqr = uth->uu_kqr_bound;
3237 workq_threadreq_param_t trp = { };
3238 int nevents = uap->affinity, error;
3239 user_addr_t eventlist = uap->item;
3240
3241 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3242 (uth->uu_workq_flags & UT_WORKQ_DYING)) {
3243 return EINVAL;
3244 }
3245
3246 if (eventlist && nevents && kqr == NULL) {
3247 return EINVAL;
3248 }
3249
3250 /* reset signal mask on the workqueue thread to default state */
3251 if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) {
3252 proc_lock(p);
3253 uth->uu_sigmask = ~workq_threadmask;
3254 proc_unlock(p);
3255 }
3256
3257 if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
3258 /*
3259 * Ensure we store the threadreq param before unbinding
3260 * the kqr from this thread.
3261 */
3262 trp = kqueue_threadreq_workloop_param(kqr);
3263 }
3264
3265 /*
3266 * Freeze the base pri while we decide the fate of this thread.
3267 *
3268 * Either:
3269 * - we return to user and kevent_cleanup will have unfrozen the base pri,
3270 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will.
3271 */
3272 thread_freeze_base_pri(th);
3273
3274 if (kqr) {
3275 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE;
3276 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
3277 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
3278 } else {
3279 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
3280 }
3281 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
3282 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
3283 } else {
3284 if (workq_thread_is_overcommit(uth)) {
3285 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
3286 }
3287 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
3288 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
3289 } else {
3290 upcall_flags |= uth->uu_workq_pri.qos_req |
3291 WQ_FLAG_THREAD_PRIO_QOS;
3292 }
3293 }
3294 error = pthread_functions->workq_handle_stack_events(p, th,
3295 get_task_map(p->task), uth->uu_workq_stackaddr,
3296 uth->uu_workq_thport, eventlist, nevents, upcall_flags);
3297 if (error) {
3298 assert(uth->uu_kqr_bound == kqr);
3299 return error;
3300 }
3301
3302 // pthread is supposed to pass KEVENT_FLAG_PARKING here
3303 // which should cause the above call to either:
3304 // - not return
3305 // - return an error
3306 // - return 0 and have unbound properly
3307 assert(uth->uu_kqr_bound == NULL);
3308 }
3309
3310 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0);
3311
3312 thread_sched_call(th, NULL);
3313 thread_will_park_or_terminate(th);
3314 #if CONFIG_WORKLOOP_DEBUG
3315 UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, });
3316 #endif
3317
3318 workq_lock_spin(wq);
3319 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3320 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
3321 workq_select_threadreq_or_park_and_unlock(p, wq, uth,
3322 WQ_SETUP_CLEAR_VOUCHER);
3323 __builtin_unreachable();
3324 }
3325
3326 /**
3327 * Multiplexed call to interact with the workqueue mechanism
3328 */
3329 int
workq_kernreturn(struct proc * p,struct workq_kernreturn_args * uap,int32_t * retval)3330 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval)
3331 {
3332 int options = uap->options;
3333 int arg2 = uap->affinity;
3334 int arg3 = uap->prio;
3335 struct workqueue *wq = proc_get_wqptr(p);
3336 int error = 0;
3337
3338 if ((p->p_lflag & P_LREGISTER) == 0) {
3339 return EINVAL;
3340 }
3341
3342 switch (options) {
3343 case WQOPS_QUEUE_NEWSPISUPP: {
3344 /*
3345 * arg2 = offset of serialno into dispatch queue
3346 * arg3 = kevent support
3347 */
3348 int offset = arg2;
3349 if (arg3 & 0x01) {
3350 // If we get here, then userspace has indicated support for kevent delivery.
3351 }
3352
3353 p->p_dispatchqueue_serialno_offset = (uint64_t)offset;
3354 break;
3355 }
3356 case WQOPS_QUEUE_REQTHREADS: {
3357 /*
3358 * arg2 = number of threads to start
3359 * arg3 = priority
3360 */
3361 error = workq_reqthreads(p, arg2, arg3, false);
3362 break;
3363 }
3364 /* For requesting threads for the cooperative pool */
3365 case WQOPS_QUEUE_REQTHREADS2: {
3366 /*
3367 * arg2 = number of threads to start
3368 * arg3 = priority
3369 */
3370 error = workq_reqthreads(p, arg2, arg3, true);
3371 break;
3372 }
3373 case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
3374 /*
3375 * arg2 = priority for the manager thread
3376 *
3377 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
3378 * the low bits of the value contains a scheduling priority
3379 * instead of a QOS value
3380 */
3381 pthread_priority_t pri = arg2;
3382
3383 if (wq == NULL) {
3384 error = EINVAL;
3385 break;
3386 }
3387
3388 /*
3389 * Normalize the incoming priority so that it is ordered numerically.
3390 */
3391 if (pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
3392 pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK |
3393 _PTHREAD_PRIORITY_SCHED_PRI_FLAG);
3394 } else {
3395 thread_qos_t qos = _pthread_priority_thread_qos(pri);
3396 int relpri = _pthread_priority_relpri(pri);
3397 if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE ||
3398 qos == THREAD_QOS_UNSPECIFIED) {
3399 error = EINVAL;
3400 break;
3401 }
3402 pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3403 }
3404
3405 /*
3406 * If userspace passes a scheduling priority, that wins over any QoS.
3407 * Userspace should takes care not to lower the priority this way.
3408 */
3409 workq_lock_spin(wq);
3410 if (wq->wq_event_manager_priority < (uint32_t)pri) {
3411 wq->wq_event_manager_priority = (uint32_t)pri;
3412 }
3413 workq_unlock(wq);
3414 break;
3415 }
3416 case WQOPS_THREAD_KEVENT_RETURN:
3417 case WQOPS_THREAD_WORKLOOP_RETURN:
3418 case WQOPS_THREAD_RETURN: {
3419 error = workq_thread_return(p, uap, wq);
3420 break;
3421 }
3422
3423 case WQOPS_SHOULD_NARROW: {
3424 /*
3425 * arg2 = priority to test
3426 * arg3 = unused
3427 */
3428 thread_t th = current_thread();
3429 struct uthread *uth = get_bsdthread_info(th);
3430 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3431 (uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) {
3432 error = EINVAL;
3433 break;
3434 }
3435
3436 thread_qos_t qos = _pthread_priority_thread_qos(arg2);
3437 if (qos == THREAD_QOS_UNSPECIFIED) {
3438 error = EINVAL;
3439 break;
3440 }
3441 workq_lock_spin(wq);
3442 bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false);
3443 workq_unlock(wq);
3444
3445 *retval = should_narrow;
3446 break;
3447 }
3448 case WQOPS_SETUP_DISPATCH: {
3449 /*
3450 * item = pointer to workq_dispatch_config structure
3451 * arg2 = sizeof(item)
3452 */
3453 struct workq_dispatch_config cfg;
3454 bzero(&cfg, sizeof(cfg));
3455
3456 error = copyin(uap->item, &cfg, MIN(sizeof(cfg), (unsigned long) arg2));
3457 if (error) {
3458 break;
3459 }
3460
3461 if (cfg.wdc_flags & ~WORKQ_DISPATCH_SUPPORTED_FLAGS ||
3462 cfg.wdc_version < WORKQ_DISPATCH_MIN_SUPPORTED_VERSION) {
3463 error = ENOTSUP;
3464 break;
3465 }
3466
3467 /* Load fields from version 1 */
3468 p->p_dispatchqueue_serialno_offset = cfg.wdc_queue_serialno_offs;
3469
3470 /* Load fields from version 2 */
3471 if (cfg.wdc_version >= 2) {
3472 p->p_dispatchqueue_label_offset = cfg.wdc_queue_label_offs;
3473 }
3474
3475 break;
3476 }
3477 default:
3478 error = EINVAL;
3479 break;
3480 }
3481
3482 return error;
3483 }
3484
3485 /*
3486 * We have no work to do, park ourselves on the idle list.
3487 *
3488 * Consumes the workqueue lock and does not return.
3489 */
3490 __attribute__((noreturn, noinline))
3491 static void
workq_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)3492 workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth,
3493 uint32_t setup_flags)
3494 {
3495 assert(uth == current_uthread());
3496 assert(uth->uu_kqr_bound == NULL);
3497 workq_push_idle_thread(p, wq, uth, setup_flags); // may not return
3498
3499 workq_thread_reset_cpupercent(NULL, uth);
3500
3501 #if CONFIG_PREADOPT_TG
3502 /* Clear the preadoption thread group on the thread.
3503 *
3504 * Case 1:
3505 * Creator thread which never picked up a thread request. We set a
3506 * preadoption thread group on creator threads but if it never picked
3507 * up a thread request and didn't go to userspace, then the thread will
3508 * park with a preadoption thread group but no explicitly adopted
3509 * voucher or work interval.
3510 *
3511 * We drop the preadoption thread group here before proceeding to park.
3512 * Note - we may get preempted when we drop the workq lock below.
3513 *
3514 * Case 2:
3515 * Thread picked up a thread request and bound to it and returned back
3516 * from userspace and is parking. At this point, preadoption thread
3517 * group should be NULL since the thread has unbound from the thread
3518 * request. So this operation should be a no-op.
3519 */
3520 thread_set_preadopt_thread_group(get_machthread(uth), NULL);
3521 #endif
3522
3523 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) &&
3524 !(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3525 workq_unlock(wq);
3526
3527 /*
3528 * workq_push_idle_thread() will unset `has_stack`
3529 * if it wants us to free the stack before parking.
3530 */
3531 if (!uth->uu_save.uus_workq_park_data.has_stack) {
3532 pthread_functions->workq_markfree_threadstack(p,
3533 get_machthread(uth), get_task_map(p->task),
3534 uth->uu_workq_stackaddr);
3535 }
3536
3537 /*
3538 * When we remove the voucher from the thread, we may lose our importance
3539 * causing us to get preempted, so we do this after putting the thread on
3540 * the idle list. Then, when we get our importance back we'll be able to
3541 * use this thread from e.g. the kevent call out to deliver a boosting
3542 * message.
3543 *
3544 * Note that setting the voucher to NULL will not clear the preadoption
3545 * thread since this thread could have become the creator again and
3546 * perhaps acquired a preadoption thread group.
3547 */
3548 __assert_only kern_return_t kr;
3549 kr = thread_set_voucher_name(MACH_PORT_NULL);
3550 assert(kr == KERN_SUCCESS);
3551
3552 workq_lock_spin(wq);
3553 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
3554 setup_flags &= ~WQ_SETUP_CLEAR_VOUCHER;
3555 }
3556
3557 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3558
3559 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
3560 /*
3561 * While we'd dropped the lock to unset our voucher, someone came
3562 * around and made us runnable. But because we weren't waiting on the
3563 * event their thread_wakeup() was ineffectual. To correct for that,
3564 * we just run the continuation ourselves.
3565 */
3566 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
3567 __builtin_unreachable();
3568 }
3569
3570 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3571 workq_unpark_for_death_and_unlock(p, wq, uth,
3572 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
3573 __builtin_unreachable();
3574 }
3575
3576 /* Disarm the workqueue quantum since the thread is now idle */
3577 thread_disarm_workqueue_quantum(get_machthread(uth));
3578
3579 thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue);
3580 assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
3581 workq_unlock(wq);
3582 thread_block(workq_unpark_continue);
3583 __builtin_unreachable();
3584 }
3585
3586 static inline bool
workq_may_start_event_mgr_thread(struct workqueue * wq,struct uthread * uth)3587 workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth)
3588 {
3589 /*
3590 * There's an event manager request and either:
3591 * - no event manager currently running
3592 * - we are re-using the event manager
3593 */
3594 return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 ||
3595 (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER);
3596 }
3597
3598 static uint32_t
workq_constrained_allowance(struct workqueue * wq,thread_qos_t at_qos,struct uthread * uth,bool may_start_timer)3599 workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos,
3600 struct uthread *uth, bool may_start_timer)
3601 {
3602 assert(at_qos != WORKQ_THREAD_QOS_MANAGER);
3603 uint32_t count = 0;
3604
3605 uint32_t max_count = wq->wq_constrained_threads_scheduled;
3606 if (uth && workq_thread_is_nonovercommit(uth)) {
3607 /*
3608 * don't count the current thread as scheduled
3609 */
3610 assert(max_count > 0);
3611 max_count--;
3612 }
3613 if (max_count >= wq_max_constrained_threads) {
3614 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1,
3615 wq->wq_constrained_threads_scheduled,
3616 wq_max_constrained_threads);
3617 /*
3618 * we need 1 or more constrained threads to return to the kernel before
3619 * we can dispatch additional work
3620 */
3621 return 0;
3622 }
3623 max_count -= wq_max_constrained_threads;
3624
3625 /*
3626 * Compute a metric for many how many threads are active. We find the
3627 * highest priority request outstanding and then add up the number of active
3628 * threads in that and all higher-priority buckets. We'll also add any
3629 * "busy" threads which are not currently active but blocked recently enough
3630 * that we can't be sure that they won't be unblocked soon and start
3631 * being active again.
3632 *
3633 * We'll then compare this metric to our max concurrency to decide whether
3634 * to add a new thread.
3635 */
3636
3637 uint32_t busycount, thactive_count;
3638
3639 thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq),
3640 at_qos, &busycount, NULL);
3641
3642 if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER &&
3643 at_qos <= uth->uu_workq_pri.qos_bucket) {
3644 /*
3645 * Don't count this thread as currently active, but only if it's not
3646 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
3647 * managers.
3648 */
3649 assert(thactive_count > 0);
3650 thactive_count--;
3651 }
3652
3653 count = wq_max_parallelism[_wq_bucket(at_qos)];
3654 if (count > thactive_count + busycount) {
3655 count -= thactive_count + busycount;
3656 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2,
3657 thactive_count, busycount);
3658 return MIN(count, max_count);
3659 } else {
3660 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3,
3661 thactive_count, busycount);
3662 }
3663
3664 if (may_start_timer) {
3665 /*
3666 * If this is called from the add timer, we won't have another timer
3667 * fire when the thread exits the "busy" state, so rearm the timer.
3668 */
3669 workq_schedule_delayed_thread_creation(wq, 0);
3670 }
3671
3672 return 0;
3673 }
3674
3675 static bool
workq_threadreq_admissible(struct workqueue * wq,struct uthread * uth,workq_threadreq_t req)3676 workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
3677 workq_threadreq_t req)
3678 {
3679 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
3680 return workq_may_start_event_mgr_thread(wq, uth);
3681 }
3682 if (workq_threadreq_is_cooperative(req)) {
3683 return workq_cooperative_allowance(wq, req->tr_qos, uth, true);
3684 }
3685 if (workq_threadreq_is_nonovercommit(req)) {
3686 return workq_constrained_allowance(wq, req->tr_qos, uth, true);
3687 }
3688
3689 return true;
3690 }
3691
3692 /*
3693 * Called from the context of selecting thread requests for threads returning
3694 * from userspace or creator thread
3695 */
3696 static workq_threadreq_t
workq_cooperative_queue_best_req(struct workqueue * wq,struct uthread * uth)3697 workq_cooperative_queue_best_req(struct workqueue *wq, struct uthread *uth)
3698 {
3699 workq_lock_held(wq);
3700
3701 /*
3702 * If the current thread is cooperative, we need to exclude it as part of
3703 * cooperative schedule count since this thread is looking for a new
3704 * request. Change in the schedule count for cooperative pool therefore
3705 * requires us to reeevaluate the next best request for it.
3706 */
3707 if (uth && workq_thread_is_cooperative(uth)) {
3708 _wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_bucket);
3709
3710 (void) _wq_cooperative_queue_refresh_best_req_qos(wq);
3711
3712 _wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_bucket);
3713 } else {
3714 /*
3715 * The old value that was already precomputed should be safe to use -
3716 * add an assert that asserts that the best req QoS doesn't change in
3717 * this case
3718 */
3719 assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
3720 }
3721
3722 thread_qos_t qos = wq->wq_cooperative_queue_best_req_qos;
3723
3724 /* There are no eligible requests in the cooperative pool */
3725 if (qos == THREAD_QOS_UNSPECIFIED) {
3726 return NULL;
3727 }
3728 assert(qos != WORKQ_THREAD_QOS_ABOVEUI);
3729 assert(qos != WORKQ_THREAD_QOS_MANAGER);
3730
3731 int bucket = _wq_bucket(qos);
3732 assert(!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket]));
3733
3734 return STAILQ_FIRST(&wq->wq_cooperative_queue[bucket]);
3735 }
3736
3737 static workq_threadreq_t
workq_threadreq_select_for_creator(struct workqueue * wq)3738 workq_threadreq_select_for_creator(struct workqueue *wq)
3739 {
3740 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
3741 thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
3742 uint8_t pri = 0;
3743
3744 /*
3745 * Compute the best priority request, and ignore the turnstile for now
3746 */
3747
3748 req_pri = priority_queue_max(&wq->wq_special_queue,
3749 struct workq_threadreq_s, tr_entry);
3750 if (req_pri) {
3751 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
3752 &req_pri->tr_entry);
3753 }
3754
3755 /*
3756 * Handle the manager thread request. The special queue might yield
3757 * a higher priority, but the manager always beats the QoS world.
3758 */
3759
3760 req_mgr = wq->wq_event_manager_threadreq;
3761 if (req_mgr && workq_may_start_event_mgr_thread(wq, NULL)) {
3762 uint32_t mgr_pri = wq->wq_event_manager_priority;
3763
3764 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
3765 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
3766 } else {
3767 mgr_pri = thread_workq_pri_for_qos(
3768 _pthread_priority_thread_qos(mgr_pri));
3769 }
3770
3771 return mgr_pri >= pri ? req_mgr : req_pri;
3772 }
3773
3774 /*
3775 * Compute the best QoS Request, and check whether it beats the "pri" one
3776 *
3777 * Start by comparing the overcommit and the cooperative pool
3778 */
3779 req_qos = priority_queue_max(&wq->wq_overcommit_queue,
3780 struct workq_threadreq_s, tr_entry);
3781 if (req_qos) {
3782 qos = req_qos->tr_qos;
3783 }
3784
3785 req_tmp = workq_cooperative_queue_best_req(wq, NULL);
3786 if (req_tmp && qos <= req_tmp->tr_qos) {
3787 /*
3788 * Cooperative TR is better between overcommit and cooperative. Note
3789 * that if qos is same between overcommit and cooperative, we choose
3790 * cooperative.
3791 *
3792 * Pick cooperative pool if it passes the admissions check
3793 */
3794 if (workq_cooperative_allowance(wq, req_tmp->tr_qos, NULL, true)) {
3795 req_qos = req_tmp;
3796 qos = req_qos->tr_qos;
3797 }
3798 }
3799
3800 /*
3801 * Compare the best QoS so far - either from overcommit or from cooperative
3802 * pool - and compare it with the constrained pool
3803 */
3804 req_tmp = priority_queue_max(&wq->wq_constrained_queue,
3805 struct workq_threadreq_s, tr_entry);
3806
3807 if (req_tmp && qos < req_tmp->tr_qos) {
3808 /*
3809 * Constrained pool is best in QoS between overcommit, cooperative
3810 * and constrained. Now check how it fairs against the priority case
3811 */
3812 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
3813 return req_pri;
3814 }
3815
3816 if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true)) {
3817 /*
3818 * If the constrained thread request is the best one and passes
3819 * the admission check, pick it.
3820 */
3821 return req_tmp;
3822 }
3823 }
3824
3825 /*
3826 * Compare the best of the QoS world with the priority
3827 */
3828 if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
3829 return req_pri;
3830 }
3831
3832 if (req_qos) {
3833 return req_qos;
3834 }
3835
3836 /*
3837 * If we had no eligible request but we have a turnstile push,
3838 * it must be a non overcommit thread request that failed
3839 * the admission check.
3840 *
3841 * Just fake a BG thread request so that if the push stops the creator
3842 * priority just drops to 4.
3843 */
3844 if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) {
3845 static struct workq_threadreq_s workq_sync_push_fake_req = {
3846 .tr_qos = THREAD_QOS_BACKGROUND,
3847 };
3848
3849 return &workq_sync_push_fake_req;
3850 }
3851
3852 return NULL;
3853 }
3854
3855 /*
3856 * Returns true if this caused a change in the schedule counts of the
3857 * cooperative pool
3858 */
3859 static bool
workq_adjust_cooperative_constrained_schedule_counts(struct workqueue * wq,struct uthread * uth,thread_qos_t old_thread_qos,workq_tr_flags_t tr_flags)3860 workq_adjust_cooperative_constrained_schedule_counts(struct workqueue *wq,
3861 struct uthread *uth, thread_qos_t old_thread_qos, workq_tr_flags_t tr_flags)
3862 {
3863 workq_lock_held(wq);
3864
3865 /*
3866 * Row: thread type
3867 * Column: Request type
3868 *
3869 * overcommit non-overcommit cooperative
3870 * overcommit X case 1 case 2
3871 * cooperative case 3 case 4 case 5
3872 * non-overcommit case 6 X case 7
3873 *
3874 * Move the thread to the right bucket depending on what state it currently
3875 * has and what state the thread req it picks, is going to have.
3876 *
3877 * Note that the creator thread is an overcommit thread.
3878 */
3879 thread_qos_t new_thread_qos = uth->uu_workq_pri.qos_bucket;
3880
3881 /*
3882 * Anytime a cooperative bucket's schedule count changes, we need to
3883 * potentially refresh the next best QoS for that pool when we determine
3884 * the next request for the creator
3885 */
3886 bool cooperative_pool_sched_count_changed = false;
3887
3888 if (workq_thread_is_overcommit(uth)) {
3889 if (workq_tr_is_nonovercommit(tr_flags)) {
3890 // Case 1: thread is overcommit, req is non-overcommit
3891 wq->wq_constrained_threads_scheduled++;
3892 } else if (workq_tr_is_cooperative(tr_flags)) {
3893 // Case 2: thread is overcommit, req is cooperative
3894 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
3895 cooperative_pool_sched_count_changed = true;
3896 }
3897 } else if (workq_thread_is_cooperative(uth)) {
3898 if (workq_tr_is_overcommit(tr_flags)) {
3899 // Case 3: thread is cooperative, req is overcommit
3900 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3901 } else if (workq_tr_is_nonovercommit(tr_flags)) {
3902 // Case 4: thread is cooperative, req is non-overcommit
3903 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3904 wq->wq_constrained_threads_scheduled++;
3905 } else {
3906 // Case 5: thread is cooperative, req is also cooperative
3907 assert(workq_tr_is_cooperative(tr_flags));
3908 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3909 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
3910 }
3911 cooperative_pool_sched_count_changed = true;
3912 } else {
3913 if (workq_tr_is_overcommit(tr_flags)) {
3914 // Case 6: Thread is non-overcommit, req is overcommit
3915 wq->wq_constrained_threads_scheduled--;
3916 } else if (workq_tr_is_cooperative(tr_flags)) {
3917 // Case 7: Thread is non-overcommit, req is cooperative
3918 wq->wq_constrained_threads_scheduled--;
3919 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
3920 cooperative_pool_sched_count_changed = true;
3921 }
3922 }
3923
3924 return cooperative_pool_sched_count_changed;
3925 }
3926
3927 static workq_threadreq_t
workq_threadreq_select(struct workqueue * wq,struct uthread * uth)3928 workq_threadreq_select(struct workqueue *wq, struct uthread *uth)
3929 {
3930 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
3931 uintptr_t proprietor;
3932 thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
3933 uint8_t pri = 0;
3934
3935 if (uth == wq->wq_creator) {
3936 uth = NULL;
3937 }
3938
3939 /*
3940 * Compute the best priority request (special or turnstile)
3941 */
3942
3943 pri = (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile,
3944 &proprietor);
3945 if (pri) {
3946 struct kqworkloop *kqwl = (struct kqworkloop *)proprietor;
3947 req_pri = &kqwl->kqwl_request;
3948 if (req_pri->tr_state != WORKQ_TR_STATE_QUEUED) {
3949 panic("Invalid thread request (%p) state %d",
3950 req_pri, req_pri->tr_state);
3951 }
3952 } else {
3953 req_pri = NULL;
3954 }
3955
3956 req_tmp = priority_queue_max(&wq->wq_special_queue,
3957 struct workq_threadreq_s, tr_entry);
3958 if (req_tmp && pri < priority_queue_entry_sched_pri(&wq->wq_special_queue,
3959 &req_tmp->tr_entry)) {
3960 req_pri = req_tmp;
3961 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
3962 &req_tmp->tr_entry);
3963 }
3964
3965 /*
3966 * Handle the manager thread request. The special queue might yield
3967 * a higher priority, but the manager always beats the QoS world.
3968 */
3969
3970 req_mgr = wq->wq_event_manager_threadreq;
3971 if (req_mgr && workq_may_start_event_mgr_thread(wq, uth)) {
3972 uint32_t mgr_pri = wq->wq_event_manager_priority;
3973
3974 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
3975 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
3976 } else {
3977 mgr_pri = thread_workq_pri_for_qos(
3978 _pthread_priority_thread_qos(mgr_pri));
3979 }
3980
3981 return mgr_pri >= pri ? req_mgr : req_pri;
3982 }
3983
3984 /*
3985 * Compute the best QoS Request, and check whether it beats the "pri" one
3986 */
3987
3988 req_qos = priority_queue_max(&wq->wq_overcommit_queue,
3989 struct workq_threadreq_s, tr_entry);
3990 if (req_qos) {
3991 qos = req_qos->tr_qos;
3992 }
3993
3994 req_tmp = workq_cooperative_queue_best_req(wq, uth);
3995 if (req_tmp && qos <= req_tmp->tr_qos) {
3996 /*
3997 * Cooperative TR is better between overcommit and cooperative. Note
3998 * that if qos is same between overcommit and cooperative, we choose
3999 * cooperative.
4000 *
4001 * Pick cooperative pool if it passes the admissions check
4002 */
4003 if (workq_cooperative_allowance(wq, req_tmp->tr_qos, uth, true)) {
4004 req_qos = req_tmp;
4005 qos = req_qos->tr_qos;
4006 }
4007 }
4008
4009 /*
4010 * Compare the best QoS so far - either from overcommit or from cooperative
4011 * pool - and compare it with the constrained pool
4012 */
4013 req_tmp = priority_queue_max(&wq->wq_constrained_queue,
4014 struct workq_threadreq_s, tr_entry);
4015
4016 if (req_tmp && qos < req_tmp->tr_qos) {
4017 /*
4018 * Constrained pool is best in QoS between overcommit, cooperative
4019 * and constrained. Now check how it fairs against the priority case
4020 */
4021 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
4022 return req_pri;
4023 }
4024
4025 if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true)) {
4026 /*
4027 * If the constrained thread request is the best one and passes
4028 * the admission check, pick it.
4029 */
4030 return req_tmp;
4031 }
4032 }
4033
4034 if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
4035 return req_pri;
4036 }
4037
4038 return req_qos;
4039 }
4040
4041 /*
4042 * The creator is an anonymous thread that is counted as scheduled,
4043 * but otherwise without its scheduler callback set or tracked as active
4044 * that is used to make other threads.
4045 *
4046 * When more requests are added or an existing one is hurried along,
4047 * a creator is elected and setup, or the existing one overridden accordingly.
4048 *
4049 * While this creator is in flight, because no request has been dequeued,
4050 * already running threads have a chance at stealing thread requests avoiding
4051 * useless context switches, and the creator once scheduled may not find any
4052 * work to do and will then just park again.
4053 *
4054 * The creator serves the dual purpose of informing the scheduler of work that
4055 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
4056 * for thread creation.
4057 *
4058 * By being anonymous (and not bound to anything) it means that thread requests
4059 * can be stolen from this creator by threads already on core yielding more
4060 * efficient scheduling and reduced context switches.
4061 */
4062 static void
workq_schedule_creator(proc_t p,struct workqueue * wq,workq_kern_threadreq_flags_t flags)4063 workq_schedule_creator(proc_t p, struct workqueue *wq,
4064 workq_kern_threadreq_flags_t flags)
4065 {
4066 workq_threadreq_t req;
4067 struct uthread *uth;
4068 bool needs_wakeup;
4069
4070 workq_lock_held(wq);
4071 assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0);
4072
4073 again:
4074 uth = wq->wq_creator;
4075
4076 if (!wq->wq_reqcount) {
4077 /*
4078 * There is no thread request left.
4079 *
4080 * If there is a creator, leave everything in place, so that it cleans
4081 * up itself in workq_push_idle_thread().
4082 *
4083 * Else, make sure the turnstile state is reset to no inheritor.
4084 */
4085 if (uth == NULL) {
4086 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4087 }
4088 return;
4089 }
4090
4091 req = workq_threadreq_select_for_creator(wq);
4092 if (req == NULL) {
4093 /*
4094 * There isn't a thread request that passes the admission check.
4095 *
4096 * If there is a creator, do not touch anything, the creator will sort
4097 * it out when it runs.
4098 *
4099 * Else, set the inheritor to "WORKQ" so that the turnstile propagation
4100 * code calls us if anything changes.
4101 */
4102 if (uth == NULL) {
4103 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
4104 }
4105 return;
4106 }
4107
4108
4109 if (uth) {
4110 /*
4111 * We need to maybe override the creator we already have
4112 */
4113 if (workq_thread_needs_priority_change(req, uth)) {
4114 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4115 wq, 1, uthread_tid(uth), req->tr_qos);
4116 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4117 }
4118 assert(wq->wq_inheritor == get_machthread(uth));
4119 } else if (wq->wq_thidlecount) {
4120 /*
4121 * We need to unpark a creator thread
4122 */
4123 wq->wq_creator = uth = workq_pop_idle_thread(wq, UT_WORKQ_OVERCOMMIT,
4124 &needs_wakeup);
4125 /* Always reset the priorities on the newly chosen creator */
4126 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4127 workq_turnstile_update_inheritor(wq, get_machthread(uth),
4128 TURNSTILE_INHERITOR_THREAD);
4129 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4130 wq, 2, uthread_tid(uth), req->tr_qos);
4131 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4132 uth->uu_save.uus_workq_park_data.yields = 0;
4133 if (needs_wakeup) {
4134 workq_thread_wakeup(uth);
4135 }
4136 } else {
4137 /*
4138 * We need to allocate a thread...
4139 */
4140 if (__improbable(wq->wq_nthreads >= wq_max_threads)) {
4141 /* out of threads, just go away */
4142 flags = WORKQ_THREADREQ_NONE;
4143 } else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) {
4144 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ);
4145 } else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) {
4146 /* This can drop the workqueue lock, and take it again */
4147 workq_schedule_immediate_thread_creation(wq);
4148 } else if (workq_add_new_idle_thread(p, wq)) {
4149 goto again;
4150 } else {
4151 workq_schedule_delayed_thread_creation(wq, 0);
4152 }
4153
4154 /*
4155 * If the current thread is the inheritor:
4156 *
4157 * If we set the AST, then the thread will stay the inheritor until
4158 * either the AST calls workq_kern_threadreq_redrive(), or it parks
4159 * and calls workq_push_idle_thread().
4160 *
4161 * Else, the responsibility of the thread creation is with a thread-call
4162 * and we need to clear the inheritor.
4163 */
4164 if ((flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) == 0 &&
4165 wq->wq_inheritor == current_thread()) {
4166 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4167 }
4168 }
4169 }
4170
4171 /**
4172 * Same as workq_unpark_select_threadreq_or_park_and_unlock,
4173 * but do not allow early binds.
4174 *
4175 * Called with the base pri frozen, will unfreeze it.
4176 */
4177 __attribute__((noreturn, noinline))
4178 static void
workq_select_threadreq_or_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)4179 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4180 struct uthread *uth, uint32_t setup_flags)
4181 {
4182 workq_threadreq_t req = NULL;
4183 bool is_creator = (wq->wq_creator == uth);
4184 bool schedule_creator = false;
4185
4186 if (__improbable(_wq_exiting(wq))) {
4187 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0);
4188 goto park;
4189 }
4190
4191 if (wq->wq_reqcount == 0) {
4192 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0);
4193 goto park;
4194 }
4195
4196 req = workq_threadreq_select(wq, uth);
4197 if (__improbable(req == NULL)) {
4198 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0);
4199 goto park;
4200 }
4201
4202 thread_qos_t old_thread_bucket = uth->uu_workq_pri.qos_bucket;
4203 uint8_t tr_flags = req->tr_flags;
4204 struct turnstile *req_ts = kqueue_threadreq_get_turnstile(req);
4205
4206 /*
4207 * Attempt to setup ourselves as the new thing to run, moving all priority
4208 * pushes to ourselves.
4209 *
4210 * If the current thread is the creator, then the fact that we are presently
4211 * running is proof that we'll do something useful, so keep going.
4212 *
4213 * For other cases, peek at the AST to know whether the scheduler wants
4214 * to preempt us, if yes, park instead, and move the thread request
4215 * turnstile back to the workqueue.
4216 */
4217 if (req_ts) {
4218 workq_perform_turnstile_operation_locked(wq, ^{
4219 turnstile_update_inheritor(req_ts, get_machthread(uth),
4220 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_THREAD);
4221 turnstile_update_inheritor_complete(req_ts,
4222 TURNSTILE_INTERLOCK_HELD);
4223 });
4224 }
4225
4226 /* accounting changes of aggregate thscheduled_count and thactive which has
4227 * to be paired with the workq_thread_reset_pri below so that we have
4228 * uth->uu_workq_pri match with thactive.
4229 *
4230 * This is undone when the thread parks */
4231 if (is_creator) {
4232 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0,
4233 uth->uu_save.uus_workq_park_data.yields);
4234 wq->wq_creator = NULL;
4235 _wq_thactive_inc(wq, req->tr_qos);
4236 wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++;
4237 } else if (old_thread_bucket != req->tr_qos) {
4238 _wq_thactive_move(wq, old_thread_bucket, req->tr_qos);
4239 }
4240 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4241
4242 /*
4243 * Make relevant accounting changes for pool specific counts.
4244 *
4245 * The schedule counts changing can affect what the next best request
4246 * for cooperative thread pool is if this request is dequeued.
4247 */
4248 bool cooperative_sched_count_changed =
4249 workq_adjust_cooperative_constrained_schedule_counts(wq, uth,
4250 old_thread_bucket, tr_flags);
4251
4252 if (workq_tr_is_overcommit(tr_flags)) {
4253 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
4254 } else if (workq_tr_is_cooperative(tr_flags)) {
4255 workq_thread_set_type(uth, UT_WORKQ_COOPERATIVE);
4256 } else {
4257 workq_thread_set_type(uth, 0);
4258 }
4259
4260 if (__improbable(thread_unfreeze_base_pri(get_machthread(uth)) && !is_creator)) {
4261 if (req_ts) {
4262 workq_perform_turnstile_operation_locked(wq, ^{
4263 turnstile_update_inheritor(req_ts, wq->wq_turnstile,
4264 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
4265 turnstile_update_inheritor_complete(req_ts,
4266 TURNSTILE_INTERLOCK_HELD);
4267 });
4268 }
4269 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 3, 0, 0);
4270 goto park_thawed;
4271 }
4272
4273 /*
4274 * We passed all checks, dequeue the request, bind to it, and set it up
4275 * to return to user.
4276 */
4277 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4278 workq_trace_req_id(req), tr_flags, 0);
4279 wq->wq_fulfilled++;
4280 schedule_creator = workq_threadreq_dequeue(wq, req,
4281 cooperative_sched_count_changed);
4282
4283 workq_thread_reset_cpupercent(req, uth);
4284
4285 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4286 kqueue_threadreq_bind_prepost(p, req, uth);
4287 req = NULL;
4288 } else if (req->tr_count > 0) {
4289 req = NULL;
4290 }
4291
4292 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4293 uth->uu_workq_flags ^= UT_WORKQ_NEW;
4294 setup_flags |= WQ_SETUP_FIRST_USE;
4295 }
4296
4297 /* If one of the following is true, call workq_schedule_creator (which also
4298 * adjusts priority of existing creator):
4299 *
4300 * - We are the creator currently so the wq may need a new creator
4301 * - The request we're binding to is the highest priority one, existing
4302 * creator's priority might need to be adjusted to reflect the next
4303 * highest TR
4304 */
4305 if (is_creator || schedule_creator) {
4306 /* This can drop the workqueue lock, and take it again */
4307 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
4308 }
4309
4310 workq_unlock(wq);
4311
4312 if (req) {
4313 zfree(workq_zone_threadreq, req);
4314 }
4315
4316 /*
4317 * Run Thread, Run!
4318 */
4319 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI;
4320 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
4321 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
4322 } else if (workq_tr_is_overcommit(tr_flags)) {
4323 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
4324 } else if (workq_tr_is_cooperative(tr_flags)) {
4325 upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
4326 }
4327 if (tr_flags & WORKQ_TR_FLAG_KEVENT) {
4328 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
4329 assert((upcall_flags & WQ_FLAG_THREAD_COOPERATIVE) == 0);
4330 }
4331
4332 if (tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
4333 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
4334 }
4335 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
4336
4337 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4338 kqueue_threadreq_bind_commit(p, get_machthread(uth));
4339 } else {
4340 #if CONFIG_PREADOPT_TG
4341 /*
4342 * The thread may have a preadopt thread group on it already because it
4343 * got tagged with it as a creator thread. So we need to make sure to
4344 * clear that since we don't have preadoption for anonymous thread
4345 * requests
4346 */
4347 thread_set_preadopt_thread_group(get_machthread(uth), NULL);
4348 #endif
4349 }
4350
4351 workq_setup_and_run(p, uth, setup_flags);
4352 __builtin_unreachable();
4353
4354 park:
4355 thread_unfreeze_base_pri(get_machthread(uth));
4356 park_thawed:
4357 workq_park_and_unlock(p, wq, uth, setup_flags);
4358 }
4359
4360 /**
4361 * Runs a thread request on a thread
4362 *
4363 * - if thread is THREAD_NULL, will find a thread and run the request there.
4364 * Otherwise, the thread must be the current thread.
4365 *
4366 * - if req is NULL, will find the highest priority request and run that. If
4367 * it is not NULL, it must be a threadreq object in state NEW. If it can not
4368 * be run immediately, it will be enqueued and moved to state QUEUED.
4369 *
4370 * Either way, the thread request object serviced will be moved to state
4371 * BINDING and attached to the uthread.
4372 *
4373 * Should be called with the workqueue lock held. Will drop it.
4374 * Should be called with the base pri not frozen.
4375 */
4376 __attribute__((noreturn, noinline))
4377 static void
workq_unpark_select_threadreq_or_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)4378 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4379 struct uthread *uth, uint32_t setup_flags)
4380 {
4381 if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) {
4382 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4383 setup_flags |= WQ_SETUP_FIRST_USE;
4384 }
4385 uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND);
4386 /*
4387 * This pointer is possibly freed and only used for tracing purposes.
4388 */
4389 workq_threadreq_t req = uth->uu_save.uus_workq_park_data.thread_request;
4390 workq_unlock(wq);
4391 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4392 VM_KERNEL_ADDRHIDE(req), 0, 0);
4393 (void)req;
4394
4395 workq_setup_and_run(p, uth, setup_flags);
4396 __builtin_unreachable();
4397 }
4398
4399 thread_freeze_base_pri(get_machthread(uth));
4400 workq_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
4401 }
4402
4403 static bool
workq_creator_should_yield(struct workqueue * wq,struct uthread * uth)4404 workq_creator_should_yield(struct workqueue *wq, struct uthread *uth)
4405 {
4406 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
4407
4408 if (qos >= THREAD_QOS_USER_INTERACTIVE) {
4409 return false;
4410 }
4411
4412 uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot;
4413 if (wq->wq_fulfilled == snapshot) {
4414 return false;
4415 }
4416
4417 uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)];
4418 if (wq->wq_fulfilled - snapshot > conc) {
4419 /* we fulfilled more than NCPU requests since being dispatched */
4420 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1,
4421 wq->wq_fulfilled, snapshot);
4422 return true;
4423 }
4424
4425 for (int i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) {
4426 cnt += wq->wq_thscheduled_count[i];
4427 }
4428 if (conc <= cnt) {
4429 /* We fulfilled requests and have more than NCPU scheduled threads */
4430 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2,
4431 wq->wq_fulfilled, snapshot);
4432 return true;
4433 }
4434
4435 return false;
4436 }
4437
4438 /**
4439 * parked thread wakes up
4440 */
4441 __attribute__((noreturn, noinline))
4442 static void
workq_unpark_continue(void * parameter __unused,wait_result_t wr __unused)4443 workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused)
4444 {
4445 thread_t th = current_thread();
4446 struct uthread *uth = get_bsdthread_info(th);
4447 proc_t p = current_proc();
4448 struct workqueue *wq = proc_get_wqptr_fast(p);
4449
4450 workq_lock_spin(wq);
4451
4452 if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) {
4453 /*
4454 * If the number of threads we have out are able to keep up with the
4455 * demand, then we should avoid sending this creator thread to
4456 * userspace.
4457 */
4458 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4459 uth->uu_save.uus_workq_park_data.yields++;
4460 workq_unlock(wq);
4461 thread_yield_with_continuation(workq_unpark_continue, NULL);
4462 __builtin_unreachable();
4463 }
4464
4465 if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
4466 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, WQ_SETUP_NONE);
4467 __builtin_unreachable();
4468 }
4469
4470 if (__probable(wr == THREAD_AWAKENED)) {
4471 /*
4472 * We were set running, but for the purposes of dying.
4473 */
4474 assert(uth->uu_workq_flags & UT_WORKQ_DYING);
4475 assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
4476 } else {
4477 /*
4478 * workaround for <rdar://problem/38647347>,
4479 * in case we do hit userspace, make sure calling
4480 * workq_thread_terminate() does the right thing here,
4481 * and if we never call it, that workq_exit() will too because it sees
4482 * this thread on the runlist.
4483 */
4484 assert(wr == THREAD_INTERRUPTED);
4485 wq->wq_thdying_count++;
4486 uth->uu_workq_flags |= UT_WORKQ_DYING;
4487 }
4488
4489 workq_unpark_for_death_and_unlock(p, wq, uth,
4490 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE);
4491 __builtin_unreachable();
4492 }
4493
4494 __attribute__((noreturn, noinline))
4495 static void
workq_setup_and_run(proc_t p,struct uthread * uth,int setup_flags)4496 workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags)
4497 {
4498 thread_t th = get_machthread(uth);
4499 vm_map_t vmap = get_task_map(p->task);
4500
4501 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
4502 /*
4503 * For preemption reasons, we want to reset the voucher as late as
4504 * possible, so we do it in two places:
4505 * - Just before parking (i.e. in workq_park_and_unlock())
4506 * - Prior to doing the setup for the next workitem (i.e. here)
4507 *
4508 * Those two places are sufficient to ensure we always reset it before
4509 * it goes back out to user space, but be careful to not break that
4510 * guarantee.
4511 *
4512 * Note that setting the voucher to NULL will not clear the preadoption
4513 * thread group on this thread
4514 */
4515 __assert_only kern_return_t kr;
4516 kr = thread_set_voucher_name(MACH_PORT_NULL);
4517 assert(kr == KERN_SUCCESS);
4518 }
4519
4520 uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags;
4521 if (!(setup_flags & WQ_SETUP_FIRST_USE)) {
4522 upcall_flags |= WQ_FLAG_THREAD_REUSE;
4523 }
4524
4525 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
4526 /*
4527 * For threads that have an outside-of-QoS thread priority, indicate
4528 * to userspace that setting QoS should only affect the TSD and not
4529 * change QOS in the kernel.
4530 */
4531 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
4532 } else {
4533 /*
4534 * Put the QoS class value into the lower bits of the reuse_thread
4535 * register, this is where the thread priority used to be stored
4536 * anyway.
4537 */
4538 upcall_flags |= uth->uu_save.uus_workq_park_data.qos |
4539 WQ_FLAG_THREAD_PRIO_QOS;
4540 }
4541
4542 if (uth->uu_workq_thport == MACH_PORT_NULL) {
4543 /* convert_thread_to_port_pinned() consumes a reference */
4544 thread_reference(th);
4545 /* Convert to immovable/pinned thread port, but port is not pinned yet */
4546 ipc_port_t port = convert_thread_to_port_pinned(th);
4547 /* Atomically, pin and copy out the port */
4548 uth->uu_workq_thport = ipc_port_copyout_send_pinned(port, get_task_ipcspace(p->task));
4549 }
4550
4551 /* Thread has been set up to run, arm its next workqueue quantum or disarm
4552 * if it is no longer supporting that */
4553 if (thread_supports_cooperative_workqueue(th)) {
4554 thread_arm_workqueue_quantum(th);
4555 } else {
4556 thread_disarm_workqueue_quantum(th);
4557 }
4558
4559 /*
4560 * Call out to pthread, this sets up the thread, pulls in kevent structs
4561 * onto the stack, sets up the thread state and then returns to userspace.
4562 */
4563 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START,
4564 proc_get_wqptr_fast(p), 0, 0, 0);
4565
4566 if (workq_thread_is_cooperative(uth)) {
4567 thread_sched_call(th, NULL);
4568 } else {
4569 thread_sched_call(th, workq_sched_callback);
4570 }
4571
4572 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
4573 uth->uu_workq_thport, 0, setup_flags, upcall_flags);
4574
4575 __builtin_unreachable();
4576 }
4577
4578 #pragma mark misc
4579
4580 int
fill_procworkqueue(proc_t p,struct proc_workqueueinfo * pwqinfo)4581 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
4582 {
4583 struct workqueue *wq = proc_get_wqptr(p);
4584 int error = 0;
4585 int activecount;
4586
4587 if (wq == NULL) {
4588 return EINVAL;
4589 }
4590
4591 /*
4592 * This is sometimes called from interrupt context by the kperf sampler.
4593 * In that case, it's not safe to spin trying to take the lock since we
4594 * might already hold it. So, we just try-lock it and error out if it's
4595 * already held. Since this is just a debugging aid, and all our callers
4596 * are able to handle an error, that's fine.
4597 */
4598 bool locked = workq_lock_try(wq);
4599 if (!locked) {
4600 return EBUSY;
4601 }
4602
4603 wq_thactive_t act = _wq_thactive(wq);
4604 activecount = _wq_thactive_aggregate_downto_qos(wq, act,
4605 WORKQ_THREAD_QOS_MIN, NULL, NULL);
4606 if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) {
4607 activecount++;
4608 }
4609 pwqinfo->pwq_nthreads = wq->wq_nthreads;
4610 pwqinfo->pwq_runthreads = activecount;
4611 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
4612 pwqinfo->pwq_state = 0;
4613
4614 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
4615 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
4616 }
4617
4618 if (wq->wq_nthreads >= wq_max_threads) {
4619 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
4620 }
4621
4622 workq_unlock(wq);
4623 return error;
4624 }
4625
4626 boolean_t
workqueue_get_pwq_exceeded(void * v,boolean_t * exceeded_total,boolean_t * exceeded_constrained)4627 workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total,
4628 boolean_t *exceeded_constrained)
4629 {
4630 proc_t p = v;
4631 struct proc_workqueueinfo pwqinfo;
4632 int err;
4633
4634 assert(p != NULL);
4635 assert(exceeded_total != NULL);
4636 assert(exceeded_constrained != NULL);
4637
4638 err = fill_procworkqueue(p, &pwqinfo);
4639 if (err) {
4640 return FALSE;
4641 }
4642 if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) {
4643 return FALSE;
4644 }
4645
4646 *exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT);
4647 *exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT);
4648
4649 return TRUE;
4650 }
4651
4652 uint32_t
workqueue_get_pwq_state_kdp(void * v)4653 workqueue_get_pwq_state_kdp(void * v)
4654 {
4655 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) ==
4656 kTaskWqExceededConstrainedThreadLimit);
4657 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) ==
4658 kTaskWqExceededTotalThreadLimit);
4659 static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable);
4660 static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT |
4661 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT) == 0x7);
4662
4663 if (v == NULL) {
4664 return 0;
4665 }
4666
4667 proc_t p = v;
4668 struct workqueue *wq = proc_get_wqptr(p);
4669
4670 if (wq == NULL || workq_lock_is_acquired_kdp(wq)) {
4671 return 0;
4672 }
4673
4674 uint32_t pwq_state = WQ_FLAGS_AVAILABLE;
4675
4676 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
4677 pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
4678 }
4679
4680 if (wq->wq_nthreads >= wq_max_threads) {
4681 pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
4682 }
4683
4684 return pwq_state;
4685 }
4686
4687 void
workq_init(void)4688 workq_init(void)
4689 {
4690 clock_interval_to_absolutetime_interval(wq_stalled_window.usecs,
4691 NSEC_PER_USEC, &wq_stalled_window.abstime);
4692 clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs,
4693 NSEC_PER_USEC, &wq_reduce_pool_window.abstime);
4694 clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs,
4695 NSEC_PER_USEC, &wq_max_timer_interval.abstime);
4696
4697 thread_deallocate_daemon_register_queue(&workq_deallocate_queue,
4698 workq_deallocate_queue_invoke);
4699 }
4700