1 /*
2 * Copyright (c) 2000-2020 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
29
30 #include <sys/cdefs.h>
31
32 #include <kern/assert.h>
33 #include <kern/ast.h>
34 #include <kern/clock.h>
35 #include <kern/cpu_data.h>
36 #include <kern/kern_types.h>
37 #include <kern/policy_internal.h>
38 #include <kern/processor.h>
39 #include <kern/sched_prim.h> /* for thread_exception_return */
40 #include <kern/task.h>
41 #include <kern/thread.h>
42 #include <kern/thread_group.h>
43 #include <kern/zalloc.h>
44 #include <kern/work_interval.h>
45 #include <mach/kern_return.h>
46 #include <mach/mach_param.h>
47 #include <mach/mach_port.h>
48 #include <mach/mach_types.h>
49 #include <mach/mach_vm.h>
50 #include <mach/sync_policy.h>
51 #include <mach/task.h>
52 #include <mach/thread_act.h> /* for thread_resume */
53 #include <mach/thread_policy.h>
54 #include <mach/thread_status.h>
55 #include <mach/vm_prot.h>
56 #include <mach/vm_statistics.h>
57 #include <machine/atomic.h>
58 #include <machine/machine_routines.h>
59 #include <machine/smp.h>
60 #include <vm/vm_map.h>
61 #include <vm/vm_fault_xnu.h>
62 #include <vm/vm_protos.h>
63
64 #include <sys/eventvar.h>
65 #include <sys/kdebug.h>
66 #include <sys/kernel.h>
67 #include <sys/lock.h>
68 #include <sys/param.h>
69 #include <sys/proc_info.h> /* for fill_procworkqueue */
70 #include <sys/proc_internal.h>
71 #include <sys/pthread_shims.h>
72 #include <sys/resourcevar.h>
73 #include <sys/signalvar.h>
74 #include <sys/sysctl.h>
75 #include <sys/sysproto.h>
76 #include <sys/systm.h>
77 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
78
79 #include <pthread/bsdthread_private.h>
80 #include <pthread/workqueue_syscalls.h>
81 #include <pthread/workqueue_internal.h>
82 #include <pthread/workqueue_trace.h>
83
84 #include <os/log.h>
85
86 static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2;
87
88 static void workq_bound_thread_unpark_continue(void *uth, wait_result_t wr) __dead2;
89
90 static void workq_bound_thread_initialize_and_unpark_continue(void *uth, wait_result_t wr) __dead2;
91
92 static void workq_bound_thread_setup_and_run(struct uthread *uth, int setup_flags) __dead2;
93
94 static void workq_schedule_creator(proc_t p, struct workqueue *wq,
95 workq_kern_threadreq_flags_t flags);
96
97 static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
98 workq_threadreq_t req);
99
100 static uint32_t workq_constrained_allowance(struct workqueue *wq,
101 thread_qos_t at_qos, struct uthread *uth,
102 bool may_start_timer, bool record_failed_allowance);
103
104 static bool _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq);
105
106 static bool workq_thread_is_busy(uint64_t cur_ts,
107 _Atomic uint64_t *lastblocked_tsp);
108
109 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
110
111 static bool
112 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags);
113
114 static inline void
115 workq_lock_spin(struct workqueue *wq);
116
117 static inline void
118 workq_unlock(struct workqueue *wq);
119
120 #pragma mark globals
121
122 struct workq_usec_var {
123 uint32_t usecs;
124 uint64_t abstime;
125 };
126
127 #define WORKQ_SYSCTL_USECS(var, init) \
128 static struct workq_usec_var var = { .usecs = init }; \
129 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
130 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
131 workq_sysctl_handle_usecs, "I", "")
132
133 static LCK_GRP_DECLARE(workq_lck_grp, "workq");
134 os_refgrp_decl(static, workq_refgrp, "workq", NULL);
135
136 static ZONE_DEFINE(workq_zone_workqueue, "workq.wq",
137 sizeof(struct workqueue), ZC_NONE);
138 static ZONE_DEFINE(workq_zone_threadreq, "workq.threadreq",
139 sizeof(struct workq_threadreq_s), ZC_CACHING);
140
141 static struct mpsc_daemon_queue workq_deallocate_queue;
142
143 WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS);
144 WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
145 WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS);
146 static uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS;
147 static uint32_t wq_max_constrained_threads = WORKQUEUE_MAXTHREADS / 8;
148 static uint32_t wq_init_constrained_limit = 1;
149 static uint16_t wq_death_max_load;
150 static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS];
151
152 /*
153 * This is not a hard limit but the max size we want to aim to hit across the
154 * entire cooperative pool. We can oversubscribe the pool due to non-cooperative
155 * workers and the max we will oversubscribe the pool by, is a total of
156 * wq_max_cooperative_threads * WORKQ_NUM_QOS_BUCKETS.
157 */
158 static uint32_t wq_max_cooperative_threads;
159
160 static inline uint32_t
wq_cooperative_queue_max_size(struct workqueue * wq)161 wq_cooperative_queue_max_size(struct workqueue *wq)
162 {
163 return wq->wq_cooperative_queue_has_limited_max_size ? 1 : wq_max_cooperative_threads;
164 }
165
166 #pragma mark sysctls
167
168 static int
169 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
170 {
171 #pragma unused(arg2)
172 struct workq_usec_var *v = arg1;
173 int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
174 if (error || !req->newptr) {
175 return error;
176 }
177 clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
178 &v->abstime);
179 return 0;
180 }
181
182 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
183 &wq_max_threads, 0, "");
184
185 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
186 &wq_max_constrained_threads, 0, "");
187
188 static int
189 wq_limit_cooperative_threads_for_proc SYSCTL_HANDLER_ARGS
190 {
191 #pragma unused(arg1, arg2, oidp)
192 int input_pool_size = 0;
193 int changed;
194 int error = 0;
195
196 error = sysctl_io_number(req, 0, sizeof(int), &input_pool_size, &changed);
197 if (error || !changed) {
198 return error;
199 }
200
201 #define WQ_COOPERATIVE_POOL_SIZE_DEFAULT 0
202 #define WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS -1
203 /* Not available currently, but sysctl interface is designed to allow these
204 * extra parameters:
205 * WQ_COOPERATIVE_POOL_SIZE_STRICT : -2 (across all bucket)
206 * WQ_COOPERATIVE_POOL_SIZE_CUSTOM : [1, 512]
207 */
208
209 if (input_pool_size != WQ_COOPERATIVE_POOL_SIZE_DEFAULT
210 && input_pool_size != WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS) {
211 error = EINVAL;
212 goto out;
213 }
214
215 proc_t p = req->p;
216 struct workqueue *wq = proc_get_wqptr(p);
217
218 if (wq != NULL) {
219 workq_lock_spin(wq);
220 if (wq->wq_reqcount > 0 || wq->wq_nthreads > 0) {
221 // Hackily enforce that the workqueue is still new (no requests or
222 // threads)
223 error = ENOTSUP;
224 } else {
225 wq->wq_cooperative_queue_has_limited_max_size = (input_pool_size == WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS);
226 }
227 workq_unlock(wq);
228 } else {
229 /* This process has no workqueue, calling this syctl makes no sense */
230 return ENOTSUP;
231 }
232
233 out:
234 return error;
235 }
236
237 SYSCTL_PROC(_kern, OID_AUTO, wq_limit_cooperative_threads,
238 CTLFLAG_ANYBODY | CTLFLAG_MASKED | CTLFLAG_WR | CTLFLAG_LOCKED | CTLTYPE_INT, 0, 0,
239 wq_limit_cooperative_threads_for_proc,
240 "I", "Modify the max pool size of the cooperative pool");
241
242 #pragma mark p_wqptr
243
244 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
245
246 static struct workqueue *
proc_get_wqptr_fast(struct proc * p)247 proc_get_wqptr_fast(struct proc *p)
248 {
249 return os_atomic_load(&p->p_wqptr, relaxed);
250 }
251
252 struct workqueue *
proc_get_wqptr(struct proc * p)253 proc_get_wqptr(struct proc *p)
254 {
255 struct workqueue *wq = proc_get_wqptr_fast(p);
256 return wq == WQPTR_IS_INITING_VALUE ? NULL : wq;
257 }
258
259 static void
proc_set_wqptr(struct proc * p,struct workqueue * wq)260 proc_set_wqptr(struct proc *p, struct workqueue *wq)
261 {
262 wq = os_atomic_xchg(&p->p_wqptr, wq, release);
263 if (wq == WQPTR_IS_INITING_VALUE) {
264 proc_lock(p);
265 thread_wakeup(&p->p_wqptr);
266 proc_unlock(p);
267 }
268 }
269
270 static bool
proc_init_wqptr_or_wait(struct proc * p)271 proc_init_wqptr_or_wait(struct proc *p)
272 {
273 struct workqueue *wq;
274
275 proc_lock(p);
276 wq = os_atomic_load(&p->p_wqptr, relaxed);
277
278 if (wq == NULL) {
279 os_atomic_store(&p->p_wqptr, WQPTR_IS_INITING_VALUE, relaxed);
280 proc_unlock(p);
281 return true;
282 }
283
284 if (wq == WQPTR_IS_INITING_VALUE) {
285 assert_wait(&p->p_wqptr, THREAD_UNINT);
286 proc_unlock(p);
287 thread_block(THREAD_CONTINUE_NULL);
288 } else {
289 proc_unlock(p);
290 }
291 return false;
292 }
293
294 static inline event_t
workq_parked_wait_event(struct uthread * uth)295 workq_parked_wait_event(struct uthread *uth)
296 {
297 return (event_t)&uth->uu_workq_stackaddr;
298 }
299
300 static inline void
workq_thread_wakeup(struct uthread * uth)301 workq_thread_wakeup(struct uthread *uth)
302 {
303 thread_wakeup_thread(workq_parked_wait_event(uth), get_machthread(uth));
304 }
305
306 #pragma mark wq_thactive
307
308 #if defined(__LP64__)
309 // Layout is:
310 // 127 - 115 : 13 bits of zeroes
311 // 114 - 112 : best QoS among all pending constrained requests
312 // 111 - 0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
313 #define WQ_THACTIVE_BUCKET_WIDTH 16
314 #define WQ_THACTIVE_QOS_SHIFT (7 * WQ_THACTIVE_BUCKET_WIDTH)
315 #else
316 // Layout is:
317 // 63 - 61 : best QoS among all pending constrained requests
318 // 60 : Manager bucket (0 or 1)
319 // 59 - 0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
320 #define WQ_THACTIVE_BUCKET_WIDTH 10
321 #define WQ_THACTIVE_QOS_SHIFT (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
322 #endif
323 #define WQ_THACTIVE_BUCKET_MASK ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
324 #define WQ_THACTIVE_BUCKET_HALF (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
325
326 static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3,
327 "Make sure we have space to encode a QoS");
328
329 static inline wq_thactive_t
_wq_thactive(struct workqueue * wq)330 _wq_thactive(struct workqueue *wq)
331 {
332 return os_atomic_load_wide(&wq->wq_thactive, relaxed);
333 }
334
335 static inline uint8_t
_wq_bucket(thread_qos_t qos)336 _wq_bucket(thread_qos_t qos)
337 {
338 // Map both BG and MT to the same bucket by over-shifting down and
339 // clamping MT and BG together.
340 switch (qos) {
341 case THREAD_QOS_MAINTENANCE:
342 return 0;
343 default:
344 return qos - 2;
345 }
346 }
347
348 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
349 ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT))
350
351 static inline thread_qos_t
_wq_thactive_best_constrained_req_qos(struct workqueue * wq)352 _wq_thactive_best_constrained_req_qos(struct workqueue *wq)
353 {
354 // Avoid expensive atomic operations: the three bits we're loading are in
355 // a single byte, and always updated under the workqueue lock
356 wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive;
357 return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v);
358 }
359
360 static void
_wq_thactive_refresh_best_constrained_req_qos(struct workqueue * wq)361 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq)
362 {
363 thread_qos_t old_qos, new_qos;
364 workq_threadreq_t req;
365
366 req = priority_queue_max(&wq->wq_constrained_queue,
367 struct workq_threadreq_s, tr_entry);
368 new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED;
369 old_qos = _wq_thactive_best_constrained_req_qos(wq);
370 if (old_qos != new_qos) {
371 long delta = (long)new_qos - (long)old_qos;
372 wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT;
373 /*
374 * We can do an atomic add relative to the initial load because updates
375 * to this qos are always serialized under the workqueue lock.
376 */
377 v = os_atomic_add(&wq->wq_thactive, v, relaxed);
378 #ifdef __LP64__
379 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v,
380 (uint64_t)(v >> 64), 0);
381 #else
382 WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0);
383 #endif
384 }
385 }
386
387 static inline wq_thactive_t
_wq_thactive_offset_for_qos(thread_qos_t qos)388 _wq_thactive_offset_for_qos(thread_qos_t qos)
389 {
390 uint8_t bucket = _wq_bucket(qos);
391 __builtin_assume(bucket < WORKQ_NUM_BUCKETS);
392 return (wq_thactive_t)1 << (bucket * WQ_THACTIVE_BUCKET_WIDTH);
393 }
394
395 static inline wq_thactive_t
_wq_thactive_inc(struct workqueue * wq,thread_qos_t qos)396 _wq_thactive_inc(struct workqueue *wq, thread_qos_t qos)
397 {
398 wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
399 return os_atomic_add_orig(&wq->wq_thactive, v, relaxed);
400 }
401
402 static inline wq_thactive_t
_wq_thactive_dec(struct workqueue * wq,thread_qos_t qos)403 _wq_thactive_dec(struct workqueue *wq, thread_qos_t qos)
404 {
405 wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
406 return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed);
407 }
408
409 static inline void
_wq_thactive_move(struct workqueue * wq,thread_qos_t old_qos,thread_qos_t new_qos)410 _wq_thactive_move(struct workqueue *wq,
411 thread_qos_t old_qos, thread_qos_t new_qos)
412 {
413 wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) -
414 _wq_thactive_offset_for_qos(old_qos);
415 os_atomic_add(&wq->wq_thactive, v, relaxed);
416 wq->wq_thscheduled_count[_wq_bucket(old_qos)]--;
417 wq->wq_thscheduled_count[_wq_bucket(new_qos)]++;
418 }
419
420 static inline uint32_t
_wq_thactive_aggregate_downto_qos(struct workqueue * wq,wq_thactive_t v,thread_qos_t qos,uint32_t * busycount,uint32_t * max_busycount)421 _wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v,
422 thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount)
423 {
424 uint32_t count = 0, active;
425 uint64_t curtime;
426
427 assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX);
428
429 if (busycount) {
430 curtime = mach_absolute_time();
431 *busycount = 0;
432 }
433 if (max_busycount) {
434 *max_busycount = THREAD_QOS_LAST - qos;
435 }
436
437 uint8_t i = _wq_bucket(qos);
438 v >>= i * WQ_THACTIVE_BUCKET_WIDTH;
439 for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) {
440 active = v & WQ_THACTIVE_BUCKET_MASK;
441 count += active;
442
443 if (busycount && wq->wq_thscheduled_count[i] > active) {
444 if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) {
445 /*
446 * We only consider the last blocked thread for a given bucket
447 * as busy because we don't want to take the list lock in each
448 * sched callback. However this is an approximation that could
449 * contribute to thread creation storms.
450 */
451 (*busycount)++;
452 }
453 }
454 }
455
456 return count;
457 }
458
459 /* The input qos here should be the requested QoS of the thread, not accounting
460 * for any overrides */
461 static inline void
_wq_cooperative_queue_scheduled_count_dec(struct workqueue * wq,thread_qos_t qos)462 _wq_cooperative_queue_scheduled_count_dec(struct workqueue *wq, thread_qos_t qos)
463 {
464 __assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]--;
465 assert(old_scheduled_count > 0);
466 }
467
468 /* The input qos here should be the requested QoS of the thread, not accounting
469 * for any overrides */
470 static inline void
_wq_cooperative_queue_scheduled_count_inc(struct workqueue * wq,thread_qos_t qos)471 _wq_cooperative_queue_scheduled_count_inc(struct workqueue *wq, thread_qos_t qos)
472 {
473 __assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]++;
474 assert(old_scheduled_count < UINT8_MAX);
475 }
476
477 #pragma mark wq_flags
478
479 static inline uint32_t
_wq_flags(struct workqueue * wq)480 _wq_flags(struct workqueue *wq)
481 {
482 return os_atomic_load(&wq->wq_flags, relaxed);
483 }
484
485 static inline bool
_wq_exiting(struct workqueue * wq)486 _wq_exiting(struct workqueue *wq)
487 {
488 return _wq_flags(wq) & WQ_EXITING;
489 }
490
491 bool
workq_is_exiting(struct proc * p)492 workq_is_exiting(struct proc *p)
493 {
494 struct workqueue *wq = proc_get_wqptr(p);
495 return !wq || _wq_exiting(wq);
496 }
497
498
499 #pragma mark workqueue lock
500
501 static bool
workq_lock_is_acquired_kdp(struct workqueue * wq)502 workq_lock_is_acquired_kdp(struct workqueue *wq)
503 {
504 return kdp_lck_ticket_is_acquired(&wq->wq_lock);
505 }
506
507 static inline void
workq_lock_spin(struct workqueue * wq)508 workq_lock_spin(struct workqueue *wq)
509 {
510 lck_ticket_lock(&wq->wq_lock, &workq_lck_grp);
511 }
512
513 static inline void
workq_lock_held(struct workqueue * wq)514 workq_lock_held(struct workqueue *wq)
515 {
516 LCK_TICKET_ASSERT_OWNED(&wq->wq_lock);
517 }
518
519 static inline bool
workq_lock_try(struct workqueue * wq)520 workq_lock_try(struct workqueue *wq)
521 {
522 return lck_ticket_lock_try(&wq->wq_lock, &workq_lck_grp);
523 }
524
525 static inline void
workq_unlock(struct workqueue * wq)526 workq_unlock(struct workqueue *wq)
527 {
528 lck_ticket_unlock(&wq->wq_lock);
529 }
530
531 #pragma mark idle thread lists
532
533 #define WORKQ_POLICY_INIT(qos) \
534 (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
535
536 static inline thread_qos_t
workq_pri_bucket(struct uu_workq_policy req)537 workq_pri_bucket(struct uu_workq_policy req)
538 {
539 return MAX(MAX(req.qos_req, req.qos_max), req.qos_override);
540 }
541
542 static inline thread_qos_t
workq_pri_override(struct uu_workq_policy req)543 workq_pri_override(struct uu_workq_policy req)
544 {
545 return MAX(workq_pri_bucket(req), req.qos_bucket);
546 }
547
548 static inline bool
workq_thread_needs_params_change(workq_threadreq_t req,struct uthread * uth)549 workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth)
550 {
551 workq_threadreq_param_t cur_trp, req_trp = { };
552
553 cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params;
554 if (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
555 req_trp = kqueue_threadreq_workloop_param(req);
556 }
557
558 /*
559 * CPU percent flags are handled separately to policy changes, so ignore
560 * them for all of these checks.
561 */
562 uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT);
563 uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT);
564
565 if (!req_flags && !cur_flags) {
566 return false;
567 }
568
569 if (req_flags != cur_flags) {
570 return true;
571 }
572
573 if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) {
574 return true;
575 }
576
577 if ((req_flags & TRP_POLICY) && req_trp.trp_pol != cur_trp.trp_pol) {
578 return true;
579 }
580
581 return false;
582 }
583
584 static inline bool
workq_thread_needs_priority_change(workq_threadreq_t req,struct uthread * uth)585 workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth)
586 {
587 if (workq_thread_needs_params_change(req, uth)) {
588 return true;
589 }
590
591 if (req->tr_qos != workq_pri_override(uth->uu_workq_pri)) {
592 return true;
593 }
594
595 #if CONFIG_PREADOPT_TG
596 thread_group_qos_t tg = kqr_preadopt_thread_group(req);
597 if (KQWL_HAS_VALID_PREADOPTED_TG(tg)) {
598 /*
599 * Ideally, we'd add check here to see if thread's preadopt TG is same
600 * as the thread requests's thread group and short circuit if that is
601 * the case. But in the interest of keeping the code clean and not
602 * taking the thread lock here, we're going to skip this. We will
603 * eventually shortcircuit once we try to set the preadoption thread
604 * group on the thread.
605 */
606 return true;
607 }
608 #endif
609
610 return false;
611 }
612
613 /* Input thread must be self. Called during self override, resetting overrides
614 * or while processing kevents
615 *
616 * Called with workq lock held. Sometimes also the thread mutex
617 */
618 static void
workq_thread_update_bucket(proc_t p,struct workqueue * wq,struct uthread * uth,struct uu_workq_policy old_pri,struct uu_workq_policy new_pri,bool force_run)619 workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth,
620 struct uu_workq_policy old_pri, struct uu_workq_policy new_pri,
621 bool force_run)
622 {
623 assert(uth == current_uthread());
624
625 thread_qos_t old_bucket = old_pri.qos_bucket;
626 thread_qos_t new_bucket = workq_pri_bucket(new_pri);
627
628 if ((old_bucket != new_bucket) &&
629 !workq_thread_is_permanently_bound(uth)) {
630 _wq_thactive_move(wq, old_bucket, new_bucket);
631 }
632
633 new_pri.qos_bucket = new_bucket;
634 uth->uu_workq_pri = new_pri;
635
636 if (old_pri.qos_override != new_pri.qos_override) {
637 thread_set_workq_override(get_machthread(uth), new_pri.qos_override);
638 }
639
640 if (wq->wq_reqcount &&
641 !workq_thread_is_permanently_bound(uth) &&
642 (old_bucket > new_bucket || force_run)) {
643 int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS;
644 if (old_bucket > new_bucket) {
645 /*
646 * When lowering our bucket, we may unblock a thread request,
647 * but we can't drop our priority before we have evaluated
648 * whether this is the case, and if we ever drop the workqueue lock
649 * that would cause a priority inversion.
650 *
651 * We hence have to disallow thread creation in that case.
652 */
653 flags = 0;
654 }
655 workq_schedule_creator(p, wq, flags);
656 }
657 }
658
659 /*
660 * Sets/resets the cpu percent limits on the current thread. We can't set
661 * these limits from outside of the current thread, so this function needs
662 * to be called when we're executing on the intended
663 */
664 static void
workq_thread_reset_cpupercent(workq_threadreq_t req,struct uthread * uth)665 workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth)
666 {
667 assert(uth == current_uthread());
668 workq_threadreq_param_t trp = { };
669
670 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
671 trp = kqueue_threadreq_workloop_param(req);
672 }
673
674 if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) {
675 /*
676 * Going through disable when we have an existing CPU percent limit
677 * set will force the ledger to refill the token bucket of the current
678 * thread. Removing any penalty applied by previous thread use.
679 */
680 thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0);
681 uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT;
682 }
683
684 if (trp.trp_flags & TRP_CPUPERCENT) {
685 thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent,
686 (uint64_t)trp.trp_refillms * NSEC_PER_SEC);
687 uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT;
688 }
689 }
690
691 /*
692 * This function is always called with the workq lock, except for the
693 * permanently bound workqueue thread, which instead requires the kqlock.
694 * See locking model for bound thread's uu_workq_flags.
695 */
696 static void
workq_thread_reset_pri(struct workqueue * wq,struct uthread * uth,workq_threadreq_t req,bool unpark)697 workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth,
698 workq_threadreq_t req, bool unpark)
699 {
700 thread_t th = get_machthread(uth);
701 thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP;
702 workq_threadreq_param_t trp = { };
703 int priority = 31;
704 int policy = POLICY_TIMESHARE;
705
706 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
707 trp = kqueue_threadreq_workloop_param(req);
708 }
709
710 uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
711 uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS;
712
713 if (unpark) {
714 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
715 // qos sent out to userspace (may differ from uu_workq_pri on param threads)
716 uth->uu_save.uus_workq_park_data.qos = qos;
717 }
718
719 if (qos == WORKQ_THREAD_QOS_MANAGER) {
720 uint32_t mgr_pri = wq->wq_event_manager_priority;
721 assert(trp.trp_value == 0); // manager qos and thread policy don't mix
722
723 if (_pthread_priority_has_sched_pri(mgr_pri)) {
724 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
725 thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri,
726 POLICY_TIMESHARE);
727 return;
728 }
729
730 qos = _pthread_priority_thread_qos(mgr_pri);
731 } else {
732 if (trp.trp_flags & TRP_PRIORITY) {
733 qos = THREAD_QOS_UNSPECIFIED;
734 priority = trp.trp_pri;
735 uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS;
736 }
737
738 if (trp.trp_flags & TRP_POLICY) {
739 policy = trp.trp_pol;
740 }
741 }
742
743 #if CONFIG_PREADOPT_TG
744 if (req && (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP)) {
745 /*
746 * For kqwl permanently configured with a thread group, we can safely borrow
747 * +1 ref from kqwl_preadopt_tg. A thread then takes additional +1 ref
748 * for itself via thread_set_preadopt_thread_group.
749 *
750 * In all other cases, we cannot safely read and borrow the reference from the kqwl
751 * since it can disappear from under us at any time due to the max-ing logic in
752 * kqueue_set_preadopted_thread_group.
753 *
754 * As such, we do the following dance:
755 *
756 * 1) cmpxchng and steal the kqwl's preadopt thread group and leave
757 * behind with (NULL + QoS). At this point, we have the reference
758 * to the thread group from the kqwl.
759 * 2) Have the thread set the preadoption thread group on itself.
760 * 3) cmpxchng from (NULL + QoS) which we set earlier in (1), back to
761 * thread_group + QoS. ie we try to give the reference back to the kqwl.
762 * If we fail, that's because a higher QoS thread group was set on the
763 * kqwl in kqueue_set_preadopted_thread_group in which case, we need to
764 * go back to (1).
765 */
766
767 _Atomic(struct thread_group *) * tg_loc = kqr_preadopt_thread_group_addr(req);
768
769 thread_group_qos_t old_tg, new_tg;
770 int ret = 0;
771 again:
772 ret = os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
773 if ((!KQWL_HAS_VALID_PREADOPTED_TG(old_tg)) ||
774 KQWL_HAS_PERMANENT_PREADOPTED_TG(old_tg)) {
775 os_atomic_rmw_loop_give_up(break);
776 }
777
778 /*
779 * Leave the QoS behind - kqueue_set_preadopted_thread_group will
780 * only modify it if there is a higher QoS thread group to attach
781 */
782 new_tg = (thread_group_qos_t) ((uintptr_t) old_tg & KQWL_PREADOPT_TG_QOS_MASK);
783 });
784
785 if (ret) {
786 /*
787 * We successfully took the ref from the kqwl so set it on the
788 * thread now
789 */
790 thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg));
791
792 thread_group_qos_t thread_group_to_expect = new_tg;
793 thread_group_qos_t thread_group_to_set = old_tg;
794
795 os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
796 if (old_tg != thread_group_to_expect) {
797 /*
798 * There was an intervening write to the kqwl_preadopt_tg,
799 * and it has a higher QoS than what we are working with
800 * here. Abandon our current adopted thread group and redo
801 * the full dance
802 */
803 thread_group_deallocate_safe(KQWL_GET_PREADOPTED_TG(thread_group_to_set));
804 os_atomic_rmw_loop_give_up(goto again);
805 }
806
807 new_tg = thread_group_to_set;
808 });
809 } else {
810 if (KQWL_HAS_PERMANENT_PREADOPTED_TG(old_tg)) {
811 thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg));
812 } else {
813 /* Nothing valid on the kqwl, just clear what's on the thread */
814 thread_set_preadopt_thread_group(th, NULL);
815 }
816 }
817 } else {
818 /* Not even a kqwl, clear what's on the thread */
819 thread_set_preadopt_thread_group(th, NULL);
820 }
821 #endif
822 thread_set_workq_pri(th, qos, priority, policy);
823 }
824
825 /*
826 * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
827 * every time a servicer is being told about a new max QoS.
828 */
829 void
workq_thread_set_max_qos(struct proc * p,workq_threadreq_t kqr)830 workq_thread_set_max_qos(struct proc *p, workq_threadreq_t kqr)
831 {
832 struct uu_workq_policy old_pri, new_pri;
833 struct uthread *uth = current_uthread();
834 struct workqueue *wq = proc_get_wqptr_fast(p);
835 thread_qos_t qos = kqr->tr_kq_qos_index;
836
837 if (uth->uu_workq_pri.qos_max == qos) {
838 return;
839 }
840
841 workq_lock_spin(wq);
842 old_pri = new_pri = uth->uu_workq_pri;
843 new_pri.qos_max = qos;
844 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
845 workq_unlock(wq);
846 }
847
848 #pragma mark idle threads accounting and handling
849
850 static inline struct uthread *
workq_oldest_killable_idle_thread(struct workqueue * wq)851 workq_oldest_killable_idle_thread(struct workqueue *wq)
852 {
853 struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
854
855 if (uth && !uth->uu_save.uus_workq_park_data.has_stack) {
856 uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry);
857 if (uth) {
858 assert(uth->uu_save.uus_workq_park_data.has_stack);
859 }
860 }
861 return uth;
862 }
863
864 static inline uint64_t
workq_kill_delay_for_idle_thread(struct workqueue * wq)865 workq_kill_delay_for_idle_thread(struct workqueue *wq)
866 {
867 uint64_t delay = wq_reduce_pool_window.abstime;
868 uint16_t idle = wq->wq_thidlecount;
869
870 /*
871 * If we have less than wq_death_max_load threads, have a 5s timer.
872 *
873 * For the next wq_max_constrained_threads ones, decay linearly from
874 * from 5s to 50ms.
875 */
876 if (idle <= wq_death_max_load) {
877 return delay;
878 }
879
880 if (wq_max_constrained_threads > idle - wq_death_max_load) {
881 delay *= (wq_max_constrained_threads - (idle - wq_death_max_load));
882 }
883 return delay / wq_max_constrained_threads;
884 }
885
886 static inline bool
workq_should_kill_idle_thread(struct workqueue * wq,struct uthread * uth,uint64_t now)887 workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth,
888 uint64_t now)
889 {
890 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
891 return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
892 }
893
894 static void
workq_death_call_schedule(struct workqueue * wq,uint64_t deadline)895 workq_death_call_schedule(struct workqueue *wq, uint64_t deadline)
896 {
897 uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed);
898
899 if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
900 return;
901 }
902 os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
903
904 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0);
905
906 /*
907 * <rdar://problem/13139182> Due to how long term timers work, the leeway
908 * can't be too short, so use 500ms which is long enough that we will not
909 * wake up the CPU for killing threads, but short enough that it doesn't
910 * fall into long-term timer list shenanigans.
911 */
912 thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline,
913 wq_reduce_pool_window.abstime / 10,
914 THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
915 }
916
917 /*
918 * `decrement` is set to the number of threads that are no longer dying:
919 * - because they have been resuscitated just in time (workq_pop_idle_thread)
920 * - or have been killed (workq_thread_terminate).
921 */
922 static void
workq_death_policy_evaluate(struct workqueue * wq,uint16_t decrement)923 workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement)
924 {
925 struct uthread *uth;
926
927 assert(wq->wq_thdying_count >= decrement);
928 if ((wq->wq_thdying_count -= decrement) > 0) {
929 return;
930 }
931
932 if (wq->wq_thidlecount <= 1) {
933 return;
934 }
935
936 if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) {
937 return;
938 }
939
940 uint64_t now = mach_absolute_time();
941 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
942
943 if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
944 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
945 wq, wq->wq_thidlecount, 0, 0);
946 wq->wq_thdying_count++;
947 uth->uu_workq_flags |= UT_WORKQ_DYING;
948 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) {
949 workq_thread_wakeup(uth);
950 }
951 return;
952 }
953
954 workq_death_call_schedule(wq,
955 uth->uu_save.uus_workq_park_data.idle_stamp + delay);
956 }
957
958 void
workq_thread_terminate(struct proc * p,struct uthread * uth)959 workq_thread_terminate(struct proc *p, struct uthread *uth)
960 {
961 struct workqueue *wq = proc_get_wqptr_fast(p);
962
963 workq_lock_spin(wq);
964 if (!workq_thread_is_permanently_bound(uth)) {
965 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
966 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
967 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END,
968 wq, wq->wq_thidlecount, 0, 0);
969 workq_death_policy_evaluate(wq, 1);
970 }
971 }
972 if (wq->wq_nthreads-- == wq_max_threads) {
973 /*
974 * We got under the thread limit again, which may have prevented
975 * thread creation from happening, redrive if there are pending requests
976 */
977 if (wq->wq_reqcount) {
978 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
979 }
980 }
981 workq_unlock(wq);
982
983 thread_deallocate(get_machthread(uth));
984 }
985
986 static void
workq_kill_old_threads_call(void * param0,void * param1 __unused)987 workq_kill_old_threads_call(void *param0, void *param1 __unused)
988 {
989 struct workqueue *wq = param0;
990
991 workq_lock_spin(wq);
992 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0);
993 os_atomic_andnot(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
994 workq_death_policy_evaluate(wq, 0);
995 WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0);
996 workq_unlock(wq);
997 }
998
999 static struct uthread *
workq_pop_idle_thread(struct workqueue * wq,uint16_t uu_flags,bool * needs_wakeup)1000 workq_pop_idle_thread(struct workqueue *wq, uint16_t uu_flags,
1001 bool *needs_wakeup)
1002 {
1003 struct uthread *uth;
1004
1005 if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) {
1006 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
1007 } else {
1008 uth = TAILQ_FIRST(&wq->wq_thnewlist);
1009 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
1010 }
1011 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
1012
1013 assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
1014 uth->uu_workq_flags |= UT_WORKQ_RUNNING | uu_flags;
1015
1016 /* A thread is never woken up as part of the cooperative pool */
1017 assert((uu_flags & UT_WORKQ_COOPERATIVE) == 0);
1018
1019 if ((uu_flags & UT_WORKQ_OVERCOMMIT) == 0) {
1020 wq->wq_constrained_threads_scheduled++;
1021 }
1022 wq->wq_threads_scheduled++;
1023 wq->wq_thidlecount--;
1024
1025 if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
1026 uth->uu_workq_flags ^= UT_WORKQ_DYING;
1027 workq_death_policy_evaluate(wq, 1);
1028 *needs_wakeup = false;
1029 } else if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) {
1030 *needs_wakeup = false;
1031 } else {
1032 *needs_wakeup = true;
1033 }
1034 return uth;
1035 }
1036
1037 /*
1038 * Called by thread_create_workq_waiting() during thread initialization, before
1039 * assert_wait, before the thread has been started.
1040 */
1041 event_t
workq_thread_init_and_wq_lock(task_t task,thread_t th)1042 workq_thread_init_and_wq_lock(task_t task, thread_t th)
1043 {
1044 struct uthread *uth = get_bsdthread_info(th);
1045
1046 uth->uu_workq_flags = UT_WORKQ_NEW;
1047 uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
1048 uth->uu_workq_thport = MACH_PORT_NULL;
1049 uth->uu_workq_stackaddr = 0;
1050 uth->uu_workq_pthread_kill_allowed = 0;
1051
1052 thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE);
1053 thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
1054
1055 workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task)));
1056 return workq_parked_wait_event(uth);
1057 }
1058
1059 /**
1060 * Try to add a new workqueue thread.
1061 *
1062 * - called with workq lock held
1063 * - dropped and retaken around thread creation
1064 * - return with workq lock held
1065 */
1066 static kern_return_t
workq_add_new_idle_thread(proc_t p,struct workqueue * wq,thread_continue_t continuation,bool is_permanently_bound,thread_t * new_thread)1067 workq_add_new_idle_thread(
1068 proc_t p,
1069 struct workqueue *wq,
1070 thread_continue_t continuation,
1071 bool is_permanently_bound,
1072 thread_t *new_thread)
1073 {
1074 mach_vm_offset_t th_stackaddr;
1075 kern_return_t kret;
1076 thread_t th;
1077
1078 wq->wq_nthreads++;
1079
1080 workq_unlock(wq);
1081
1082 vm_map_t vmap = get_task_map(proc_task(p));
1083
1084 kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr);
1085 if (kret != KERN_SUCCESS) {
1086 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1087 kret, 1, 0);
1088 goto out;
1089 }
1090
1091 kret = thread_create_workq_waiting(proc_task(p),
1092 continuation,
1093 &th,
1094 is_permanently_bound);
1095 if (kret != KERN_SUCCESS) {
1096 WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1097 kret, 0, 0);
1098 pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr);
1099 goto out;
1100 }
1101
1102 // thread_create_workq_waiting() will return with the wq lock held
1103 // on success, because it calls workq_thread_init_and_wq_lock() above
1104
1105 struct uthread *uth = get_bsdthread_info(th);
1106 uth->uu_workq_stackaddr = (user_addr_t)th_stackaddr;
1107
1108 wq->wq_creations++;
1109 if (!is_permanently_bound) {
1110 wq->wq_thidlecount++;
1111 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1112 }
1113
1114 if (new_thread) {
1115 *new_thread = th;
1116 }
1117
1118 WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0);
1119 return kret;
1120
1121 out:
1122 workq_lock_spin(wq);
1123 /*
1124 * Do not redrive here if we went under wq_max_threads again,
1125 * it is the responsibility of the callers of this function
1126 * to do so when it fails.
1127 */
1128 wq->wq_nthreads--;
1129 return kret;
1130 }
1131
1132 static inline bool
workq_thread_is_overcommit(struct uthread * uth)1133 workq_thread_is_overcommit(struct uthread *uth)
1134 {
1135 return (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0;
1136 }
1137
1138 static inline bool
workq_thread_is_nonovercommit(struct uthread * uth)1139 workq_thread_is_nonovercommit(struct uthread *uth)
1140 {
1141 return (uth->uu_workq_flags & (UT_WORKQ_OVERCOMMIT |
1142 UT_WORKQ_COOPERATIVE)) == 0;
1143 }
1144
1145 static inline bool
workq_thread_is_cooperative(struct uthread * uth)1146 workq_thread_is_cooperative(struct uthread *uth)
1147 {
1148 return (uth->uu_workq_flags & UT_WORKQ_COOPERATIVE) != 0;
1149 }
1150
1151 bool
workq_thread_is_permanently_bound(struct uthread * uth)1152 workq_thread_is_permanently_bound(struct uthread *uth)
1153 {
1154 return (uth->uu_workq_flags & UT_WORKQ_PERMANENT_BIND) != 0;
1155 }
1156
1157 static inline void
workq_thread_set_type(struct uthread * uth,uint16_t flags)1158 workq_thread_set_type(struct uthread *uth, uint16_t flags)
1159 {
1160 uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1161 uth->uu_workq_flags |= flags;
1162 }
1163
1164
1165 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
1166
1167 __attribute__((noreturn, noinline))
1168 static void
workq_unpark_for_death_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t death_flags,uint32_t setup_flags)1169 workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq,
1170 struct uthread *uth, uint32_t death_flags, uint32_t setup_flags)
1171 {
1172 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
1173 bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW;
1174
1175 if (qos > WORKQ_THREAD_QOS_CLEANUP) {
1176 workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true);
1177 qos = WORKQ_THREAD_QOS_CLEANUP;
1178 }
1179
1180 workq_thread_reset_cpupercent(NULL, uth);
1181
1182 if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
1183 wq->wq_thidlecount--;
1184 if (first_use) {
1185 TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
1186 } else {
1187 TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
1188 }
1189 }
1190 TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
1191
1192 workq_unlock(wq);
1193
1194 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
1195 __assert_only kern_return_t kr;
1196 kr = thread_set_voucher_name(MACH_PORT_NULL);
1197 assert(kr == KERN_SUCCESS);
1198 }
1199
1200 uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS;
1201 thread_t th = get_machthread(uth);
1202 vm_map_t vmap = get_task_map(proc_task(p));
1203
1204 if (!first_use) {
1205 flags |= WQ_FLAG_THREAD_REUSE;
1206 }
1207
1208 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
1209 uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, flags);
1210 __builtin_unreachable();
1211 }
1212
1213 bool
workq_is_current_thread_updating_turnstile(struct workqueue * wq)1214 workq_is_current_thread_updating_turnstile(struct workqueue *wq)
1215 {
1216 return wq->wq_turnstile_updater == current_thread();
1217 }
1218
1219 __attribute__((always_inline))
1220 static inline void
1221 workq_perform_turnstile_operation_locked(struct workqueue *wq,
1222 void (^operation)(void))
1223 {
1224 workq_lock_held(wq);
1225 wq->wq_turnstile_updater = current_thread();
1226 operation();
1227 wq->wq_turnstile_updater = THREAD_NULL;
1228 }
1229
1230 static void
workq_turnstile_update_inheritor(struct workqueue * wq,turnstile_inheritor_t inheritor,turnstile_update_flags_t flags)1231 workq_turnstile_update_inheritor(struct workqueue *wq,
1232 turnstile_inheritor_t inheritor,
1233 turnstile_update_flags_t flags)
1234 {
1235 if (wq->wq_inheritor == inheritor) {
1236 return;
1237 }
1238 wq->wq_inheritor = inheritor;
1239 workq_perform_turnstile_operation_locked(wq, ^{
1240 turnstile_update_inheritor(wq->wq_turnstile, inheritor,
1241 flags | TURNSTILE_IMMEDIATE_UPDATE);
1242 turnstile_update_inheritor_complete(wq->wq_turnstile,
1243 TURNSTILE_INTERLOCK_HELD);
1244 });
1245 }
1246
1247 static void
workq_push_idle_thread(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)1248 workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth,
1249 uint32_t setup_flags)
1250 {
1251 uint64_t now = mach_absolute_time();
1252 bool is_creator = (uth == wq->wq_creator);
1253
1254 if (workq_thread_is_cooperative(uth)) {
1255 assert(!is_creator);
1256
1257 thread_qos_t thread_qos = uth->uu_workq_pri.qos_req;
1258 _wq_cooperative_queue_scheduled_count_dec(wq, thread_qos);
1259
1260 /* Before we get here, we always go through
1261 * workq_select_threadreq_or_park_and_unlock. If we got here, it means
1262 * that we went through the logic in workq_threadreq_select which
1263 * did the refresh for the next best cooperative qos while
1264 * excluding the current thread - we shouldn't need to do it again.
1265 */
1266 assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
1267 } else if (workq_thread_is_nonovercommit(uth)) {
1268 assert(!is_creator);
1269
1270 wq->wq_constrained_threads_scheduled--;
1271 }
1272
1273 uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1274 TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
1275 wq->wq_threads_scheduled--;
1276
1277 if (is_creator) {
1278 wq->wq_creator = NULL;
1279 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0,
1280 uth->uu_save.uus_workq_park_data.yields);
1281 }
1282
1283 if (wq->wq_inheritor == get_machthread(uth)) {
1284 assert(wq->wq_creator == NULL);
1285 if (wq->wq_reqcount) {
1286 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
1287 } else {
1288 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
1289 }
1290 }
1291
1292 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
1293 assert(is_creator || (_wq_flags(wq) & WQ_EXITING));
1294 TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1295 wq->wq_thidlecount++;
1296 return;
1297 }
1298
1299 if (!is_creator) {
1300 _wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket);
1301 wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--;
1302 uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP;
1303 }
1304
1305 uth->uu_save.uus_workq_park_data.idle_stamp = now;
1306
1307 struct uthread *oldest = workq_oldest_killable_idle_thread(wq);
1308 uint16_t cur_idle = wq->wq_thidlecount;
1309
1310 if (cur_idle >= wq_max_constrained_threads ||
1311 (wq->wq_thdying_count == 0 && oldest &&
1312 workq_should_kill_idle_thread(wq, oldest, now))) {
1313 /*
1314 * Immediately kill threads if we have too may of them.
1315 *
1316 * And swap "place" with the oldest one we'd have woken up.
1317 * This is a relatively desperate situation where we really
1318 * need to kill threads quickly and it's best to kill
1319 * the one that's currently on core than context switching.
1320 */
1321 if (oldest) {
1322 oldest->uu_save.uus_workq_park_data.idle_stamp = now;
1323 TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry);
1324 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry);
1325 }
1326
1327 WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
1328 wq, cur_idle, 0, 0);
1329 wq->wq_thdying_count++;
1330 uth->uu_workq_flags |= UT_WORKQ_DYING;
1331 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
1332 workq_unpark_for_death_and_unlock(p, wq, uth, 0, setup_flags);
1333 __builtin_unreachable();
1334 }
1335
1336 struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
1337
1338 cur_idle += 1;
1339 wq->wq_thidlecount = cur_idle;
1340
1341 if (cur_idle >= wq_death_max_load && tail &&
1342 tail->uu_save.uus_workq_park_data.has_stack) {
1343 uth->uu_save.uus_workq_park_data.has_stack = false;
1344 TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry);
1345 } else {
1346 uth->uu_save.uus_workq_park_data.has_stack = true;
1347 TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry);
1348 }
1349
1350 if (!tail) {
1351 uint64_t delay = workq_kill_delay_for_idle_thread(wq);
1352 workq_death_call_schedule(wq, now + delay);
1353 }
1354 }
1355
1356 #pragma mark thread requests
1357
1358 static inline bool
workq_tr_is_overcommit(workq_tr_flags_t tr_flags)1359 workq_tr_is_overcommit(workq_tr_flags_t tr_flags)
1360 {
1361 return (tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) != 0;
1362 }
1363
1364 static inline bool
workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)1365 workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)
1366 {
1367 return (tr_flags & (WORKQ_TR_FLAG_OVERCOMMIT |
1368 WORKQ_TR_FLAG_COOPERATIVE |
1369 WORKQ_TR_FLAG_PERMANENT_BIND)) == 0;
1370 }
1371
1372 static inline bool
workq_tr_is_cooperative(workq_tr_flags_t tr_flags)1373 workq_tr_is_cooperative(workq_tr_flags_t tr_flags)
1374 {
1375 return (tr_flags & WORKQ_TR_FLAG_COOPERATIVE) != 0;
1376 }
1377
1378 #define workq_threadreq_is_overcommit(req) workq_tr_is_overcommit((req)->tr_flags)
1379 #define workq_threadreq_is_nonovercommit(req) workq_tr_is_nonovercommit((req)->tr_flags)
1380 #define workq_threadreq_is_cooperative(req) workq_tr_is_cooperative((req)->tr_flags)
1381
1382 static inline int
workq_priority_for_req(workq_threadreq_t req)1383 workq_priority_for_req(workq_threadreq_t req)
1384 {
1385 thread_qos_t qos = req->tr_qos;
1386
1387 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1388 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
1389 assert(trp.trp_flags & TRP_PRIORITY);
1390 return trp.trp_pri;
1391 }
1392 return thread_workq_pri_for_qos(qos);
1393 }
1394
1395 static inline struct priority_queue_sched_max *
workq_priority_queue_for_req(struct workqueue * wq,workq_threadreq_t req)1396 workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req)
1397 {
1398 assert(!workq_tr_is_cooperative(req->tr_flags));
1399
1400 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1401 return &wq->wq_special_queue;
1402 } else if (workq_tr_is_overcommit(req->tr_flags)) {
1403 return &wq->wq_overcommit_queue;
1404 } else {
1405 return &wq->wq_constrained_queue;
1406 }
1407 }
1408
1409 /* Calculates the number of threads scheduled >= the input QoS */
1410 static uint64_t
workq_num_cooperative_threads_scheduled_to_qos_internal(struct workqueue * wq,thread_qos_t qos)1411 workq_num_cooperative_threads_scheduled_to_qos_internal(struct workqueue *wq, thread_qos_t qos)
1412 {
1413 uint64_t num_cooperative_threads = 0;
1414
1415 for (thread_qos_t cur_qos = WORKQ_THREAD_QOS_MAX; cur_qos >= qos; cur_qos--) {
1416 uint8_t bucket = _wq_bucket(cur_qos);
1417 num_cooperative_threads += wq->wq_cooperative_queue_scheduled_count[bucket];
1418 }
1419
1420 return num_cooperative_threads;
1421 }
1422
1423 /* Calculates the number of threads scheduled >= the input QoS */
1424 static uint64_t
workq_num_cooperative_threads_scheduled_to_qos_locked(struct workqueue * wq,thread_qos_t qos)1425 workq_num_cooperative_threads_scheduled_to_qos_locked(struct workqueue *wq, thread_qos_t qos)
1426 {
1427 workq_lock_held(wq);
1428 return workq_num_cooperative_threads_scheduled_to_qos_internal(wq, qos);
1429 }
1430
1431 static uint64_t
workq_num_cooperative_threads_scheduled_total(struct workqueue * wq)1432 workq_num_cooperative_threads_scheduled_total(struct workqueue *wq)
1433 {
1434 return workq_num_cooperative_threads_scheduled_to_qos_locked(wq, WORKQ_THREAD_QOS_MIN);
1435 }
1436
1437 static bool
workq_has_cooperative_thread_requests(struct workqueue * wq)1438 workq_has_cooperative_thread_requests(struct workqueue *wq)
1439 {
1440 for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1441 uint8_t bucket = _wq_bucket(qos);
1442 if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1443 return true;
1444 }
1445 }
1446
1447 return false;
1448 }
1449
1450 /*
1451 * Determines the next QoS bucket we should service next in the cooperative
1452 * pool. This function will always return a QoS for cooperative pool as long as
1453 * there are requests to be serviced.
1454 *
1455 * Unlike the other thread pools, for the cooperative thread pool the schedule
1456 * counts for the various buckets in the pool affect the next best request for
1457 * it.
1458 *
1459 * This function is called in the following contexts:
1460 *
1461 * a) When determining the best thread QoS for cooperative bucket for the
1462 * creator/thread reuse
1463 *
1464 * b) Once (a) has happened and thread has bound to a thread request, figuring
1465 * out whether the next best request for this pool has changed so that creator
1466 * can be scheduled.
1467 *
1468 * Returns true if the cooperative queue's best qos changed from previous
1469 * value.
1470 */
1471 static bool
_wq_cooperative_queue_refresh_best_req_qos(struct workqueue * wq)1472 _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq)
1473 {
1474 workq_lock_held(wq);
1475
1476 thread_qos_t old_best_req_qos = wq->wq_cooperative_queue_best_req_qos;
1477
1478 /* We determine the next best cooperative thread request based on the
1479 * following:
1480 *
1481 * 1. Take the MAX of the following:
1482 * a) Highest qos with pending TRs such that number of scheduled
1483 * threads so far with >= qos is < wq_max_cooperative_threads
1484 * b) Highest qos bucket with pending TRs but no scheduled threads for that bucket
1485 *
1486 * 2. If the result of (1) is UN, then we pick the highest priority amongst
1487 * pending thread requests in the pool.
1488 *
1489 */
1490 thread_qos_t highest_qos_with_no_scheduled = THREAD_QOS_UNSPECIFIED;
1491 thread_qos_t highest_qos_req_with_width = THREAD_QOS_UNSPECIFIED;
1492
1493 thread_qos_t highest_qos_req = THREAD_QOS_UNSPECIFIED;
1494
1495 int scheduled_count_till_qos = 0;
1496
1497 for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1498 uint8_t bucket = _wq_bucket(qos);
1499 uint8_t scheduled_count_for_bucket = wq->wq_cooperative_queue_scheduled_count[bucket];
1500 scheduled_count_till_qos += scheduled_count_for_bucket;
1501
1502 if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1503 if (qos > highest_qos_req) {
1504 highest_qos_req = qos;
1505 }
1506 /*
1507 * The pool isn't saturated for threads at and above this QoS, and
1508 * this qos bucket has pending requests
1509 */
1510 if (scheduled_count_till_qos < wq_cooperative_queue_max_size(wq)) {
1511 if (qos > highest_qos_req_with_width) {
1512 highest_qos_req_with_width = qos;
1513 }
1514 }
1515
1516 /*
1517 * There are no threads scheduled for this bucket but there
1518 * is work pending, give it at least 1 thread
1519 */
1520 if (scheduled_count_for_bucket == 0) {
1521 if (qos > highest_qos_with_no_scheduled) {
1522 highest_qos_with_no_scheduled = qos;
1523 }
1524 }
1525 }
1526 }
1527
1528 wq->wq_cooperative_queue_best_req_qos = MAX(highest_qos_with_no_scheduled, highest_qos_req_with_width);
1529 if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1530 wq->wq_cooperative_queue_best_req_qos = highest_qos_req;
1531 }
1532
1533 #if MACH_ASSERT
1534 /* Assert that if we are showing up the next best req as UN, then there
1535 * actually is no thread request in the cooperative pool buckets */
1536 if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1537 assert(!workq_has_cooperative_thread_requests(wq));
1538 }
1539 #endif
1540
1541 return old_best_req_qos != wq->wq_cooperative_queue_best_req_qos;
1542 }
1543
1544 /*
1545 * Returns whether or not the input thread (or creator thread if uth is NULL)
1546 * should be allowed to work as part of the cooperative pool for the <input qos>
1547 * bucket.
1548 *
1549 * This function is called in a bunch of places:
1550 * a) Quantum expires for a thread and it is part of the cooperative pool
1551 * b) When trying to pick a thread request for the creator thread to
1552 * represent.
1553 * c) When a thread is trying to pick a thread request to actually bind to
1554 * and service.
1555 *
1556 * Called with workq lock held.
1557 */
1558
1559 #define WQ_COOPERATIVE_POOL_UNSATURATED 1
1560 #define WQ_COOPERATIVE_BUCKET_UNSERVICED 2
1561 #define WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS 3
1562
1563 static bool
workq_cooperative_allowance(struct workqueue * wq,thread_qos_t qos,struct uthread * uth,bool may_start_timer)1564 workq_cooperative_allowance(struct workqueue *wq, thread_qos_t qos, struct uthread *uth,
1565 bool may_start_timer)
1566 {
1567 workq_lock_held(wq);
1568
1569 bool exclude_thread_as_scheduled = false;
1570 bool passed_admissions = false;
1571 uint8_t bucket = _wq_bucket(qos);
1572
1573 if (uth && workq_thread_is_cooperative(uth)) {
1574 exclude_thread_as_scheduled = true;
1575 _wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req);
1576 }
1577
1578 /*
1579 * We have not saturated the pool yet, let this thread continue
1580 */
1581 uint64_t total_cooperative_threads;
1582 total_cooperative_threads = workq_num_cooperative_threads_scheduled_total(wq);
1583 if (total_cooperative_threads < wq_cooperative_queue_max_size(wq)) {
1584 passed_admissions = true;
1585 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1586 total_cooperative_threads, qos, passed_admissions,
1587 WQ_COOPERATIVE_POOL_UNSATURATED);
1588 goto out;
1589 }
1590
1591 /*
1592 * Without this thread, nothing is servicing the bucket which has pending
1593 * work
1594 */
1595 uint64_t bucket_scheduled = wq->wq_cooperative_queue_scheduled_count[bucket];
1596 if (bucket_scheduled == 0 &&
1597 !STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1598 passed_admissions = true;
1599 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1600 total_cooperative_threads, qos, passed_admissions,
1601 WQ_COOPERATIVE_BUCKET_UNSERVICED);
1602 goto out;
1603 }
1604
1605 /*
1606 * If number of threads at the QoS bucket >= input QoS exceeds the max we want
1607 * for the pool, deny this thread
1608 */
1609 uint64_t aggregate_down_to_qos = workq_num_cooperative_threads_scheduled_to_qos_locked(wq, qos);
1610 passed_admissions = (aggregate_down_to_qos < wq_cooperative_queue_max_size(wq));
1611 WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, aggregate_down_to_qos,
1612 qos, passed_admissions, WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS);
1613
1614 if (!passed_admissions && may_start_timer) {
1615 workq_schedule_delayed_thread_creation(wq, 0);
1616 }
1617
1618 out:
1619 if (exclude_thread_as_scheduled) {
1620 _wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req);
1621 }
1622 return passed_admissions;
1623 }
1624
1625 /*
1626 * returns true if the best request for the pool changed as a result of
1627 * enqueuing this thread request.
1628 */
1629 static bool
workq_threadreq_enqueue(struct workqueue * wq,workq_threadreq_t req)1630 workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req)
1631 {
1632 assert(req->tr_state == WORKQ_TR_STATE_NEW);
1633
1634 req->tr_state = WORKQ_TR_STATE_QUEUED;
1635 wq->wq_reqcount += req->tr_count;
1636
1637 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1638 assert(wq->wq_event_manager_threadreq == NULL);
1639 assert(req->tr_flags & WORKQ_TR_FLAG_KEVENT);
1640 assert(req->tr_count == 1);
1641 wq->wq_event_manager_threadreq = req;
1642 return true;
1643 }
1644
1645 if (workq_threadreq_is_cooperative(req)) {
1646 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1647 assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1648
1649 struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1650 STAILQ_INSERT_TAIL(bucket, req, tr_link);
1651
1652 return _wq_cooperative_queue_refresh_best_req_qos(wq);
1653 }
1654
1655 struct priority_queue_sched_max *q = workq_priority_queue_for_req(wq, req);
1656
1657 priority_queue_entry_set_sched_pri(q, &req->tr_entry,
1658 workq_priority_for_req(req), false);
1659
1660 if (priority_queue_insert(q, &req->tr_entry)) {
1661 if (workq_threadreq_is_nonovercommit(req)) {
1662 _wq_thactive_refresh_best_constrained_req_qos(wq);
1663 }
1664 return true;
1665 }
1666 return false;
1667 }
1668
1669 /*
1670 * returns true if one of the following is true (so as to update creator if
1671 * needed):
1672 *
1673 * (a) the next highest request of the pool we dequeued the request from changed
1674 * (b) the next highest requests of the pool the current thread used to be a
1675 * part of, changed
1676 *
1677 * For overcommit, special and constrained pools, the next highest QoS for each
1678 * pool just a MAX of pending requests so tracking (a) is sufficient.
1679 *
1680 * But for cooperative thread pool, the next highest QoS for the pool depends on
1681 * schedule counts in the pool as well. So if the current thread used to be
1682 * cooperative in it's previous logical run ie (b), then that can also affect
1683 * cooperative pool's next best QoS requests.
1684 */
1685 static bool
workq_threadreq_dequeue(struct workqueue * wq,workq_threadreq_t req,bool cooperative_sched_count_changed)1686 workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req,
1687 bool cooperative_sched_count_changed)
1688 {
1689 wq->wq_reqcount--;
1690
1691 bool next_highest_request_changed = false;
1692
1693 if (--req->tr_count == 0) {
1694 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1695 assert(wq->wq_event_manager_threadreq == req);
1696 assert(req->tr_count == 0);
1697 wq->wq_event_manager_threadreq = NULL;
1698
1699 /* If a cooperative thread was the one which picked up the manager
1700 * thread request, we need to reevaluate the cooperative pool
1701 * anyways.
1702 */
1703 if (cooperative_sched_count_changed) {
1704 _wq_cooperative_queue_refresh_best_req_qos(wq);
1705 }
1706 return true;
1707 }
1708
1709 if (workq_threadreq_is_cooperative(req)) {
1710 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1711 assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1712 /* Account for the fact that BG and MT are coalesced when
1713 * calculating best request for cooperative pool
1714 */
1715 assert(_wq_bucket(req->tr_qos) == _wq_bucket(wq->wq_cooperative_queue_best_req_qos));
1716
1717 struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1718 __assert_only workq_threadreq_t head = STAILQ_FIRST(bucket);
1719
1720 assert(head == req);
1721 STAILQ_REMOVE_HEAD(bucket, tr_link);
1722
1723 /*
1724 * If the request we're dequeueing is cooperative, then the sched
1725 * counts definitely changed.
1726 */
1727 assert(cooperative_sched_count_changed);
1728 }
1729
1730 /*
1731 * We want to do the cooperative pool refresh after dequeueing a
1732 * cooperative thread request if any (to combine both effects into 1
1733 * refresh operation)
1734 */
1735 if (cooperative_sched_count_changed) {
1736 next_highest_request_changed = _wq_cooperative_queue_refresh_best_req_qos(wq);
1737 }
1738
1739 if (!workq_threadreq_is_cooperative(req)) {
1740 /*
1741 * All other types of requests are enqueued in priority queues
1742 */
1743
1744 if (priority_queue_remove(workq_priority_queue_for_req(wq, req),
1745 &req->tr_entry)) {
1746 next_highest_request_changed |= true;
1747 if (workq_threadreq_is_nonovercommit(req)) {
1748 _wq_thactive_refresh_best_constrained_req_qos(wq);
1749 }
1750 }
1751 }
1752 }
1753
1754 return next_highest_request_changed;
1755 }
1756
1757 static void
workq_threadreq_destroy(proc_t p,workq_threadreq_t req)1758 workq_threadreq_destroy(proc_t p, workq_threadreq_t req)
1759 {
1760 req->tr_state = WORKQ_TR_STATE_CANCELED;
1761 if (req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)) {
1762 kqueue_threadreq_cancel(p, req);
1763 } else {
1764 zfree(workq_zone_threadreq, req);
1765 }
1766 }
1767
1768 #pragma mark workqueue thread creation thread calls
1769
1770 static inline bool
workq_thread_call_prepost(struct workqueue * wq,uint32_t sched,uint32_t pend,uint32_t fail_mask)1771 workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend,
1772 uint32_t fail_mask)
1773 {
1774 uint32_t old_flags, new_flags;
1775
1776 os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, {
1777 if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) {
1778 os_atomic_rmw_loop_give_up(return false);
1779 }
1780 if (__improbable(old_flags & WQ_PROC_SUSPENDED)) {
1781 new_flags = old_flags | pend;
1782 } else {
1783 new_flags = old_flags | sched;
1784 }
1785 });
1786
1787 return (old_flags & WQ_PROC_SUSPENDED) == 0;
1788 }
1789
1790 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1791
1792 static bool
workq_schedule_delayed_thread_creation(struct workqueue * wq,int flags)1793 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags)
1794 {
1795 assert(!preemption_enabled());
1796
1797 if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED,
1798 WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED |
1799 WQ_IMMEDIATE_CALL_SCHEDULED)) {
1800 return false;
1801 }
1802
1803 uint64_t now = mach_absolute_time();
1804
1805 if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) {
1806 /* do not change the window */
1807 } else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) {
1808 wq->wq_timer_interval *= 2;
1809 if (wq->wq_timer_interval > wq_max_timer_interval.abstime) {
1810 wq->wq_timer_interval = (uint32_t)wq_max_timer_interval.abstime;
1811 }
1812 } else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) {
1813 wq->wq_timer_interval /= 2;
1814 if (wq->wq_timer_interval < wq_stalled_window.abstime) {
1815 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
1816 }
1817 }
1818
1819 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1820 _wq_flags(wq), wq->wq_timer_interval);
1821
1822 thread_call_t call = wq->wq_delayed_call;
1823 uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED;
1824 uint64_t deadline = now + wq->wq_timer_interval;
1825 if (thread_call_enter1_delayed(call, (void *)arg, deadline)) {
1826 panic("delayed_call was already enqueued");
1827 }
1828 return true;
1829 }
1830
1831 static void
workq_schedule_immediate_thread_creation(struct workqueue * wq)1832 workq_schedule_immediate_thread_creation(struct workqueue *wq)
1833 {
1834 assert(!preemption_enabled());
1835
1836 if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED,
1837 WQ_IMMEDIATE_CALL_PENDED, 0)) {
1838 WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1839 _wq_flags(wq), 0);
1840
1841 uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED;
1842 if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) {
1843 panic("immediate_call was already enqueued");
1844 }
1845 }
1846 }
1847
1848 void
workq_proc_suspended(struct proc * p)1849 workq_proc_suspended(struct proc *p)
1850 {
1851 struct workqueue *wq = proc_get_wqptr(p);
1852
1853 if (wq) {
1854 os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed);
1855 }
1856 }
1857
1858 void
workq_proc_resumed(struct proc * p)1859 workq_proc_resumed(struct proc *p)
1860 {
1861 struct workqueue *wq = proc_get_wqptr(p);
1862 uint32_t wq_flags;
1863
1864 if (!wq) {
1865 return;
1866 }
1867
1868 wq_flags = os_atomic_andnot_orig(&wq->wq_flags, WQ_PROC_SUSPENDED |
1869 WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED, relaxed);
1870 if ((wq_flags & WQ_EXITING) == 0) {
1871 disable_preemption();
1872 if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) {
1873 workq_schedule_immediate_thread_creation(wq);
1874 } else if (wq_flags & WQ_DELAYED_CALL_PENDED) {
1875 workq_schedule_delayed_thread_creation(wq,
1876 WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART);
1877 }
1878 enable_preemption();
1879 }
1880 }
1881
1882 /**
1883 * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1884 */
1885 static bool
workq_thread_is_busy(uint64_t now,_Atomic uint64_t * lastblocked_tsp)1886 workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp)
1887 {
1888 uint64_t lastblocked_ts = os_atomic_load_wide(lastblocked_tsp, relaxed);
1889 if (now <= lastblocked_ts) {
1890 /*
1891 * Because the update of the timestamp when a thread blocks
1892 * isn't serialized against us looking at it (i.e. we don't hold
1893 * the workq lock), it's possible to have a timestamp that matches
1894 * the current time or that even looks to be in the future relative
1895 * to when we grabbed the current time...
1896 *
1897 * Just treat this as a busy thread since it must have just blocked.
1898 */
1899 return true;
1900 }
1901 return (now - lastblocked_ts) < wq_stalled_window.abstime;
1902 }
1903
1904 static void
workq_add_new_threads_call(void * _p,void * flags)1905 workq_add_new_threads_call(void *_p, void *flags)
1906 {
1907 proc_t p = _p;
1908 struct workqueue *wq = proc_get_wqptr(p);
1909 uint32_t my_flag = (uint32_t)(uintptr_t)flags;
1910
1911 /*
1912 * workq_exit() will set the workqueue to NULL before
1913 * it cancels thread calls.
1914 */
1915 if (!wq) {
1916 return;
1917 }
1918
1919 assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) ||
1920 (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED));
1921
1922 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq),
1923 wq->wq_nthreads, wq->wq_thidlecount);
1924
1925 workq_lock_spin(wq);
1926
1927 wq->wq_thread_call_last_run = mach_absolute_time();
1928 os_atomic_andnot(&wq->wq_flags, my_flag, release);
1929
1930 /* This can drop the workqueue lock, and take it again */
1931 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
1932
1933 workq_unlock(wq);
1934
1935 WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0,
1936 wq->wq_nthreads, wq->wq_thidlecount);
1937 }
1938
1939 #pragma mark thread state tracking
1940
1941 static void
workq_sched_callback(int type,thread_t thread)1942 workq_sched_callback(int type, thread_t thread)
1943 {
1944 thread_ro_t tro = get_thread_ro(thread);
1945 struct uthread *uth = get_bsdthread_info(thread);
1946 struct workqueue *wq = proc_get_wqptr(tro->tro_proc);
1947 thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket;
1948 wq_thactive_t old_thactive;
1949 bool start_timer = false;
1950
1951 if (qos == WORKQ_THREAD_QOS_MANAGER) {
1952 return;
1953 }
1954
1955 switch (type) {
1956 case SCHED_CALL_BLOCK:
1957 old_thactive = _wq_thactive_dec(wq, qos);
1958 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1959
1960 /*
1961 * Remember the timestamp of the last thread that blocked in this
1962 * bucket, it used used by admission checks to ignore one thread
1963 * being inactive if this timestamp is recent enough.
1964 *
1965 * If we collide with another thread trying to update the
1966 * last_blocked (really unlikely since another thread would have to
1967 * get scheduled and then block after we start down this path), it's
1968 * not a problem. Either timestamp is adequate, so no need to retry
1969 */
1970 os_atomic_store_wide(&wq->wq_lastblocked_ts[_wq_bucket(qos)],
1971 thread_last_run_time(thread), relaxed);
1972
1973 if (req_qos == THREAD_QOS_UNSPECIFIED) {
1974 /*
1975 * No pending request at the moment we could unblock, move on.
1976 */
1977 } else if (qos < req_qos) {
1978 /*
1979 * The blocking thread is at a lower QoS than the highest currently
1980 * pending constrained request, nothing has to be redriven
1981 */
1982 } else {
1983 uint32_t max_busycount, old_req_count;
1984 old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
1985 req_qos, NULL, &max_busycount);
1986 /*
1987 * If it is possible that may_start_constrained_thread had refused
1988 * admission due to being over the max concurrency, we may need to
1989 * spin up a new thread.
1990 *
1991 * We take into account the maximum number of busy threads
1992 * that can affect may_start_constrained_thread as looking at the
1993 * actual number may_start_constrained_thread will see is racy.
1994 *
1995 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1996 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1997 */
1998 uint32_t conc = wq_max_parallelism[_wq_bucket(qos)];
1999 if (old_req_count <= conc && conc <= old_req_count + max_busycount) {
2000 start_timer = workq_schedule_delayed_thread_creation(wq, 0);
2001 }
2002 }
2003 if (__improbable(kdebug_enable)) {
2004 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
2005 old_thactive, qos, NULL, NULL);
2006 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq,
2007 old - 1, qos | (req_qos << 8),
2008 wq->wq_reqcount << 1 | start_timer);
2009 }
2010 break;
2011
2012 case SCHED_CALL_UNBLOCK:
2013 /*
2014 * we cannot take the workqueue_lock here...
2015 * an UNBLOCK can occur from a timer event which
2016 * is run from an interrupt context... if the workqueue_lock
2017 * is already held by this processor, we'll deadlock...
2018 * the thread lock for the thread being UNBLOCKED
2019 * is also held
2020 */
2021 old_thactive = _wq_thactive_inc(wq, qos);
2022 if (__improbable(kdebug_enable)) {
2023 __unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
2024 old_thactive, qos, NULL, NULL);
2025 req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
2026 WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq,
2027 old + 1, qos | (req_qos << 8),
2028 wq->wq_threads_scheduled);
2029 }
2030 break;
2031 }
2032 }
2033
2034 #pragma mark workq lifecycle
2035
2036 void
workq_reference(struct workqueue * wq)2037 workq_reference(struct workqueue *wq)
2038 {
2039 os_ref_retain(&wq->wq_refcnt);
2040 }
2041
2042 static void
workq_deallocate_queue_invoke(mpsc_queue_chain_t e,__assert_only mpsc_daemon_queue_t dq)2043 workq_deallocate_queue_invoke(mpsc_queue_chain_t e,
2044 __assert_only mpsc_daemon_queue_t dq)
2045 {
2046 struct workqueue *wq;
2047 struct turnstile *ts;
2048
2049 wq = mpsc_queue_element(e, struct workqueue, wq_destroy_link);
2050 assert(dq == &workq_deallocate_queue);
2051
2052 turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts, TURNSTILE_WORKQS);
2053 assert(ts);
2054 turnstile_cleanup();
2055 turnstile_deallocate(ts);
2056
2057 lck_ticket_destroy(&wq->wq_lock, &workq_lck_grp);
2058 zfree(workq_zone_workqueue, wq);
2059 }
2060
2061 static void
workq_deallocate(struct workqueue * wq)2062 workq_deallocate(struct workqueue *wq)
2063 {
2064 if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) {
2065 workq_deallocate_queue_invoke(&wq->wq_destroy_link,
2066 &workq_deallocate_queue);
2067 }
2068 }
2069
2070 void
workq_deallocate_safe(struct workqueue * wq)2071 workq_deallocate_safe(struct workqueue *wq)
2072 {
2073 if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) {
2074 mpsc_daemon_enqueue(&workq_deallocate_queue, &wq->wq_destroy_link,
2075 MPSC_QUEUE_DISABLE_PREEMPTION);
2076 }
2077 }
2078
2079 /**
2080 * Setup per-process state for the workqueue.
2081 */
2082 int
workq_open(struct proc * p,__unused struct workq_open_args * uap,__unused int32_t * retval)2083 workq_open(struct proc *p, __unused struct workq_open_args *uap,
2084 __unused int32_t *retval)
2085 {
2086 struct workqueue *wq;
2087 int error = 0;
2088
2089 if ((p->p_lflag & P_LREGISTER) == 0) {
2090 return EINVAL;
2091 }
2092
2093 if (wq_init_constrained_limit) {
2094 uint32_t limit, num_cpus = ml_wait_max_cpus();
2095
2096 /*
2097 * set up the limit for the constrained pool
2098 * this is a virtual pool in that we don't
2099 * maintain it on a separate idle and run list
2100 */
2101 limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR;
2102
2103 if (limit > wq_max_constrained_threads) {
2104 wq_max_constrained_threads = limit;
2105 }
2106
2107 if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) {
2108 wq_max_threads = WQ_THACTIVE_BUCKET_HALF;
2109 }
2110 if (wq_max_threads > CONFIG_THREAD_MAX - 20) {
2111 wq_max_threads = CONFIG_THREAD_MAX - 20;
2112 }
2113
2114 wq_death_max_load = (uint16_t)fls(num_cpus) + 1;
2115
2116 for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) {
2117 wq_max_parallelism[_wq_bucket(qos)] =
2118 qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL);
2119 }
2120
2121 wq_max_cooperative_threads = num_cpus;
2122
2123 wq_init_constrained_limit = 0;
2124 }
2125
2126 if (proc_get_wqptr(p) == NULL) {
2127 if (proc_init_wqptr_or_wait(p) == FALSE) {
2128 assert(proc_get_wqptr(p) != NULL);
2129 goto out;
2130 }
2131
2132 wq = zalloc_flags(workq_zone_workqueue, Z_WAITOK | Z_ZERO);
2133
2134 os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1);
2135
2136 // Start the event manager at the priority hinted at by the policy engine
2137 thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task());
2138 pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0);
2139 wq->wq_event_manager_priority = (uint32_t)pp;
2140 wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
2141 wq->wq_proc = p;
2142 turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(),
2143 TURNSTILE_WORKQS);
2144
2145 TAILQ_INIT(&wq->wq_thrunlist);
2146 TAILQ_INIT(&wq->wq_thnewlist);
2147 TAILQ_INIT(&wq->wq_thidlelist);
2148 priority_queue_init(&wq->wq_overcommit_queue);
2149 priority_queue_init(&wq->wq_constrained_queue);
2150 priority_queue_init(&wq->wq_special_queue);
2151 for (int bucket = 0; bucket < WORKQ_NUM_QOS_BUCKETS; bucket++) {
2152 STAILQ_INIT(&wq->wq_cooperative_queue[bucket]);
2153 }
2154
2155 /* We are only using the delayed thread call for the constrained pool
2156 * which can't have work at >= UI QoS and so we can be fine with a
2157 * UI QoS thread call.
2158 */
2159 wq->wq_delayed_call = thread_call_allocate_with_qos(
2160 workq_add_new_threads_call, p, THREAD_QOS_USER_INTERACTIVE,
2161 THREAD_CALL_OPTIONS_ONCE);
2162 wq->wq_immediate_call = thread_call_allocate_with_options(
2163 workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL,
2164 THREAD_CALL_OPTIONS_ONCE);
2165 wq->wq_death_call = thread_call_allocate_with_options(
2166 workq_kill_old_threads_call, wq,
2167 THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
2168
2169 lck_ticket_init(&wq->wq_lock, &workq_lck_grp);
2170
2171 WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq,
2172 VM_KERNEL_ADDRHIDE(wq), 0, 0);
2173 proc_set_wqptr(p, wq);
2174 }
2175 out:
2176
2177 return error;
2178 }
2179
2180 /*
2181 * Routine: workq_mark_exiting
2182 *
2183 * Function: Mark the work queue such that new threads will not be added to the
2184 * work queue after we return.
2185 *
2186 * Conditions: Called against the current process.
2187 */
2188 void
workq_mark_exiting(struct proc * p)2189 workq_mark_exiting(struct proc *p)
2190 {
2191 struct workqueue *wq = proc_get_wqptr(p);
2192 uint32_t wq_flags;
2193 workq_threadreq_t mgr_req;
2194
2195 if (!wq) {
2196 return;
2197 }
2198
2199 WQ_TRACE_WQ(TRACE_wq_pthread_exit | DBG_FUNC_START, wq, 0, 0, 0);
2200
2201 workq_lock_spin(wq);
2202
2203 wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed);
2204 if (__improbable(wq_flags & WQ_EXITING)) {
2205 panic("workq_mark_exiting called twice");
2206 }
2207
2208 /*
2209 * Opportunistically try to cancel thread calls that are likely in flight.
2210 * workq_exit() will do the proper cleanup.
2211 */
2212 if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) {
2213 thread_call_cancel(wq->wq_immediate_call);
2214 }
2215 if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) {
2216 thread_call_cancel(wq->wq_delayed_call);
2217 }
2218 if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
2219 thread_call_cancel(wq->wq_death_call);
2220 }
2221
2222 mgr_req = wq->wq_event_manager_threadreq;
2223 wq->wq_event_manager_threadreq = NULL;
2224 wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */
2225 wq->wq_creator = NULL;
2226 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
2227
2228 workq_unlock(wq);
2229
2230 if (mgr_req) {
2231 kqueue_threadreq_cancel(p, mgr_req);
2232 }
2233 /*
2234 * No one touches the priority queues once WQ_EXITING is set.
2235 * It is hence safe to do the tear down without holding any lock.
2236 */
2237 priority_queue_destroy(&wq->wq_overcommit_queue,
2238 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2239 workq_threadreq_destroy(p, e);
2240 });
2241 priority_queue_destroy(&wq->wq_constrained_queue,
2242 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2243 workq_threadreq_destroy(p, e);
2244 });
2245 priority_queue_destroy(&wq->wq_special_queue,
2246 struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2247 workq_threadreq_destroy(p, e);
2248 });
2249
2250 WQ_TRACE(TRACE_wq_pthread_exit | DBG_FUNC_END, 0, 0, 0, 0);
2251 }
2252
2253 /*
2254 * Routine: workq_exit
2255 *
2256 * Function: clean up the work queue structure(s) now that there are no threads
2257 * left running inside the work queue (except possibly current_thread).
2258 *
2259 * Conditions: Called by the last thread in the process.
2260 * Called against current process.
2261 */
2262 void
workq_exit(struct proc * p)2263 workq_exit(struct proc *p)
2264 {
2265 struct workqueue *wq;
2266 struct uthread *uth, *tmp;
2267
2268 wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed);
2269 if (wq != NULL) {
2270 thread_t th = current_thread();
2271
2272 WQ_TRACE_WQ(TRACE_wq_workqueue_exit | DBG_FUNC_START, wq, 0, 0, 0);
2273
2274 if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) {
2275 /*
2276 * <rdar://problem/40111515> Make sure we will no longer call the
2277 * sched call, if we ever block this thread, which the cancel_wait
2278 * below can do.
2279 */
2280 thread_sched_call(th, NULL);
2281 }
2282
2283 /*
2284 * Thread calls are always scheduled by the proc itself or under the
2285 * workqueue spinlock if WQ_EXITING is not yet set.
2286 *
2287 * Either way, when this runs, the proc has no threads left beside
2288 * the one running this very code, so we know no thread call can be
2289 * dispatched anymore.
2290 */
2291 thread_call_cancel_wait(wq->wq_delayed_call);
2292 thread_call_cancel_wait(wq->wq_immediate_call);
2293 thread_call_cancel_wait(wq->wq_death_call);
2294 thread_call_free(wq->wq_delayed_call);
2295 thread_call_free(wq->wq_immediate_call);
2296 thread_call_free(wq->wq_death_call);
2297
2298 /*
2299 * Clean up workqueue data structures for threads that exited and
2300 * didn't get a chance to clean up after themselves.
2301 *
2302 * idle/new threads should have been interrupted and died on their own
2303 */
2304 TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) {
2305 thread_t mth = get_machthread(uth);
2306 thread_sched_call(mth, NULL);
2307 thread_deallocate(mth);
2308 }
2309 assert(TAILQ_EMPTY(&wq->wq_thnewlist));
2310 assert(TAILQ_EMPTY(&wq->wq_thidlelist));
2311
2312 WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq,
2313 VM_KERNEL_ADDRHIDE(wq), 0, 0);
2314
2315 workq_deallocate(wq);
2316
2317 WQ_TRACE(TRACE_wq_workqueue_exit | DBG_FUNC_END, 0, 0, 0, 0);
2318 }
2319 }
2320
2321
2322 #pragma mark bsd thread control
2323
2324 bool
bsdthread_part_of_cooperative_workqueue(struct uthread * uth)2325 bsdthread_part_of_cooperative_workqueue(struct uthread *uth)
2326 {
2327 return (workq_thread_is_cooperative(uth) || workq_thread_is_nonovercommit(uth)) &&
2328 (uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER) &&
2329 (!workq_thread_is_permanently_bound(uth));
2330 }
2331
2332 static bool
_pthread_priority_to_policy(pthread_priority_t priority,thread_qos_policy_data_t * data)2333 _pthread_priority_to_policy(pthread_priority_t priority,
2334 thread_qos_policy_data_t *data)
2335 {
2336 data->qos_tier = _pthread_priority_thread_qos(priority);
2337 data->tier_importance = _pthread_priority_relpri(priority);
2338 if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 ||
2339 data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) {
2340 return false;
2341 }
2342 return true;
2343 }
2344
2345 static int
bsdthread_set_self(proc_t p,thread_t th,pthread_priority_t priority,mach_port_name_t voucher,enum workq_set_self_flags flags)2346 bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority,
2347 mach_port_name_t voucher, enum workq_set_self_flags flags)
2348 {
2349 struct uthread *uth = get_bsdthread_info(th);
2350 struct workqueue *wq = proc_get_wqptr(p);
2351
2352 kern_return_t kr;
2353 int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0;
2354 bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE);
2355
2356 assert(th == current_thread());
2357 if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) {
2358 if (!is_wq_thread) {
2359 unbind_rv = EINVAL;
2360 goto qos;
2361 }
2362
2363 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
2364 unbind_rv = EINVAL;
2365 goto qos;
2366 }
2367
2368 workq_threadreq_t kqr = uth->uu_kqr_bound;
2369 if (kqr == NULL) {
2370 unbind_rv = EALREADY;
2371 goto qos;
2372 }
2373
2374 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2375 unbind_rv = EINVAL;
2376 goto qos;
2377 }
2378
2379 kqueue_threadreq_unbind(p, kqr);
2380 }
2381
2382 qos:
2383 if (flags & (WORKQ_SET_SELF_QOS_FLAG | WORKQ_SET_SELF_QOS_OVERRIDE_FLAG)) {
2384 assert(flags & WORKQ_SET_SELF_QOS_FLAG);
2385
2386 thread_qos_policy_data_t new_policy;
2387 thread_qos_t qos_override = THREAD_QOS_UNSPECIFIED;
2388
2389 if (!_pthread_priority_to_policy(priority, &new_policy)) {
2390 qos_rv = EINVAL;
2391 goto voucher;
2392 }
2393
2394 if (flags & WORKQ_SET_SELF_QOS_OVERRIDE_FLAG) {
2395 /*
2396 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is set, we definitely
2397 * should have an override QoS in the pthread_priority_t and we should
2398 * only come into this path for cooperative thread requests
2399 */
2400 if (!_pthread_priority_has_override_qos(priority) ||
2401 !_pthread_priority_is_cooperative(priority)) {
2402 qos_rv = EINVAL;
2403 goto voucher;
2404 }
2405 qos_override = _pthread_priority_thread_override_qos(priority);
2406 } else {
2407 /*
2408 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is not set, we definitely
2409 * should not have an override QoS in the pthread_priority_t
2410 */
2411 if (_pthread_priority_has_override_qos(priority)) {
2412 qos_rv = EINVAL;
2413 goto voucher;
2414 }
2415 }
2416
2417 if (!is_wq_thread) {
2418 /*
2419 * Threads opted out of QoS can't change QoS
2420 */
2421 if (!thread_has_qos_policy(th)) {
2422 qos_rv = EPERM;
2423 goto voucher;
2424 }
2425 } else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER ||
2426 uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_ABOVEUI) {
2427 /*
2428 * Workqueue manager threads or threads above UI can't change QoS
2429 */
2430 qos_rv = EINVAL;
2431 goto voucher;
2432 } else {
2433 /*
2434 * For workqueue threads, possibly adjust buckets and redrive thread
2435 * requests.
2436 *
2437 * Transitions allowed:
2438 *
2439 * overcommit --> non-overcommit
2440 * overcommit --> overcommit
2441 * non-overcommit --> non-overcommit
2442 * non-overcommit --> overcommit (to be deprecated later)
2443 * cooperative --> cooperative
2444 *
2445 * All other transitions aren't allowed so reject them.
2446 */
2447 if (workq_thread_is_overcommit(uth) && _pthread_priority_is_cooperative(priority)) {
2448 qos_rv = EINVAL;
2449 goto voucher;
2450 } else if (workq_thread_is_cooperative(uth) && !_pthread_priority_is_cooperative(priority)) {
2451 qos_rv = EINVAL;
2452 goto voucher;
2453 } else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_cooperative(priority)) {
2454 qos_rv = EINVAL;
2455 goto voucher;
2456 }
2457
2458 struct uu_workq_policy old_pri, new_pri;
2459 bool force_run = false;
2460
2461 if (qos_override) {
2462 /*
2463 * We're in the case of a thread clarifying that it is for eg. not IN
2464 * req QoS but rather, UT req QoS with IN override. However, this can
2465 * race with a concurrent override happening to the thread via
2466 * workq_thread_add_dispatch_override so this needs to be
2467 * synchronized with the thread mutex.
2468 */
2469 thread_mtx_lock(th);
2470 }
2471
2472 workq_lock_spin(wq);
2473
2474 old_pri = new_pri = uth->uu_workq_pri;
2475 new_pri.qos_req = (thread_qos_t)new_policy.qos_tier;
2476
2477 if (old_pri.qos_override < qos_override) {
2478 /*
2479 * Since this can race with a concurrent override via
2480 * workq_thread_add_dispatch_override, only adjust override value if we
2481 * are higher - this is a saturating function.
2482 *
2483 * We should not be changing the final override values, we should simply
2484 * be redistributing the current value with a different breakdown of req
2485 * vs override QoS - assert to that effect. Therefore, buckets should
2486 * not change.
2487 */
2488 new_pri.qos_override = qos_override;
2489 assert(workq_pri_override(new_pri) == workq_pri_override(old_pri));
2490 assert(workq_pri_bucket(new_pri) == workq_pri_bucket(old_pri));
2491 }
2492
2493 /* Adjust schedule counts for various types of transitions */
2494
2495 /* overcommit -> non-overcommit */
2496 if (workq_thread_is_overcommit(uth) && _pthread_priority_is_nonovercommit(priority)) {
2497 workq_thread_set_type(uth, 0);
2498 wq->wq_constrained_threads_scheduled++;
2499
2500 /* non-overcommit -> overcommit */
2501 } else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_overcommit(priority)) {
2502 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
2503 force_run = (wq->wq_constrained_threads_scheduled-- == wq_max_constrained_threads);
2504
2505 /* cooperative -> cooperative */
2506 } else if (workq_thread_is_cooperative(uth)) {
2507 _wq_cooperative_queue_scheduled_count_dec(wq, old_pri.qos_req);
2508 _wq_cooperative_queue_scheduled_count_inc(wq, new_pri.qos_req);
2509
2510 /* We're changing schedule counts within cooperative pool, we
2511 * need to refresh best cooperative QoS logic again */
2512 force_run = _wq_cooperative_queue_refresh_best_req_qos(wq);
2513 }
2514
2515 /*
2516 * This will set up an override on the thread if any and will also call
2517 * schedule_creator if needed
2518 */
2519 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run);
2520 workq_unlock(wq);
2521
2522 if (qos_override) {
2523 thread_mtx_unlock(th);
2524 }
2525
2526 if (workq_thread_is_overcommit(uth)) {
2527 thread_disarm_workqueue_quantum(th);
2528 } else {
2529 /* If the thread changed QoS buckets, the quantum duration
2530 * may have changed too */
2531 thread_arm_workqueue_quantum(th);
2532 }
2533 }
2534
2535 kr = thread_policy_set_internal(th, THREAD_QOS_POLICY,
2536 (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT);
2537 if (kr != KERN_SUCCESS) {
2538 qos_rv = EINVAL;
2539 }
2540 }
2541
2542 voucher:
2543 if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) {
2544 kr = thread_set_voucher_name(voucher);
2545 if (kr != KERN_SUCCESS) {
2546 voucher_rv = ENOENT;
2547 goto fixedpri;
2548 }
2549 }
2550
2551 fixedpri:
2552 if (qos_rv) {
2553 goto done;
2554 }
2555 if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) {
2556 thread_extended_policy_data_t extpol = {.timeshare = 0};
2557
2558 if (is_wq_thread) {
2559 /* Not allowed on workqueue threads */
2560 fixedpri_rv = ENOTSUP;
2561 goto done;
2562 }
2563
2564 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2565 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2566 if (kr != KERN_SUCCESS) {
2567 fixedpri_rv = EINVAL;
2568 goto done;
2569 }
2570 } else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) {
2571 thread_extended_policy_data_t extpol = {.timeshare = 1};
2572
2573 if (is_wq_thread) {
2574 /* Not allowed on workqueue threads */
2575 fixedpri_rv = ENOTSUP;
2576 goto done;
2577 }
2578
2579 kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2580 (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2581 if (kr != KERN_SUCCESS) {
2582 fixedpri_rv = EINVAL;
2583 goto done;
2584 }
2585 }
2586
2587 done:
2588 if (qos_rv && voucher_rv) {
2589 /* Both failed, give that a unique error. */
2590 return EBADMSG;
2591 }
2592
2593 if (unbind_rv) {
2594 return unbind_rv;
2595 }
2596
2597 if (qos_rv) {
2598 return qos_rv;
2599 }
2600
2601 if (voucher_rv) {
2602 return voucher_rv;
2603 }
2604
2605 if (fixedpri_rv) {
2606 return fixedpri_rv;
2607 }
2608
2609
2610 return 0;
2611 }
2612
2613 static int
bsdthread_add_explicit_override(proc_t p,mach_port_name_t kport,pthread_priority_t pp,user_addr_t resource)2614 bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport,
2615 pthread_priority_t pp, user_addr_t resource)
2616 {
2617 thread_qos_t qos = _pthread_priority_thread_qos(pp);
2618 if (qos == THREAD_QOS_UNSPECIFIED) {
2619 return EINVAL;
2620 }
2621
2622 thread_t th = port_name_to_thread(kport,
2623 PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2624 if (th == THREAD_NULL) {
2625 return ESRCH;
2626 }
2627
2628 int rv = proc_thread_qos_add_override(proc_task(p), th, 0, qos, TRUE,
2629 resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2630
2631 thread_deallocate(th);
2632 return rv;
2633 }
2634
2635 static int
bsdthread_remove_explicit_override(proc_t p,mach_port_name_t kport,user_addr_t resource)2636 bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport,
2637 user_addr_t resource)
2638 {
2639 thread_t th = port_name_to_thread(kport,
2640 PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2641 if (th == THREAD_NULL) {
2642 return ESRCH;
2643 }
2644
2645 int rv = proc_thread_qos_remove_override(proc_task(p), th, 0, resource,
2646 THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2647
2648 thread_deallocate(th);
2649 return rv;
2650 }
2651
2652 static int
workq_thread_add_dispatch_override(proc_t p,mach_port_name_t kport,pthread_priority_t pp,user_addr_t ulock_addr)2653 workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport,
2654 pthread_priority_t pp, user_addr_t ulock_addr)
2655 {
2656 struct uu_workq_policy old_pri, new_pri;
2657 struct workqueue *wq = proc_get_wqptr(p);
2658
2659 thread_qos_t qos_override = _pthread_priority_thread_qos(pp);
2660 if (qos_override == THREAD_QOS_UNSPECIFIED) {
2661 return EINVAL;
2662 }
2663
2664 thread_t thread = port_name_to_thread(kport,
2665 PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2666 if (thread == THREAD_NULL) {
2667 return ESRCH;
2668 }
2669
2670 struct uthread *uth = get_bsdthread_info(thread);
2671 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2672 thread_deallocate(thread);
2673 return EPERM;
2674 }
2675
2676 WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE,
2677 wq, thread_tid(thread), 1, pp);
2678
2679 thread_mtx_lock(thread);
2680
2681 if (ulock_addr) {
2682 uint32_t val;
2683 int rc;
2684 vm_fault_disable();
2685 rc = copyin_atomic32(ulock_addr, &val);
2686 vm_fault_enable();
2687 if (rc == 0 && ulock_owner_value_to_port_name(val) != kport) {
2688 goto out;
2689 }
2690 }
2691
2692 workq_lock_spin(wq);
2693
2694 old_pri = uth->uu_workq_pri;
2695 if (old_pri.qos_override >= qos_override) {
2696 /* Nothing to do */
2697 } else if (thread == current_thread()) {
2698 new_pri = old_pri;
2699 new_pri.qos_override = qos_override;
2700 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2701 } else {
2702 uth->uu_workq_pri.qos_override = qos_override;
2703 if (qos_override > workq_pri_override(old_pri)) {
2704 thread_set_workq_override(thread, qos_override);
2705 }
2706 }
2707
2708 workq_unlock(wq);
2709
2710 out:
2711 thread_mtx_unlock(thread);
2712 thread_deallocate(thread);
2713 return 0;
2714 }
2715
2716 static int
workq_thread_reset_dispatch_override(proc_t p,thread_t thread)2717 workq_thread_reset_dispatch_override(proc_t p, thread_t thread)
2718 {
2719 struct uu_workq_policy old_pri, new_pri;
2720 struct workqueue *wq = proc_get_wqptr(p);
2721 struct uthread *uth = get_bsdthread_info(thread);
2722
2723 if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2724 return EPERM;
2725 }
2726
2727 WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0);
2728
2729 /*
2730 * workq_thread_add_dispatch_override takes the thread mutex before doing the
2731 * copyin to validate the drainer and apply the override. We need to do the
2732 * same here. See rdar://84472518
2733 */
2734 thread_mtx_lock(thread);
2735
2736 workq_lock_spin(wq);
2737 old_pri = new_pri = uth->uu_workq_pri;
2738 new_pri.qos_override = THREAD_QOS_UNSPECIFIED;
2739 workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2740 workq_unlock(wq);
2741
2742 thread_mtx_unlock(thread);
2743 return 0;
2744 }
2745
2746 static int
workq_thread_allow_kill(__unused proc_t p,thread_t thread,bool enable)2747 workq_thread_allow_kill(__unused proc_t p, thread_t thread, bool enable)
2748 {
2749 if (!(thread_get_tag(thread) & THREAD_TAG_WORKQUEUE)) {
2750 // If the thread isn't a workqueue thread, don't set the
2751 // kill_allowed bit; however, we still need to return 0
2752 // instead of an error code since this code is executed
2753 // on the abort path which needs to not depend on the
2754 // pthread_t (returning an error depends on pthread_t via
2755 // cerror_nocancel)
2756 return 0;
2757 }
2758 struct uthread *uth = get_bsdthread_info(thread);
2759 uth->uu_workq_pthread_kill_allowed = enable;
2760 return 0;
2761 }
2762
2763 static int
workq_allow_sigmask(proc_t p,sigset_t mask)2764 workq_allow_sigmask(proc_t p, sigset_t mask)
2765 {
2766 if (mask & workq_threadmask) {
2767 return EINVAL;
2768 }
2769
2770 proc_lock(p);
2771 p->p_workq_allow_sigmask |= mask;
2772 proc_unlock(p);
2773
2774 return 0;
2775 }
2776
2777 static int
bsdthread_get_max_parallelism(thread_qos_t qos,unsigned long flags,int * retval)2778 bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags,
2779 int *retval)
2780 {
2781 static_assert(QOS_PARALLELISM_COUNT_LOGICAL ==
2782 _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical");
2783 static_assert(QOS_PARALLELISM_REALTIME ==
2784 _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime");
2785 static_assert(QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE ==
2786 _PTHREAD_QOS_PARALLELISM_CLUSTER_SHARED_RSRC, "cluster shared resource");
2787
2788 if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL | QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE)) {
2789 return EINVAL;
2790 }
2791
2792 /* No units are present */
2793 if (flags & QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE) {
2794 return ENOTSUP;
2795 }
2796
2797 if (flags & QOS_PARALLELISM_REALTIME) {
2798 if (qos) {
2799 return EINVAL;
2800 }
2801 } else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) {
2802 return EINVAL;
2803 }
2804
2805 *retval = qos_max_parallelism(qos, flags);
2806 return 0;
2807 }
2808
2809 static int
bsdthread_dispatch_apply_attr(__unused struct proc * p,thread_t thread,unsigned long flags,uint64_t value1,__unused uint64_t value2)2810 bsdthread_dispatch_apply_attr(__unused struct proc *p, thread_t thread,
2811 unsigned long flags, uint64_t value1, __unused uint64_t value2)
2812 {
2813 uint32_t apply_worker_index;
2814 kern_return_t kr;
2815
2816 switch (flags) {
2817 case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_SET:
2818 apply_worker_index = (uint32_t)value1;
2819 kr = thread_shared_rsrc_policy_set(thread, apply_worker_index, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2820 /*
2821 * KERN_INVALID_POLICY indicates that the thread was trying to bind to a
2822 * cluster which it was not eligible to execute on.
2823 */
2824 return (kr == KERN_SUCCESS) ? 0 : ((kr == KERN_INVALID_POLICY) ? ENOTSUP : EINVAL);
2825 case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_CLEAR:
2826 kr = thread_shared_rsrc_policy_clear(thread, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2827 return (kr == KERN_SUCCESS) ? 0 : EINVAL;
2828 default:
2829 return EINVAL;
2830 }
2831 }
2832
2833 #define ENSURE_UNUSED(arg) \
2834 ({ if ((arg) != 0) { return EINVAL; } })
2835
2836 int
bsdthread_ctl(struct proc * p,struct bsdthread_ctl_args * uap,int * retval)2837 bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval)
2838 {
2839 switch (uap->cmd) {
2840 case BSDTHREAD_CTL_QOS_OVERRIDE_START:
2841 return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1,
2842 (pthread_priority_t)uap->arg2, uap->arg3);
2843 case BSDTHREAD_CTL_QOS_OVERRIDE_END:
2844 ENSURE_UNUSED(uap->arg3);
2845 return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1,
2846 (user_addr_t)uap->arg2);
2847
2848 case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH:
2849 return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1,
2850 (pthread_priority_t)uap->arg2, uap->arg3);
2851 case BSDTHREAD_CTL_QOS_OVERRIDE_RESET:
2852 return workq_thread_reset_dispatch_override(p, current_thread());
2853
2854 case BSDTHREAD_CTL_SET_SELF:
2855 return bsdthread_set_self(p, current_thread(),
2856 (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2,
2857 (enum workq_set_self_flags)uap->arg3);
2858
2859 case BSDTHREAD_CTL_QOS_MAX_PARALLELISM:
2860 ENSURE_UNUSED(uap->arg3);
2861 return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1,
2862 (unsigned long)uap->arg2, retval);
2863 case BSDTHREAD_CTL_WORKQ_ALLOW_KILL:
2864 ENSURE_UNUSED(uap->arg2);
2865 ENSURE_UNUSED(uap->arg3);
2866 return workq_thread_allow_kill(p, current_thread(), (bool)uap->arg1);
2867 case BSDTHREAD_CTL_DISPATCH_APPLY_ATTR:
2868 return bsdthread_dispatch_apply_attr(p, current_thread(),
2869 (unsigned long)uap->arg1, (uint64_t)uap->arg2,
2870 (uint64_t)uap->arg3);
2871 case BSDTHREAD_CTL_WORKQ_ALLOW_SIGMASK:
2872 return workq_allow_sigmask(p, (int)uap->arg1);
2873 case BSDTHREAD_CTL_SET_QOS:
2874 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD:
2875 case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET:
2876 /* no longer supported */
2877 return ENOTSUP;
2878
2879 default:
2880 return EINVAL;
2881 }
2882 }
2883
2884 #pragma mark workqueue thread manipulation
2885
2886 static void __dead2
2887 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2888 struct uthread *uth, uint32_t setup_flags);
2889
2890 static void __dead2
2891 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2892 struct uthread *uth, uint32_t setup_flags);
2893
2894 static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2;
2895
2896 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2897 static inline uint64_t
workq_trace_req_id(workq_threadreq_t req)2898 workq_trace_req_id(workq_threadreq_t req)
2899 {
2900 struct kqworkloop *kqwl;
2901 if (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2902 kqwl = __container_of(req, struct kqworkloop, kqwl_request);
2903 return kqwl->kqwl_dynamicid;
2904 }
2905
2906 return VM_KERNEL_ADDRHIDE(req);
2907 }
2908 #endif
2909
2910 /**
2911 * Entry point for libdispatch to ask for threads
2912 */
2913 static int
workq_reqthreads(struct proc * p,uint32_t reqcount,pthread_priority_t pp,bool cooperative)2914 workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp, bool cooperative)
2915 {
2916 thread_qos_t qos = _pthread_priority_thread_qos(pp);
2917 struct workqueue *wq = proc_get_wqptr(p);
2918 uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI;
2919 int ret = 0;
2920
2921 if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX ||
2922 qos == THREAD_QOS_UNSPECIFIED) {
2923 ret = EINVAL;
2924 goto exit;
2925 }
2926
2927 WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE,
2928 wq, reqcount, pp, cooperative);
2929
2930 workq_threadreq_t req = zalloc(workq_zone_threadreq);
2931 priority_queue_entry_init(&req->tr_entry);
2932 req->tr_state = WORKQ_TR_STATE_NEW;
2933 req->tr_qos = qos;
2934 workq_tr_flags_t tr_flags = 0;
2935
2936 if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
2937 tr_flags |= WORKQ_TR_FLAG_OVERCOMMIT;
2938 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2939 }
2940
2941 if (cooperative) {
2942 tr_flags |= WORKQ_TR_FLAG_COOPERATIVE;
2943 upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
2944
2945 if (reqcount > 1) {
2946 ret = ENOTSUP;
2947 goto free_and_exit;
2948 }
2949 }
2950
2951 /* A thread request cannot be both overcommit and cooperative */
2952 if (workq_tr_is_cooperative(tr_flags) &&
2953 workq_tr_is_overcommit(tr_flags)) {
2954 ret = EINVAL;
2955 goto free_and_exit;
2956 }
2957 req->tr_flags = tr_flags;
2958
2959 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE,
2960 wq, workq_trace_req_id(req), req->tr_qos, reqcount);
2961
2962 workq_lock_spin(wq);
2963 do {
2964 if (_wq_exiting(wq)) {
2965 goto unlock_and_exit;
2966 }
2967
2968 /*
2969 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2970 * threads without pacing, to inform the scheduler of that workload.
2971 *
2972 * The last requests, or the ones that failed the admission checks are
2973 * enqueued and go through the regular creator codepath.
2974 *
2975 * If there aren't enough threads, add one, but re-evaluate everything
2976 * as conditions may now have changed.
2977 */
2978 unpaced = reqcount - 1;
2979
2980 if (reqcount > 1) {
2981 /* We don't handle asking for parallelism on the cooperative
2982 * workqueue just yet */
2983 assert(!workq_threadreq_is_cooperative(req));
2984
2985 if (workq_threadreq_is_nonovercommit(req)) {
2986 unpaced = workq_constrained_allowance(wq, qos, NULL, false, true);
2987 if (unpaced >= reqcount - 1) {
2988 unpaced = reqcount - 1;
2989 }
2990 }
2991 }
2992
2993 /*
2994 * This path does not currently handle custom workloop parameters
2995 * when creating threads for parallelism.
2996 */
2997 assert(!(req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS));
2998
2999 /*
3000 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
3001 */
3002 while (unpaced > 0 && wq->wq_thidlecount) {
3003 struct uthread *uth;
3004 bool needs_wakeup;
3005 uint8_t uu_flags = UT_WORKQ_EARLY_BOUND;
3006
3007 if (workq_tr_is_overcommit(req->tr_flags)) {
3008 uu_flags |= UT_WORKQ_OVERCOMMIT;
3009 }
3010
3011 uth = workq_pop_idle_thread(wq, uu_flags, &needs_wakeup);
3012
3013 _wq_thactive_inc(wq, qos);
3014 wq->wq_thscheduled_count[_wq_bucket(qos)]++;
3015 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
3016 wq->wq_fulfilled++;
3017
3018 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
3019 uth->uu_save.uus_workq_park_data.thread_request = req;
3020 if (needs_wakeup) {
3021 workq_thread_wakeup(uth);
3022 }
3023 unpaced--;
3024 reqcount--;
3025 }
3026 } while (unpaced && wq->wq_nthreads < wq_max_threads &&
3027 (workq_add_new_idle_thread(p, wq, workq_unpark_continue,
3028 false, NULL) == KERN_SUCCESS));
3029
3030 if (_wq_exiting(wq)) {
3031 goto unlock_and_exit;
3032 }
3033
3034 req->tr_count = (uint16_t)reqcount;
3035 if (workq_threadreq_enqueue(wq, req)) {
3036 /* This can drop the workqueue lock, and take it again */
3037 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
3038 }
3039 workq_unlock(wq);
3040 return 0;
3041
3042 unlock_and_exit:
3043 workq_unlock(wq);
3044 free_and_exit:
3045 zfree(workq_zone_threadreq, req);
3046 exit:
3047 return ret;
3048 }
3049
3050 bool
workq_kern_threadreq_initiate(struct proc * p,workq_threadreq_t req,struct turnstile * workloop_ts,thread_qos_t qos,workq_kern_threadreq_flags_t flags)3051 workq_kern_threadreq_initiate(struct proc *p, workq_threadreq_t req,
3052 struct turnstile *workloop_ts, thread_qos_t qos,
3053 workq_kern_threadreq_flags_t flags)
3054 {
3055 struct workqueue *wq = proc_get_wqptr_fast(p);
3056 struct uthread *uth = NULL;
3057
3058 assert(req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT));
3059
3060 /*
3061 * For any new initialization changes done to workqueue thread request below,
3062 * please also consider if they are relevant to permanently bound thread
3063 * request. See workq_kern_threadreq_permanent_bind.
3064 */
3065 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
3066 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
3067 qos = thread_workq_qos_for_pri(trp.trp_pri);
3068 if (qos == THREAD_QOS_UNSPECIFIED) {
3069 qos = WORKQ_THREAD_QOS_ABOVEUI;
3070 }
3071 }
3072
3073 assert(req->tr_state == WORKQ_TR_STATE_IDLE);
3074 priority_queue_entry_init(&req->tr_entry);
3075 req->tr_count = 1;
3076 req->tr_state = WORKQ_TR_STATE_NEW;
3077 req->tr_qos = qos;
3078
3079 WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq,
3080 workq_trace_req_id(req), qos, 1);
3081
3082 if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) {
3083 /*
3084 * we're called back synchronously from the context of
3085 * kqueue_threadreq_unbind from within workq_thread_return()
3086 * we can try to match up this thread with this request !
3087 */
3088 uth = current_uthread();
3089 assert(uth->uu_kqr_bound == NULL);
3090 }
3091
3092 workq_lock_spin(wq);
3093 if (_wq_exiting(wq)) {
3094 req->tr_state = WORKQ_TR_STATE_IDLE;
3095 workq_unlock(wq);
3096 return false;
3097 }
3098
3099 if (uth && workq_threadreq_admissible(wq, uth, req)) {
3100 /* This is the case of the rebind - we were about to park and unbind
3101 * when more events came so keep the binding.
3102 */
3103 assert(uth != wq->wq_creator);
3104
3105 if (uth->uu_workq_pri.qos_bucket != req->tr_qos) {
3106 _wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos);
3107 workq_thread_reset_pri(wq, uth, req, /*unpark*/ false);
3108 }
3109 /*
3110 * We're called from workq_kern_threadreq_initiate()
3111 * due to an unbind, with the kq req held.
3112 */
3113 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
3114 workq_trace_req_id(req), req->tr_flags, 0);
3115 wq->wq_fulfilled++;
3116
3117 kqueue_threadreq_bind(p, req, get_machthread(uth), 0);
3118 } else {
3119 if (workloop_ts) {
3120 workq_perform_turnstile_operation_locked(wq, ^{
3121 turnstile_update_inheritor(workloop_ts, wq->wq_turnstile,
3122 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
3123 turnstile_update_inheritor_complete(workloop_ts,
3124 TURNSTILE_INTERLOCK_HELD);
3125 });
3126 }
3127
3128 bool reevaluate_creator_thread_group = false;
3129 #if CONFIG_PREADOPT_TG
3130 reevaluate_creator_thread_group = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
3131 #endif
3132 /* We enqueued the highest priority item or we may need to reevaluate if
3133 * the creator needs a thread group pre-adoption */
3134 if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_thread_group) {
3135 workq_schedule_creator(p, wq, flags);
3136 }
3137 }
3138
3139 workq_unlock(wq);
3140
3141 return true;
3142 }
3143
3144 void
workq_kern_threadreq_modify(struct proc * p,workq_threadreq_t req,thread_qos_t qos,workq_kern_threadreq_flags_t flags)3145 workq_kern_threadreq_modify(struct proc *p, workq_threadreq_t req,
3146 thread_qos_t qos, workq_kern_threadreq_flags_t flags)
3147 {
3148 struct workqueue *wq = proc_get_wqptr_fast(p);
3149 bool make_overcommit = false;
3150
3151 if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
3152 /* Requests outside-of-QoS shouldn't accept modify operations */
3153 return;
3154 }
3155
3156 workq_lock_spin(wq);
3157
3158 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3159 assert(req->tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP));
3160
3161 if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3162 kqueue_threadreq_bind(p, req, req->tr_thread, 0);
3163 workq_unlock(wq);
3164 return;
3165 }
3166
3167 if (flags & WORKQ_THREADREQ_MAKE_OVERCOMMIT) {
3168 /* TODO (rokhinip): We come into this code path for kqwl thread
3169 * requests. kqwl requests cannot be cooperative.
3170 */
3171 assert(!workq_threadreq_is_cooperative(req));
3172
3173 make_overcommit = workq_threadreq_is_nonovercommit(req);
3174 }
3175
3176 if (_wq_exiting(wq) || (req->tr_qos == qos && !make_overcommit)) {
3177 workq_unlock(wq);
3178 return;
3179 }
3180
3181 assert(req->tr_count == 1);
3182 if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3183 panic("Invalid thread request (%p) state %d", req, req->tr_state);
3184 }
3185
3186 WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq,
3187 workq_trace_req_id(req), qos, 0);
3188
3189 struct priority_queue_sched_max *pq = workq_priority_queue_for_req(wq, req);
3190 workq_threadreq_t req_max;
3191
3192 /*
3193 * Stage 1: Dequeue the request from its priority queue.
3194 *
3195 * If we dequeue the root item of the constrained priority queue,
3196 * maintain the best constrained request qos invariant.
3197 */
3198 if (priority_queue_remove(pq, &req->tr_entry)) {
3199 if (workq_threadreq_is_nonovercommit(req)) {
3200 _wq_thactive_refresh_best_constrained_req_qos(wq);
3201 }
3202 }
3203
3204 /*
3205 * Stage 2: Apply changes to the thread request
3206 *
3207 * If the item will not become the root of the priority queue it belongs to,
3208 * then we need to wait in line, just enqueue and return quickly.
3209 */
3210 if (__improbable(make_overcommit)) {
3211 req->tr_flags ^= WORKQ_TR_FLAG_OVERCOMMIT;
3212 pq = workq_priority_queue_for_req(wq, req);
3213 }
3214 req->tr_qos = qos;
3215
3216 req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry);
3217 if (req_max && req_max->tr_qos >= qos) {
3218 priority_queue_entry_set_sched_pri(pq, &req->tr_entry,
3219 workq_priority_for_req(req), false);
3220 priority_queue_insert(pq, &req->tr_entry);
3221 workq_unlock(wq);
3222 return;
3223 }
3224
3225 /*
3226 * Stage 3: Reevaluate whether we should run the thread request.
3227 *
3228 * Pretend the thread request is new again:
3229 * - adjust wq_reqcount to not count it anymore.
3230 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
3231 * properly attempts a synchronous bind)
3232 */
3233 wq->wq_reqcount--;
3234 req->tr_state = WORKQ_TR_STATE_NEW;
3235
3236 /* We enqueued the highest priority item or we may need to reevaluate if
3237 * the creator needs a thread group pre-adoption if the request got a new TG */
3238 bool reevaluate_creator_tg = false;
3239
3240 #if CONFIG_PREADOPT_TG
3241 reevaluate_creator_tg = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
3242 #endif
3243
3244 if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_tg) {
3245 workq_schedule_creator(p, wq, flags);
3246 }
3247 workq_unlock(wq);
3248 }
3249
3250 void
workq_kern_bound_thread_reset_pri(workq_threadreq_t req,struct uthread * uth)3251 workq_kern_bound_thread_reset_pri(workq_threadreq_t req, struct uthread *uth)
3252 {
3253 assert(workq_thread_is_permanently_bound(uth));
3254
3255 if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS)) {
3256 /*
3257 * For requests outside-of-QoS, we set the scheduling policy and
3258 * absolute priority for the bound thread right at the initialization
3259 * time. See workq_kern_threadreq_permanent_bind.
3260 */
3261 return;
3262 }
3263
3264 struct workqueue *wq = proc_get_wqptr_fast(current_proc());
3265 if (req) {
3266 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3267 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
3268 } else {
3269 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
3270 if (qos > WORKQ_THREAD_QOS_CLEANUP) {
3271 workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true);
3272 } else {
3273 uth->uu_save.uus_workq_park_data.qos = qos;
3274 }
3275 }
3276 }
3277
3278 void
workq_kern_threadreq_lock(struct proc * p)3279 workq_kern_threadreq_lock(struct proc *p)
3280 {
3281 workq_lock_spin(proc_get_wqptr_fast(p));
3282 }
3283
3284 void
workq_kern_threadreq_unlock(struct proc * p)3285 workq_kern_threadreq_unlock(struct proc *p)
3286 {
3287 workq_unlock(proc_get_wqptr_fast(p));
3288 }
3289
3290 void
workq_kern_threadreq_update_inheritor(struct proc * p,workq_threadreq_t req,thread_t owner,struct turnstile * wl_ts,turnstile_update_flags_t flags)3291 workq_kern_threadreq_update_inheritor(struct proc *p, workq_threadreq_t req,
3292 thread_t owner, struct turnstile *wl_ts,
3293 turnstile_update_flags_t flags)
3294 {
3295 struct workqueue *wq = proc_get_wqptr_fast(p);
3296 turnstile_inheritor_t inheritor;
3297
3298 assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3299 assert(req->tr_flags & WORKQ_TR_FLAG_WORKLOOP);
3300 workq_lock_held(wq);
3301
3302 if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3303 kqueue_threadreq_bind(p, req, req->tr_thread,
3304 KQUEUE_THREADREQ_BIND_NO_INHERITOR_UPDATE);
3305 return;
3306 }
3307
3308 if (_wq_exiting(wq)) {
3309 inheritor = TURNSTILE_INHERITOR_NULL;
3310 } else {
3311 if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3312 panic("Invalid thread request (%p) state %d", req, req->tr_state);
3313 }
3314
3315 if (owner) {
3316 inheritor = owner;
3317 flags |= TURNSTILE_INHERITOR_THREAD;
3318 } else {
3319 inheritor = wq->wq_turnstile;
3320 flags |= TURNSTILE_INHERITOR_TURNSTILE;
3321 }
3322 }
3323
3324 workq_perform_turnstile_operation_locked(wq, ^{
3325 turnstile_update_inheritor(wl_ts, inheritor, flags);
3326 });
3327 }
3328
3329 /*
3330 * An entry point for kevent to request a newly created workqueue thread
3331 * and bind it permanently to the given workqueue thread request.
3332 *
3333 * It currently only supports fixed scheduler priority thread requests.
3334 *
3335 * The newly created thread counts towards wq_nthreads. This function returns
3336 * an error if we are above that limit. There is no concept of delayed thread
3337 * creation for such specially configured kqworkloops.
3338 *
3339 * If successful, the newly created thread will be parked in
3340 * workq_bound_thread_initialize_and_unpark_continue waiting for
3341 * new incoming events.
3342 */
3343 kern_return_t
workq_kern_threadreq_permanent_bind(struct proc * p,struct workq_threadreq_s * kqr)3344 workq_kern_threadreq_permanent_bind(struct proc *p, struct workq_threadreq_s *kqr)
3345 {
3346 kern_return_t ret = 0;
3347 thread_t new_thread = NULL;
3348 struct workqueue *wq = proc_get_wqptr_fast(p);
3349
3350 workq_lock_spin(wq);
3351
3352 if (wq->wq_nthreads >= wq_max_threads) {
3353 ret = EDOM;
3354 } else {
3355 if (kqr->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
3356 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(kqr);
3357 /*
3358 * For requests outside-of-QoS, we fully initialize the thread
3359 * request here followed by preadopting the scheduling properties
3360 * on the newly created bound thread.
3361 */
3362 thread_qos_t qos = thread_workq_qos_for_pri(trp.trp_pri);
3363 if (qos == THREAD_QOS_UNSPECIFIED) {
3364 qos = WORKQ_THREAD_QOS_ABOVEUI;
3365 }
3366 kqr->tr_qos = qos;
3367 }
3368 kqr->tr_count = 1;
3369
3370 /* workq_lock dropped and retaken around thread creation below. */
3371 ret = workq_add_new_idle_thread(p, wq,
3372 workq_bound_thread_initialize_and_unpark_continue,
3373 true, &new_thread);
3374 if (ret == KERN_SUCCESS) {
3375 struct uthread *uth = get_bsdthread_info(new_thread);
3376 if (kqr->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
3377 workq_thread_reset_pri(wq, uth, kqr, /*unpark*/ true);
3378 }
3379 /*
3380 * The newly created thread goes through a full bind to the kqwl
3381 * right upon creation.
3382 * It then falls back to soft bind/unbind upon wakeup/park.
3383 */
3384 kqueue_threadreq_bind_prepost(p, kqr, uth);
3385 uth->uu_workq_flags |= UT_WORKQ_PERMANENT_BIND;
3386 }
3387 }
3388
3389 workq_unlock(wq);
3390
3391 if (ret == KERN_SUCCESS) {
3392 kqueue_threadreq_bind_commit(p, new_thread);
3393 }
3394 return ret;
3395 }
3396
3397 /*
3398 * Called with kqlock held. It does not need to take the process wide
3399 * global workq lock -> making it faster.
3400 */
3401 void
workq_kern_bound_thread_wakeup(struct workq_threadreq_s * kqr)3402 workq_kern_bound_thread_wakeup(struct workq_threadreq_s *kqr)
3403 {
3404 struct uthread *uth = get_bsdthread_info(kqr->tr_thread);
3405 workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(kqr);
3406
3407 /*
3408 * See "Locking model for accessing uu_workq_flags" for more information
3409 * on how access to uu_workq_flags for the bound thread is synchronized.
3410 */
3411 assert((uth->uu_workq_flags & (UT_WORKQ_RUNNING | UT_WORKQ_DYING)) == 0);
3412
3413 if (trp.trp_flags & TRP_RELEASED) {
3414 uth->uu_workq_flags |= UT_WORKQ_DYING;
3415 } else {
3416 uth->uu_workq_flags |= UT_WORKQ_RUNNING;
3417 }
3418
3419 workq_thread_wakeup(uth);
3420 }
3421
3422 /*
3423 * Called with kqlock held. Dropped before parking.
3424 * It does not need to take process wide global workqueue
3425 * lock -> making it faster.
3426 */
3427 __attribute__((noreturn, noinline))
3428 void
workq_kern_bound_thread_park(struct workq_threadreq_s * kqr)3429 workq_kern_bound_thread_park(struct workq_threadreq_s *kqr)
3430 {
3431 struct uthread *uth = get_bsdthread_info(kqr->tr_thread);
3432 assert(uth == current_uthread());
3433
3434 /*
3435 * See "Locking model for accessing uu_workq_flags" for more information
3436 * on how access to uu_workq_flags for the bound thread is synchronized.
3437 */
3438 uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING);
3439
3440 thread_disarm_workqueue_quantum(get_machthread(uth));
3441
3442 /*
3443 * TODO (pavhad) We could do the reusable userspace stack performance
3444 * optimization here.
3445 */
3446
3447 kqworkloop_bound_thread_park_prepost(kqr);
3448 /* KQ_SLEEP bit is set and kqlock is dropped. */
3449
3450 __assert_only kern_return_t kr;
3451 kr = thread_set_voucher_name(MACH_PORT_NULL);
3452 assert(kr == KERN_SUCCESS);
3453
3454 kqworkloop_bound_thread_park_commit(kqr,
3455 workq_parked_wait_event(uth), workq_bound_thread_unpark_continue);
3456
3457 __builtin_unreachable();
3458 }
3459
3460 /*
3461 * To terminate the permenantly bound workqueue thread. It unbinds itself
3462 * with the kqwl during uthread_cleanup -> kqueue_threadreq_unbind.
3463 * It is also when it will release its reference on the kqwl.
3464 */
3465 __attribute__((noreturn, noinline))
3466 void
workq_kern_bound_thread_terminate(struct workq_threadreq_s * kqr)3467 workq_kern_bound_thread_terminate(struct workq_threadreq_s *kqr)
3468 {
3469 proc_t p = current_proc();
3470 struct uthread *uth = get_bsdthread_info(kqr->tr_thread);
3471 uint16_t uu_workq_flags_orig;
3472
3473 assert(uth == current_uthread());
3474
3475 /*
3476 * See "Locking model for accessing uu_workq_flags" for more information
3477 * on how access to uu_workq_flags for the bound thread is synchronized.
3478 */
3479 kqworkloop_bound_thread_terminate(kqr, &uu_workq_flags_orig);
3480
3481 if (uu_workq_flags_orig & UT_WORKQ_WORK_INTERVAL_JOINED) {
3482 __assert_only kern_return_t kr;
3483 kr = kern_work_interval_join(get_machthread(uth), MACH_PORT_NULL);
3484 /* The bound thread un-joins the work interval and drops its +1 ref. */
3485 assert(kr == KERN_SUCCESS);
3486 }
3487
3488 /*
3489 * Drop the voucher now that we are on our way to termination.
3490 */
3491 __assert_only kern_return_t kr;
3492 kr = thread_set_voucher_name(MACH_PORT_NULL);
3493 assert(kr == KERN_SUCCESS);
3494
3495 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI;
3496 upcall_flags |= uth->uu_save.uus_workq_park_data.qos |
3497 WQ_FLAG_THREAD_PRIO_QOS;
3498
3499 thread_t th = get_machthread(uth);
3500 vm_map_t vmap = get_task_map(proc_task(p));
3501
3502 if ((uu_workq_flags_orig & UT_WORKQ_NEW) == 0) {
3503 upcall_flags |= WQ_FLAG_THREAD_REUSE;
3504 }
3505
3506 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
3507 uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, upcall_flags);
3508 __builtin_unreachable();
3509 }
3510
3511 void
workq_kern_threadreq_redrive(struct proc * p,workq_kern_threadreq_flags_t flags)3512 workq_kern_threadreq_redrive(struct proc *p, workq_kern_threadreq_flags_t flags)
3513 {
3514 struct workqueue *wq = proc_get_wqptr_fast(p);
3515
3516 workq_lock_spin(wq);
3517 workq_schedule_creator(p, wq, flags);
3518 workq_unlock(wq);
3519 }
3520
3521 /*
3522 * Always called at AST by the thread on itself
3523 *
3524 * Upon quantum expiry, the workqueue subsystem evaluates its state and decides
3525 * on what the thread should do next. The TSD value is always set by the thread
3526 * on itself in the kernel and cleared either by userspace when it acks the TSD
3527 * value and takes action, or by the thread in the kernel when the quantum
3528 * expires again.
3529 */
3530 void
workq_kern_quantum_expiry_reevaluate(proc_t proc,thread_t thread)3531 workq_kern_quantum_expiry_reevaluate(proc_t proc, thread_t thread)
3532 {
3533 struct uthread *uth = get_bsdthread_info(thread);
3534
3535 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3536 return;
3537 }
3538
3539 if (!thread_supports_cooperative_workqueue(thread)) {
3540 panic("Quantum expired for thread that doesn't support cooperative workqueue");
3541 }
3542
3543 thread_qos_t qos = uth->uu_workq_pri.qos_bucket;
3544 if (qos == THREAD_QOS_UNSPECIFIED) {
3545 panic("Thread should not have workq bucket of QoS UN");
3546 }
3547
3548 assert(thread_has_expired_workqueue_quantum(thread, false));
3549
3550 struct workqueue *wq = proc_get_wqptr(proc);
3551 assert(wq != NULL);
3552
3553 /*
3554 * For starters, we're just going to evaluate and see if we need to narrow
3555 * the pool and tell this thread to park if needed. In the future, we'll
3556 * evaluate and convey other workqueue state information like needing to
3557 * pump kevents, etc.
3558 */
3559 uint64_t flags = 0;
3560
3561 workq_lock_spin(wq);
3562
3563 if (workq_thread_is_cooperative(uth)) {
3564 if (!workq_cooperative_allowance(wq, qos, uth, false)) {
3565 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3566 } else {
3567 /* In the future, when we have kevent hookups for the cooperative
3568 * pool, we need fancier logic for what userspace should do. But
3569 * right now, only userspace thread requests exist - so we'll just
3570 * tell userspace to shuffle work items */
3571 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_SHUFFLE;
3572 }
3573 } else if (workq_thread_is_nonovercommit(uth)) {
3574 if (!workq_constrained_allowance(wq, qos, uth, false, false)) {
3575 flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3576 }
3577 }
3578 workq_unlock(wq);
3579
3580 WQ_TRACE(TRACE_wq_quantum_expiry_reevaluate, flags, 0, 0, 0);
3581
3582 kevent_set_workq_quantum_expiry_user_tsd(proc, thread, flags);
3583
3584 /* We have conveyed to userspace about what it needs to do upon quantum
3585 * expiry, now rearm the workqueue quantum again */
3586 thread_arm_workqueue_quantum(get_machthread(uth));
3587 }
3588
3589 void
workq_schedule_creator_turnstile_redrive(struct workqueue * wq,bool locked)3590 workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked)
3591 {
3592 if (locked) {
3593 workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_NONE);
3594 } else {
3595 workq_schedule_immediate_thread_creation(wq);
3596 }
3597 }
3598
3599 static int
workq_thread_return(struct proc * p,struct workq_kernreturn_args * uap,struct workqueue * wq)3600 workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap,
3601 struct workqueue *wq)
3602 {
3603 thread_t th = current_thread();
3604 struct uthread *uth = get_bsdthread_info(th);
3605 workq_threadreq_t kqr = uth->uu_kqr_bound;
3606 workq_threadreq_param_t trp = { };
3607 int nevents = uap->affinity, error;
3608 user_addr_t eventlist = uap->item;
3609
3610 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3611 (uth->uu_workq_flags & UT_WORKQ_DYING)) {
3612 return EINVAL;
3613 }
3614
3615 if (eventlist && nevents && kqr == NULL) {
3616 return EINVAL;
3617 }
3618
3619 /*
3620 * Reset signal mask on the workqueue thread to default state,
3621 * but do not touch any signals that are marked for preservation.
3622 */
3623 sigset_t resettable = uth->uu_sigmask & ~p->p_workq_allow_sigmask;
3624 if (resettable != (sigset_t)~workq_threadmask) {
3625 proc_lock(p);
3626 uth->uu_sigmask |= ~workq_threadmask & ~p->p_workq_allow_sigmask;
3627 proc_unlock(p);
3628 }
3629
3630 if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
3631 /*
3632 * Ensure we store the threadreq param before unbinding
3633 * the kqr from this thread.
3634 */
3635 trp = kqueue_threadreq_workloop_param(kqr);
3636 }
3637
3638 if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_PERMANENT_BIND) {
3639 goto handle_stack_events;
3640 }
3641
3642 /*
3643 * Freeze the base pri while we decide the fate of this thread.
3644 *
3645 * Either:
3646 * - we return to user and kevent_cleanup will have unfrozen the base pri,
3647 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will.
3648 */
3649 thread_freeze_base_pri(th);
3650
3651 handle_stack_events:
3652
3653 if (kqr) {
3654 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE;
3655 if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
3656 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
3657 } else {
3658 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
3659 }
3660 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
3661 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
3662 } else {
3663 if (workq_thread_is_overcommit(uth)) {
3664 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
3665 }
3666 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
3667 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
3668 } else {
3669 upcall_flags |= uth->uu_workq_pri.qos_req |
3670 WQ_FLAG_THREAD_PRIO_QOS;
3671 }
3672 }
3673 error = pthread_functions->workq_handle_stack_events(p, th,
3674 get_task_map(proc_task(p)), uth->uu_workq_stackaddr,
3675 uth->uu_workq_thport, eventlist, nevents, upcall_flags);
3676 if (error) {
3677 assert(uth->uu_kqr_bound == kqr);
3678 return error;
3679 }
3680
3681 // pthread is supposed to pass KEVENT_FLAG_PARKING here
3682 // which should cause the above call to either:
3683 // - not return
3684 // - return an error
3685 // - return 0 and have unbound properly
3686 assert(uth->uu_kqr_bound == NULL);
3687 }
3688
3689 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0);
3690
3691 thread_sched_call(th, NULL);
3692 thread_will_park_or_terminate(th);
3693 #if CONFIG_WORKLOOP_DEBUG
3694 UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, });
3695 #endif
3696
3697 workq_lock_spin(wq);
3698 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3699 uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
3700 workq_select_threadreq_or_park_and_unlock(p, wq, uth,
3701 WQ_SETUP_CLEAR_VOUCHER);
3702 __builtin_unreachable();
3703 }
3704
3705 /**
3706 * Multiplexed call to interact with the workqueue mechanism
3707 */
3708 int
workq_kernreturn(struct proc * p,struct workq_kernreturn_args * uap,int32_t * retval)3709 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval)
3710 {
3711 int options = uap->options;
3712 int arg2 = uap->affinity;
3713 int arg3 = uap->prio;
3714 struct workqueue *wq = proc_get_wqptr(p);
3715 int error = 0;
3716
3717 if ((p->p_lflag & P_LREGISTER) == 0) {
3718 return EINVAL;
3719 }
3720
3721 switch (options) {
3722 case WQOPS_QUEUE_NEWSPISUPP: {
3723 /*
3724 * arg2 = offset of serialno into dispatch queue
3725 * arg3 = kevent support
3726 */
3727 int offset = arg2;
3728 if (arg3 & 0x01) {
3729 // If we get here, then userspace has indicated support for kevent delivery.
3730 }
3731
3732 p->p_dispatchqueue_serialno_offset = (uint64_t)offset;
3733 break;
3734 }
3735 case WQOPS_QUEUE_REQTHREADS: {
3736 /*
3737 * arg2 = number of threads to start
3738 * arg3 = priority
3739 */
3740 error = workq_reqthreads(p, arg2, arg3, false);
3741 break;
3742 }
3743 /* For requesting threads for the cooperative pool */
3744 case WQOPS_QUEUE_REQTHREADS2: {
3745 /*
3746 * arg2 = number of threads to start
3747 * arg3 = priority
3748 */
3749 error = workq_reqthreads(p, arg2, arg3, true);
3750 break;
3751 }
3752 case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
3753 /*
3754 * arg2 = priority for the manager thread
3755 *
3756 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
3757 * the low bits of the value contains a scheduling priority
3758 * instead of a QOS value
3759 */
3760 pthread_priority_t pri = arg2;
3761
3762 if (wq == NULL) {
3763 error = EINVAL;
3764 break;
3765 }
3766
3767 /*
3768 * Normalize the incoming priority so that it is ordered numerically.
3769 */
3770 if (_pthread_priority_has_sched_pri(pri)) {
3771 pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK |
3772 _PTHREAD_PRIORITY_SCHED_PRI_FLAG);
3773 } else {
3774 thread_qos_t qos = _pthread_priority_thread_qos(pri);
3775 int relpri = _pthread_priority_relpri(pri);
3776 if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE ||
3777 qos == THREAD_QOS_UNSPECIFIED) {
3778 error = EINVAL;
3779 break;
3780 }
3781 pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3782 }
3783
3784 /*
3785 * If userspace passes a scheduling priority, that wins over any QoS.
3786 * Userspace should takes care not to lower the priority this way.
3787 */
3788 workq_lock_spin(wq);
3789 if (wq->wq_event_manager_priority < (uint32_t)pri) {
3790 wq->wq_event_manager_priority = (uint32_t)pri;
3791 }
3792 workq_unlock(wq);
3793 break;
3794 }
3795 case WQOPS_THREAD_KEVENT_RETURN:
3796 case WQOPS_THREAD_WORKLOOP_RETURN:
3797 case WQOPS_THREAD_RETURN: {
3798 error = workq_thread_return(p, uap, wq);
3799 break;
3800 }
3801
3802 case WQOPS_SHOULD_NARROW: {
3803 /*
3804 * arg2 = priority to test
3805 * arg3 = unused
3806 */
3807 thread_t th = current_thread();
3808 struct uthread *uth = get_bsdthread_info(th);
3809 if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3810 (uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) {
3811 error = EINVAL;
3812 break;
3813 }
3814
3815 thread_qos_t qos = _pthread_priority_thread_qos(arg2);
3816 if (qos == THREAD_QOS_UNSPECIFIED) {
3817 error = EINVAL;
3818 break;
3819 }
3820 workq_lock_spin(wq);
3821 bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false, false);
3822 workq_unlock(wq);
3823
3824 *retval = should_narrow;
3825 break;
3826 }
3827 case WQOPS_SETUP_DISPATCH: {
3828 /*
3829 * item = pointer to workq_dispatch_config structure
3830 * arg2 = sizeof(item)
3831 */
3832 struct workq_dispatch_config cfg;
3833 bzero(&cfg, sizeof(cfg));
3834
3835 error = copyin(uap->item, &cfg, MIN(sizeof(cfg), (unsigned long) arg2));
3836 if (error) {
3837 break;
3838 }
3839
3840 if (cfg.wdc_flags & ~WORKQ_DISPATCH_SUPPORTED_FLAGS ||
3841 cfg.wdc_version < WORKQ_DISPATCH_MIN_SUPPORTED_VERSION) {
3842 error = ENOTSUP;
3843 break;
3844 }
3845
3846 /* Load fields from version 1 */
3847 p->p_dispatchqueue_serialno_offset = cfg.wdc_queue_serialno_offs;
3848
3849 /* Load fields from version 2 */
3850 if (cfg.wdc_version >= 2) {
3851 p->p_dispatchqueue_label_offset = cfg.wdc_queue_label_offs;
3852 }
3853
3854 break;
3855 }
3856 default:
3857 error = EINVAL;
3858 break;
3859 }
3860
3861 return error;
3862 }
3863
3864 /*
3865 * We have no work to do, park ourselves on the idle list.
3866 *
3867 * Consumes the workqueue lock and does not return.
3868 */
3869 __attribute__((noreturn, noinline))
3870 static void
workq_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)3871 workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth,
3872 uint32_t setup_flags)
3873 {
3874 assert(uth == current_uthread());
3875 assert(uth->uu_kqr_bound == NULL);
3876 workq_push_idle_thread(p, wq, uth, setup_flags); // may not return
3877
3878 workq_thread_reset_cpupercent(NULL, uth);
3879
3880 #if CONFIG_PREADOPT_TG
3881 /* Clear the preadoption thread group on the thread.
3882 *
3883 * Case 1:
3884 * Creator thread which never picked up a thread request. We set a
3885 * preadoption thread group on creator threads but if it never picked
3886 * up a thread request and didn't go to userspace, then the thread will
3887 * park with a preadoption thread group but no explicitly adopted
3888 * voucher or work interval.
3889 *
3890 * We drop the preadoption thread group here before proceeding to park.
3891 * Note - we may get preempted when we drop the workq lock below.
3892 *
3893 * Case 2:
3894 * Thread picked up a thread request and bound to it and returned back
3895 * from userspace and is parking. At this point, preadoption thread
3896 * group should be NULL since the thread has unbound from the thread
3897 * request. So this operation should be a no-op.
3898 */
3899 thread_set_preadopt_thread_group(get_machthread(uth), NULL);
3900 #endif
3901
3902 if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) &&
3903 !(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3904 workq_unlock(wq);
3905
3906 /*
3907 * workq_push_idle_thread() will unset `has_stack`
3908 * if it wants us to free the stack before parking.
3909 */
3910 if (!uth->uu_save.uus_workq_park_data.has_stack) {
3911 pthread_functions->workq_markfree_threadstack(p,
3912 get_machthread(uth), get_task_map(proc_task(p)),
3913 uth->uu_workq_stackaddr);
3914 }
3915
3916 /*
3917 * When we remove the voucher from the thread, we may lose our importance
3918 * causing us to get preempted, so we do this after putting the thread on
3919 * the idle list. Then, when we get our importance back we'll be able to
3920 * use this thread from e.g. the kevent call out to deliver a boosting
3921 * message.
3922 *
3923 * Note that setting the voucher to NULL will not clear the preadoption
3924 * thread since this thread could have become the creator again and
3925 * perhaps acquired a preadoption thread group.
3926 */
3927 __assert_only kern_return_t kr;
3928 kr = thread_set_voucher_name(MACH_PORT_NULL);
3929 assert(kr == KERN_SUCCESS);
3930
3931 workq_lock_spin(wq);
3932 uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
3933 setup_flags &= ~WQ_SETUP_CLEAR_VOUCHER;
3934 }
3935
3936 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3937
3938 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
3939 /*
3940 * While we'd dropped the lock to unset our voucher, someone came
3941 * around and made us runnable. But because we weren't waiting on the
3942 * event their thread_wakeup() was ineffectual. To correct for that,
3943 * we just run the continuation ourselves.
3944 */
3945 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
3946 __builtin_unreachable();
3947 }
3948
3949 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3950 workq_unpark_for_death_and_unlock(p, wq, uth,
3951 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
3952 __builtin_unreachable();
3953 }
3954
3955 /* Disarm the workqueue quantum since the thread is now idle */
3956 thread_disarm_workqueue_quantum(get_machthread(uth));
3957
3958 thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue);
3959 assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
3960 workq_unlock(wq);
3961 thread_block(workq_unpark_continue);
3962 __builtin_unreachable();
3963 }
3964
3965 static inline bool
workq_may_start_event_mgr_thread(struct workqueue * wq,struct uthread * uth)3966 workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth)
3967 {
3968 /*
3969 * There's an event manager request and either:
3970 * - no event manager currently running
3971 * - we are re-using the event manager
3972 */
3973 return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 ||
3974 (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER);
3975 }
3976
3977 /* Called with workq lock held. */
3978 static uint32_t
workq_constrained_allowance(struct workqueue * wq,thread_qos_t at_qos,struct uthread * uth,bool may_start_timer,bool record_failed_allowance)3979 workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos,
3980 struct uthread *uth, bool may_start_timer, bool record_failed_allowance)
3981 {
3982 assert(at_qos != WORKQ_THREAD_QOS_MANAGER);
3983 uint32_t allowance_passed = 0;
3984 uint32_t count = 0;
3985
3986 uint32_t max_count = wq->wq_constrained_threads_scheduled;
3987 if (uth && workq_thread_is_nonovercommit(uth)) {
3988 /*
3989 * don't count the current thread as scheduled
3990 */
3991 assert(max_count > 0);
3992 max_count--;
3993 }
3994 if (max_count >= wq_max_constrained_threads) {
3995 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1,
3996 wq->wq_constrained_threads_scheduled,
3997 wq_max_constrained_threads);
3998 /*
3999 * we need 1 or more constrained threads to return to the kernel before
4000 * we can dispatch additional work
4001 */
4002 allowance_passed = 0;
4003 goto out;
4004 }
4005 max_count -= wq_max_constrained_threads;
4006
4007 /*
4008 * Compute a metric for many how many threads are active. We find the
4009 * highest priority request outstanding and then add up the number of active
4010 * threads in that and all higher-priority buckets. We'll also add any
4011 * "busy" threads which are not currently active but blocked recently enough
4012 * that we can't be sure that they won't be unblocked soon and start
4013 * being active again.
4014 *
4015 * We'll then compare this metric to our max concurrency to decide whether
4016 * to add a new thread.
4017 */
4018
4019 uint32_t busycount, thactive_count;
4020
4021 thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq),
4022 at_qos, &busycount, NULL);
4023
4024 if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER &&
4025 at_qos <= uth->uu_workq_pri.qos_bucket) {
4026 /*
4027 * Don't count this thread as currently active, but only if it's not
4028 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
4029 * managers.
4030 */
4031 assert(thactive_count > 0);
4032 thactive_count--;
4033 }
4034
4035 count = wq_max_parallelism[_wq_bucket(at_qos)];
4036 if (count > thactive_count + busycount) {
4037 count -= thactive_count + busycount;
4038 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2,
4039 thactive_count, busycount);
4040 allowance_passed = MIN(count, max_count);
4041 goto out;
4042 } else {
4043 WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3,
4044 thactive_count, busycount);
4045 allowance_passed = 0;
4046 }
4047
4048 if (may_start_timer) {
4049 /*
4050 * If this is called from the add timer, we won't have another timer
4051 * fire when the thread exits the "busy" state, so rearm the timer.
4052 */
4053 workq_schedule_delayed_thread_creation(wq, 0);
4054 }
4055
4056 out:
4057 if (record_failed_allowance) {
4058 wq->wq_exceeded_active_constrained_thread_limit = !allowance_passed;
4059 }
4060 return allowance_passed;
4061 }
4062
4063 static bool
workq_threadreq_admissible(struct workqueue * wq,struct uthread * uth,workq_threadreq_t req)4064 workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
4065 workq_threadreq_t req)
4066 {
4067 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
4068 return workq_may_start_event_mgr_thread(wq, uth);
4069 }
4070 if (workq_threadreq_is_cooperative(req)) {
4071 return workq_cooperative_allowance(wq, req->tr_qos, uth, true);
4072 }
4073 if (workq_threadreq_is_nonovercommit(req)) {
4074 return workq_constrained_allowance(wq, req->tr_qos, uth, true, true);
4075 }
4076
4077 return true;
4078 }
4079
4080 /*
4081 * Called from the context of selecting thread requests for threads returning
4082 * from userspace or creator thread
4083 */
4084 static workq_threadreq_t
workq_cooperative_queue_best_req(struct workqueue * wq,struct uthread * uth)4085 workq_cooperative_queue_best_req(struct workqueue *wq, struct uthread *uth)
4086 {
4087 workq_lock_held(wq);
4088
4089 /*
4090 * If the current thread is cooperative, we need to exclude it as part of
4091 * cooperative schedule count since this thread is looking for a new
4092 * request. Change in the schedule count for cooperative pool therefore
4093 * requires us to reeevaluate the next best request for it.
4094 */
4095 if (uth && workq_thread_is_cooperative(uth)) {
4096 _wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req);
4097
4098 (void) _wq_cooperative_queue_refresh_best_req_qos(wq);
4099
4100 _wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req);
4101 } else {
4102 /*
4103 * The old value that was already precomputed should be safe to use -
4104 * add an assert that asserts that the best req QoS doesn't change in
4105 * this case
4106 */
4107 assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
4108 }
4109
4110 thread_qos_t qos = wq->wq_cooperative_queue_best_req_qos;
4111
4112 /* There are no eligible requests in the cooperative pool */
4113 if (qos == THREAD_QOS_UNSPECIFIED) {
4114 return NULL;
4115 }
4116 assert(qos != WORKQ_THREAD_QOS_ABOVEUI);
4117 assert(qos != WORKQ_THREAD_QOS_MANAGER);
4118
4119 uint8_t bucket = _wq_bucket(qos);
4120 assert(!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket]));
4121
4122 return STAILQ_FIRST(&wq->wq_cooperative_queue[bucket]);
4123 }
4124
4125 static workq_threadreq_t
workq_threadreq_select_for_creator(struct workqueue * wq)4126 workq_threadreq_select_for_creator(struct workqueue *wq)
4127 {
4128 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
4129 thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
4130 uint8_t pri = 0;
4131
4132 /*
4133 * Compute the best priority request, and ignore the turnstile for now
4134 */
4135
4136 req_pri = priority_queue_max(&wq->wq_special_queue,
4137 struct workq_threadreq_s, tr_entry);
4138 if (req_pri) {
4139 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
4140 &req_pri->tr_entry);
4141 }
4142
4143 /*
4144 * Handle the manager thread request. The special queue might yield
4145 * a higher priority, but the manager always beats the QoS world.
4146 */
4147
4148 req_mgr = wq->wq_event_manager_threadreq;
4149 if (req_mgr && workq_may_start_event_mgr_thread(wq, NULL)) {
4150 uint32_t mgr_pri = wq->wq_event_manager_priority;
4151
4152 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
4153 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
4154 } else {
4155 mgr_pri = thread_workq_pri_for_qos(
4156 _pthread_priority_thread_qos(mgr_pri));
4157 }
4158
4159 return mgr_pri >= pri ? req_mgr : req_pri;
4160 }
4161
4162 /*
4163 * Compute the best QoS Request, and check whether it beats the "pri" one
4164 *
4165 * Start by comparing the overcommit and the cooperative pool
4166 */
4167 req_qos = priority_queue_max(&wq->wq_overcommit_queue,
4168 struct workq_threadreq_s, tr_entry);
4169 if (req_qos) {
4170 qos = req_qos->tr_qos;
4171 }
4172
4173 req_tmp = workq_cooperative_queue_best_req(wq, NULL);
4174 if (req_tmp && qos <= req_tmp->tr_qos) {
4175 /*
4176 * Cooperative TR is better between overcommit and cooperative. Note
4177 * that if qos is same between overcommit and cooperative, we choose
4178 * cooperative.
4179 *
4180 * Pick cooperative pool if it passes the admissions check
4181 */
4182 if (workq_cooperative_allowance(wq, req_tmp->tr_qos, NULL, true)) {
4183 req_qos = req_tmp;
4184 qos = req_qos->tr_qos;
4185 }
4186 }
4187
4188 /*
4189 * Compare the best QoS so far - either from overcommit or from cooperative
4190 * pool - and compare it with the constrained pool
4191 */
4192 req_tmp = priority_queue_max(&wq->wq_constrained_queue,
4193 struct workq_threadreq_s, tr_entry);
4194
4195 if (req_tmp && qos < req_tmp->tr_qos) {
4196 /*
4197 * Constrained pool is best in QoS between overcommit, cooperative
4198 * and constrained. Now check how it fairs against the priority case
4199 */
4200 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
4201 return req_pri;
4202 }
4203
4204 if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true, true)) {
4205 /*
4206 * If the constrained thread request is the best one and passes
4207 * the admission check, pick it.
4208 */
4209 return req_tmp;
4210 }
4211 }
4212
4213 /*
4214 * Compare the best of the QoS world with the priority
4215 */
4216 if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
4217 return req_pri;
4218 }
4219
4220 if (req_qos) {
4221 return req_qos;
4222 }
4223
4224 /*
4225 * If we had no eligible request but we have a turnstile push,
4226 * it must be a non overcommit thread request that failed
4227 * the admission check.
4228 *
4229 * Just fake a BG thread request so that if the push stops the creator
4230 * priority just drops to 4.
4231 */
4232 if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) {
4233 static struct workq_threadreq_s workq_sync_push_fake_req = {
4234 .tr_qos = THREAD_QOS_BACKGROUND,
4235 };
4236
4237 return &workq_sync_push_fake_req;
4238 }
4239
4240 return NULL;
4241 }
4242
4243 /*
4244 * Returns true if this caused a change in the schedule counts of the
4245 * cooperative pool
4246 */
4247 static bool
workq_adjust_cooperative_constrained_schedule_counts(struct workqueue * wq,struct uthread * uth,thread_qos_t old_thread_qos,workq_tr_flags_t tr_flags)4248 workq_adjust_cooperative_constrained_schedule_counts(struct workqueue *wq,
4249 struct uthread *uth, thread_qos_t old_thread_qos, workq_tr_flags_t tr_flags)
4250 {
4251 workq_lock_held(wq);
4252
4253 /*
4254 * Row: thread type
4255 * Column: Request type
4256 *
4257 * overcommit non-overcommit cooperative
4258 * overcommit X case 1 case 2
4259 * cooperative case 3 case 4 case 5
4260 * non-overcommit case 6 X case 7
4261 *
4262 * Move the thread to the right bucket depending on what state it currently
4263 * has and what state the thread req it picks, is going to have.
4264 *
4265 * Note that the creator thread is an overcommit thread.
4266 */
4267 thread_qos_t new_thread_qos = uth->uu_workq_pri.qos_req;
4268
4269 /*
4270 * Anytime a cooperative bucket's schedule count changes, we need to
4271 * potentially refresh the next best QoS for that pool when we determine
4272 * the next request for the creator
4273 */
4274 bool cooperative_pool_sched_count_changed = false;
4275
4276 if (workq_thread_is_overcommit(uth)) {
4277 if (workq_tr_is_nonovercommit(tr_flags)) {
4278 // Case 1: thread is overcommit, req is non-overcommit
4279 wq->wq_constrained_threads_scheduled++;
4280 } else if (workq_tr_is_cooperative(tr_flags)) {
4281 // Case 2: thread is overcommit, req is cooperative
4282 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
4283 cooperative_pool_sched_count_changed = true;
4284 }
4285 } else if (workq_thread_is_cooperative(uth)) {
4286 if (workq_tr_is_overcommit(tr_flags)) {
4287 // Case 3: thread is cooperative, req is overcommit
4288 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
4289 } else if (workq_tr_is_nonovercommit(tr_flags)) {
4290 // Case 4: thread is cooperative, req is non-overcommit
4291 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
4292 wq->wq_constrained_threads_scheduled++;
4293 } else {
4294 // Case 5: thread is cooperative, req is also cooperative
4295 assert(workq_tr_is_cooperative(tr_flags));
4296 _wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
4297 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
4298 }
4299 cooperative_pool_sched_count_changed = true;
4300 } else {
4301 if (workq_tr_is_overcommit(tr_flags)) {
4302 // Case 6: Thread is non-overcommit, req is overcommit
4303 wq->wq_constrained_threads_scheduled--;
4304 } else if (workq_tr_is_cooperative(tr_flags)) {
4305 // Case 7: Thread is non-overcommit, req is cooperative
4306 wq->wq_constrained_threads_scheduled--;
4307 _wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
4308 cooperative_pool_sched_count_changed = true;
4309 }
4310 }
4311
4312 return cooperative_pool_sched_count_changed;
4313 }
4314
4315 static workq_threadreq_t
workq_threadreq_select(struct workqueue * wq,struct uthread * uth)4316 workq_threadreq_select(struct workqueue *wq, struct uthread *uth)
4317 {
4318 workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
4319 uintptr_t proprietor;
4320 thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
4321 uint8_t pri = 0;
4322
4323 if (uth == wq->wq_creator) {
4324 uth = NULL;
4325 }
4326
4327 /*
4328 * Compute the best priority request (special or turnstile)
4329 */
4330
4331 pri = (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile,
4332 &proprietor);
4333 if (pri) {
4334 struct kqworkloop *kqwl = (struct kqworkloop *)proprietor;
4335 req_pri = &kqwl->kqwl_request;
4336 if (req_pri->tr_state != WORKQ_TR_STATE_QUEUED) {
4337 panic("Invalid thread request (%p) state %d",
4338 req_pri, req_pri->tr_state);
4339 }
4340 } else {
4341 req_pri = NULL;
4342 }
4343
4344 req_tmp = priority_queue_max(&wq->wq_special_queue,
4345 struct workq_threadreq_s, tr_entry);
4346 if (req_tmp && pri < priority_queue_entry_sched_pri(&wq->wq_special_queue,
4347 &req_tmp->tr_entry)) {
4348 req_pri = req_tmp;
4349 pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
4350 &req_tmp->tr_entry);
4351 }
4352
4353 /*
4354 * Handle the manager thread request. The special queue might yield
4355 * a higher priority, but the manager always beats the QoS world.
4356 */
4357
4358 req_mgr = wq->wq_event_manager_threadreq;
4359 if (req_mgr && workq_may_start_event_mgr_thread(wq, uth)) {
4360 uint32_t mgr_pri = wq->wq_event_manager_priority;
4361
4362 if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
4363 mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
4364 } else {
4365 mgr_pri = thread_workq_pri_for_qos(
4366 _pthread_priority_thread_qos(mgr_pri));
4367 }
4368
4369 return mgr_pri >= pri ? req_mgr : req_pri;
4370 }
4371
4372 /*
4373 * Compute the best QoS Request, and check whether it beats the "pri" one
4374 */
4375
4376 req_qos = priority_queue_max(&wq->wq_overcommit_queue,
4377 struct workq_threadreq_s, tr_entry);
4378 if (req_qos) {
4379 qos = req_qos->tr_qos;
4380 }
4381
4382 req_tmp = workq_cooperative_queue_best_req(wq, uth);
4383 if (req_tmp && qos <= req_tmp->tr_qos) {
4384 /*
4385 * Cooperative TR is better between overcommit and cooperative. Note
4386 * that if qos is same between overcommit and cooperative, we choose
4387 * cooperative.
4388 *
4389 * Pick cooperative pool if it passes the admissions check
4390 */
4391 if (workq_cooperative_allowance(wq, req_tmp->tr_qos, uth, true)) {
4392 req_qos = req_tmp;
4393 qos = req_qos->tr_qos;
4394 }
4395 }
4396
4397 /*
4398 * Compare the best QoS so far - either from overcommit or from cooperative
4399 * pool - and compare it with the constrained pool
4400 */
4401 req_tmp = priority_queue_max(&wq->wq_constrained_queue,
4402 struct workq_threadreq_s, tr_entry);
4403
4404 if (req_tmp && qos < req_tmp->tr_qos) {
4405 /*
4406 * Constrained pool is best in QoS between overcommit, cooperative
4407 * and constrained. Now check how it fairs against the priority case
4408 */
4409 if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
4410 return req_pri;
4411 }
4412
4413 if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true, true)) {
4414 /*
4415 * If the constrained thread request is the best one and passes
4416 * the admission check, pick it.
4417 */
4418 return req_tmp;
4419 }
4420 }
4421
4422 if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
4423 return req_pri;
4424 }
4425
4426 return req_qos;
4427 }
4428
4429 /*
4430 * The creator is an anonymous thread that is counted as scheduled,
4431 * but otherwise without its scheduler callback set or tracked as active
4432 * that is used to make other threads.
4433 *
4434 * When more requests are added or an existing one is hurried along,
4435 * a creator is elected and setup, or the existing one overridden accordingly.
4436 *
4437 * While this creator is in flight, because no request has been dequeued,
4438 * already running threads have a chance at stealing thread requests avoiding
4439 * useless context switches, and the creator once scheduled may not find any
4440 * work to do and will then just park again.
4441 *
4442 * The creator serves the dual purpose of informing the scheduler of work that
4443 * hasn't be materialized as threads yet, and also as a natural pacing mechanism
4444 * for thread creation.
4445 *
4446 * By being anonymous (and not bound to anything) it means that thread requests
4447 * can be stolen from this creator by threads already on core yielding more
4448 * efficient scheduling and reduced context switches.
4449 */
4450 static void
workq_schedule_creator(proc_t p,struct workqueue * wq,workq_kern_threadreq_flags_t flags)4451 workq_schedule_creator(proc_t p, struct workqueue *wq,
4452 workq_kern_threadreq_flags_t flags)
4453 {
4454 workq_threadreq_t req;
4455 struct uthread *uth;
4456 bool needs_wakeup;
4457
4458 workq_lock_held(wq);
4459 assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0);
4460
4461 again:
4462 uth = wq->wq_creator;
4463
4464 if (!wq->wq_reqcount) {
4465 /*
4466 * There is no thread request left.
4467 *
4468 * If there is a creator, leave everything in place, so that it cleans
4469 * up itself in workq_push_idle_thread().
4470 *
4471 * Else, make sure the turnstile state is reset to no inheritor.
4472 */
4473 if (uth == NULL) {
4474 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4475 }
4476 return;
4477 }
4478
4479 req = workq_threadreq_select_for_creator(wq);
4480 if (req == NULL) {
4481 /*
4482 * There isn't a thread request that passes the admission check.
4483 *
4484 * If there is a creator, do not touch anything, the creator will sort
4485 * it out when it runs.
4486 *
4487 * Else, set the inheritor to "WORKQ" so that the turnstile propagation
4488 * code calls us if anything changes.
4489 */
4490 if (uth == NULL) {
4491 workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
4492 }
4493 return;
4494 }
4495
4496
4497 if (uth) {
4498 /*
4499 * We need to maybe override the creator we already have
4500 */
4501 if (workq_thread_needs_priority_change(req, uth)) {
4502 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4503 wq, 1, uthread_tid(uth), req->tr_qos);
4504 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4505 }
4506 assert(wq->wq_inheritor == get_machthread(uth));
4507 } else if (wq->wq_thidlecount) {
4508 /*
4509 * We need to unpark a creator thread
4510 */
4511 wq->wq_creator = uth = workq_pop_idle_thread(wq, UT_WORKQ_OVERCOMMIT,
4512 &needs_wakeup);
4513 /* Always reset the priorities on the newly chosen creator */
4514 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4515 workq_turnstile_update_inheritor(wq, get_machthread(uth),
4516 TURNSTILE_INHERITOR_THREAD);
4517 WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4518 wq, 2, uthread_tid(uth), req->tr_qos);
4519 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4520 uth->uu_save.uus_workq_park_data.yields = 0;
4521 if (needs_wakeup) {
4522 workq_thread_wakeup(uth);
4523 }
4524 } else {
4525 /*
4526 * We need to allocate a thread...
4527 */
4528 if (__improbable(wq->wq_nthreads >= wq_max_threads)) {
4529 /* out of threads, just go away */
4530 flags = WORKQ_THREADREQ_NONE;
4531 } else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) {
4532 act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ);
4533 } else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) {
4534 /* This can drop the workqueue lock, and take it again */
4535 workq_schedule_immediate_thread_creation(wq);
4536 } else if ((workq_add_new_idle_thread(p, wq,
4537 workq_unpark_continue, false, NULL) == KERN_SUCCESS)) {
4538 goto again;
4539 } else {
4540 workq_schedule_delayed_thread_creation(wq, 0);
4541 }
4542
4543 /*
4544 * If the current thread is the inheritor:
4545 *
4546 * If we set the AST, then the thread will stay the inheritor until
4547 * either the AST calls workq_kern_threadreq_redrive(), or it parks
4548 * and calls workq_push_idle_thread().
4549 *
4550 * Else, the responsibility of the thread creation is with a thread-call
4551 * and we need to clear the inheritor.
4552 */
4553 if ((flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) == 0 &&
4554 wq->wq_inheritor == current_thread()) {
4555 workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4556 }
4557 }
4558 }
4559
4560 /**
4561 * Same as workq_unpark_select_threadreq_or_park_and_unlock,
4562 * but do not allow early binds.
4563 *
4564 * Called with the base pri frozen, will unfreeze it.
4565 */
4566 __attribute__((noreturn, noinline))
4567 static void
workq_select_threadreq_or_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)4568 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4569 struct uthread *uth, uint32_t setup_flags)
4570 {
4571 workq_threadreq_t req = NULL;
4572 bool is_creator = (wq->wq_creator == uth);
4573 bool schedule_creator = false;
4574
4575 if (__improbable(_wq_exiting(wq))) {
4576 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0);
4577 goto park;
4578 }
4579
4580 if (wq->wq_reqcount == 0) {
4581 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0);
4582 goto park;
4583 }
4584
4585 req = workq_threadreq_select(wq, uth);
4586 if (__improbable(req == NULL)) {
4587 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0);
4588 goto park;
4589 }
4590
4591 struct uu_workq_policy old_pri = uth->uu_workq_pri;
4592 uint8_t tr_flags = req->tr_flags;
4593 struct turnstile *req_ts = kqueue_threadreq_get_turnstile(req);
4594
4595 /*
4596 * Attempt to setup ourselves as the new thing to run, moving all priority
4597 * pushes to ourselves.
4598 *
4599 * If the current thread is the creator, then the fact that we are presently
4600 * running is proof that we'll do something useful, so keep going.
4601 *
4602 * For other cases, peek at the AST to know whether the scheduler wants
4603 * to preempt us, if yes, park instead, and move the thread request
4604 * turnstile back to the workqueue.
4605 */
4606 if (req_ts) {
4607 workq_perform_turnstile_operation_locked(wq, ^{
4608 turnstile_update_inheritor(req_ts, get_machthread(uth),
4609 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_THREAD);
4610 turnstile_update_inheritor_complete(req_ts,
4611 TURNSTILE_INTERLOCK_HELD);
4612 });
4613 }
4614
4615 /* accounting changes of aggregate thscheduled_count and thactive which has
4616 * to be paired with the workq_thread_reset_pri below so that we have
4617 * uth->uu_workq_pri match with thactive.
4618 *
4619 * This is undone when the thread parks */
4620 if (is_creator) {
4621 WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0,
4622 uth->uu_save.uus_workq_park_data.yields);
4623 wq->wq_creator = NULL;
4624 _wq_thactive_inc(wq, req->tr_qos);
4625 wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++;
4626 } else if (old_pri.qos_bucket != req->tr_qos) {
4627 _wq_thactive_move(wq, old_pri.qos_bucket, req->tr_qos);
4628 }
4629 workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4630
4631 /*
4632 * Make relevant accounting changes for pool specific counts.
4633 *
4634 * The schedule counts changing can affect what the next best request
4635 * for cooperative thread pool is if this request is dequeued.
4636 */
4637 bool cooperative_sched_count_changed =
4638 workq_adjust_cooperative_constrained_schedule_counts(wq, uth,
4639 old_pri.qos_req, tr_flags);
4640
4641 if (workq_tr_is_overcommit(tr_flags)) {
4642 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
4643 } else if (workq_tr_is_cooperative(tr_flags)) {
4644 workq_thread_set_type(uth, UT_WORKQ_COOPERATIVE);
4645 } else {
4646 workq_thread_set_type(uth, 0);
4647 }
4648
4649 if (__improbable(thread_unfreeze_base_pri(get_machthread(uth)) && !is_creator)) {
4650 if (req_ts) {
4651 workq_perform_turnstile_operation_locked(wq, ^{
4652 turnstile_update_inheritor(req_ts, wq->wq_turnstile,
4653 TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
4654 turnstile_update_inheritor_complete(req_ts,
4655 TURNSTILE_INTERLOCK_HELD);
4656 });
4657 }
4658 WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 3, 0, 0);
4659
4660 /*
4661 * If a cooperative thread was the one which picked up the manager
4662 * thread request, we need to reevaluate the cooperative pool before
4663 * it goes and parks.
4664 *
4665 * For every other of thread request that it picks up, the logic in
4666 * workq_threadreq_select should have done this refresh.
4667 * See workq_push_idle_thread.
4668 */
4669 if (cooperative_sched_count_changed) {
4670 if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
4671 _wq_cooperative_queue_refresh_best_req_qos(wq);
4672 }
4673 }
4674 goto park_thawed;
4675 }
4676
4677 /*
4678 * We passed all checks, dequeue the request, bind to it, and set it up
4679 * to return to user.
4680 */
4681 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4682 workq_trace_req_id(req), tr_flags, 0);
4683 wq->wq_fulfilled++;
4684 schedule_creator = workq_threadreq_dequeue(wq, req,
4685 cooperative_sched_count_changed);
4686
4687 workq_thread_reset_cpupercent(req, uth);
4688
4689 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4690 kqueue_threadreq_bind_prepost(p, req, uth);
4691 req = NULL;
4692 } else if (req->tr_count > 0) {
4693 req = NULL;
4694 }
4695
4696 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4697 uth->uu_workq_flags ^= UT_WORKQ_NEW;
4698 setup_flags |= WQ_SETUP_FIRST_USE;
4699 }
4700
4701 /* If one of the following is true, call workq_schedule_creator (which also
4702 * adjusts priority of existing creator):
4703 *
4704 * - We are the creator currently so the wq may need a new creator
4705 * - The request we're binding to is the highest priority one, existing
4706 * creator's priority might need to be adjusted to reflect the next
4707 * highest TR
4708 */
4709 if (is_creator || schedule_creator) {
4710 /* This can drop the workqueue lock, and take it again */
4711 workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
4712 }
4713
4714 workq_unlock(wq);
4715
4716 if (req) {
4717 zfree(workq_zone_threadreq, req);
4718 }
4719
4720 /*
4721 * Run Thread, Run!
4722 */
4723 uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI;
4724 if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
4725 upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
4726 } else if (workq_tr_is_overcommit(tr_flags)) {
4727 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
4728 } else if (workq_tr_is_cooperative(tr_flags)) {
4729 upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
4730 }
4731 if (tr_flags & WORKQ_TR_FLAG_KEVENT) {
4732 upcall_flags |= WQ_FLAG_THREAD_KEVENT;
4733 assert((upcall_flags & WQ_FLAG_THREAD_COOPERATIVE) == 0);
4734 }
4735
4736 if (tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
4737 upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
4738 }
4739 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
4740
4741 if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4742 kqueue_threadreq_bind_commit(p, get_machthread(uth));
4743 } else {
4744 #if CONFIG_PREADOPT_TG
4745 /*
4746 * The thread may have a preadopt thread group on it already because it
4747 * got tagged with it as a creator thread. So we need to make sure to
4748 * clear that since we don't have preadoption for anonymous thread
4749 * requests
4750 */
4751 thread_set_preadopt_thread_group(get_machthread(uth), NULL);
4752 #endif
4753 }
4754
4755 workq_setup_and_run(p, uth, setup_flags);
4756 __builtin_unreachable();
4757
4758 park:
4759 thread_unfreeze_base_pri(get_machthread(uth));
4760 park_thawed:
4761 workq_park_and_unlock(p, wq, uth, setup_flags);
4762 }
4763
4764 /**
4765 * Runs a thread request on a thread
4766 *
4767 * - if thread is THREAD_NULL, will find a thread and run the request there.
4768 * Otherwise, the thread must be the current thread.
4769 *
4770 * - if req is NULL, will find the highest priority request and run that. If
4771 * it is not NULL, it must be a threadreq object in state NEW. If it can not
4772 * be run immediately, it will be enqueued and moved to state QUEUED.
4773 *
4774 * Either way, the thread request object serviced will be moved to state
4775 * BINDING and attached to the uthread.
4776 *
4777 * Should be called with the workqueue lock held. Will drop it.
4778 * Should be called with the base pri not frozen.
4779 */
4780 __attribute__((noreturn, noinline))
4781 static void
workq_unpark_select_threadreq_or_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)4782 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4783 struct uthread *uth, uint32_t setup_flags)
4784 {
4785 if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) {
4786 if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4787 setup_flags |= WQ_SETUP_FIRST_USE;
4788 }
4789 uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND);
4790 /*
4791 * This pointer is possibly freed and only used for tracing purposes.
4792 */
4793 workq_threadreq_t req = uth->uu_save.uus_workq_park_data.thread_request;
4794 workq_unlock(wq);
4795 WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4796 VM_KERNEL_ADDRHIDE(req), 0, 0);
4797 (void)req;
4798
4799 workq_setup_and_run(p, uth, setup_flags);
4800 __builtin_unreachable();
4801 }
4802
4803 thread_freeze_base_pri(get_machthread(uth));
4804 workq_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
4805 }
4806
4807 static bool
workq_creator_should_yield(struct workqueue * wq,struct uthread * uth)4808 workq_creator_should_yield(struct workqueue *wq, struct uthread *uth)
4809 {
4810 thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
4811
4812 if (qos >= THREAD_QOS_USER_INTERACTIVE) {
4813 return false;
4814 }
4815
4816 uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot;
4817 if (wq->wq_fulfilled == snapshot) {
4818 return false;
4819 }
4820
4821 uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)];
4822 if (wq->wq_fulfilled - snapshot > conc) {
4823 /* we fulfilled more than NCPU requests since being dispatched */
4824 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1,
4825 wq->wq_fulfilled, snapshot);
4826 return true;
4827 }
4828
4829 for (uint8_t i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) {
4830 cnt += wq->wq_thscheduled_count[i];
4831 }
4832 if (conc <= cnt) {
4833 /* We fulfilled requests and have more than NCPU scheduled threads */
4834 WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2,
4835 wq->wq_fulfilled, snapshot);
4836 return true;
4837 }
4838
4839 return false;
4840 }
4841
4842 /**
4843 * parked idle thread wakes up
4844 */
4845 __attribute__((noreturn, noinline))
4846 static void
workq_unpark_continue(void * parameter __unused,wait_result_t wr __unused)4847 workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused)
4848 {
4849 thread_t th = current_thread();
4850 struct uthread *uth = get_bsdthread_info(th);
4851 proc_t p = current_proc();
4852 struct workqueue *wq = proc_get_wqptr_fast(p);
4853
4854 workq_lock_spin(wq);
4855
4856 if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) {
4857 /*
4858 * If the number of threads we have out are able to keep up with the
4859 * demand, then we should avoid sending this creator thread to
4860 * userspace.
4861 */
4862 uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4863 uth->uu_save.uus_workq_park_data.yields++;
4864 workq_unlock(wq);
4865 thread_yield_with_continuation(workq_unpark_continue, NULL);
4866 __builtin_unreachable();
4867 }
4868
4869 if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
4870 workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, WQ_SETUP_NONE);
4871 __builtin_unreachable();
4872 }
4873
4874 if (__probable(wr == THREAD_AWAKENED)) {
4875 /*
4876 * We were set running, but for the purposes of dying.
4877 */
4878 assert(uth->uu_workq_flags & UT_WORKQ_DYING);
4879 assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
4880 } else {
4881 /*
4882 * workaround for <rdar://problem/38647347>,
4883 * in case we do hit userspace, make sure calling
4884 * workq_thread_terminate() does the right thing here,
4885 * and if we never call it, that workq_exit() will too because it sees
4886 * this thread on the runlist.
4887 */
4888 assert(wr == THREAD_INTERRUPTED);
4889 wq->wq_thdying_count++;
4890 uth->uu_workq_flags |= UT_WORKQ_DYING;
4891 }
4892
4893 workq_unpark_for_death_and_unlock(p, wq, uth,
4894 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE);
4895 __builtin_unreachable();
4896 }
4897
4898 __attribute__((noreturn, noinline))
4899 static void
workq_setup_and_run(proc_t p,struct uthread * uth,int setup_flags)4900 workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags)
4901 {
4902 thread_t th = get_machthread(uth);
4903 vm_map_t vmap = get_task_map(proc_task(p));
4904
4905 if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
4906 /*
4907 * For preemption reasons, we want to reset the voucher as late as
4908 * possible, so we do it in two places:
4909 * - Just before parking (i.e. in workq_park_and_unlock())
4910 * - Prior to doing the setup for the next workitem (i.e. here)
4911 *
4912 * Those two places are sufficient to ensure we always reset it before
4913 * it goes back out to user space, but be careful to not break that
4914 * guarantee.
4915 *
4916 * Note that setting the voucher to NULL will not clear the preadoption
4917 * thread group on this thread
4918 */
4919 __assert_only kern_return_t kr;
4920 kr = thread_set_voucher_name(MACH_PORT_NULL);
4921 assert(kr == KERN_SUCCESS);
4922 }
4923
4924 uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags;
4925 if (!(setup_flags & WQ_SETUP_FIRST_USE)) {
4926 upcall_flags |= WQ_FLAG_THREAD_REUSE;
4927 }
4928
4929 if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
4930 /*
4931 * For threads that have an outside-of-QoS thread priority, indicate
4932 * to userspace that setting QoS should only affect the TSD and not
4933 * change QOS in the kernel.
4934 */
4935 upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
4936 } else {
4937 /*
4938 * Put the QoS class value into the lower bits of the reuse_thread
4939 * register, this is where the thread priority used to be stored
4940 * anyway.
4941 */
4942 upcall_flags |= uth->uu_save.uus_workq_park_data.qos |
4943 WQ_FLAG_THREAD_PRIO_QOS;
4944 }
4945
4946 if (uth->uu_workq_thport == MACH_PORT_NULL) {
4947 /* convert_thread_to_port_pinned() consumes a reference */
4948 thread_reference(th);
4949 /* Convert to immovable/pinned thread port, but port is not pinned yet */
4950 ipc_port_t port = convert_thread_to_port_pinned(th);
4951 /* Atomically, pin and copy out the port */
4952 uth->uu_workq_thport = ipc_port_copyout_send_pinned(port, get_task_ipcspace(proc_task(p)));
4953 }
4954
4955 /* Thread has been set up to run, arm its next workqueue quantum or disarm
4956 * if it is no longer supporting that */
4957 if (thread_supports_cooperative_workqueue(th)) {
4958 thread_arm_workqueue_quantum(th);
4959 } else {
4960 thread_disarm_workqueue_quantum(th);
4961 }
4962
4963 /*
4964 * Call out to pthread, this sets up the thread, pulls in kevent structs
4965 * onto the stack, sets up the thread state and then returns to userspace.
4966 */
4967 WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START,
4968 proc_get_wqptr_fast(p), 0, 0, 0);
4969
4970 if (workq_thread_is_cooperative(uth) || workq_thread_is_permanently_bound(uth)) {
4971 thread_sched_call(th, NULL);
4972 } else {
4973 thread_sched_call(th, workq_sched_callback);
4974 }
4975
4976 pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
4977 uth->uu_workq_thport, 0, setup_flags, upcall_flags);
4978
4979 __builtin_unreachable();
4980 }
4981
4982 /**
4983 * A wrapper around workq_setup_and_run for permanently bound thread.
4984 */
4985 __attribute__((noreturn, noinline))
4986 static void
workq_bound_thread_setup_and_run(struct uthread * uth,int setup_flags)4987 workq_bound_thread_setup_and_run(struct uthread *uth, int setup_flags)
4988 {
4989 struct workq_threadreq_s * kqr = uth->uu_kqr_bound;
4990
4991 uint32_t upcall_flags = (WQ_FLAG_THREAD_NEWSPI |
4992 WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT);
4993 if (workq_tr_is_overcommit(kqr->tr_flags)) {
4994 workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
4995 upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
4996 }
4997 uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
4998 workq_setup_and_run(current_proc(), uth, setup_flags);
4999 __builtin_unreachable();
5000 }
5001
5002 /**
5003 * A parked bound thread wakes up for the first time.
5004 */
5005 __attribute__((noreturn, noinline))
5006 static void
workq_bound_thread_initialize_and_unpark_continue(void * parameter __unused,wait_result_t wr)5007 workq_bound_thread_initialize_and_unpark_continue(void *parameter __unused,
5008 wait_result_t wr)
5009 {
5010 /*
5011 * Locking model for accessing uu_workq_flags :
5012 *
5013 * The concurrent access to uu_workq_flags is synchronized with workq lock
5014 * until a thread gets permanently bound to a kqwl. Post that, kqlock
5015 * is used for subsequent synchronizations. This gives us a significant
5016 * benefit by avoiding having to take a process wide workq lock on every
5017 * wakeup of the bound thread.
5018 * This flip in locking model is tracked with UT_WORKQ_PERMANENT_BIND flag.
5019 *
5020 * There is one more optimization we can perform for when the thread is
5021 * awakened for running (i.e THREAD_AWAKENED) until it parks.
5022 * During this window, we know KQ_SLEEP bit is reset so there should not
5023 * be any concurrent attempts to modify uu_workq_flags by
5024 * kqworkloop_bound_thread_wakeup because the thread is already "awake".
5025 * So we can safely access uu_workq_flags within this window without having
5026 * to take kqlock. This KQ_SLEEP is later set by the bound thread under
5027 * kqlock on its way to parking.
5028 */
5029 struct uthread *uth = get_bsdthread_info(current_thread());
5030
5031 if (__probable(wr == THREAD_AWAKENED)) {
5032 /* At most one flag. */
5033 assert((uth->uu_workq_flags & (UT_WORKQ_RUNNING | UT_WORKQ_DYING))
5034 != (UT_WORKQ_RUNNING | UT_WORKQ_DYING));
5035
5036 assert(workq_thread_is_permanently_bound(uth));
5037
5038 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
5039 assert(uth->uu_workq_flags & UT_WORKQ_NEW);
5040 uth->uu_workq_flags &= ~UT_WORKQ_NEW;
5041
5042 struct workq_threadreq_s * kqr = uth->uu_kqr_bound;
5043 if (kqr->tr_work_interval) {
5044 kern_return_t kr;
5045 kr = kern_work_interval_explicit_join(get_machthread(uth),
5046 kqr->tr_work_interval);
5047 /*
5048 * The work interval functions requires to be called on the
5049 * current thread. If we fail here, we record the fact and
5050 * continue.
5051 * In the future, we can preflight checking that this join will
5052 * always be successful when the paird kqwl is configured; but,
5053 * for now, this should be a rare case (e.g. if you have passed
5054 * invalid arguments to the join).
5055 */
5056 if (kr == KERN_SUCCESS) {
5057 uth->uu_workq_flags |= UT_WORKQ_WORK_INTERVAL_JOINED;
5058 /* Thread and kqwl both have +1 ref on the work interval. */
5059 } else {
5060 uth->uu_workq_flags |= UT_WORKQ_WORK_INTERVAL_FAILED;
5061 }
5062 }
5063 workq_thread_reset_cpupercent(kqr, uth);
5064 workq_bound_thread_setup_and_run(uth, WQ_SETUP_FIRST_USE);
5065 __builtin_unreachable();
5066 } else {
5067 /*
5068 * The permanently bound kqworkloop is getting destroyed so we
5069 * are woken up to cleanly unbind ourselves from it and terminate.
5070 * See KQ_WORKLOOP_DESTROY -> workq_kern_bound_thread_wakeup.
5071 *
5072 * The actual full unbind happens from
5073 * uthread_cleanup -> kqueue_threadreq_unbind.
5074 */
5075 assert(uth->uu_workq_flags & UT_WORKQ_DYING);
5076 }
5077 } else {
5078 /*
5079 * The process is getting terminated so we are woken up to die.
5080 * E.g. SIGKILL'd.
5081 */
5082 assert(wr == THREAD_INTERRUPTED);
5083 /*
5084 * It is possible we started running as the process is aborted
5085 * due to termination; but, workq_kern_threadreq_permanent_bind
5086 * has not had a chance to bind us to the kqwl yet.
5087 *
5088 * We synchronize with it using workq lock.
5089 */
5090 proc_t p = current_proc();
5091 struct workqueue *wq = proc_get_wqptr_fast(p);
5092 workq_lock_spin(wq);
5093 assert(workq_thread_is_permanently_bound(uth));
5094 workq_unlock(wq);
5095
5096 /*
5097 * We do the bind commit ourselves if workq_kern_threadreq_permanent_bind
5098 * has not done it for us yet so our state is aligned with what the
5099 * termination path below expects.
5100 */
5101 kqueue_threadreq_bind_commit(p, get_machthread(uth));
5102 }
5103 workq_kern_bound_thread_terminate(uth->uu_kqr_bound);
5104 __builtin_unreachable();
5105 }
5106
5107 /**
5108 * A parked bound thread wakes up. Not the first time.
5109 */
5110 __attribute__((noreturn, noinline))
5111 static void
workq_bound_thread_unpark_continue(void * parameter __unused,wait_result_t wr)5112 workq_bound_thread_unpark_continue(void *parameter __unused, wait_result_t wr)
5113 {
5114 struct uthread *uth = get_bsdthread_info(current_thread());
5115 assert(workq_thread_is_permanently_bound(uth));
5116
5117 if (__probable(wr == THREAD_AWAKENED)) {
5118 /* At most one flag. */
5119 assert((uth->uu_workq_flags & (UT_WORKQ_RUNNING | UT_WORKQ_DYING))
5120 != (UT_WORKQ_RUNNING | UT_WORKQ_DYING));
5121 if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
5122 workq_bound_thread_setup_and_run(uth, WQ_SETUP_NONE);
5123 } else {
5124 assert(uth->uu_workq_flags & UT_WORKQ_DYING);
5125 }
5126 } else {
5127 assert(wr == THREAD_INTERRUPTED);
5128 }
5129 workq_kern_bound_thread_terminate(uth->uu_kqr_bound);
5130 __builtin_unreachable();
5131 }
5132
5133 #pragma mark misc
5134
5135 int
fill_procworkqueue(proc_t p,struct proc_workqueueinfo * pwqinfo)5136 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
5137 {
5138 struct workqueue *wq = proc_get_wqptr(p);
5139 int error = 0;
5140 int activecount;
5141
5142 if (wq == NULL) {
5143 return EINVAL;
5144 }
5145
5146 /*
5147 * This is sometimes called from interrupt context by the kperf sampler.
5148 * In that case, it's not safe to spin trying to take the lock since we
5149 * might already hold it. So, we just try-lock it and error out if it's
5150 * already held. Since this is just a debugging aid, and all our callers
5151 * are able to handle an error, that's fine.
5152 */
5153 bool locked = workq_lock_try(wq);
5154 if (!locked) {
5155 return EBUSY;
5156 }
5157
5158 wq_thactive_t act = _wq_thactive(wq);
5159 activecount = _wq_thactive_aggregate_downto_qos(wq, act,
5160 WORKQ_THREAD_QOS_MIN, NULL, NULL);
5161 if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) {
5162 activecount++;
5163 }
5164 pwqinfo->pwq_nthreads = wq->wq_nthreads;
5165 pwqinfo->pwq_runthreads = activecount;
5166 pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
5167 pwqinfo->pwq_state = 0;
5168
5169 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
5170 pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
5171 }
5172
5173 if (wq->wq_nthreads >= wq_max_threads) {
5174 pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
5175 }
5176
5177 uint64_t total_cooperative_threads;
5178 total_cooperative_threads = workq_num_cooperative_threads_scheduled_total(wq);
5179 if ((total_cooperative_threads == wq_cooperative_queue_max_size(wq)) &&
5180 workq_has_cooperative_thread_requests(wq)) {
5181 pwqinfo->pwq_state |= WQ_EXCEEDED_COOPERATIVE_THREAD_LIMIT;
5182 }
5183
5184 if (wq->wq_exceeded_active_constrained_thread_limit) {
5185 pwqinfo->pwq_state |= WQ_EXCEEDED_ACTIVE_CONSTRAINED_THREAD_LIMIT;
5186 }
5187
5188 workq_unlock(wq);
5189 return error;
5190 }
5191
5192 boolean_t
workqueue_get_pwq_exceeded(void * v,boolean_t * exceeded_total,boolean_t * exceeded_constrained)5193 workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total,
5194 boolean_t *exceeded_constrained)
5195 {
5196 proc_t p = v;
5197 struct proc_workqueueinfo pwqinfo;
5198 int err;
5199
5200 assert(p != NULL);
5201 assert(exceeded_total != NULL);
5202 assert(exceeded_constrained != NULL);
5203
5204 err = fill_procworkqueue(p, &pwqinfo);
5205 if (err) {
5206 return FALSE;
5207 }
5208 if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) {
5209 return FALSE;
5210 }
5211
5212 *exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT);
5213 *exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT);
5214
5215 return TRUE;
5216 }
5217
5218 uint64_t
workqueue_get_task_ss_flags_from_pwq_state_kdp(void * v)5219 workqueue_get_task_ss_flags_from_pwq_state_kdp(void * v)
5220 {
5221 static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) ==
5222 kTaskWqExceededConstrainedThreadLimit);
5223 static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) ==
5224 kTaskWqExceededTotalThreadLimit);
5225 static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable);
5226 static_assert(((uint64_t)WQ_EXCEEDED_COOPERATIVE_THREAD_LIMIT << 34) ==
5227 (uint64_t)kTaskWqExceededCooperativeThreadLimit);
5228 static_assert(((uint64_t)WQ_EXCEEDED_ACTIVE_CONSTRAINED_THREAD_LIMIT << 34) ==
5229 (uint64_t)kTaskWqExceededActiveConstrainedThreadLimit);
5230 static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT |
5231 WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT |
5232 WQ_EXCEEDED_COOPERATIVE_THREAD_LIMIT |
5233 WQ_EXCEEDED_ACTIVE_CONSTRAINED_THREAD_LIMIT) == 0x1F);
5234
5235 if (v == NULL) {
5236 return 0;
5237 }
5238
5239 proc_t p = v;
5240 struct workqueue *wq = proc_get_wqptr(p);
5241
5242 if (wq == NULL || workq_lock_is_acquired_kdp(wq)) {
5243 return 0;
5244 }
5245
5246 uint64_t ss_flags = kTaskWqFlagsAvailable;
5247
5248 if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
5249 ss_flags |= kTaskWqExceededConstrainedThreadLimit;
5250 }
5251
5252 if (wq->wq_nthreads >= wq_max_threads) {
5253 ss_flags |= kTaskWqExceededTotalThreadLimit;
5254 }
5255
5256 uint64_t total_cooperative_threads;
5257 total_cooperative_threads = workq_num_cooperative_threads_scheduled_to_qos_internal(wq,
5258 WORKQ_THREAD_QOS_MIN);
5259 if ((total_cooperative_threads == wq_cooperative_queue_max_size(wq)) &&
5260 workq_has_cooperative_thread_requests(wq)) {
5261 ss_flags |= kTaskWqExceededCooperativeThreadLimit;
5262 }
5263
5264 if (wq->wq_exceeded_active_constrained_thread_limit) {
5265 ss_flags |= kTaskWqExceededActiveConstrainedThreadLimit;
5266 }
5267
5268 return ss_flags;
5269 }
5270
5271 void
workq_init(void)5272 workq_init(void)
5273 {
5274 clock_interval_to_absolutetime_interval(wq_stalled_window.usecs,
5275 NSEC_PER_USEC, &wq_stalled_window.abstime);
5276 clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs,
5277 NSEC_PER_USEC, &wq_reduce_pool_window.abstime);
5278 clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs,
5279 NSEC_PER_USEC, &wq_max_timer_interval.abstime);
5280
5281 thread_deallocate_daemon_register_queue(&workq_deallocate_queue,
5282 workq_deallocate_queue_invoke);
5283 }
5284