xref: /xnu-10063.121.3/bsd/pthread/pthread_workqueue.c (revision 2c2f96dc2b9a4408a43d3150ae9c105355ca3daa)
1 /*
2  * Copyright (c) 2000-2020 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
29 
30 #include <sys/cdefs.h>
31 
32 #include <kern/assert.h>
33 #include <kern/ast.h>
34 #include <kern/clock.h>
35 #include <kern/cpu_data.h>
36 #include <kern/kern_types.h>
37 #include <kern/policy_internal.h>
38 #include <kern/processor.h>
39 #include <kern/sched_prim.h>    /* for thread_exception_return */
40 #include <kern/task.h>
41 #include <kern/thread.h>
42 #include <kern/thread_group.h>
43 #include <kern/zalloc.h>
44 #include <mach/kern_return.h>
45 #include <mach/mach_param.h>
46 #include <mach/mach_port.h>
47 #include <mach/mach_types.h>
48 #include <mach/mach_vm.h>
49 #include <mach/sync_policy.h>
50 #include <mach/task.h>
51 #include <mach/thread_act.h> /* for thread_resume */
52 #include <mach/thread_policy.h>
53 #include <mach/thread_status.h>
54 #include <mach/vm_prot.h>
55 #include <mach/vm_statistics.h>
56 #include <machine/atomic.h>
57 #include <machine/machine_routines.h>
58 #include <machine/smp.h>
59 #include <vm/vm_map.h>
60 #include <vm/vm_protos.h>
61 
62 #include <sys/eventvar.h>
63 #include <sys/kdebug.h>
64 #include <sys/kernel.h>
65 #include <sys/lock.h>
66 #include <sys/param.h>
67 #include <sys/proc_info.h>      /* for fill_procworkqueue */
68 #include <sys/proc_internal.h>
69 #include <sys/pthread_shims.h>
70 #include <sys/resourcevar.h>
71 #include <sys/signalvar.h>
72 #include <sys/sysctl.h>
73 #include <sys/sysproto.h>
74 #include <sys/systm.h>
75 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
76 
77 #include <pthread/bsdthread_private.h>
78 #include <pthread/workqueue_syscalls.h>
79 #include <pthread/workqueue_internal.h>
80 #include <pthread/workqueue_trace.h>
81 
82 #include <os/log.h>
83 
84 static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2;
85 static void workq_schedule_creator(proc_t p, struct workqueue *wq,
86     workq_kern_threadreq_flags_t flags);
87 
88 static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
89     workq_threadreq_t req);
90 
91 static uint32_t workq_constrained_allowance(struct workqueue *wq,
92     thread_qos_t at_qos, struct uthread *uth, bool may_start_timer);
93 
94 static bool _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq);
95 
96 static bool workq_thread_is_busy(uint64_t cur_ts,
97     _Atomic uint64_t *lastblocked_tsp);
98 
99 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
100 
101 static bool
102 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags);
103 
104 static inline void
105 workq_lock_spin(struct workqueue *wq);
106 
107 static inline void
108 workq_unlock(struct workqueue *wq);
109 
110 #pragma mark globals
111 
112 struct workq_usec_var {
113 	uint32_t usecs;
114 	uint64_t abstime;
115 };
116 
117 #define WORKQ_SYSCTL_USECS(var, init) \
118 	        static struct workq_usec_var var = { .usecs = init }; \
119 	        SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
120 	                        CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
121 	                        workq_sysctl_handle_usecs, "I", "")
122 
123 static LCK_GRP_DECLARE(workq_lck_grp, "workq");
124 os_refgrp_decl(static, workq_refgrp, "workq", NULL);
125 
126 static ZONE_DEFINE(workq_zone_workqueue, "workq.wq",
127     sizeof(struct workqueue), ZC_NONE);
128 static ZONE_DEFINE(workq_zone_threadreq, "workq.threadreq",
129     sizeof(struct workq_threadreq_s), ZC_CACHING);
130 
131 static struct mpsc_daemon_queue workq_deallocate_queue;
132 
133 WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS);
134 WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
135 WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS);
136 static uint32_t wq_max_threads              = WORKQUEUE_MAXTHREADS;
137 static uint32_t wq_max_constrained_threads  = WORKQUEUE_MAXTHREADS / 8;
138 static uint32_t wq_init_constrained_limit   = 1;
139 static uint16_t wq_death_max_load;
140 static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS];
141 
142 /*
143  * This is not a hard limit but the max size we want to aim to hit across the
144  * entire cooperative pool. We can oversubscribe the pool due to non-cooperative
145  * workers and the max we will oversubscribe the pool by, is a total of
146  * wq_max_cooperative_threads * WORKQ_NUM_QOS_BUCKETS.
147  */
148 static uint32_t wq_max_cooperative_threads;
149 
150 static inline uint32_t
wq_cooperative_queue_max_size(struct workqueue * wq)151 wq_cooperative_queue_max_size(struct workqueue *wq)
152 {
153 	return wq->wq_cooperative_queue_has_limited_max_size ? 1 : wq_max_cooperative_threads;
154 }
155 
156 #pragma mark sysctls
157 
158 static int
159 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
160 {
161 #pragma unused(arg2)
162 	struct workq_usec_var *v = arg1;
163 	int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
164 	if (error || !req->newptr) {
165 		return error;
166 	}
167 	clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
168 	    &v->abstime);
169 	return 0;
170 }
171 
172 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
173     &wq_max_threads, 0, "");
174 
175 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
176     &wq_max_constrained_threads, 0, "");
177 
178 static int
179 wq_limit_cooperative_threads_for_proc SYSCTL_HANDLER_ARGS
180 {
181 #pragma unused(arg1, arg2, oidp)
182 	int input_pool_size = 0;
183 	int changed;
184 	int error = 0;
185 
186 	error = sysctl_io_number(req, 0, sizeof(int), &input_pool_size, &changed);
187 	if (error || !changed) {
188 		return error;
189 	}
190 
191 #define WQ_COOPERATIVE_POOL_SIZE_DEFAULT 0
192 #define WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS -1
193 /* Not available currently, but sysctl interface is designed to allow these
194  * extra parameters:
195  *		WQ_COOPERATIVE_POOL_SIZE_STRICT : -2 (across all bucket)
196  *		WQ_COOPERATIVE_POOL_SIZE_CUSTOM : [1, 512]
197  */
198 
199 	if (input_pool_size != WQ_COOPERATIVE_POOL_SIZE_DEFAULT
200 	    && input_pool_size != WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS) {
201 		error = EINVAL;
202 		goto out;
203 	}
204 
205 	proc_t p = req->p;
206 	struct workqueue *wq = proc_get_wqptr(p);
207 
208 	if (wq != NULL) {
209 		workq_lock_spin(wq);
210 		if (wq->wq_reqcount > 0 || wq->wq_nthreads > 0) {
211 			// Hackily enforce that the workqueue is still new (no requests or
212 			// threads)
213 			error = ENOTSUP;
214 		} else {
215 			wq->wq_cooperative_queue_has_limited_max_size = (input_pool_size == WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS);
216 		}
217 		workq_unlock(wq);
218 	} else {
219 		/* This process has no workqueue, calling this syctl makes no sense */
220 		return ENOTSUP;
221 	}
222 
223 out:
224 	return error;
225 }
226 
227 SYSCTL_PROC(_kern, OID_AUTO, wq_limit_cooperative_threads,
228     CTLFLAG_ANYBODY | CTLFLAG_MASKED | CTLFLAG_WR | CTLFLAG_LOCKED | CTLTYPE_INT, 0, 0,
229     wq_limit_cooperative_threads_for_proc,
230     "I", "Modify the max pool size of the cooperative pool");
231 
232 #pragma mark p_wqptr
233 
234 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
235 
236 static struct workqueue *
proc_get_wqptr_fast(struct proc * p)237 proc_get_wqptr_fast(struct proc *p)
238 {
239 	return os_atomic_load(&p->p_wqptr, relaxed);
240 }
241 
242 struct workqueue *
proc_get_wqptr(struct proc * p)243 proc_get_wqptr(struct proc *p)
244 {
245 	struct workqueue *wq = proc_get_wqptr_fast(p);
246 	return wq == WQPTR_IS_INITING_VALUE ? NULL : wq;
247 }
248 
249 static void
proc_set_wqptr(struct proc * p,struct workqueue * wq)250 proc_set_wqptr(struct proc *p, struct workqueue *wq)
251 {
252 	wq = os_atomic_xchg(&p->p_wqptr, wq, release);
253 	if (wq == WQPTR_IS_INITING_VALUE) {
254 		proc_lock(p);
255 		thread_wakeup(&p->p_wqptr);
256 		proc_unlock(p);
257 	}
258 }
259 
260 static bool
proc_init_wqptr_or_wait(struct proc * p)261 proc_init_wqptr_or_wait(struct proc *p)
262 {
263 	struct workqueue *wq;
264 
265 	proc_lock(p);
266 	wq = os_atomic_load(&p->p_wqptr, relaxed);
267 
268 	if (wq == NULL) {
269 		os_atomic_store(&p->p_wqptr, WQPTR_IS_INITING_VALUE, relaxed);
270 		proc_unlock(p);
271 		return true;
272 	}
273 
274 	if (wq == WQPTR_IS_INITING_VALUE) {
275 		assert_wait(&p->p_wqptr, THREAD_UNINT);
276 		proc_unlock(p);
277 		thread_block(THREAD_CONTINUE_NULL);
278 	} else {
279 		proc_unlock(p);
280 	}
281 	return false;
282 }
283 
284 static inline event_t
workq_parked_wait_event(struct uthread * uth)285 workq_parked_wait_event(struct uthread *uth)
286 {
287 	return (event_t)&uth->uu_workq_stackaddr;
288 }
289 
290 static inline void
workq_thread_wakeup(struct uthread * uth)291 workq_thread_wakeup(struct uthread *uth)
292 {
293 	thread_wakeup_thread(workq_parked_wait_event(uth), get_machthread(uth));
294 }
295 
296 #pragma mark wq_thactive
297 
298 #if defined(__LP64__)
299 // Layout is:
300 //   127 - 115 : 13 bits of zeroes
301 //   114 - 112 : best QoS among all pending constrained requests
302 //   111 -   0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
303 #define WQ_THACTIVE_BUCKET_WIDTH 16
304 #define WQ_THACTIVE_QOS_SHIFT    (7 * WQ_THACTIVE_BUCKET_WIDTH)
305 #else
306 // Layout is:
307 //   63 - 61 : best QoS among all pending constrained requests
308 //   60      : Manager bucket (0 or 1)
309 //   59 -  0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
310 #define WQ_THACTIVE_BUCKET_WIDTH 10
311 #define WQ_THACTIVE_QOS_SHIFT    (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
312 #endif
313 #define WQ_THACTIVE_BUCKET_MASK  ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
314 #define WQ_THACTIVE_BUCKET_HALF  (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
315 
316 static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3,
317     "Make sure we have space to encode a QoS");
318 
319 static inline wq_thactive_t
_wq_thactive(struct workqueue * wq)320 _wq_thactive(struct workqueue *wq)
321 {
322 	return os_atomic_load_wide(&wq->wq_thactive, relaxed);
323 }
324 
325 static inline uint8_t
_wq_bucket(thread_qos_t qos)326 _wq_bucket(thread_qos_t qos)
327 {
328 	// Map both BG and MT to the same bucket by over-shifting down and
329 	// clamping MT and BG together.
330 	switch (qos) {
331 	case THREAD_QOS_MAINTENANCE:
332 		return 0;
333 	default:
334 		return qos - 2;
335 	}
336 }
337 
338 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
339 	        ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT))
340 
341 static inline thread_qos_t
_wq_thactive_best_constrained_req_qos(struct workqueue * wq)342 _wq_thactive_best_constrained_req_qos(struct workqueue *wq)
343 {
344 	// Avoid expensive atomic operations: the three bits we're loading are in
345 	// a single byte, and always updated under the workqueue lock
346 	wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive;
347 	return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v);
348 }
349 
350 static void
_wq_thactive_refresh_best_constrained_req_qos(struct workqueue * wq)351 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq)
352 {
353 	thread_qos_t old_qos, new_qos;
354 	workq_threadreq_t req;
355 
356 	req = priority_queue_max(&wq->wq_constrained_queue,
357 	    struct workq_threadreq_s, tr_entry);
358 	new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED;
359 	old_qos = _wq_thactive_best_constrained_req_qos(wq);
360 	if (old_qos != new_qos) {
361 		long delta = (long)new_qos - (long)old_qos;
362 		wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT;
363 		/*
364 		 * We can do an atomic add relative to the initial load because updates
365 		 * to this qos are always serialized under the workqueue lock.
366 		 */
367 		v = os_atomic_add(&wq->wq_thactive, v, relaxed);
368 #ifdef __LP64__
369 		WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v,
370 		    (uint64_t)(v >> 64), 0);
371 #else
372 		WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0);
373 #endif
374 	}
375 }
376 
377 static inline wq_thactive_t
_wq_thactive_offset_for_qos(thread_qos_t qos)378 _wq_thactive_offset_for_qos(thread_qos_t qos)
379 {
380 	uint8_t bucket = _wq_bucket(qos);
381 	__builtin_assume(bucket < WORKQ_NUM_BUCKETS);
382 	return (wq_thactive_t)1 << (bucket * WQ_THACTIVE_BUCKET_WIDTH);
383 }
384 
385 static inline wq_thactive_t
_wq_thactive_inc(struct workqueue * wq,thread_qos_t qos)386 _wq_thactive_inc(struct workqueue *wq, thread_qos_t qos)
387 {
388 	wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
389 	return os_atomic_add_orig(&wq->wq_thactive, v, relaxed);
390 }
391 
392 static inline wq_thactive_t
_wq_thactive_dec(struct workqueue * wq,thread_qos_t qos)393 _wq_thactive_dec(struct workqueue *wq, thread_qos_t qos)
394 {
395 	wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
396 	return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed);
397 }
398 
399 static inline void
_wq_thactive_move(struct workqueue * wq,thread_qos_t old_qos,thread_qos_t new_qos)400 _wq_thactive_move(struct workqueue *wq,
401     thread_qos_t old_qos, thread_qos_t new_qos)
402 {
403 	wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) -
404 	    _wq_thactive_offset_for_qos(old_qos);
405 	os_atomic_add(&wq->wq_thactive, v, relaxed);
406 	wq->wq_thscheduled_count[_wq_bucket(old_qos)]--;
407 	wq->wq_thscheduled_count[_wq_bucket(new_qos)]++;
408 }
409 
410 static inline uint32_t
_wq_thactive_aggregate_downto_qos(struct workqueue * wq,wq_thactive_t v,thread_qos_t qos,uint32_t * busycount,uint32_t * max_busycount)411 _wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v,
412     thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount)
413 {
414 	uint32_t count = 0, active;
415 	uint64_t curtime;
416 
417 	assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX);
418 
419 	if (busycount) {
420 		curtime = mach_absolute_time();
421 		*busycount = 0;
422 	}
423 	if (max_busycount) {
424 		*max_busycount = THREAD_QOS_LAST - qos;
425 	}
426 
427 	uint8_t i = _wq_bucket(qos);
428 	v >>= i * WQ_THACTIVE_BUCKET_WIDTH;
429 	for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) {
430 		active = v & WQ_THACTIVE_BUCKET_MASK;
431 		count += active;
432 
433 		if (busycount && wq->wq_thscheduled_count[i] > active) {
434 			if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) {
435 				/*
436 				 * We only consider the last blocked thread for a given bucket
437 				 * as busy because we don't want to take the list lock in each
438 				 * sched callback. However this is an approximation that could
439 				 * contribute to thread creation storms.
440 				 */
441 				(*busycount)++;
442 			}
443 		}
444 	}
445 
446 	return count;
447 }
448 
449 /* The input qos here should be the requested QoS of the thread, not accounting
450  * for any overrides */
451 static inline void
_wq_cooperative_queue_scheduled_count_dec(struct workqueue * wq,thread_qos_t qos)452 _wq_cooperative_queue_scheduled_count_dec(struct workqueue *wq, thread_qos_t qos)
453 {
454 	__assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]--;
455 	assert(old_scheduled_count > 0);
456 }
457 
458 /* The input qos here should be the requested QoS of the thread, not accounting
459  * for any overrides */
460 static inline void
_wq_cooperative_queue_scheduled_count_inc(struct workqueue * wq,thread_qos_t qos)461 _wq_cooperative_queue_scheduled_count_inc(struct workqueue *wq, thread_qos_t qos)
462 {
463 	__assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]++;
464 	assert(old_scheduled_count < UINT8_MAX);
465 }
466 
467 #pragma mark wq_flags
468 
469 static inline uint32_t
_wq_flags(struct workqueue * wq)470 _wq_flags(struct workqueue *wq)
471 {
472 	return os_atomic_load(&wq->wq_flags, relaxed);
473 }
474 
475 static inline bool
_wq_exiting(struct workqueue * wq)476 _wq_exiting(struct workqueue *wq)
477 {
478 	return _wq_flags(wq) & WQ_EXITING;
479 }
480 
481 bool
workq_is_exiting(struct proc * p)482 workq_is_exiting(struct proc *p)
483 {
484 	struct workqueue *wq = proc_get_wqptr(p);
485 	return !wq || _wq_exiting(wq);
486 }
487 
488 
489 #pragma mark workqueue lock
490 
491 static bool
workq_lock_is_acquired_kdp(struct workqueue * wq)492 workq_lock_is_acquired_kdp(struct workqueue *wq)
493 {
494 	return kdp_lck_ticket_is_acquired(&wq->wq_lock);
495 }
496 
497 static inline void
workq_lock_spin(struct workqueue * wq)498 workq_lock_spin(struct workqueue *wq)
499 {
500 	lck_ticket_lock(&wq->wq_lock, &workq_lck_grp);
501 }
502 
503 static inline void
workq_lock_held(struct workqueue * wq)504 workq_lock_held(struct workqueue *wq)
505 {
506 	LCK_TICKET_ASSERT_OWNED(&wq->wq_lock);
507 }
508 
509 static inline bool
workq_lock_try(struct workqueue * wq)510 workq_lock_try(struct workqueue *wq)
511 {
512 	return lck_ticket_lock_try(&wq->wq_lock, &workq_lck_grp);
513 }
514 
515 static inline void
workq_unlock(struct workqueue * wq)516 workq_unlock(struct workqueue *wq)
517 {
518 	lck_ticket_unlock(&wq->wq_lock);
519 }
520 
521 #pragma mark idle thread lists
522 
523 #define WORKQ_POLICY_INIT(qos) \
524 	        (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
525 
526 static inline thread_qos_t
workq_pri_bucket(struct uu_workq_policy req)527 workq_pri_bucket(struct uu_workq_policy req)
528 {
529 	return MAX(MAX(req.qos_req, req.qos_max), req.qos_override);
530 }
531 
532 static inline thread_qos_t
workq_pri_override(struct uu_workq_policy req)533 workq_pri_override(struct uu_workq_policy req)
534 {
535 	return MAX(workq_pri_bucket(req), req.qos_bucket);
536 }
537 
538 static inline bool
workq_thread_needs_params_change(workq_threadreq_t req,struct uthread * uth)539 workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth)
540 {
541 	workq_threadreq_param_t cur_trp, req_trp = { };
542 
543 	cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params;
544 	if (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
545 		req_trp = kqueue_threadreq_workloop_param(req);
546 	}
547 
548 	/*
549 	 * CPU percent flags are handled separately to policy changes, so ignore
550 	 * them for all of these checks.
551 	 */
552 	uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT);
553 	uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT);
554 
555 	if (!req_flags && !cur_flags) {
556 		return false;
557 	}
558 
559 	if (req_flags != cur_flags) {
560 		return true;
561 	}
562 
563 	if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) {
564 		return true;
565 	}
566 
567 	if ((req_flags & TRP_POLICY) && req_trp.trp_pol != cur_trp.trp_pol) {
568 		return true;
569 	}
570 
571 	return false;
572 }
573 
574 static inline bool
workq_thread_needs_priority_change(workq_threadreq_t req,struct uthread * uth)575 workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth)
576 {
577 	if (workq_thread_needs_params_change(req, uth)) {
578 		return true;
579 	}
580 
581 	if (req->tr_qos != workq_pri_override(uth->uu_workq_pri)) {
582 		return true;
583 	}
584 
585 #if CONFIG_PREADOPT_TG
586 	thread_group_qos_t tg = kqr_preadopt_thread_group(req);
587 	if (KQWL_HAS_VALID_PREADOPTED_TG(tg)) {
588 		/*
589 		 * Ideally, we'd add check here to see if thread's preadopt TG is same
590 		 * as the thread requests's thread group and short circuit if that is
591 		 * the case. But in the interest of keeping the code clean and not
592 		 * taking the thread lock here, we're going to skip this. We will
593 		 * eventually shortcircuit once we try to set the preadoption thread
594 		 * group on the thread.
595 		 */
596 		return true;
597 	}
598 #endif
599 
600 	return false;
601 }
602 
603 /* Input thread must be self. Called during self override, resetting overrides
604  * or while processing kevents
605  *
606  * Called with workq lock held. Sometimes also the thread mutex
607  */
608 static void
workq_thread_update_bucket(proc_t p,struct workqueue * wq,struct uthread * uth,struct uu_workq_policy old_pri,struct uu_workq_policy new_pri,bool force_run)609 workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth,
610     struct uu_workq_policy old_pri, struct uu_workq_policy new_pri,
611     bool force_run)
612 {
613 	assert(uth == current_uthread());
614 
615 	thread_qos_t old_bucket = old_pri.qos_bucket;
616 	thread_qos_t new_bucket = workq_pri_bucket(new_pri);
617 
618 	if (old_bucket != new_bucket) {
619 		_wq_thactive_move(wq, old_bucket, new_bucket);
620 	}
621 
622 	new_pri.qos_bucket = new_bucket;
623 	uth->uu_workq_pri = new_pri;
624 
625 	if (old_pri.qos_override != new_pri.qos_override) {
626 		thread_set_workq_override(get_machthread(uth), new_pri.qos_override);
627 	}
628 
629 	if (wq->wq_reqcount && (old_bucket > new_bucket || force_run)) {
630 		int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS;
631 		if (old_bucket > new_bucket) {
632 			/*
633 			 * When lowering our bucket, we may unblock a thread request,
634 			 * but we can't drop our priority before we have evaluated
635 			 * whether this is the case, and if we ever drop the workqueue lock
636 			 * that would cause a priority inversion.
637 			 *
638 			 * We hence have to disallow thread creation in that case.
639 			 */
640 			flags = 0;
641 		}
642 		workq_schedule_creator(p, wq, flags);
643 	}
644 }
645 
646 /*
647  * Sets/resets the cpu percent limits on the current thread. We can't set
648  * these limits from outside of the current thread, so this function needs
649  * to be called when we're executing on the intended
650  */
651 static void
workq_thread_reset_cpupercent(workq_threadreq_t req,struct uthread * uth)652 workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth)
653 {
654 	assert(uth == current_uthread());
655 	workq_threadreq_param_t trp = { };
656 
657 	if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
658 		trp = kqueue_threadreq_workloop_param(req);
659 	}
660 
661 	if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) {
662 		/*
663 		 * Going through disable when we have an existing CPU percent limit
664 		 * set will force the ledger to refill the token bucket of the current
665 		 * thread. Removing any penalty applied by previous thread use.
666 		 */
667 		thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0);
668 		uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT;
669 	}
670 
671 	if (trp.trp_flags & TRP_CPUPERCENT) {
672 		thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent,
673 		    (uint64_t)trp.trp_refillms * NSEC_PER_SEC);
674 		uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT;
675 	}
676 }
677 
678 /* Called with the workq lock held */
679 static void
workq_thread_reset_pri(struct workqueue * wq,struct uthread * uth,workq_threadreq_t req,bool unpark)680 workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth,
681     workq_threadreq_t req, bool unpark)
682 {
683 	thread_t th = get_machthread(uth);
684 	thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP;
685 	workq_threadreq_param_t trp = { };
686 	int priority = 31;
687 	int policy = POLICY_TIMESHARE;
688 
689 	if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
690 		trp = kqueue_threadreq_workloop_param(req);
691 	}
692 
693 	uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
694 	uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS;
695 
696 	if (unpark) {
697 		uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
698 		// qos sent out to userspace (may differ from uu_workq_pri on param threads)
699 		uth->uu_save.uus_workq_park_data.qos = qos;
700 	}
701 
702 	if (qos == WORKQ_THREAD_QOS_MANAGER) {
703 		uint32_t mgr_pri = wq->wq_event_manager_priority;
704 		assert(trp.trp_value == 0); // manager qos and thread policy don't mix
705 
706 		if (_pthread_priority_has_sched_pri(mgr_pri)) {
707 			mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
708 			thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri,
709 			    POLICY_TIMESHARE);
710 			return;
711 		}
712 
713 		qos = _pthread_priority_thread_qos(mgr_pri);
714 	} else {
715 		if (trp.trp_flags & TRP_PRIORITY) {
716 			qos = THREAD_QOS_UNSPECIFIED;
717 			priority = trp.trp_pri;
718 			uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS;
719 		}
720 
721 		if (trp.trp_flags & TRP_POLICY) {
722 			policy = trp.trp_pol;
723 		}
724 	}
725 
726 #if CONFIG_PREADOPT_TG
727 	if (req && (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP)) {
728 		/*
729 		 * For kqwl permanently configured with a thread group, we can safely borrow
730 		 * +1 ref from kqwl_preadopt_tg. A thread then takes additional +1 ref
731 		 * for itself via thread_set_preadopt_thread_group.
732 		 *
733 		 * In all other cases, we cannot safely read and borrow the reference from the kqwl
734 		 * since it can disappear from under us at any time due to the max-ing logic in
735 		 * kqueue_set_preadopted_thread_group.
736 		 *
737 		 * As such, we do the following dance:
738 		 *
739 		 * 1) cmpxchng and steal the kqwl's preadopt thread group and leave
740 		 * behind with (NULL + QoS). At this point, we have the reference
741 		 * to the thread group from the kqwl.
742 		 * 2) Have the thread set the preadoption thread group on itself.
743 		 * 3) cmpxchng from (NULL + QoS) which we set earlier in (1), back to
744 		 * thread_group + QoS. ie we try to give the reference back to the kqwl.
745 		 * If we fail, that's because a higher QoS thread group was set on the
746 		 * kqwl in kqueue_set_preadopted_thread_group in which case, we need to
747 		 * go back to (1).
748 		 */
749 
750 		_Atomic(struct thread_group *) * tg_loc = kqr_preadopt_thread_group_addr(req);
751 
752 		thread_group_qos_t old_tg, new_tg;
753 		int ret = 0;
754 again:
755 		ret = os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
756 			if ((!KQWL_HAS_VALID_PREADOPTED_TG(old_tg)) ||
757 			KQWL_HAS_PERMANENT_PREADOPTED_TG(old_tg)) {
758 			        os_atomic_rmw_loop_give_up(break);
759 			}
760 
761 			/*
762 			 * Leave the QoS behind - kqueue_set_preadopted_thread_group will
763 			 * only modify it if there is a higher QoS thread group to attach
764 			 */
765 			new_tg = (thread_group_qos_t) ((uintptr_t) old_tg & KQWL_PREADOPT_TG_QOS_MASK);
766 		});
767 
768 		if (ret) {
769 			/*
770 			 * We successfully took the ref from the kqwl so set it on the
771 			 * thread now
772 			 */
773 			thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg));
774 
775 			thread_group_qos_t thread_group_to_expect = new_tg;
776 			thread_group_qos_t thread_group_to_set = old_tg;
777 
778 			os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
779 				if (old_tg != thread_group_to_expect) {
780 				        /*
781 				         * There was an intervening write to the kqwl_preadopt_tg,
782 				         * and it has a higher QoS than what we are working with
783 				         * here. Abandon our current adopted thread group and redo
784 				         * the full dance
785 				         */
786 				        thread_group_deallocate_safe(KQWL_GET_PREADOPTED_TG(thread_group_to_set));
787 				        os_atomic_rmw_loop_give_up(goto again);
788 				}
789 
790 				new_tg = thread_group_to_set;
791 			});
792 		} else {
793 			if (KQWL_HAS_PERMANENT_PREADOPTED_TG(old_tg)) {
794 				thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg));
795 			} else {
796 				/* Nothing valid on the kqwl, just clear what's on the thread */
797 				thread_set_preadopt_thread_group(th, NULL);
798 			}
799 		}
800 	} else {
801 		/* Not even a kqwl, clear what's on the thread */
802 		thread_set_preadopt_thread_group(th, NULL);
803 	}
804 #endif
805 	thread_set_workq_pri(th, qos, priority, policy);
806 }
807 
808 /*
809  * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
810  * every time a servicer is being told about a new max QoS.
811  */
812 void
workq_thread_set_max_qos(struct proc * p,workq_threadreq_t kqr)813 workq_thread_set_max_qos(struct proc *p, workq_threadreq_t kqr)
814 {
815 	struct uu_workq_policy old_pri, new_pri;
816 	struct uthread *uth = current_uthread();
817 	struct workqueue *wq = proc_get_wqptr_fast(p);
818 	thread_qos_t qos = kqr->tr_kq_qos_index;
819 
820 	if (uth->uu_workq_pri.qos_max == qos) {
821 		return;
822 	}
823 
824 	workq_lock_spin(wq);
825 	old_pri = new_pri = uth->uu_workq_pri;
826 	new_pri.qos_max = qos;
827 	workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
828 	workq_unlock(wq);
829 }
830 
831 #pragma mark idle threads accounting and handling
832 
833 static inline struct uthread *
workq_oldest_killable_idle_thread(struct workqueue * wq)834 workq_oldest_killable_idle_thread(struct workqueue *wq)
835 {
836 	struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
837 
838 	if (uth && !uth->uu_save.uus_workq_park_data.has_stack) {
839 		uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry);
840 		if (uth) {
841 			assert(uth->uu_save.uus_workq_park_data.has_stack);
842 		}
843 	}
844 	return uth;
845 }
846 
847 static inline uint64_t
workq_kill_delay_for_idle_thread(struct workqueue * wq)848 workq_kill_delay_for_idle_thread(struct workqueue *wq)
849 {
850 	uint64_t delay = wq_reduce_pool_window.abstime;
851 	uint16_t idle = wq->wq_thidlecount;
852 
853 	/*
854 	 * If we have less than wq_death_max_load threads, have a 5s timer.
855 	 *
856 	 * For the next wq_max_constrained_threads ones, decay linearly from
857 	 * from 5s to 50ms.
858 	 */
859 	if (idle <= wq_death_max_load) {
860 		return delay;
861 	}
862 
863 	if (wq_max_constrained_threads > idle - wq_death_max_load) {
864 		delay *= (wq_max_constrained_threads - (idle - wq_death_max_load));
865 	}
866 	return delay / wq_max_constrained_threads;
867 }
868 
869 static inline bool
workq_should_kill_idle_thread(struct workqueue * wq,struct uthread * uth,uint64_t now)870 workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth,
871     uint64_t now)
872 {
873 	uint64_t delay = workq_kill_delay_for_idle_thread(wq);
874 	return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
875 }
876 
877 static void
workq_death_call_schedule(struct workqueue * wq,uint64_t deadline)878 workq_death_call_schedule(struct workqueue *wq, uint64_t deadline)
879 {
880 	uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed);
881 
882 	if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
883 		return;
884 	}
885 	os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
886 
887 	WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0);
888 
889 	/*
890 	 * <rdar://problem/13139182> Due to how long term timers work, the leeway
891 	 * can't be too short, so use 500ms which is long enough that we will not
892 	 * wake up the CPU for killing threads, but short enough that it doesn't
893 	 * fall into long-term timer list shenanigans.
894 	 */
895 	thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline,
896 	    wq_reduce_pool_window.abstime / 10,
897 	    THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
898 }
899 
900 /*
901  * `decrement` is set to the number of threads that are no longer dying:
902  * - because they have been resuscitated just in time (workq_pop_idle_thread)
903  * - or have been killed (workq_thread_terminate).
904  */
905 static void
workq_death_policy_evaluate(struct workqueue * wq,uint16_t decrement)906 workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement)
907 {
908 	struct uthread *uth;
909 
910 	assert(wq->wq_thdying_count >= decrement);
911 	if ((wq->wq_thdying_count -= decrement) > 0) {
912 		return;
913 	}
914 
915 	if (wq->wq_thidlecount <= 1) {
916 		return;
917 	}
918 
919 	if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) {
920 		return;
921 	}
922 
923 	uint64_t now = mach_absolute_time();
924 	uint64_t delay = workq_kill_delay_for_idle_thread(wq);
925 
926 	if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
927 		WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
928 		    wq, wq->wq_thidlecount, 0, 0);
929 		wq->wq_thdying_count++;
930 		uth->uu_workq_flags |= UT_WORKQ_DYING;
931 		if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) {
932 			workq_thread_wakeup(uth);
933 		}
934 		return;
935 	}
936 
937 	workq_death_call_schedule(wq,
938 	    uth->uu_save.uus_workq_park_data.idle_stamp + delay);
939 }
940 
941 void
workq_thread_terminate(struct proc * p,struct uthread * uth)942 workq_thread_terminate(struct proc *p, struct uthread *uth)
943 {
944 	struct workqueue *wq = proc_get_wqptr_fast(p);
945 
946 	workq_lock_spin(wq);
947 	TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
948 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
949 		WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END,
950 		    wq, wq->wq_thidlecount, 0, 0);
951 		workq_death_policy_evaluate(wq, 1);
952 	}
953 	if (wq->wq_nthreads-- == wq_max_threads) {
954 		/*
955 		 * We got under the thread limit again, which may have prevented
956 		 * thread creation from happening, redrive if there are pending requests
957 		 */
958 		if (wq->wq_reqcount) {
959 			workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
960 		}
961 	}
962 	workq_unlock(wq);
963 
964 	thread_deallocate(get_machthread(uth));
965 }
966 
967 static void
workq_kill_old_threads_call(void * param0,void * param1 __unused)968 workq_kill_old_threads_call(void *param0, void *param1 __unused)
969 {
970 	struct workqueue *wq = param0;
971 
972 	workq_lock_spin(wq);
973 	WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0);
974 	os_atomic_andnot(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
975 	workq_death_policy_evaluate(wq, 0);
976 	WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0);
977 	workq_unlock(wq);
978 }
979 
980 static struct uthread *
workq_pop_idle_thread(struct workqueue * wq,uint16_t uu_flags,bool * needs_wakeup)981 workq_pop_idle_thread(struct workqueue *wq, uint16_t uu_flags,
982     bool *needs_wakeup)
983 {
984 	struct uthread *uth;
985 
986 	if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) {
987 		TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
988 	} else {
989 		uth = TAILQ_FIRST(&wq->wq_thnewlist);
990 		TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
991 	}
992 	TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
993 
994 	assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
995 	uth->uu_workq_flags |= UT_WORKQ_RUNNING | uu_flags;
996 
997 	/* A thread is never woken up as part of the cooperative pool */
998 	assert((uu_flags & UT_WORKQ_COOPERATIVE) == 0);
999 
1000 	if ((uu_flags & UT_WORKQ_OVERCOMMIT) == 0) {
1001 		wq->wq_constrained_threads_scheduled++;
1002 	}
1003 	wq->wq_threads_scheduled++;
1004 	wq->wq_thidlecount--;
1005 
1006 	if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
1007 		uth->uu_workq_flags ^= UT_WORKQ_DYING;
1008 		workq_death_policy_evaluate(wq, 1);
1009 		*needs_wakeup = false;
1010 	} else if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) {
1011 		*needs_wakeup = false;
1012 	} else {
1013 		*needs_wakeup = true;
1014 	}
1015 	return uth;
1016 }
1017 
1018 /*
1019  * Called by thread_create_workq_waiting() during thread initialization, before
1020  * assert_wait, before the thread has been started.
1021  */
1022 event_t
workq_thread_init_and_wq_lock(task_t task,thread_t th)1023 workq_thread_init_and_wq_lock(task_t task, thread_t th)
1024 {
1025 	struct uthread *uth = get_bsdthread_info(th);
1026 
1027 	uth->uu_workq_flags = UT_WORKQ_NEW;
1028 	uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
1029 	uth->uu_workq_thport = MACH_PORT_NULL;
1030 	uth->uu_workq_stackaddr = 0;
1031 	uth->uu_workq_pthread_kill_allowed = 0;
1032 
1033 	thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE);
1034 	thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
1035 
1036 	workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task)));
1037 	return workq_parked_wait_event(uth);
1038 }
1039 
1040 /**
1041  * Try to add a new workqueue thread.
1042  *
1043  * - called with workq lock held
1044  * - dropped and retaken around thread creation
1045  * - return with workq lock held
1046  */
1047 static bool
workq_add_new_idle_thread(proc_t p,struct workqueue * wq)1048 workq_add_new_idle_thread(proc_t p, struct workqueue *wq)
1049 {
1050 	mach_vm_offset_t th_stackaddr;
1051 	kern_return_t kret;
1052 	thread_t th;
1053 
1054 	wq->wq_nthreads++;
1055 
1056 	workq_unlock(wq);
1057 
1058 	vm_map_t vmap = get_task_map(proc_task(p));
1059 
1060 	kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr);
1061 	if (kret != KERN_SUCCESS) {
1062 		WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1063 		    kret, 1, 0);
1064 		goto out;
1065 	}
1066 
1067 	kret = thread_create_workq_waiting(proc_task(p), workq_unpark_continue, &th);
1068 	if (kret != KERN_SUCCESS) {
1069 		WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1070 		    kret, 0, 0);
1071 		pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr);
1072 		goto out;
1073 	}
1074 
1075 	// thread_create_workq_waiting() will return with the wq lock held
1076 	// on success, because it calls workq_thread_init_and_wq_lock() above
1077 
1078 	struct uthread *uth = get_bsdthread_info(th);
1079 
1080 	wq->wq_creations++;
1081 	wq->wq_thidlecount++;
1082 	uth->uu_workq_stackaddr = (user_addr_t)th_stackaddr;
1083 	TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1084 
1085 	WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0);
1086 	return true;
1087 
1088 out:
1089 	workq_lock_spin(wq);
1090 	/*
1091 	 * Do not redrive here if we went under wq_max_threads again,
1092 	 * it is the responsibility of the callers of this function
1093 	 * to do so when it fails.
1094 	 */
1095 	wq->wq_nthreads--;
1096 	return false;
1097 }
1098 
1099 static inline bool
workq_thread_is_overcommit(struct uthread * uth)1100 workq_thread_is_overcommit(struct uthread *uth)
1101 {
1102 	return (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0;
1103 }
1104 
1105 static inline bool
workq_thread_is_nonovercommit(struct uthread * uth)1106 workq_thread_is_nonovercommit(struct uthread *uth)
1107 {
1108 	return (uth->uu_workq_flags & (UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE)) == 0;
1109 }
1110 
1111 static inline bool
workq_thread_is_cooperative(struct uthread * uth)1112 workq_thread_is_cooperative(struct uthread *uth)
1113 {
1114 	return (uth->uu_workq_flags & UT_WORKQ_COOPERATIVE) != 0;
1115 }
1116 
1117 static inline void
workq_thread_set_type(struct uthread * uth,uint16_t flags)1118 workq_thread_set_type(struct uthread *uth, uint16_t flags)
1119 {
1120 	uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1121 	uth->uu_workq_flags |= flags;
1122 }
1123 
1124 
1125 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
1126 
1127 __attribute__((noreturn, noinline))
1128 static void
workq_unpark_for_death_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t death_flags,uint32_t setup_flags)1129 workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq,
1130     struct uthread *uth, uint32_t death_flags, uint32_t setup_flags)
1131 {
1132 	thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
1133 	bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW;
1134 
1135 	if (qos > WORKQ_THREAD_QOS_CLEANUP) {
1136 		workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true);
1137 		qos = WORKQ_THREAD_QOS_CLEANUP;
1138 	}
1139 
1140 	workq_thread_reset_cpupercent(NULL, uth);
1141 
1142 	if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
1143 		wq->wq_thidlecount--;
1144 		if (first_use) {
1145 			TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
1146 		} else {
1147 			TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
1148 		}
1149 	}
1150 	TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
1151 
1152 	workq_unlock(wq);
1153 
1154 	if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
1155 		__assert_only kern_return_t kr;
1156 		kr = thread_set_voucher_name(MACH_PORT_NULL);
1157 		assert(kr == KERN_SUCCESS);
1158 	}
1159 
1160 	uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS;
1161 	thread_t th = get_machthread(uth);
1162 	vm_map_t vmap = get_task_map(proc_task(p));
1163 
1164 	if (!first_use) {
1165 		flags |= WQ_FLAG_THREAD_REUSE;
1166 	}
1167 
1168 	pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
1169 	    uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, flags);
1170 	__builtin_unreachable();
1171 }
1172 
1173 bool
workq_is_current_thread_updating_turnstile(struct workqueue * wq)1174 workq_is_current_thread_updating_turnstile(struct workqueue *wq)
1175 {
1176 	return wq->wq_turnstile_updater == current_thread();
1177 }
1178 
1179 __attribute__((always_inline))
1180 static inline void
1181 workq_perform_turnstile_operation_locked(struct workqueue *wq,
1182     void (^operation)(void))
1183 {
1184 	workq_lock_held(wq);
1185 	wq->wq_turnstile_updater = current_thread();
1186 	operation();
1187 	wq->wq_turnstile_updater = THREAD_NULL;
1188 }
1189 
1190 static void
workq_turnstile_update_inheritor(struct workqueue * wq,turnstile_inheritor_t inheritor,turnstile_update_flags_t flags)1191 workq_turnstile_update_inheritor(struct workqueue *wq,
1192     turnstile_inheritor_t inheritor,
1193     turnstile_update_flags_t flags)
1194 {
1195 	if (wq->wq_inheritor == inheritor) {
1196 		return;
1197 	}
1198 	wq->wq_inheritor = inheritor;
1199 	workq_perform_turnstile_operation_locked(wq, ^{
1200 		turnstile_update_inheritor(wq->wq_turnstile, inheritor,
1201 		flags | TURNSTILE_IMMEDIATE_UPDATE);
1202 		turnstile_update_inheritor_complete(wq->wq_turnstile,
1203 		TURNSTILE_INTERLOCK_HELD);
1204 	});
1205 }
1206 
1207 static void
workq_push_idle_thread(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)1208 workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth,
1209     uint32_t setup_flags)
1210 {
1211 	uint64_t now = mach_absolute_time();
1212 	bool is_creator = (uth == wq->wq_creator);
1213 
1214 	if (workq_thread_is_cooperative(uth)) {
1215 		assert(!is_creator);
1216 
1217 		thread_qos_t thread_qos = uth->uu_workq_pri.qos_req;
1218 		_wq_cooperative_queue_scheduled_count_dec(wq, thread_qos);
1219 
1220 		/* Before we get here, we always go through
1221 		 * workq_select_threadreq_or_park_and_unlock. If we got here, it means
1222 		 * that we went through the logic in workq_threadreq_select which
1223 		 * did the refresh for the next best cooperative qos while
1224 		 * excluding the current thread - we shouldn't need to do it again.
1225 		 */
1226 		assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
1227 	} else if (workq_thread_is_nonovercommit(uth)) {
1228 		assert(!is_creator);
1229 
1230 		wq->wq_constrained_threads_scheduled--;
1231 	}
1232 
1233 	uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1234 	TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
1235 	wq->wq_threads_scheduled--;
1236 
1237 	if (is_creator) {
1238 		wq->wq_creator = NULL;
1239 		WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0,
1240 		    uth->uu_save.uus_workq_park_data.yields);
1241 	}
1242 
1243 	if (wq->wq_inheritor == get_machthread(uth)) {
1244 		assert(wq->wq_creator == NULL);
1245 		if (wq->wq_reqcount) {
1246 			workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
1247 		} else {
1248 			workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
1249 		}
1250 	}
1251 
1252 	if (uth->uu_workq_flags & UT_WORKQ_NEW) {
1253 		assert(is_creator || (_wq_flags(wq) & WQ_EXITING));
1254 		TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1255 		wq->wq_thidlecount++;
1256 		return;
1257 	}
1258 
1259 	if (!is_creator) {
1260 		_wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket);
1261 		wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--;
1262 		uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP;
1263 	}
1264 
1265 	uth->uu_save.uus_workq_park_data.idle_stamp = now;
1266 
1267 	struct uthread *oldest = workq_oldest_killable_idle_thread(wq);
1268 	uint16_t cur_idle = wq->wq_thidlecount;
1269 
1270 	if (cur_idle >= wq_max_constrained_threads ||
1271 	    (wq->wq_thdying_count == 0 && oldest &&
1272 	    workq_should_kill_idle_thread(wq, oldest, now))) {
1273 		/*
1274 		 * Immediately kill threads if we have too may of them.
1275 		 *
1276 		 * And swap "place" with the oldest one we'd have woken up.
1277 		 * This is a relatively desperate situation where we really
1278 		 * need to kill threads quickly and it's best to kill
1279 		 * the one that's currently on core than context switching.
1280 		 */
1281 		if (oldest) {
1282 			oldest->uu_save.uus_workq_park_data.idle_stamp = now;
1283 			TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry);
1284 			TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry);
1285 		}
1286 
1287 		WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
1288 		    wq, cur_idle, 0, 0);
1289 		wq->wq_thdying_count++;
1290 		uth->uu_workq_flags |= UT_WORKQ_DYING;
1291 		uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
1292 		workq_unpark_for_death_and_unlock(p, wq, uth, 0, setup_flags);
1293 		__builtin_unreachable();
1294 	}
1295 
1296 	struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
1297 
1298 	cur_idle += 1;
1299 	wq->wq_thidlecount = cur_idle;
1300 
1301 	if (cur_idle >= wq_death_max_load && tail &&
1302 	    tail->uu_save.uus_workq_park_data.has_stack) {
1303 		uth->uu_save.uus_workq_park_data.has_stack = false;
1304 		TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry);
1305 	} else {
1306 		uth->uu_save.uus_workq_park_data.has_stack = true;
1307 		TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry);
1308 	}
1309 
1310 	if (!tail) {
1311 		uint64_t delay = workq_kill_delay_for_idle_thread(wq);
1312 		workq_death_call_schedule(wq, now + delay);
1313 	}
1314 }
1315 
1316 #pragma mark thread requests
1317 
1318 static inline bool
workq_tr_is_overcommit(workq_tr_flags_t tr_flags)1319 workq_tr_is_overcommit(workq_tr_flags_t tr_flags)
1320 {
1321 	return (tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) != 0;
1322 }
1323 
1324 static inline bool
workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)1325 workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)
1326 {
1327 	return (tr_flags & (WORKQ_TR_FLAG_OVERCOMMIT | WORKQ_TR_FLAG_COOPERATIVE)) == 0;
1328 }
1329 
1330 static inline bool
workq_tr_is_cooperative(workq_tr_flags_t tr_flags)1331 workq_tr_is_cooperative(workq_tr_flags_t tr_flags)
1332 {
1333 	return (tr_flags & WORKQ_TR_FLAG_COOPERATIVE) != 0;
1334 }
1335 
1336 #define workq_threadreq_is_overcommit(req) workq_tr_is_overcommit((req)->tr_flags)
1337 #define workq_threadreq_is_nonovercommit(req) workq_tr_is_nonovercommit((req)->tr_flags)
1338 #define workq_threadreq_is_cooperative(req) workq_tr_is_cooperative((req)->tr_flags)
1339 
1340 static inline int
workq_priority_for_req(workq_threadreq_t req)1341 workq_priority_for_req(workq_threadreq_t req)
1342 {
1343 	thread_qos_t qos = req->tr_qos;
1344 
1345 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1346 		workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
1347 		assert(trp.trp_flags & TRP_PRIORITY);
1348 		return trp.trp_pri;
1349 	}
1350 	return thread_workq_pri_for_qos(qos);
1351 }
1352 
1353 static inline struct priority_queue_sched_max *
workq_priority_queue_for_req(struct workqueue * wq,workq_threadreq_t req)1354 workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req)
1355 {
1356 	assert(!workq_tr_is_cooperative(req->tr_flags));
1357 
1358 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1359 		return &wq->wq_special_queue;
1360 	} else if (workq_tr_is_overcommit(req->tr_flags)) {
1361 		return &wq->wq_overcommit_queue;
1362 	} else {
1363 		return &wq->wq_constrained_queue;
1364 	}
1365 }
1366 
1367 
1368 /* Calculates the number of threads scheduled >= the input QoS */
1369 static uint64_t
workq_num_cooperative_threads_scheduled_to_qos(struct workqueue * wq,thread_qos_t qos)1370 workq_num_cooperative_threads_scheduled_to_qos(struct workqueue *wq, thread_qos_t qos)
1371 {
1372 	workq_lock_held(wq);
1373 
1374 	uint64_t num_cooperative_threads = 0;
1375 
1376 	for (thread_qos_t cur_qos = WORKQ_THREAD_QOS_MAX; cur_qos >= qos; cur_qos--) {
1377 		uint8_t bucket = _wq_bucket(cur_qos);
1378 		num_cooperative_threads += wq->wq_cooperative_queue_scheduled_count[bucket];
1379 	}
1380 
1381 	return num_cooperative_threads;
1382 }
1383 
1384 static uint64_t
workq_num_cooperative_threads_scheduled_total(struct workqueue * wq)1385 workq_num_cooperative_threads_scheduled_total(struct workqueue *wq)
1386 {
1387 	return workq_num_cooperative_threads_scheduled_to_qos(wq, WORKQ_THREAD_QOS_MIN);
1388 }
1389 
1390 #if DEBUG || DEVELOPMENT
1391 static bool
workq_has_cooperative_thread_requests(struct workqueue * wq)1392 workq_has_cooperative_thread_requests(struct workqueue *wq)
1393 {
1394 	for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1395 		uint8_t bucket = _wq_bucket(qos);
1396 		if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1397 			return true;
1398 		}
1399 	}
1400 
1401 	return false;
1402 }
1403 #endif
1404 
1405 /*
1406  * Determines the next QoS bucket we should service next in the cooperative
1407  * pool. This function will always return a QoS for cooperative pool as long as
1408  * there are requests to be serviced.
1409  *
1410  * Unlike the other thread pools, for the cooperative thread pool the schedule
1411  * counts for the various buckets in the pool affect the next best request for
1412  * it.
1413  *
1414  * This function is called in the following contexts:
1415  *
1416  * a) When determining the best thread QoS for cooperative bucket for the
1417  * creator/thread reuse
1418  *
1419  * b) Once (a) has happened and thread has bound to a thread request, figuring
1420  * out whether the next best request for this pool has changed so that creator
1421  * can be scheduled.
1422  *
1423  * Returns true if the cooperative queue's best qos changed from previous
1424  * value.
1425  */
1426 static bool
_wq_cooperative_queue_refresh_best_req_qos(struct workqueue * wq)1427 _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq)
1428 {
1429 	workq_lock_held(wq);
1430 
1431 	thread_qos_t old_best_req_qos = wq->wq_cooperative_queue_best_req_qos;
1432 
1433 	/* We determine the next best cooperative thread request based on the
1434 	 * following:
1435 	 *
1436 	 * 1. Take the MAX of the following:
1437 	 *		a) Highest qos with pending TRs such that number of scheduled
1438 	 *		threads so far with >= qos is < wq_max_cooperative_threads
1439 	 *		b) Highest qos bucket with pending TRs but no scheduled threads for that bucket
1440 	 *
1441 	 * 2. If the result of (1) is UN, then we pick the highest priority amongst
1442 	 * pending thread requests in the pool.
1443 	 *
1444 	 */
1445 	thread_qos_t highest_qos_with_no_scheduled = THREAD_QOS_UNSPECIFIED;
1446 	thread_qos_t highest_qos_req_with_width = THREAD_QOS_UNSPECIFIED;
1447 
1448 	thread_qos_t highest_qos_req = THREAD_QOS_UNSPECIFIED;
1449 
1450 	int scheduled_count_till_qos = 0;
1451 
1452 	for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1453 		uint8_t bucket = _wq_bucket(qos);
1454 		uint8_t scheduled_count_for_bucket = wq->wq_cooperative_queue_scheduled_count[bucket];
1455 		scheduled_count_till_qos += scheduled_count_for_bucket;
1456 
1457 		if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1458 			if (qos > highest_qos_req) {
1459 				highest_qos_req = qos;
1460 			}
1461 			/*
1462 			 * The pool isn't saturated for threads at and above this QoS, and
1463 			 * this qos bucket has pending requests
1464 			 */
1465 			if (scheduled_count_till_qos < wq_cooperative_queue_max_size(wq)) {
1466 				if (qos > highest_qos_req_with_width) {
1467 					highest_qos_req_with_width = qos;
1468 				}
1469 			}
1470 
1471 			/*
1472 			 * There are no threads scheduled for this bucket but there
1473 			 * is work pending, give it at least 1 thread
1474 			 */
1475 			if (scheduled_count_for_bucket == 0) {
1476 				if (qos > highest_qos_with_no_scheduled) {
1477 					highest_qos_with_no_scheduled = qos;
1478 				}
1479 			}
1480 		}
1481 	}
1482 
1483 	wq->wq_cooperative_queue_best_req_qos = MAX(highest_qos_with_no_scheduled, highest_qos_req_with_width);
1484 	if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1485 		wq->wq_cooperative_queue_best_req_qos = highest_qos_req;
1486 	}
1487 
1488 #if DEBUG || DEVELOPMENT
1489 	/* Assert that if we are showing up the next best req as UN, then there
1490 	 * actually is no thread request in the cooperative pool buckets */
1491 	if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1492 		assert(!workq_has_cooperative_thread_requests(wq));
1493 	}
1494 #endif
1495 
1496 	return old_best_req_qos != wq->wq_cooperative_queue_best_req_qos;
1497 }
1498 
1499 /*
1500  * Returns whether or not the input thread (or creator thread if uth is NULL)
1501  * should be allowed to work as part of the cooperative pool for the <input qos>
1502  * bucket.
1503  *
1504  * This function is called in a bunch of places:
1505  *		a) Quantum expires for a thread and it is part of the cooperative pool
1506  *		b) When trying to pick a thread request for the creator thread to
1507  *		represent.
1508  *		c) When a thread is trying to pick a thread request to actually bind to
1509  *		and service.
1510  *
1511  * Called with workq lock held.
1512  */
1513 
1514 #define WQ_COOPERATIVE_POOL_UNSATURATED 1
1515 #define WQ_COOPERATIVE_BUCKET_UNSERVICED 2
1516 #define WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS 3
1517 
1518 static bool
workq_cooperative_allowance(struct workqueue * wq,thread_qos_t qos,struct uthread * uth,bool may_start_timer)1519 workq_cooperative_allowance(struct workqueue *wq, thread_qos_t qos, struct uthread *uth,
1520     bool may_start_timer)
1521 {
1522 	workq_lock_held(wq);
1523 
1524 	bool exclude_thread_as_scheduled = false;
1525 	bool passed_admissions = false;
1526 	uint8_t bucket = _wq_bucket(qos);
1527 
1528 	if (uth && workq_thread_is_cooperative(uth)) {
1529 		exclude_thread_as_scheduled = true;
1530 		_wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req);
1531 	}
1532 
1533 	/*
1534 	 * We have not saturated the pool yet, let this thread continue
1535 	 */
1536 	uint64_t total_cooperative_threads;
1537 	total_cooperative_threads = workq_num_cooperative_threads_scheduled_total(wq);
1538 	if (total_cooperative_threads < wq_cooperative_queue_max_size(wq)) {
1539 		passed_admissions = true;
1540 		WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1541 		    total_cooperative_threads, qos, passed_admissions,
1542 		    WQ_COOPERATIVE_POOL_UNSATURATED);
1543 		goto out;
1544 	}
1545 
1546 	/*
1547 	 * Without this thread, nothing is servicing the bucket which has pending
1548 	 * work
1549 	 */
1550 	uint64_t bucket_scheduled = wq->wq_cooperative_queue_scheduled_count[bucket];
1551 	if (bucket_scheduled == 0 &&
1552 	    !STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1553 		passed_admissions = true;
1554 		WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1555 		    total_cooperative_threads, qos, passed_admissions,
1556 		    WQ_COOPERATIVE_BUCKET_UNSERVICED);
1557 		goto out;
1558 	}
1559 
1560 	/*
1561 	 * If number of threads at the QoS bucket >= input QoS exceeds the max we want
1562 	 * for the pool, deny this thread
1563 	 */
1564 	uint64_t aggregate_down_to_qos = workq_num_cooperative_threads_scheduled_to_qos(wq, qos);
1565 	passed_admissions = (aggregate_down_to_qos < wq_cooperative_queue_max_size(wq));
1566 	WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, aggregate_down_to_qos,
1567 	    qos, passed_admissions, WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS);
1568 
1569 	if (!passed_admissions && may_start_timer) {
1570 		workq_schedule_delayed_thread_creation(wq, 0);
1571 	}
1572 
1573 out:
1574 	if (exclude_thread_as_scheduled) {
1575 		_wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req);
1576 	}
1577 	return passed_admissions;
1578 }
1579 
1580 /*
1581  * returns true if the best request for the pool changed as a result of
1582  * enqueuing this thread request.
1583  */
1584 static bool
workq_threadreq_enqueue(struct workqueue * wq,workq_threadreq_t req)1585 workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req)
1586 {
1587 	assert(req->tr_state == WORKQ_TR_STATE_NEW);
1588 
1589 	req->tr_state = WORKQ_TR_STATE_QUEUED;
1590 	wq->wq_reqcount += req->tr_count;
1591 
1592 	if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1593 		assert(wq->wq_event_manager_threadreq == NULL);
1594 		assert(req->tr_flags & WORKQ_TR_FLAG_KEVENT);
1595 		assert(req->tr_count == 1);
1596 		wq->wq_event_manager_threadreq = req;
1597 		return true;
1598 	}
1599 
1600 	if (workq_threadreq_is_cooperative(req)) {
1601 		assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1602 		assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1603 
1604 		struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1605 		STAILQ_INSERT_TAIL(bucket, req, tr_link);
1606 
1607 		return _wq_cooperative_queue_refresh_best_req_qos(wq);
1608 	}
1609 
1610 	struct priority_queue_sched_max *q = workq_priority_queue_for_req(wq, req);
1611 
1612 	priority_queue_entry_set_sched_pri(q, &req->tr_entry,
1613 	    workq_priority_for_req(req), false);
1614 
1615 	if (priority_queue_insert(q, &req->tr_entry)) {
1616 		if (workq_threadreq_is_nonovercommit(req)) {
1617 			_wq_thactive_refresh_best_constrained_req_qos(wq);
1618 		}
1619 		return true;
1620 	}
1621 	return false;
1622 }
1623 
1624 /*
1625  * returns true if one of the following is true (so as to update creator if
1626  * needed):
1627  *
1628  * (a) the next highest request of the pool we dequeued the request from changed
1629  * (b) the next highest requests of the pool the current thread used to be a
1630  * part of, changed
1631  *
1632  * For overcommit, special and constrained pools, the next highest QoS for each
1633  * pool just a MAX of pending requests so tracking (a) is sufficient.
1634  *
1635  * But for cooperative thread pool, the next highest QoS for the pool depends on
1636  * schedule counts in the pool as well. So if the current thread used to be
1637  * cooperative in it's previous logical run ie (b), then that can also affect
1638  * cooperative pool's next best QoS requests.
1639  */
1640 static bool
workq_threadreq_dequeue(struct workqueue * wq,workq_threadreq_t req,bool cooperative_sched_count_changed)1641 workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req,
1642     bool cooperative_sched_count_changed)
1643 {
1644 	wq->wq_reqcount--;
1645 
1646 	bool next_highest_request_changed = false;
1647 
1648 	if (--req->tr_count == 0) {
1649 		if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1650 			assert(wq->wq_event_manager_threadreq == req);
1651 			assert(req->tr_count == 0);
1652 			wq->wq_event_manager_threadreq = NULL;
1653 
1654 			/* If a cooperative thread was the one which picked up the manager
1655 			 * thread request, we need to reevaluate the cooperative pool
1656 			 * anyways.
1657 			 */
1658 			if (cooperative_sched_count_changed) {
1659 				_wq_cooperative_queue_refresh_best_req_qos(wq);
1660 			}
1661 			return true;
1662 		}
1663 
1664 		if (workq_threadreq_is_cooperative(req)) {
1665 			assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1666 			assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1667 			/* Account for the fact that BG and MT are coalesced when
1668 			 * calculating best request for cooperative pool
1669 			 */
1670 			assert(_wq_bucket(req->tr_qos) == _wq_bucket(wq->wq_cooperative_queue_best_req_qos));
1671 
1672 			struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1673 			__assert_only workq_threadreq_t head = STAILQ_FIRST(bucket);
1674 
1675 			assert(head == req);
1676 			STAILQ_REMOVE_HEAD(bucket, tr_link);
1677 
1678 			/*
1679 			 * If the request we're dequeueing is cooperative, then the sched
1680 			 * counts definitely changed.
1681 			 */
1682 			assert(cooperative_sched_count_changed);
1683 		}
1684 
1685 		/*
1686 		 * We want to do the cooperative pool refresh after dequeueing a
1687 		 * cooperative thread request if any (to combine both effects into 1
1688 		 * refresh operation)
1689 		 */
1690 		if (cooperative_sched_count_changed) {
1691 			next_highest_request_changed = _wq_cooperative_queue_refresh_best_req_qos(wq);
1692 		}
1693 
1694 		if (!workq_threadreq_is_cooperative(req)) {
1695 			/*
1696 			 * All other types of requests are enqueued in priority queues
1697 			 */
1698 
1699 			if (priority_queue_remove(workq_priority_queue_for_req(wq, req),
1700 			    &req->tr_entry)) {
1701 				next_highest_request_changed |= true;
1702 				if (workq_threadreq_is_nonovercommit(req)) {
1703 					_wq_thactive_refresh_best_constrained_req_qos(wq);
1704 				}
1705 			}
1706 		}
1707 	}
1708 
1709 	return next_highest_request_changed;
1710 }
1711 
1712 static void
workq_threadreq_destroy(proc_t p,workq_threadreq_t req)1713 workq_threadreq_destroy(proc_t p, workq_threadreq_t req)
1714 {
1715 	req->tr_state = WORKQ_TR_STATE_CANCELED;
1716 	if (req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)) {
1717 		kqueue_threadreq_cancel(p, req);
1718 	} else {
1719 		zfree(workq_zone_threadreq, req);
1720 	}
1721 }
1722 
1723 #pragma mark workqueue thread creation thread calls
1724 
1725 static inline bool
workq_thread_call_prepost(struct workqueue * wq,uint32_t sched,uint32_t pend,uint32_t fail_mask)1726 workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend,
1727     uint32_t fail_mask)
1728 {
1729 	uint32_t old_flags, new_flags;
1730 
1731 	os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, {
1732 		if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) {
1733 		        os_atomic_rmw_loop_give_up(return false);
1734 		}
1735 		if (__improbable(old_flags & WQ_PROC_SUSPENDED)) {
1736 		        new_flags = old_flags | pend;
1737 		} else {
1738 		        new_flags = old_flags | sched;
1739 		}
1740 	});
1741 
1742 	return (old_flags & WQ_PROC_SUSPENDED) == 0;
1743 }
1744 
1745 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1746 
1747 static bool
workq_schedule_delayed_thread_creation(struct workqueue * wq,int flags)1748 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags)
1749 {
1750 	assert(!preemption_enabled());
1751 
1752 	if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED,
1753 	    WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED |
1754 	    WQ_IMMEDIATE_CALL_SCHEDULED)) {
1755 		return false;
1756 	}
1757 
1758 	uint64_t now = mach_absolute_time();
1759 
1760 	if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) {
1761 		/* do not change the window */
1762 	} else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) {
1763 		wq->wq_timer_interval *= 2;
1764 		if (wq->wq_timer_interval > wq_max_timer_interval.abstime) {
1765 			wq->wq_timer_interval = (uint32_t)wq_max_timer_interval.abstime;
1766 		}
1767 	} else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) {
1768 		wq->wq_timer_interval /= 2;
1769 		if (wq->wq_timer_interval < wq_stalled_window.abstime) {
1770 			wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
1771 		}
1772 	}
1773 
1774 	WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1775 	    _wq_flags(wq), wq->wq_timer_interval);
1776 
1777 	thread_call_t call = wq->wq_delayed_call;
1778 	uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED;
1779 	uint64_t deadline = now + wq->wq_timer_interval;
1780 	if (thread_call_enter1_delayed(call, (void *)arg, deadline)) {
1781 		panic("delayed_call was already enqueued");
1782 	}
1783 	return true;
1784 }
1785 
1786 static void
workq_schedule_immediate_thread_creation(struct workqueue * wq)1787 workq_schedule_immediate_thread_creation(struct workqueue *wq)
1788 {
1789 	assert(!preemption_enabled());
1790 
1791 	if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED,
1792 	    WQ_IMMEDIATE_CALL_PENDED, 0)) {
1793 		WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1794 		    _wq_flags(wq), 0);
1795 
1796 		uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED;
1797 		if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) {
1798 			panic("immediate_call was already enqueued");
1799 		}
1800 	}
1801 }
1802 
1803 void
workq_proc_suspended(struct proc * p)1804 workq_proc_suspended(struct proc *p)
1805 {
1806 	struct workqueue *wq = proc_get_wqptr(p);
1807 
1808 	if (wq) {
1809 		os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed);
1810 	}
1811 }
1812 
1813 void
workq_proc_resumed(struct proc * p)1814 workq_proc_resumed(struct proc *p)
1815 {
1816 	struct workqueue *wq = proc_get_wqptr(p);
1817 	uint32_t wq_flags;
1818 
1819 	if (!wq) {
1820 		return;
1821 	}
1822 
1823 	wq_flags = os_atomic_andnot_orig(&wq->wq_flags, WQ_PROC_SUSPENDED |
1824 	    WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED, relaxed);
1825 	if ((wq_flags & WQ_EXITING) == 0) {
1826 		disable_preemption();
1827 		if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) {
1828 			workq_schedule_immediate_thread_creation(wq);
1829 		} else if (wq_flags & WQ_DELAYED_CALL_PENDED) {
1830 			workq_schedule_delayed_thread_creation(wq,
1831 			    WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART);
1832 		}
1833 		enable_preemption();
1834 	}
1835 }
1836 
1837 /**
1838  * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1839  */
1840 static bool
workq_thread_is_busy(uint64_t now,_Atomic uint64_t * lastblocked_tsp)1841 workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp)
1842 {
1843 	uint64_t lastblocked_ts = os_atomic_load_wide(lastblocked_tsp, relaxed);
1844 	if (now <= lastblocked_ts) {
1845 		/*
1846 		 * Because the update of the timestamp when a thread blocks
1847 		 * isn't serialized against us looking at it (i.e. we don't hold
1848 		 * the workq lock), it's possible to have a timestamp that matches
1849 		 * the current time or that even looks to be in the future relative
1850 		 * to when we grabbed the current time...
1851 		 *
1852 		 * Just treat this as a busy thread since it must have just blocked.
1853 		 */
1854 		return true;
1855 	}
1856 	return (now - lastblocked_ts) < wq_stalled_window.abstime;
1857 }
1858 
1859 static void
workq_add_new_threads_call(void * _p,void * flags)1860 workq_add_new_threads_call(void *_p, void *flags)
1861 {
1862 	proc_t p = _p;
1863 	struct workqueue *wq = proc_get_wqptr(p);
1864 	uint32_t my_flag = (uint32_t)(uintptr_t)flags;
1865 
1866 	/*
1867 	 * workq_exit() will set the workqueue to NULL before
1868 	 * it cancels thread calls.
1869 	 */
1870 	if (!wq) {
1871 		return;
1872 	}
1873 
1874 	assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) ||
1875 	    (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED));
1876 
1877 	WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq),
1878 	    wq->wq_nthreads, wq->wq_thidlecount);
1879 
1880 	workq_lock_spin(wq);
1881 
1882 	wq->wq_thread_call_last_run = mach_absolute_time();
1883 	os_atomic_andnot(&wq->wq_flags, my_flag, release);
1884 
1885 	/* This can drop the workqueue lock, and take it again */
1886 	workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
1887 
1888 	workq_unlock(wq);
1889 
1890 	WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0,
1891 	    wq->wq_nthreads, wq->wq_thidlecount);
1892 }
1893 
1894 #pragma mark thread state tracking
1895 
1896 static void
workq_sched_callback(int type,thread_t thread)1897 workq_sched_callback(int type, thread_t thread)
1898 {
1899 	thread_ro_t tro = get_thread_ro(thread);
1900 	struct uthread *uth = get_bsdthread_info(thread);
1901 	struct workqueue *wq = proc_get_wqptr(tro->tro_proc);
1902 	thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket;
1903 	wq_thactive_t old_thactive;
1904 	bool start_timer = false;
1905 
1906 	if (qos == WORKQ_THREAD_QOS_MANAGER) {
1907 		return;
1908 	}
1909 
1910 	switch (type) {
1911 	case SCHED_CALL_BLOCK:
1912 		old_thactive = _wq_thactive_dec(wq, qos);
1913 		req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1914 
1915 		/*
1916 		 * Remember the timestamp of the last thread that blocked in this
1917 		 * bucket, it used used by admission checks to ignore one thread
1918 		 * being inactive if this timestamp is recent enough.
1919 		 *
1920 		 * If we collide with another thread trying to update the
1921 		 * last_blocked (really unlikely since another thread would have to
1922 		 * get scheduled and then block after we start down this path), it's
1923 		 * not a problem.  Either timestamp is adequate, so no need to retry
1924 		 */
1925 		os_atomic_store_wide(&wq->wq_lastblocked_ts[_wq_bucket(qos)],
1926 		    thread_last_run_time(thread), relaxed);
1927 
1928 		if (req_qos == THREAD_QOS_UNSPECIFIED) {
1929 			/*
1930 			 * No pending request at the moment we could unblock, move on.
1931 			 */
1932 		} else if (qos < req_qos) {
1933 			/*
1934 			 * The blocking thread is at a lower QoS than the highest currently
1935 			 * pending constrained request, nothing has to be redriven
1936 			 */
1937 		} else {
1938 			uint32_t max_busycount, old_req_count;
1939 			old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
1940 			    req_qos, NULL, &max_busycount);
1941 			/*
1942 			 * If it is possible that may_start_constrained_thread had refused
1943 			 * admission due to being over the max concurrency, we may need to
1944 			 * spin up a new thread.
1945 			 *
1946 			 * We take into account the maximum number of busy threads
1947 			 * that can affect may_start_constrained_thread as looking at the
1948 			 * actual number may_start_constrained_thread will see is racy.
1949 			 *
1950 			 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1951 			 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1952 			 */
1953 			uint32_t conc = wq_max_parallelism[_wq_bucket(qos)];
1954 			if (old_req_count <= conc && conc <= old_req_count + max_busycount) {
1955 				start_timer = workq_schedule_delayed_thread_creation(wq, 0);
1956 			}
1957 		}
1958 		if (__improbable(kdebug_enable)) {
1959 			__unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1960 			    old_thactive, qos, NULL, NULL);
1961 			WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq,
1962 			    old - 1, qos | (req_qos << 8),
1963 			    wq->wq_reqcount << 1 | start_timer);
1964 		}
1965 		break;
1966 
1967 	case SCHED_CALL_UNBLOCK:
1968 		/*
1969 		 * we cannot take the workqueue_lock here...
1970 		 * an UNBLOCK can occur from a timer event which
1971 		 * is run from an interrupt context... if the workqueue_lock
1972 		 * is already held by this processor, we'll deadlock...
1973 		 * the thread lock for the thread being UNBLOCKED
1974 		 * is also held
1975 		 */
1976 		old_thactive = _wq_thactive_inc(wq, qos);
1977 		if (__improbable(kdebug_enable)) {
1978 			__unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1979 			    old_thactive, qos, NULL, NULL);
1980 			req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1981 			WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq,
1982 			    old + 1, qos | (req_qos << 8),
1983 			    wq->wq_threads_scheduled);
1984 		}
1985 		break;
1986 	}
1987 }
1988 
1989 #pragma mark workq lifecycle
1990 
1991 void
workq_reference(struct workqueue * wq)1992 workq_reference(struct workqueue *wq)
1993 {
1994 	os_ref_retain(&wq->wq_refcnt);
1995 }
1996 
1997 static void
workq_deallocate_queue_invoke(mpsc_queue_chain_t e,__assert_only mpsc_daemon_queue_t dq)1998 workq_deallocate_queue_invoke(mpsc_queue_chain_t e,
1999     __assert_only mpsc_daemon_queue_t dq)
2000 {
2001 	struct workqueue *wq;
2002 	struct turnstile *ts;
2003 
2004 	wq = mpsc_queue_element(e, struct workqueue, wq_destroy_link);
2005 	assert(dq == &workq_deallocate_queue);
2006 
2007 	turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts, TURNSTILE_WORKQS);
2008 	assert(ts);
2009 	turnstile_cleanup();
2010 	turnstile_deallocate(ts);
2011 
2012 	lck_ticket_destroy(&wq->wq_lock, &workq_lck_grp);
2013 	zfree(workq_zone_workqueue, wq);
2014 }
2015 
2016 static void
workq_deallocate(struct workqueue * wq)2017 workq_deallocate(struct workqueue *wq)
2018 {
2019 	if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) {
2020 		workq_deallocate_queue_invoke(&wq->wq_destroy_link,
2021 		    &workq_deallocate_queue);
2022 	}
2023 }
2024 
2025 void
workq_deallocate_safe(struct workqueue * wq)2026 workq_deallocate_safe(struct workqueue *wq)
2027 {
2028 	if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) {
2029 		mpsc_daemon_enqueue(&workq_deallocate_queue, &wq->wq_destroy_link,
2030 		    MPSC_QUEUE_DISABLE_PREEMPTION);
2031 	}
2032 }
2033 
2034 /**
2035  * Setup per-process state for the workqueue.
2036  */
2037 int
workq_open(struct proc * p,__unused struct workq_open_args * uap,__unused int32_t * retval)2038 workq_open(struct proc *p, __unused struct workq_open_args *uap,
2039     __unused int32_t *retval)
2040 {
2041 	struct workqueue *wq;
2042 	int error = 0;
2043 
2044 	if ((p->p_lflag & P_LREGISTER) == 0) {
2045 		return EINVAL;
2046 	}
2047 
2048 	if (wq_init_constrained_limit) {
2049 		uint32_t limit, num_cpus = ml_wait_max_cpus();
2050 
2051 		/*
2052 		 * set up the limit for the constrained pool
2053 		 * this is a virtual pool in that we don't
2054 		 * maintain it on a separate idle and run list
2055 		 */
2056 		limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR;
2057 
2058 		if (limit > wq_max_constrained_threads) {
2059 			wq_max_constrained_threads = limit;
2060 		}
2061 
2062 		if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) {
2063 			wq_max_threads = WQ_THACTIVE_BUCKET_HALF;
2064 		}
2065 		if (wq_max_threads > CONFIG_THREAD_MAX - 20) {
2066 			wq_max_threads = CONFIG_THREAD_MAX - 20;
2067 		}
2068 
2069 		wq_death_max_load = (uint16_t)fls(num_cpus) + 1;
2070 
2071 		for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) {
2072 			wq_max_parallelism[_wq_bucket(qos)] =
2073 			    qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL);
2074 		}
2075 
2076 		wq_max_cooperative_threads = num_cpus;
2077 
2078 		wq_init_constrained_limit = 0;
2079 	}
2080 
2081 	if (proc_get_wqptr(p) == NULL) {
2082 		if (proc_init_wqptr_or_wait(p) == FALSE) {
2083 			assert(proc_get_wqptr(p) != NULL);
2084 			goto out;
2085 		}
2086 
2087 		wq = zalloc_flags(workq_zone_workqueue, Z_WAITOK | Z_ZERO);
2088 
2089 		os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1);
2090 
2091 		// Start the event manager at the priority hinted at by the policy engine
2092 		thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task());
2093 		pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0);
2094 		wq->wq_event_manager_priority = (uint32_t)pp;
2095 		wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
2096 		wq->wq_proc = p;
2097 		turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(),
2098 		    TURNSTILE_WORKQS);
2099 
2100 		TAILQ_INIT(&wq->wq_thrunlist);
2101 		TAILQ_INIT(&wq->wq_thnewlist);
2102 		TAILQ_INIT(&wq->wq_thidlelist);
2103 		priority_queue_init(&wq->wq_overcommit_queue);
2104 		priority_queue_init(&wq->wq_constrained_queue);
2105 		priority_queue_init(&wq->wq_special_queue);
2106 		for (int bucket = 0; bucket < WORKQ_NUM_QOS_BUCKETS; bucket++) {
2107 			STAILQ_INIT(&wq->wq_cooperative_queue[bucket]);
2108 		}
2109 
2110 		/* We are only using the delayed thread call for the constrained pool
2111 		 * which can't have work at >= UI QoS and so we can be fine with a
2112 		 * UI QoS thread call.
2113 		 */
2114 		wq->wq_delayed_call = thread_call_allocate_with_qos(
2115 			workq_add_new_threads_call, p, THREAD_QOS_USER_INTERACTIVE,
2116 			THREAD_CALL_OPTIONS_ONCE);
2117 		wq->wq_immediate_call = thread_call_allocate_with_options(
2118 			workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL,
2119 			THREAD_CALL_OPTIONS_ONCE);
2120 		wq->wq_death_call = thread_call_allocate_with_options(
2121 			workq_kill_old_threads_call, wq,
2122 			THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
2123 
2124 		lck_ticket_init(&wq->wq_lock, &workq_lck_grp);
2125 
2126 		WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq,
2127 		    VM_KERNEL_ADDRHIDE(wq), 0, 0);
2128 		proc_set_wqptr(p, wq);
2129 	}
2130 out:
2131 
2132 	return error;
2133 }
2134 
2135 /*
2136  * Routine:	workq_mark_exiting
2137  *
2138  * Function:	Mark the work queue such that new threads will not be added to the
2139  *		work queue after we return.
2140  *
2141  * Conditions:	Called against the current process.
2142  */
2143 void
workq_mark_exiting(struct proc * p)2144 workq_mark_exiting(struct proc *p)
2145 {
2146 	struct workqueue *wq = proc_get_wqptr(p);
2147 	uint32_t wq_flags;
2148 	workq_threadreq_t mgr_req;
2149 
2150 	if (!wq) {
2151 		return;
2152 	}
2153 
2154 	WQ_TRACE_WQ(TRACE_wq_pthread_exit | DBG_FUNC_START, wq, 0, 0, 0);
2155 
2156 	workq_lock_spin(wq);
2157 
2158 	wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed);
2159 	if (__improbable(wq_flags & WQ_EXITING)) {
2160 		panic("workq_mark_exiting called twice");
2161 	}
2162 
2163 	/*
2164 	 * Opportunistically try to cancel thread calls that are likely in flight.
2165 	 * workq_exit() will do the proper cleanup.
2166 	 */
2167 	if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) {
2168 		thread_call_cancel(wq->wq_immediate_call);
2169 	}
2170 	if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) {
2171 		thread_call_cancel(wq->wq_delayed_call);
2172 	}
2173 	if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
2174 		thread_call_cancel(wq->wq_death_call);
2175 	}
2176 
2177 	mgr_req = wq->wq_event_manager_threadreq;
2178 	wq->wq_event_manager_threadreq = NULL;
2179 	wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */
2180 	wq->wq_creator = NULL;
2181 	workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
2182 
2183 	workq_unlock(wq);
2184 
2185 	if (mgr_req) {
2186 		kqueue_threadreq_cancel(p, mgr_req);
2187 	}
2188 	/*
2189 	 * No one touches the priority queues once WQ_EXITING is set.
2190 	 * It is hence safe to do the tear down without holding any lock.
2191 	 */
2192 	priority_queue_destroy(&wq->wq_overcommit_queue,
2193 	    struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2194 		workq_threadreq_destroy(p, e);
2195 	});
2196 	priority_queue_destroy(&wq->wq_constrained_queue,
2197 	    struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2198 		workq_threadreq_destroy(p, e);
2199 	});
2200 	priority_queue_destroy(&wq->wq_special_queue,
2201 	    struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2202 		workq_threadreq_destroy(p, e);
2203 	});
2204 
2205 	WQ_TRACE(TRACE_wq_pthread_exit | DBG_FUNC_END, 0, 0, 0, 0);
2206 }
2207 
2208 /*
2209  * Routine:	workq_exit
2210  *
2211  * Function:	clean up the work queue structure(s) now that there are no threads
2212  *		left running inside the work queue (except possibly current_thread).
2213  *
2214  * Conditions:	Called by the last thread in the process.
2215  *		Called against current process.
2216  */
2217 void
workq_exit(struct proc * p)2218 workq_exit(struct proc *p)
2219 {
2220 	struct workqueue *wq;
2221 	struct uthread *uth, *tmp;
2222 
2223 	wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed);
2224 	if (wq != NULL) {
2225 		thread_t th = current_thread();
2226 
2227 		WQ_TRACE_WQ(TRACE_wq_workqueue_exit | DBG_FUNC_START, wq, 0, 0, 0);
2228 
2229 		if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) {
2230 			/*
2231 			 * <rdar://problem/40111515> Make sure we will no longer call the
2232 			 * sched call, if we ever block this thread, which the cancel_wait
2233 			 * below can do.
2234 			 */
2235 			thread_sched_call(th, NULL);
2236 		}
2237 
2238 		/*
2239 		 * Thread calls are always scheduled by the proc itself or under the
2240 		 * workqueue spinlock if WQ_EXITING is not yet set.
2241 		 *
2242 		 * Either way, when this runs, the proc has no threads left beside
2243 		 * the one running this very code, so we know no thread call can be
2244 		 * dispatched anymore.
2245 		 */
2246 		thread_call_cancel_wait(wq->wq_delayed_call);
2247 		thread_call_cancel_wait(wq->wq_immediate_call);
2248 		thread_call_cancel_wait(wq->wq_death_call);
2249 		thread_call_free(wq->wq_delayed_call);
2250 		thread_call_free(wq->wq_immediate_call);
2251 		thread_call_free(wq->wq_death_call);
2252 
2253 		/*
2254 		 * Clean up workqueue data structures for threads that exited and
2255 		 * didn't get a chance to clean up after themselves.
2256 		 *
2257 		 * idle/new threads should have been interrupted and died on their own
2258 		 */
2259 		TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) {
2260 			thread_t mth = get_machthread(uth);
2261 			thread_sched_call(mth, NULL);
2262 			thread_deallocate(mth);
2263 		}
2264 		assert(TAILQ_EMPTY(&wq->wq_thnewlist));
2265 		assert(TAILQ_EMPTY(&wq->wq_thidlelist));
2266 
2267 		WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq,
2268 		    VM_KERNEL_ADDRHIDE(wq), 0, 0);
2269 
2270 		workq_deallocate(wq);
2271 
2272 		WQ_TRACE(TRACE_wq_workqueue_exit | DBG_FUNC_END, 0, 0, 0, 0);
2273 	}
2274 }
2275 
2276 
2277 #pragma mark bsd thread control
2278 
2279 bool
bsdthread_part_of_cooperative_workqueue(struct uthread * uth)2280 bsdthread_part_of_cooperative_workqueue(struct uthread *uth)
2281 {
2282 	return (workq_thread_is_cooperative(uth) || workq_thread_is_nonovercommit(uth)) &&
2283 	       (uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER);
2284 }
2285 
2286 static bool
_pthread_priority_to_policy(pthread_priority_t priority,thread_qos_policy_data_t * data)2287 _pthread_priority_to_policy(pthread_priority_t priority,
2288     thread_qos_policy_data_t *data)
2289 {
2290 	data->qos_tier = _pthread_priority_thread_qos(priority);
2291 	data->tier_importance = _pthread_priority_relpri(priority);
2292 	if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 ||
2293 	    data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) {
2294 		return false;
2295 	}
2296 	return true;
2297 }
2298 
2299 static int
bsdthread_set_self(proc_t p,thread_t th,pthread_priority_t priority,mach_port_name_t voucher,enum workq_set_self_flags flags)2300 bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority,
2301     mach_port_name_t voucher, enum workq_set_self_flags flags)
2302 {
2303 	struct uthread *uth = get_bsdthread_info(th);
2304 	struct workqueue *wq = proc_get_wqptr(p);
2305 
2306 	kern_return_t kr;
2307 	int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0;
2308 	bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE);
2309 
2310 	assert(th == current_thread());
2311 	if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) {
2312 		if (!is_wq_thread) {
2313 			unbind_rv = EINVAL;
2314 			goto qos;
2315 		}
2316 
2317 		if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
2318 			unbind_rv = EINVAL;
2319 			goto qos;
2320 		}
2321 
2322 		workq_threadreq_t kqr = uth->uu_kqr_bound;
2323 		if (kqr == NULL) {
2324 			unbind_rv = EALREADY;
2325 			goto qos;
2326 		}
2327 
2328 		if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2329 			unbind_rv = EINVAL;
2330 			goto qos;
2331 		}
2332 
2333 		kqueue_threadreq_unbind(p, kqr);
2334 	}
2335 
2336 qos:
2337 	if (flags & (WORKQ_SET_SELF_QOS_FLAG | WORKQ_SET_SELF_QOS_OVERRIDE_FLAG)) {
2338 		assert(flags & WORKQ_SET_SELF_QOS_FLAG);
2339 
2340 		thread_qos_policy_data_t new_policy;
2341 		thread_qos_t qos_override = THREAD_QOS_UNSPECIFIED;
2342 
2343 		if (!_pthread_priority_to_policy(priority, &new_policy)) {
2344 			qos_rv = EINVAL;
2345 			goto voucher;
2346 		}
2347 
2348 		if (flags & WORKQ_SET_SELF_QOS_OVERRIDE_FLAG) {
2349 			/*
2350 			 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is set, we definitely
2351 			 * should have an override QoS in the pthread_priority_t and we should
2352 			 * only come into this path for cooperative thread requests
2353 			 */
2354 			if (!_pthread_priority_has_override_qos(priority) ||
2355 			    !_pthread_priority_is_cooperative(priority)) {
2356 				qos_rv = EINVAL;
2357 				goto voucher;
2358 			}
2359 			qos_override = _pthread_priority_thread_override_qos(priority);
2360 		} else {
2361 			/*
2362 			 * If the WORKQ_SET_SELF_QOS_OVERRIDE_FLAG is not set, we definitely
2363 			 * should not have an override QoS in the pthread_priority_t
2364 			 */
2365 			if (_pthread_priority_has_override_qos(priority)) {
2366 				qos_rv = EINVAL;
2367 				goto voucher;
2368 			}
2369 		}
2370 
2371 		if (!is_wq_thread) {
2372 			/*
2373 			 * Threads opted out of QoS can't change QoS
2374 			 */
2375 			if (!thread_has_qos_policy(th)) {
2376 				qos_rv = EPERM;
2377 				goto voucher;
2378 			}
2379 		} else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER ||
2380 		    uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_ABOVEUI) {
2381 			/*
2382 			 * Workqueue manager threads or threads above UI can't change QoS
2383 			 */
2384 			qos_rv = EINVAL;
2385 			goto voucher;
2386 		} else {
2387 			/*
2388 			 * For workqueue threads, possibly adjust buckets and redrive thread
2389 			 * requests.
2390 			 *
2391 			 * Transitions allowed:
2392 			 *
2393 			 * overcommit --> non-overcommit
2394 			 * overcommit --> overcommit
2395 			 * non-overcommit --> non-overcommit
2396 			 * non-overcommit --> overcommit (to be deprecated later)
2397 			 * cooperative --> cooperative
2398 			 *
2399 			 * All other transitions aren't allowed so reject them.
2400 			 */
2401 			if (workq_thread_is_overcommit(uth) && _pthread_priority_is_cooperative(priority)) {
2402 				qos_rv = EINVAL;
2403 				goto voucher;
2404 			} else if (workq_thread_is_cooperative(uth) && !_pthread_priority_is_cooperative(priority)) {
2405 				qos_rv = EINVAL;
2406 				goto voucher;
2407 			} else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_cooperative(priority)) {
2408 				qos_rv = EINVAL;
2409 				goto voucher;
2410 			}
2411 
2412 			struct uu_workq_policy old_pri, new_pri;
2413 			bool force_run = false;
2414 
2415 			if (qos_override) {
2416 				/*
2417 				 * We're in the case of a thread clarifying that it is for eg. not IN
2418 				 * req QoS but rather, UT req QoS with IN override. However, this can
2419 				 * race with a concurrent override happening to the thread via
2420 				 * workq_thread_add_dispatch_override so this needs to be
2421 				 * synchronized with the thread mutex.
2422 				 */
2423 				thread_mtx_lock(th);
2424 			}
2425 
2426 			workq_lock_spin(wq);
2427 
2428 			old_pri = new_pri = uth->uu_workq_pri;
2429 			new_pri.qos_req = (thread_qos_t)new_policy.qos_tier;
2430 
2431 			if (old_pri.qos_override < qos_override) {
2432 				/*
2433 				 * Since this can race with a concurrent override via
2434 				 * workq_thread_add_dispatch_override, only adjust override value if we
2435 				 * are higher - this is a saturating function.
2436 				 *
2437 				 * We should not be changing the final override values, we should simply
2438 				 * be redistributing the current value with a different breakdown of req
2439 				 * vs override QoS - assert to that effect. Therefore, buckets should
2440 				 * not change.
2441 				 */
2442 				new_pri.qos_override = qos_override;
2443 				assert(workq_pri_override(new_pri) == workq_pri_override(old_pri));
2444 				assert(workq_pri_bucket(new_pri) == workq_pri_bucket(old_pri));
2445 			}
2446 
2447 			/* Adjust schedule counts for various types of transitions */
2448 
2449 			/* overcommit -> non-overcommit */
2450 			if (workq_thread_is_overcommit(uth) && _pthread_priority_is_nonovercommit(priority)) {
2451 				workq_thread_set_type(uth, 0);
2452 				wq->wq_constrained_threads_scheduled++;
2453 
2454 				/* non-overcommit -> overcommit */
2455 			} else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_overcommit(priority)) {
2456 				workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
2457 				force_run = (wq->wq_constrained_threads_scheduled-- == wq_max_constrained_threads);
2458 
2459 				/* cooperative -> cooperative */
2460 			} else if (workq_thread_is_cooperative(uth)) {
2461 				_wq_cooperative_queue_scheduled_count_dec(wq, old_pri.qos_req);
2462 				_wq_cooperative_queue_scheduled_count_inc(wq, new_pri.qos_req);
2463 
2464 				/* We're changing schedule counts within cooperative pool, we
2465 				 * need to refresh best cooperative QoS logic again */
2466 				force_run = _wq_cooperative_queue_refresh_best_req_qos(wq);
2467 			}
2468 
2469 			/*
2470 			 * This will set up an override on the thread if any and will also call
2471 			 * schedule_creator if needed
2472 			 */
2473 			workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run);
2474 			workq_unlock(wq);
2475 
2476 			if (qos_override) {
2477 				thread_mtx_unlock(th);
2478 			}
2479 
2480 			if (workq_thread_is_overcommit(uth)) {
2481 				thread_disarm_workqueue_quantum(th);
2482 			} else {
2483 				/* If the thread changed QoS buckets, the quantum duration
2484 				 * may have changed too */
2485 				thread_arm_workqueue_quantum(th);
2486 			}
2487 		}
2488 
2489 		kr = thread_policy_set_internal(th, THREAD_QOS_POLICY,
2490 		    (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT);
2491 		if (kr != KERN_SUCCESS) {
2492 			qos_rv = EINVAL;
2493 		}
2494 	}
2495 
2496 voucher:
2497 	if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) {
2498 		kr = thread_set_voucher_name(voucher);
2499 		if (kr != KERN_SUCCESS) {
2500 			voucher_rv = ENOENT;
2501 			goto fixedpri;
2502 		}
2503 	}
2504 
2505 fixedpri:
2506 	if (qos_rv) {
2507 		goto done;
2508 	}
2509 	if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) {
2510 		thread_extended_policy_data_t extpol = {.timeshare = 0};
2511 
2512 		if (is_wq_thread) {
2513 			/* Not allowed on workqueue threads */
2514 			fixedpri_rv = ENOTSUP;
2515 			goto done;
2516 		}
2517 
2518 		kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2519 		    (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2520 		if (kr != KERN_SUCCESS) {
2521 			fixedpri_rv = EINVAL;
2522 			goto done;
2523 		}
2524 	} else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) {
2525 		thread_extended_policy_data_t extpol = {.timeshare = 1};
2526 
2527 		if (is_wq_thread) {
2528 			/* Not allowed on workqueue threads */
2529 			fixedpri_rv = ENOTSUP;
2530 			goto done;
2531 		}
2532 
2533 		kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2534 		    (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2535 		if (kr != KERN_SUCCESS) {
2536 			fixedpri_rv = EINVAL;
2537 			goto done;
2538 		}
2539 	}
2540 
2541 done:
2542 	if (qos_rv && voucher_rv) {
2543 		/* Both failed, give that a unique error. */
2544 		return EBADMSG;
2545 	}
2546 
2547 	if (unbind_rv) {
2548 		return unbind_rv;
2549 	}
2550 
2551 	if (qos_rv) {
2552 		return qos_rv;
2553 	}
2554 
2555 	if (voucher_rv) {
2556 		return voucher_rv;
2557 	}
2558 
2559 	if (fixedpri_rv) {
2560 		return fixedpri_rv;
2561 	}
2562 
2563 
2564 	return 0;
2565 }
2566 
2567 static int
bsdthread_add_explicit_override(proc_t p,mach_port_name_t kport,pthread_priority_t pp,user_addr_t resource)2568 bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport,
2569     pthread_priority_t pp, user_addr_t resource)
2570 {
2571 	thread_qos_t qos = _pthread_priority_thread_qos(pp);
2572 	if (qos == THREAD_QOS_UNSPECIFIED) {
2573 		return EINVAL;
2574 	}
2575 
2576 	thread_t th = port_name_to_thread(kport,
2577 	    PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2578 	if (th == THREAD_NULL) {
2579 		return ESRCH;
2580 	}
2581 
2582 	int rv = proc_thread_qos_add_override(proc_task(p), th, 0, qos, TRUE,
2583 	    resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2584 
2585 	thread_deallocate(th);
2586 	return rv;
2587 }
2588 
2589 static int
bsdthread_remove_explicit_override(proc_t p,mach_port_name_t kport,user_addr_t resource)2590 bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport,
2591     user_addr_t resource)
2592 {
2593 	thread_t th = port_name_to_thread(kport,
2594 	    PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2595 	if (th == THREAD_NULL) {
2596 		return ESRCH;
2597 	}
2598 
2599 	int rv = proc_thread_qos_remove_override(proc_task(p), th, 0, resource,
2600 	    THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2601 
2602 	thread_deallocate(th);
2603 	return rv;
2604 }
2605 
2606 static int
workq_thread_add_dispatch_override(proc_t p,mach_port_name_t kport,pthread_priority_t pp,user_addr_t ulock_addr)2607 workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport,
2608     pthread_priority_t pp, user_addr_t ulock_addr)
2609 {
2610 	struct uu_workq_policy old_pri, new_pri;
2611 	struct workqueue *wq = proc_get_wqptr(p);
2612 
2613 	thread_qos_t qos_override = _pthread_priority_thread_qos(pp);
2614 	if (qos_override == THREAD_QOS_UNSPECIFIED) {
2615 		return EINVAL;
2616 	}
2617 
2618 	thread_t thread = port_name_to_thread(kport,
2619 	    PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2620 	if (thread == THREAD_NULL) {
2621 		return ESRCH;
2622 	}
2623 
2624 	struct uthread *uth = get_bsdthread_info(thread);
2625 	if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2626 		thread_deallocate(thread);
2627 		return EPERM;
2628 	}
2629 
2630 	WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE,
2631 	    wq, thread_tid(thread), 1, pp);
2632 
2633 	thread_mtx_lock(thread);
2634 
2635 	if (ulock_addr) {
2636 		uint32_t val;
2637 		int rc;
2638 		/*
2639 		 * Workaround lack of explicit support for 'no-fault copyin'
2640 		 * <rdar://problem/24999882>, as disabling preemption prevents paging in
2641 		 */
2642 		disable_preemption();
2643 		rc = copyin_atomic32(ulock_addr, &val);
2644 		enable_preemption();
2645 		if (rc == 0 && ulock_owner_value_to_port_name(val) != kport) {
2646 			goto out;
2647 		}
2648 	}
2649 
2650 	workq_lock_spin(wq);
2651 
2652 	old_pri = uth->uu_workq_pri;
2653 	if (old_pri.qos_override >= qos_override) {
2654 		/* Nothing to do */
2655 	} else if (thread == current_thread()) {
2656 		new_pri = old_pri;
2657 		new_pri.qos_override = qos_override;
2658 		workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2659 	} else {
2660 		uth->uu_workq_pri.qos_override = qos_override;
2661 		if (qos_override > workq_pri_override(old_pri)) {
2662 			thread_set_workq_override(thread, qos_override);
2663 		}
2664 	}
2665 
2666 	workq_unlock(wq);
2667 
2668 out:
2669 	thread_mtx_unlock(thread);
2670 	thread_deallocate(thread);
2671 	return 0;
2672 }
2673 
2674 static int
workq_thread_reset_dispatch_override(proc_t p,thread_t thread)2675 workq_thread_reset_dispatch_override(proc_t p, thread_t thread)
2676 {
2677 	struct uu_workq_policy old_pri, new_pri;
2678 	struct workqueue *wq = proc_get_wqptr(p);
2679 	struct uthread *uth = get_bsdthread_info(thread);
2680 
2681 	if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2682 		return EPERM;
2683 	}
2684 
2685 	WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0);
2686 
2687 	/*
2688 	 * workq_thread_add_dispatch_override takes the thread mutex before doing the
2689 	 * copyin to validate the drainer and apply the override. We need to do the
2690 	 * same here. See rdar://84472518
2691 	 */
2692 	thread_mtx_lock(thread);
2693 
2694 	workq_lock_spin(wq);
2695 	old_pri = new_pri = uth->uu_workq_pri;
2696 	new_pri.qos_override = THREAD_QOS_UNSPECIFIED;
2697 	workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2698 	workq_unlock(wq);
2699 
2700 	thread_mtx_unlock(thread);
2701 	return 0;
2702 }
2703 
2704 static int
workq_thread_allow_kill(__unused proc_t p,thread_t thread,bool enable)2705 workq_thread_allow_kill(__unused proc_t p, thread_t thread, bool enable)
2706 {
2707 	if (!(thread_get_tag(thread) & THREAD_TAG_WORKQUEUE)) {
2708 		// If the thread isn't a workqueue thread, don't set the
2709 		// kill_allowed bit; however, we still need to return 0
2710 		// instead of an error code since this code is executed
2711 		// on the abort path which needs to not depend on the
2712 		// pthread_t (returning an error depends on pthread_t via
2713 		// cerror_nocancel)
2714 		return 0;
2715 	}
2716 	struct uthread *uth = get_bsdthread_info(thread);
2717 	uth->uu_workq_pthread_kill_allowed = enable;
2718 	return 0;
2719 }
2720 
2721 static int
workq_allow_sigmask(proc_t p,sigset_t mask)2722 workq_allow_sigmask(proc_t p, sigset_t mask)
2723 {
2724 	if (mask & workq_threadmask) {
2725 		return EINVAL;
2726 	}
2727 
2728 	proc_lock(p);
2729 	p->p_workq_allow_sigmask |= mask;
2730 	proc_unlock(p);
2731 
2732 	return 0;
2733 }
2734 
2735 static int
bsdthread_get_max_parallelism(thread_qos_t qos,unsigned long flags,int * retval)2736 bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags,
2737     int *retval)
2738 {
2739 	static_assert(QOS_PARALLELISM_COUNT_LOGICAL ==
2740 	    _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical");
2741 	static_assert(QOS_PARALLELISM_REALTIME ==
2742 	    _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime");
2743 	static_assert(QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE ==
2744 	    _PTHREAD_QOS_PARALLELISM_CLUSTER_SHARED_RSRC, "cluster shared resource");
2745 
2746 	if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL | QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE)) {
2747 		return EINVAL;
2748 	}
2749 
2750 	/* No units are present */
2751 	if (flags & QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE) {
2752 		return ENOTSUP;
2753 	}
2754 
2755 	if (flags & QOS_PARALLELISM_REALTIME) {
2756 		if (qos) {
2757 			return EINVAL;
2758 		}
2759 	} else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) {
2760 		return EINVAL;
2761 	}
2762 
2763 	*retval = qos_max_parallelism(qos, flags);
2764 	return 0;
2765 }
2766 
2767 static int
bsdthread_dispatch_apply_attr(__unused struct proc * p,thread_t thread,unsigned long flags,uint64_t value1,__unused uint64_t value2)2768 bsdthread_dispatch_apply_attr(__unused struct proc *p, thread_t thread,
2769     unsigned long flags, uint64_t value1, __unused uint64_t value2)
2770 {
2771 	uint32_t apply_worker_index;
2772 	kern_return_t kr;
2773 
2774 	switch (flags) {
2775 	case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_SET:
2776 		apply_worker_index = (uint32_t)value1;
2777 		kr = thread_shared_rsrc_policy_set(thread, apply_worker_index, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2778 		/*
2779 		 * KERN_INVALID_POLICY indicates that the thread was trying to bind to a
2780 		 * cluster which it was not eligible to execute on.
2781 		 */
2782 		return (kr == KERN_SUCCESS) ? 0 : ((kr == KERN_INVALID_POLICY) ? ENOTSUP : EINVAL);
2783 	case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_CLEAR:
2784 		kr = thread_shared_rsrc_policy_clear(thread, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2785 		return (kr == KERN_SUCCESS) ? 0 : EINVAL;
2786 	default:
2787 		return EINVAL;
2788 	}
2789 }
2790 
2791 #define ENSURE_UNUSED(arg) \
2792 	        ({ if ((arg) != 0) { return EINVAL; } })
2793 
2794 int
bsdthread_ctl(struct proc * p,struct bsdthread_ctl_args * uap,int * retval)2795 bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval)
2796 {
2797 	switch (uap->cmd) {
2798 	case BSDTHREAD_CTL_QOS_OVERRIDE_START:
2799 		return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1,
2800 		           (pthread_priority_t)uap->arg2, uap->arg3);
2801 	case BSDTHREAD_CTL_QOS_OVERRIDE_END:
2802 		ENSURE_UNUSED(uap->arg3);
2803 		return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1,
2804 		           (user_addr_t)uap->arg2);
2805 
2806 	case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH:
2807 		return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1,
2808 		           (pthread_priority_t)uap->arg2, uap->arg3);
2809 	case BSDTHREAD_CTL_QOS_OVERRIDE_RESET:
2810 		return workq_thread_reset_dispatch_override(p, current_thread());
2811 
2812 	case BSDTHREAD_CTL_SET_SELF:
2813 		return bsdthread_set_self(p, current_thread(),
2814 		           (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2,
2815 		           (enum workq_set_self_flags)uap->arg3);
2816 
2817 	case BSDTHREAD_CTL_QOS_MAX_PARALLELISM:
2818 		ENSURE_UNUSED(uap->arg3);
2819 		return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1,
2820 		           (unsigned long)uap->arg2, retval);
2821 	case BSDTHREAD_CTL_WORKQ_ALLOW_KILL:
2822 		ENSURE_UNUSED(uap->arg2);
2823 		ENSURE_UNUSED(uap->arg3);
2824 		return workq_thread_allow_kill(p, current_thread(), (bool)uap->arg1);
2825 	case BSDTHREAD_CTL_DISPATCH_APPLY_ATTR:
2826 		return bsdthread_dispatch_apply_attr(p, current_thread(),
2827 		           (unsigned long)uap->arg1, (uint64_t)uap->arg2,
2828 		           (uint64_t)uap->arg3);
2829 	case BSDTHREAD_CTL_WORKQ_ALLOW_SIGMASK:
2830 		return workq_allow_sigmask(p, (int)uap->arg1);
2831 	case BSDTHREAD_CTL_SET_QOS:
2832 	case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD:
2833 	case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET:
2834 		/* no longer supported */
2835 		return ENOTSUP;
2836 
2837 	default:
2838 		return EINVAL;
2839 	}
2840 }
2841 
2842 #pragma mark workqueue thread manipulation
2843 
2844 static void __dead2
2845 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2846     struct uthread *uth, uint32_t setup_flags);
2847 
2848 static void __dead2
2849 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2850     struct uthread *uth, uint32_t setup_flags);
2851 
2852 static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2;
2853 
2854 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2855 static inline uint64_t
workq_trace_req_id(workq_threadreq_t req)2856 workq_trace_req_id(workq_threadreq_t req)
2857 {
2858 	struct kqworkloop *kqwl;
2859 	if (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2860 		kqwl = __container_of(req, struct kqworkloop, kqwl_request);
2861 		return kqwl->kqwl_dynamicid;
2862 	}
2863 
2864 	return VM_KERNEL_ADDRHIDE(req);
2865 }
2866 #endif
2867 
2868 /**
2869  * Entry point for libdispatch to ask for threads
2870  */
2871 static int
workq_reqthreads(struct proc * p,uint32_t reqcount,pthread_priority_t pp,bool cooperative)2872 workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp, bool cooperative)
2873 {
2874 	thread_qos_t qos = _pthread_priority_thread_qos(pp);
2875 	struct workqueue *wq = proc_get_wqptr(p);
2876 	uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI;
2877 	int ret = 0;
2878 
2879 	if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX ||
2880 	    qos == THREAD_QOS_UNSPECIFIED) {
2881 		ret = EINVAL;
2882 		goto exit;
2883 	}
2884 
2885 	WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE,
2886 	    wq, reqcount, pp, cooperative);
2887 
2888 	workq_threadreq_t req = zalloc(workq_zone_threadreq);
2889 	priority_queue_entry_init(&req->tr_entry);
2890 	req->tr_state = WORKQ_TR_STATE_NEW;
2891 	req->tr_qos   = qos;
2892 	workq_tr_flags_t tr_flags = 0;
2893 
2894 	if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
2895 		tr_flags |= WORKQ_TR_FLAG_OVERCOMMIT;
2896 		upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2897 	}
2898 
2899 	if (cooperative) {
2900 		tr_flags |= WORKQ_TR_FLAG_COOPERATIVE;
2901 		upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
2902 
2903 		if (reqcount > 1) {
2904 			ret = ENOTSUP;
2905 			goto free_and_exit;
2906 		}
2907 	}
2908 
2909 	/* A thread request cannot be both overcommit and cooperative */
2910 	if (workq_tr_is_cooperative(tr_flags) &&
2911 	    workq_tr_is_overcommit(tr_flags)) {
2912 		ret = EINVAL;
2913 		goto free_and_exit;
2914 	}
2915 	req->tr_flags = tr_flags;
2916 
2917 	WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE,
2918 	    wq, workq_trace_req_id(req), req->tr_qos, reqcount);
2919 
2920 	workq_lock_spin(wq);
2921 	do {
2922 		if (_wq_exiting(wq)) {
2923 			goto unlock_and_exit;
2924 		}
2925 
2926 		/*
2927 		 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2928 		 * threads without pacing, to inform the scheduler of that workload.
2929 		 *
2930 		 * The last requests, or the ones that failed the admission checks are
2931 		 * enqueued and go through the regular creator codepath.
2932 		 *
2933 		 * If there aren't enough threads, add one, but re-evaluate everything
2934 		 * as conditions may now have changed.
2935 		 */
2936 		unpaced = reqcount - 1;
2937 
2938 		if (reqcount > 1) {
2939 			/* We don't handle asking for parallelism on the cooperative
2940 			 * workqueue just yet */
2941 			assert(!workq_threadreq_is_cooperative(req));
2942 
2943 			if (workq_threadreq_is_nonovercommit(req)) {
2944 				unpaced = workq_constrained_allowance(wq, qos, NULL, false);
2945 				if (unpaced >= reqcount - 1) {
2946 					unpaced = reqcount - 1;
2947 				}
2948 			}
2949 		}
2950 
2951 		/*
2952 		 * This path does not currently handle custom workloop parameters
2953 		 * when creating threads for parallelism.
2954 		 */
2955 		assert(!(req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS));
2956 
2957 		/*
2958 		 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2959 		 */
2960 		while (unpaced > 0 && wq->wq_thidlecount) {
2961 			struct uthread *uth;
2962 			bool needs_wakeup;
2963 			uint8_t uu_flags = UT_WORKQ_EARLY_BOUND;
2964 
2965 			if (workq_tr_is_overcommit(req->tr_flags)) {
2966 				uu_flags |= UT_WORKQ_OVERCOMMIT;
2967 			}
2968 
2969 			uth = workq_pop_idle_thread(wq, uu_flags, &needs_wakeup);
2970 
2971 			_wq_thactive_inc(wq, qos);
2972 			wq->wq_thscheduled_count[_wq_bucket(qos)]++;
2973 			workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
2974 			wq->wq_fulfilled++;
2975 
2976 			uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
2977 			uth->uu_save.uus_workq_park_data.thread_request = req;
2978 			if (needs_wakeup) {
2979 				workq_thread_wakeup(uth);
2980 			}
2981 			unpaced--;
2982 			reqcount--;
2983 		}
2984 	} while (unpaced && wq->wq_nthreads < wq_max_threads &&
2985 	    workq_add_new_idle_thread(p, wq));
2986 
2987 	if (_wq_exiting(wq)) {
2988 		goto unlock_and_exit;
2989 	}
2990 
2991 	req->tr_count = (uint16_t)reqcount;
2992 	if (workq_threadreq_enqueue(wq, req)) {
2993 		/* This can drop the workqueue lock, and take it again */
2994 		workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
2995 	}
2996 	workq_unlock(wq);
2997 	return 0;
2998 
2999 unlock_and_exit:
3000 	workq_unlock(wq);
3001 free_and_exit:
3002 	zfree(workq_zone_threadreq, req);
3003 exit:
3004 	return ret;
3005 }
3006 
3007 bool
workq_kern_threadreq_initiate(struct proc * p,workq_threadreq_t req,struct turnstile * workloop_ts,thread_qos_t qos,workq_kern_threadreq_flags_t flags)3008 workq_kern_threadreq_initiate(struct proc *p, workq_threadreq_t req,
3009     struct turnstile *workloop_ts, thread_qos_t qos,
3010     workq_kern_threadreq_flags_t flags)
3011 {
3012 	struct workqueue *wq = proc_get_wqptr_fast(p);
3013 	struct uthread *uth = NULL;
3014 
3015 	assert(req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT));
3016 
3017 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
3018 		workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
3019 		qos = thread_workq_qos_for_pri(trp.trp_pri);
3020 		if (qos == THREAD_QOS_UNSPECIFIED) {
3021 			qos = WORKQ_THREAD_QOS_ABOVEUI;
3022 		}
3023 	}
3024 
3025 	assert(req->tr_state == WORKQ_TR_STATE_IDLE);
3026 	priority_queue_entry_init(&req->tr_entry);
3027 	req->tr_count = 1;
3028 	req->tr_state = WORKQ_TR_STATE_NEW;
3029 	req->tr_qos   = qos;
3030 
3031 	WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq,
3032 	    workq_trace_req_id(req), qos, 1);
3033 
3034 	if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) {
3035 		/*
3036 		 * we're called back synchronously from the context of
3037 		 * kqueue_threadreq_unbind from within workq_thread_return()
3038 		 * we can try to match up this thread with this request !
3039 		 */
3040 		uth = current_uthread();
3041 		assert(uth->uu_kqr_bound == NULL);
3042 	}
3043 
3044 	workq_lock_spin(wq);
3045 	if (_wq_exiting(wq)) {
3046 		req->tr_state = WORKQ_TR_STATE_IDLE;
3047 		workq_unlock(wq);
3048 		return false;
3049 	}
3050 
3051 	if (uth && workq_threadreq_admissible(wq, uth, req)) {
3052 		/* This is the case of the rebind - we were about to park and unbind
3053 		 * when more events came so keep the binding.
3054 		 */
3055 		assert(uth != wq->wq_creator);
3056 
3057 		if (uth->uu_workq_pri.qos_bucket != req->tr_qos) {
3058 			_wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos);
3059 			workq_thread_reset_pri(wq, uth, req, /*unpark*/ false);
3060 		}
3061 		/*
3062 		 * We're called from workq_kern_threadreq_initiate()
3063 		 * due to an unbind, with the kq req held.
3064 		 */
3065 		WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
3066 		    workq_trace_req_id(req), req->tr_flags, 0);
3067 		wq->wq_fulfilled++;
3068 
3069 		kqueue_threadreq_bind(p, req, get_machthread(uth), 0);
3070 	} else {
3071 		if (workloop_ts) {
3072 			workq_perform_turnstile_operation_locked(wq, ^{
3073 				turnstile_update_inheritor(workloop_ts, wq->wq_turnstile,
3074 				TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
3075 				turnstile_update_inheritor_complete(workloop_ts,
3076 				TURNSTILE_INTERLOCK_HELD);
3077 			});
3078 		}
3079 
3080 		bool reevaluate_creator_thread_group = false;
3081 #if CONFIG_PREADOPT_TG
3082 		reevaluate_creator_thread_group = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
3083 #endif
3084 		/* We enqueued the highest priority item or we may need to reevaluate if
3085 		 * the creator needs a thread group pre-adoption */
3086 		if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_thread_group) {
3087 			workq_schedule_creator(p, wq, flags);
3088 		}
3089 	}
3090 
3091 	workq_unlock(wq);
3092 
3093 	return true;
3094 }
3095 
3096 void
workq_kern_threadreq_modify(struct proc * p,workq_threadreq_t req,thread_qos_t qos,workq_kern_threadreq_flags_t flags)3097 workq_kern_threadreq_modify(struct proc *p, workq_threadreq_t req,
3098     thread_qos_t qos, workq_kern_threadreq_flags_t flags)
3099 {
3100 	struct workqueue *wq = proc_get_wqptr_fast(p);
3101 	bool make_overcommit = false;
3102 
3103 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
3104 		/* Requests outside-of-QoS shouldn't accept modify operations */
3105 		return;
3106 	}
3107 
3108 	workq_lock_spin(wq);
3109 
3110 	assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3111 	assert(req->tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP));
3112 
3113 	if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3114 		kqueue_threadreq_bind(p, req, req->tr_thread, 0);
3115 		workq_unlock(wq);
3116 		return;
3117 	}
3118 
3119 	if (flags & WORKQ_THREADREQ_MAKE_OVERCOMMIT) {
3120 		/* TODO (rokhinip): We come into this code path for kqwl thread
3121 		 * requests. kqwl requests cannot be cooperative.
3122 		 */
3123 		assert(!workq_threadreq_is_cooperative(req));
3124 
3125 		make_overcommit = workq_threadreq_is_nonovercommit(req);
3126 	}
3127 
3128 	if (_wq_exiting(wq) || (req->tr_qos == qos && !make_overcommit)) {
3129 		workq_unlock(wq);
3130 		return;
3131 	}
3132 
3133 	assert(req->tr_count == 1);
3134 	if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3135 		panic("Invalid thread request (%p) state %d", req, req->tr_state);
3136 	}
3137 
3138 	WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq,
3139 	    workq_trace_req_id(req), qos, 0);
3140 
3141 	struct priority_queue_sched_max *pq = workq_priority_queue_for_req(wq, req);
3142 	workq_threadreq_t req_max;
3143 
3144 	/*
3145 	 * Stage 1: Dequeue the request from its priority queue.
3146 	 *
3147 	 * If we dequeue the root item of the constrained priority queue,
3148 	 * maintain the best constrained request qos invariant.
3149 	 */
3150 	if (priority_queue_remove(pq, &req->tr_entry)) {
3151 		if (workq_threadreq_is_nonovercommit(req)) {
3152 			_wq_thactive_refresh_best_constrained_req_qos(wq);
3153 		}
3154 	}
3155 
3156 	/*
3157 	 * Stage 2: Apply changes to the thread request
3158 	 *
3159 	 * If the item will not become the root of the priority queue it belongs to,
3160 	 * then we need to wait in line, just enqueue and return quickly.
3161 	 */
3162 	if (__improbable(make_overcommit)) {
3163 		req->tr_flags ^= WORKQ_TR_FLAG_OVERCOMMIT;
3164 		pq = workq_priority_queue_for_req(wq, req);
3165 	}
3166 	req->tr_qos = qos;
3167 
3168 	req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry);
3169 	if (req_max && req_max->tr_qos >= qos) {
3170 		priority_queue_entry_set_sched_pri(pq, &req->tr_entry,
3171 		    workq_priority_for_req(req), false);
3172 		priority_queue_insert(pq, &req->tr_entry);
3173 		workq_unlock(wq);
3174 		return;
3175 	}
3176 
3177 	/*
3178 	 * Stage 3: Reevaluate whether we should run the thread request.
3179 	 *
3180 	 * Pretend the thread request is new again:
3181 	 * - adjust wq_reqcount to not count it anymore.
3182 	 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
3183 	 *   properly attempts a synchronous bind)
3184 	 */
3185 	wq->wq_reqcount--;
3186 	req->tr_state = WORKQ_TR_STATE_NEW;
3187 
3188 	/* We enqueued the highest priority item or we may need to reevaluate if
3189 	 * the creator needs a thread group pre-adoption if the request got a new TG */
3190 	bool reevaluate_creator_tg = false;
3191 
3192 #if CONFIG_PREADOPT_TG
3193 	reevaluate_creator_tg = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
3194 #endif
3195 
3196 	if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_tg) {
3197 		workq_schedule_creator(p, wq, flags);
3198 	}
3199 	workq_unlock(wq);
3200 }
3201 
3202 void
workq_kern_threadreq_lock(struct proc * p)3203 workq_kern_threadreq_lock(struct proc *p)
3204 {
3205 	workq_lock_spin(proc_get_wqptr_fast(p));
3206 }
3207 
3208 void
workq_kern_threadreq_unlock(struct proc * p)3209 workq_kern_threadreq_unlock(struct proc *p)
3210 {
3211 	workq_unlock(proc_get_wqptr_fast(p));
3212 }
3213 
3214 void
workq_kern_threadreq_update_inheritor(struct proc * p,workq_threadreq_t req,thread_t owner,struct turnstile * wl_ts,turnstile_update_flags_t flags)3215 workq_kern_threadreq_update_inheritor(struct proc *p, workq_threadreq_t req,
3216     thread_t owner, struct turnstile *wl_ts,
3217     turnstile_update_flags_t flags)
3218 {
3219 	struct workqueue *wq = proc_get_wqptr_fast(p);
3220 	turnstile_inheritor_t inheritor;
3221 
3222 	assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3223 	assert(req->tr_flags & WORKQ_TR_FLAG_WORKLOOP);
3224 	workq_lock_held(wq);
3225 
3226 	if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3227 		kqueue_threadreq_bind(p, req, req->tr_thread,
3228 		    KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE);
3229 		return;
3230 	}
3231 
3232 	if (_wq_exiting(wq)) {
3233 		inheritor = TURNSTILE_INHERITOR_NULL;
3234 	} else {
3235 		if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3236 			panic("Invalid thread request (%p) state %d", req, req->tr_state);
3237 		}
3238 
3239 		if (owner) {
3240 			inheritor = owner;
3241 			flags |= TURNSTILE_INHERITOR_THREAD;
3242 		} else {
3243 			inheritor = wq->wq_turnstile;
3244 			flags |= TURNSTILE_INHERITOR_TURNSTILE;
3245 		}
3246 	}
3247 
3248 	workq_perform_turnstile_operation_locked(wq, ^{
3249 		turnstile_update_inheritor(wl_ts, inheritor, flags);
3250 	});
3251 }
3252 
3253 void
workq_kern_threadreq_redrive(struct proc * p,workq_kern_threadreq_flags_t flags)3254 workq_kern_threadreq_redrive(struct proc *p, workq_kern_threadreq_flags_t flags)
3255 {
3256 	struct workqueue *wq = proc_get_wqptr_fast(p);
3257 
3258 	workq_lock_spin(wq);
3259 	workq_schedule_creator(p, wq, flags);
3260 	workq_unlock(wq);
3261 }
3262 
3263 /*
3264  * Always called at AST by the thread on itself
3265  *
3266  * Upon quantum expiry, the workqueue subsystem evaluates its state and decides
3267  * on what the thread should do next. The TSD value is always set by the thread
3268  * on itself in the kernel and cleared either by userspace when it acks the TSD
3269  * value and takes action, or by the thread in the kernel when the quantum
3270  * expires again.
3271  */
3272 void
workq_kern_quantum_expiry_reevaluate(proc_t proc,thread_t thread)3273 workq_kern_quantum_expiry_reevaluate(proc_t proc, thread_t thread)
3274 {
3275 	struct uthread *uth = get_bsdthread_info(thread);
3276 
3277 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3278 		return;
3279 	}
3280 
3281 	if (!thread_supports_cooperative_workqueue(thread)) {
3282 		panic("Quantum expired for thread that doesn't support cooperative workqueue");
3283 	}
3284 
3285 	thread_qos_t qos = uth->uu_workq_pri.qos_bucket;
3286 	if (qos == THREAD_QOS_UNSPECIFIED) {
3287 		panic("Thread should not have workq bucket of QoS UN");
3288 	}
3289 
3290 	assert(thread_has_expired_workqueue_quantum(thread, false));
3291 
3292 	struct workqueue *wq = proc_get_wqptr(proc);
3293 	assert(wq != NULL);
3294 
3295 	/*
3296 	 * For starters, we're just going to evaluate and see if we need to narrow
3297 	 * the pool and tell this thread to park if needed. In the future, we'll
3298 	 * evaluate and convey other workqueue state information like needing to
3299 	 * pump kevents, etc.
3300 	 */
3301 	uint64_t flags = 0;
3302 
3303 	workq_lock_spin(wq);
3304 
3305 	if (workq_thread_is_cooperative(uth)) {
3306 		if (!workq_cooperative_allowance(wq, qos, uth, false)) {
3307 			flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3308 		} else {
3309 			/* In the future, when we have kevent hookups for the cooperative
3310 			 * pool, we need fancier logic for what userspace should do. But
3311 			 * right now, only userspace thread requests exist - so we'll just
3312 			 * tell userspace to shuffle work items */
3313 			flags |= PTHREAD_WQ_QUANTUM_EXPIRY_SHUFFLE;
3314 		}
3315 	} else if (workq_thread_is_nonovercommit(uth)) {
3316 		if (!workq_constrained_allowance(wq, qos, uth, false)) {
3317 			flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3318 		}
3319 	}
3320 	workq_unlock(wq);
3321 
3322 	WQ_TRACE(TRACE_wq_quantum_expiry_reevaluate, flags, 0, 0, 0);
3323 
3324 	kevent_set_workq_quantum_expiry_user_tsd(proc, thread, flags);
3325 
3326 	/* We have conveyed to userspace about what it needs to do upon quantum
3327 	 * expiry, now rearm the workqueue quantum again */
3328 	thread_arm_workqueue_quantum(get_machthread(uth));
3329 }
3330 
3331 void
workq_schedule_creator_turnstile_redrive(struct workqueue * wq,bool locked)3332 workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked)
3333 {
3334 	if (locked) {
3335 		workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_NONE);
3336 	} else {
3337 		workq_schedule_immediate_thread_creation(wq);
3338 	}
3339 }
3340 
3341 static int
workq_thread_return(struct proc * p,struct workq_kernreturn_args * uap,struct workqueue * wq)3342 workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap,
3343     struct workqueue *wq)
3344 {
3345 	thread_t th = current_thread();
3346 	struct uthread *uth = get_bsdthread_info(th);
3347 	workq_threadreq_t kqr = uth->uu_kqr_bound;
3348 	workq_threadreq_param_t trp = { };
3349 	int nevents = uap->affinity, error;
3350 	user_addr_t eventlist = uap->item;
3351 
3352 	if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3353 	    (uth->uu_workq_flags & UT_WORKQ_DYING)) {
3354 		return EINVAL;
3355 	}
3356 
3357 	if (eventlist && nevents && kqr == NULL) {
3358 		return EINVAL;
3359 	}
3360 
3361 	/*
3362 	 * Reset signal mask on the workqueue thread to default state,
3363 	 * but do not touch any signals that are marked for preservation.
3364 	 */
3365 	sigset_t resettable = uth->uu_sigmask & ~p->p_workq_allow_sigmask;
3366 	if (resettable != (sigset_t)~workq_threadmask) {
3367 		proc_lock(p);
3368 		uth->uu_sigmask |= ~workq_threadmask & ~p->p_workq_allow_sigmask;
3369 		proc_unlock(p);
3370 	}
3371 
3372 	if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
3373 		/*
3374 		 * Ensure we store the threadreq param before unbinding
3375 		 * the kqr from this thread.
3376 		 */
3377 		trp = kqueue_threadreq_workloop_param(kqr);
3378 	}
3379 
3380 	/*
3381 	 * Freeze the base pri while we decide the fate of this thread.
3382 	 *
3383 	 * Either:
3384 	 * - we return to user and kevent_cleanup will have unfrozen the base pri,
3385 	 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will.
3386 	 */
3387 	thread_freeze_base_pri(th);
3388 
3389 	if (kqr) {
3390 		uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE;
3391 		if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
3392 			upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
3393 		} else {
3394 			upcall_flags |= WQ_FLAG_THREAD_KEVENT;
3395 		}
3396 		if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
3397 			upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
3398 		} else {
3399 			if (workq_thread_is_overcommit(uth)) {
3400 				upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
3401 			}
3402 			if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
3403 				upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
3404 			} else {
3405 				upcall_flags |= uth->uu_workq_pri.qos_req |
3406 				    WQ_FLAG_THREAD_PRIO_QOS;
3407 			}
3408 		}
3409 		error = pthread_functions->workq_handle_stack_events(p, th,
3410 		    get_task_map(proc_task(p)), uth->uu_workq_stackaddr,
3411 		    uth->uu_workq_thport, eventlist, nevents, upcall_flags);
3412 		if (error) {
3413 			assert(uth->uu_kqr_bound == kqr);
3414 			return error;
3415 		}
3416 
3417 		// pthread is supposed to pass KEVENT_FLAG_PARKING here
3418 		// which should cause the above call to either:
3419 		// - not return
3420 		// - return an error
3421 		// - return 0 and have unbound properly
3422 		assert(uth->uu_kqr_bound == NULL);
3423 	}
3424 
3425 	WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0);
3426 
3427 	thread_sched_call(th, NULL);
3428 	thread_will_park_or_terminate(th);
3429 #if CONFIG_WORKLOOP_DEBUG
3430 	UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, });
3431 #endif
3432 
3433 	workq_lock_spin(wq);
3434 	WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3435 	uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
3436 	workq_select_threadreq_or_park_and_unlock(p, wq, uth,
3437 	    WQ_SETUP_CLEAR_VOUCHER);
3438 	__builtin_unreachable();
3439 }
3440 
3441 /**
3442  * Multiplexed call to interact with the workqueue mechanism
3443  */
3444 int
workq_kernreturn(struct proc * p,struct workq_kernreturn_args * uap,int32_t * retval)3445 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval)
3446 {
3447 	int options = uap->options;
3448 	int arg2 = uap->affinity;
3449 	int arg3 = uap->prio;
3450 	struct workqueue *wq = proc_get_wqptr(p);
3451 	int error = 0;
3452 
3453 	if ((p->p_lflag & P_LREGISTER) == 0) {
3454 		return EINVAL;
3455 	}
3456 
3457 	switch (options) {
3458 	case WQOPS_QUEUE_NEWSPISUPP: {
3459 		/*
3460 		 * arg2 = offset of serialno into dispatch queue
3461 		 * arg3 = kevent support
3462 		 */
3463 		int offset = arg2;
3464 		if (arg3 & 0x01) {
3465 			// If we get here, then userspace has indicated support for kevent delivery.
3466 		}
3467 
3468 		p->p_dispatchqueue_serialno_offset = (uint64_t)offset;
3469 		break;
3470 	}
3471 	case WQOPS_QUEUE_REQTHREADS: {
3472 		/*
3473 		 * arg2 = number of threads to start
3474 		 * arg3 = priority
3475 		 */
3476 		error = workq_reqthreads(p, arg2, arg3, false);
3477 		break;
3478 	}
3479 	/* For requesting threads for the cooperative pool */
3480 	case WQOPS_QUEUE_REQTHREADS2: {
3481 		/*
3482 		 * arg2 = number of threads to start
3483 		 * arg3 = priority
3484 		 */
3485 		error = workq_reqthreads(p, arg2, arg3, true);
3486 		break;
3487 	}
3488 	case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
3489 		/*
3490 		 * arg2 = priority for the manager thread
3491 		 *
3492 		 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
3493 		 * the low bits of the value contains a scheduling priority
3494 		 * instead of a QOS value
3495 		 */
3496 		pthread_priority_t pri = arg2;
3497 
3498 		if (wq == NULL) {
3499 			error = EINVAL;
3500 			break;
3501 		}
3502 
3503 		/*
3504 		 * Normalize the incoming priority so that it is ordered numerically.
3505 		 */
3506 		if (_pthread_priority_has_sched_pri(pri)) {
3507 			pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK |
3508 			    _PTHREAD_PRIORITY_SCHED_PRI_FLAG);
3509 		} else {
3510 			thread_qos_t qos = _pthread_priority_thread_qos(pri);
3511 			int relpri = _pthread_priority_relpri(pri);
3512 			if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE ||
3513 			    qos == THREAD_QOS_UNSPECIFIED) {
3514 				error = EINVAL;
3515 				break;
3516 			}
3517 			pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3518 		}
3519 
3520 		/*
3521 		 * If userspace passes a scheduling priority, that wins over any QoS.
3522 		 * Userspace should takes care not to lower the priority this way.
3523 		 */
3524 		workq_lock_spin(wq);
3525 		if (wq->wq_event_manager_priority < (uint32_t)pri) {
3526 			wq->wq_event_manager_priority = (uint32_t)pri;
3527 		}
3528 		workq_unlock(wq);
3529 		break;
3530 	}
3531 	case WQOPS_THREAD_KEVENT_RETURN:
3532 	case WQOPS_THREAD_WORKLOOP_RETURN:
3533 	case WQOPS_THREAD_RETURN: {
3534 		error = workq_thread_return(p, uap, wq);
3535 		break;
3536 	}
3537 
3538 	case WQOPS_SHOULD_NARROW: {
3539 		/*
3540 		 * arg2 = priority to test
3541 		 * arg3 = unused
3542 		 */
3543 		thread_t th = current_thread();
3544 		struct uthread *uth = get_bsdthread_info(th);
3545 		if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3546 		    (uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) {
3547 			error = EINVAL;
3548 			break;
3549 		}
3550 
3551 		thread_qos_t qos = _pthread_priority_thread_qos(arg2);
3552 		if (qos == THREAD_QOS_UNSPECIFIED) {
3553 			error = EINVAL;
3554 			break;
3555 		}
3556 		workq_lock_spin(wq);
3557 		bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false);
3558 		workq_unlock(wq);
3559 
3560 		*retval = should_narrow;
3561 		break;
3562 	}
3563 	case WQOPS_SETUP_DISPATCH: {
3564 		/*
3565 		 * item = pointer to workq_dispatch_config structure
3566 		 * arg2 = sizeof(item)
3567 		 */
3568 		struct workq_dispatch_config cfg;
3569 		bzero(&cfg, sizeof(cfg));
3570 
3571 		error = copyin(uap->item, &cfg, MIN(sizeof(cfg), (unsigned long) arg2));
3572 		if (error) {
3573 			break;
3574 		}
3575 
3576 		if (cfg.wdc_flags & ~WORKQ_DISPATCH_SUPPORTED_FLAGS ||
3577 		    cfg.wdc_version < WORKQ_DISPATCH_MIN_SUPPORTED_VERSION) {
3578 			error = ENOTSUP;
3579 			break;
3580 		}
3581 
3582 		/* Load fields from version 1 */
3583 		p->p_dispatchqueue_serialno_offset = cfg.wdc_queue_serialno_offs;
3584 
3585 		/* Load fields from version 2 */
3586 		if (cfg.wdc_version >= 2) {
3587 			p->p_dispatchqueue_label_offset = cfg.wdc_queue_label_offs;
3588 		}
3589 
3590 		break;
3591 	}
3592 	default:
3593 		error = EINVAL;
3594 		break;
3595 	}
3596 
3597 	return error;
3598 }
3599 
3600 /*
3601  * We have no work to do, park ourselves on the idle list.
3602  *
3603  * Consumes the workqueue lock and does not return.
3604  */
3605 __attribute__((noreturn, noinline))
3606 static void
workq_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)3607 workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth,
3608     uint32_t setup_flags)
3609 {
3610 	assert(uth == current_uthread());
3611 	assert(uth->uu_kqr_bound == NULL);
3612 	workq_push_idle_thread(p, wq, uth, setup_flags); // may not return
3613 
3614 	workq_thread_reset_cpupercent(NULL, uth);
3615 
3616 #if CONFIG_PREADOPT_TG
3617 	/* Clear the preadoption thread group on the thread.
3618 	 *
3619 	 * Case 1:
3620 	 *		Creator thread which never picked up a thread request. We set a
3621 	 *		preadoption thread group on creator threads but if it never picked
3622 	 *		up a thread request and didn't go to userspace, then the thread will
3623 	 *		park with a preadoption thread group but no explicitly adopted
3624 	 *		voucher or work interval.
3625 	 *
3626 	 *		We drop the preadoption thread group here before proceeding to park.
3627 	 *		Note - we may get preempted when we drop the workq lock below.
3628 	 *
3629 	 * Case 2:
3630 	 *		Thread picked up a thread request and bound to it and returned back
3631 	 *		from userspace and is parking. At this point, preadoption thread
3632 	 *		group should be NULL since the thread has unbound from the thread
3633 	 *		request. So this operation should be a no-op.
3634 	 */
3635 	thread_set_preadopt_thread_group(get_machthread(uth), NULL);
3636 #endif
3637 
3638 	if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) &&
3639 	    !(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3640 		workq_unlock(wq);
3641 
3642 		/*
3643 		 * workq_push_idle_thread() will unset `has_stack`
3644 		 * if it wants us to free the stack before parking.
3645 		 */
3646 		if (!uth->uu_save.uus_workq_park_data.has_stack) {
3647 			pthread_functions->workq_markfree_threadstack(p,
3648 			    get_machthread(uth), get_task_map(proc_task(p)),
3649 			    uth->uu_workq_stackaddr);
3650 		}
3651 
3652 		/*
3653 		 * When we remove the voucher from the thread, we may lose our importance
3654 		 * causing us to get preempted, so we do this after putting the thread on
3655 		 * the idle list.  Then, when we get our importance back we'll be able to
3656 		 * use this thread from e.g. the kevent call out to deliver a boosting
3657 		 * message.
3658 		 *
3659 		 * Note that setting the voucher to NULL will not clear the preadoption
3660 		 * thread since this thread could have become the creator again and
3661 		 * perhaps acquired a preadoption thread group.
3662 		 */
3663 		__assert_only kern_return_t kr;
3664 		kr = thread_set_voucher_name(MACH_PORT_NULL);
3665 		assert(kr == KERN_SUCCESS);
3666 
3667 		workq_lock_spin(wq);
3668 		uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
3669 		setup_flags &= ~WQ_SETUP_CLEAR_VOUCHER;
3670 	}
3671 
3672 	WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3673 
3674 	if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
3675 		/*
3676 		 * While we'd dropped the lock to unset our voucher, someone came
3677 		 * around and made us runnable.  But because we weren't waiting on the
3678 		 * event their thread_wakeup() was ineffectual.  To correct for that,
3679 		 * we just run the continuation ourselves.
3680 		 */
3681 		workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
3682 		__builtin_unreachable();
3683 	}
3684 
3685 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3686 		workq_unpark_for_death_and_unlock(p, wq, uth,
3687 		    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
3688 		__builtin_unreachable();
3689 	}
3690 
3691 	/* Disarm the workqueue quantum since the thread is now idle */
3692 	thread_disarm_workqueue_quantum(get_machthread(uth));
3693 
3694 	thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue);
3695 	assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
3696 	workq_unlock(wq);
3697 	thread_block(workq_unpark_continue);
3698 	__builtin_unreachable();
3699 }
3700 
3701 static inline bool
workq_may_start_event_mgr_thread(struct workqueue * wq,struct uthread * uth)3702 workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth)
3703 {
3704 	/*
3705 	 * There's an event manager request and either:
3706 	 * - no event manager currently running
3707 	 * - we are re-using the event manager
3708 	 */
3709 	return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 ||
3710 	       (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER);
3711 }
3712 
3713 static uint32_t
workq_constrained_allowance(struct workqueue * wq,thread_qos_t at_qos,struct uthread * uth,bool may_start_timer)3714 workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos,
3715     struct uthread *uth, bool may_start_timer)
3716 {
3717 	assert(at_qos != WORKQ_THREAD_QOS_MANAGER);
3718 	uint32_t count = 0;
3719 
3720 	uint32_t max_count = wq->wq_constrained_threads_scheduled;
3721 	if (uth && workq_thread_is_nonovercommit(uth)) {
3722 		/*
3723 		 * don't count the current thread as scheduled
3724 		 */
3725 		assert(max_count > 0);
3726 		max_count--;
3727 	}
3728 	if (max_count >= wq_max_constrained_threads) {
3729 		WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1,
3730 		    wq->wq_constrained_threads_scheduled,
3731 		    wq_max_constrained_threads);
3732 		/*
3733 		 * we need 1 or more constrained threads to return to the kernel before
3734 		 * we can dispatch additional work
3735 		 */
3736 		return 0;
3737 	}
3738 	max_count -= wq_max_constrained_threads;
3739 
3740 	/*
3741 	 * Compute a metric for many how many threads are active.  We find the
3742 	 * highest priority request outstanding and then add up the number of active
3743 	 * threads in that and all higher-priority buckets.  We'll also add any
3744 	 * "busy" threads which are not currently active but blocked recently enough
3745 	 * that we can't be sure that they won't be unblocked soon and start
3746 	 * being active again.
3747 	 *
3748 	 * We'll then compare this metric to our max concurrency to decide whether
3749 	 * to add a new thread.
3750 	 */
3751 
3752 	uint32_t busycount, thactive_count;
3753 
3754 	thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq),
3755 	    at_qos, &busycount, NULL);
3756 
3757 	if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER &&
3758 	    at_qos <= uth->uu_workq_pri.qos_bucket) {
3759 		/*
3760 		 * Don't count this thread as currently active, but only if it's not
3761 		 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
3762 		 * managers.
3763 		 */
3764 		assert(thactive_count > 0);
3765 		thactive_count--;
3766 	}
3767 
3768 	count = wq_max_parallelism[_wq_bucket(at_qos)];
3769 	if (count > thactive_count + busycount) {
3770 		count -= thactive_count + busycount;
3771 		WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2,
3772 		    thactive_count, busycount);
3773 		return MIN(count, max_count);
3774 	} else {
3775 		WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3,
3776 		    thactive_count, busycount);
3777 	}
3778 
3779 	if (may_start_timer) {
3780 		/*
3781 		 * If this is called from the add timer, we won't have another timer
3782 		 * fire when the thread exits the "busy" state, so rearm the timer.
3783 		 */
3784 		workq_schedule_delayed_thread_creation(wq, 0);
3785 	}
3786 
3787 	return 0;
3788 }
3789 
3790 static bool
workq_threadreq_admissible(struct workqueue * wq,struct uthread * uth,workq_threadreq_t req)3791 workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
3792     workq_threadreq_t req)
3793 {
3794 	if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
3795 		return workq_may_start_event_mgr_thread(wq, uth);
3796 	}
3797 	if (workq_threadreq_is_cooperative(req)) {
3798 		return workq_cooperative_allowance(wq, req->tr_qos, uth, true);
3799 	}
3800 	if (workq_threadreq_is_nonovercommit(req)) {
3801 		return workq_constrained_allowance(wq, req->tr_qos, uth, true);
3802 	}
3803 
3804 	return true;
3805 }
3806 
3807 /*
3808  * Called from the context of selecting thread requests for threads returning
3809  * from userspace or creator thread
3810  */
3811 static workq_threadreq_t
workq_cooperative_queue_best_req(struct workqueue * wq,struct uthread * uth)3812 workq_cooperative_queue_best_req(struct workqueue *wq, struct uthread *uth)
3813 {
3814 	workq_lock_held(wq);
3815 
3816 	/*
3817 	 * If the current thread is cooperative, we need to exclude it as part of
3818 	 * cooperative schedule count since this thread is looking for a new
3819 	 * request. Change in the schedule count for cooperative pool therefore
3820 	 * requires us to reeevaluate the next best request for it.
3821 	 */
3822 	if (uth && workq_thread_is_cooperative(uth)) {
3823 		_wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_req);
3824 
3825 		(void) _wq_cooperative_queue_refresh_best_req_qos(wq);
3826 
3827 		_wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_req);
3828 	} else {
3829 		/*
3830 		 * The old value that was already precomputed should be safe to use -
3831 		 * add an assert that asserts that the best req QoS doesn't change in
3832 		 * this case
3833 		 */
3834 		assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
3835 	}
3836 
3837 	thread_qos_t qos = wq->wq_cooperative_queue_best_req_qos;
3838 
3839 	/* There are no eligible requests in the cooperative pool */
3840 	if (qos == THREAD_QOS_UNSPECIFIED) {
3841 		return NULL;
3842 	}
3843 	assert(qos != WORKQ_THREAD_QOS_ABOVEUI);
3844 	assert(qos != WORKQ_THREAD_QOS_MANAGER);
3845 
3846 	uint8_t bucket = _wq_bucket(qos);
3847 	assert(!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket]));
3848 
3849 	return STAILQ_FIRST(&wq->wq_cooperative_queue[bucket]);
3850 }
3851 
3852 static workq_threadreq_t
workq_threadreq_select_for_creator(struct workqueue * wq)3853 workq_threadreq_select_for_creator(struct workqueue *wq)
3854 {
3855 	workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
3856 	thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
3857 	uint8_t pri = 0;
3858 
3859 	/*
3860 	 * Compute the best priority request, and ignore the turnstile for now
3861 	 */
3862 
3863 	req_pri = priority_queue_max(&wq->wq_special_queue,
3864 	    struct workq_threadreq_s, tr_entry);
3865 	if (req_pri) {
3866 		pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
3867 		    &req_pri->tr_entry);
3868 	}
3869 
3870 	/*
3871 	 * Handle the manager thread request. The special queue might yield
3872 	 * a higher priority, but the manager always beats the QoS world.
3873 	 */
3874 
3875 	req_mgr = wq->wq_event_manager_threadreq;
3876 	if (req_mgr && workq_may_start_event_mgr_thread(wq, NULL)) {
3877 		uint32_t mgr_pri = wq->wq_event_manager_priority;
3878 
3879 		if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
3880 			mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
3881 		} else {
3882 			mgr_pri = thread_workq_pri_for_qos(
3883 				_pthread_priority_thread_qos(mgr_pri));
3884 		}
3885 
3886 		return mgr_pri >= pri ? req_mgr : req_pri;
3887 	}
3888 
3889 	/*
3890 	 * Compute the best QoS Request, and check whether it beats the "pri" one
3891 	 *
3892 	 * Start by comparing the overcommit and the cooperative pool
3893 	 */
3894 	req_qos = priority_queue_max(&wq->wq_overcommit_queue,
3895 	    struct workq_threadreq_s, tr_entry);
3896 	if (req_qos) {
3897 		qos = req_qos->tr_qos;
3898 	}
3899 
3900 	req_tmp = workq_cooperative_queue_best_req(wq, NULL);
3901 	if (req_tmp && qos <= req_tmp->tr_qos) {
3902 		/*
3903 		 * Cooperative TR is better between overcommit and cooperative.  Note
3904 		 * that if qos is same between overcommit and cooperative, we choose
3905 		 * cooperative.
3906 		 *
3907 		 * Pick cooperative pool if it passes the admissions check
3908 		 */
3909 		if (workq_cooperative_allowance(wq, req_tmp->tr_qos, NULL, true)) {
3910 			req_qos = req_tmp;
3911 			qos = req_qos->tr_qos;
3912 		}
3913 	}
3914 
3915 	/*
3916 	 * Compare the best QoS so far - either from overcommit or from cooperative
3917 	 * pool - and compare it with the constrained pool
3918 	 */
3919 	req_tmp = priority_queue_max(&wq->wq_constrained_queue,
3920 	    struct workq_threadreq_s, tr_entry);
3921 
3922 	if (req_tmp && qos < req_tmp->tr_qos) {
3923 		/*
3924 		 * Constrained pool is best in QoS between overcommit, cooperative
3925 		 * and constrained. Now check how it fairs against the priority case
3926 		 */
3927 		if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
3928 			return req_pri;
3929 		}
3930 
3931 		if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true)) {
3932 			/*
3933 			 * If the constrained thread request is the best one and passes
3934 			 * the admission check, pick it.
3935 			 */
3936 			return req_tmp;
3937 		}
3938 	}
3939 
3940 	/*
3941 	 * Compare the best of the QoS world with the priority
3942 	 */
3943 	if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
3944 		return req_pri;
3945 	}
3946 
3947 	if (req_qos) {
3948 		return req_qos;
3949 	}
3950 
3951 	/*
3952 	 * If we had no eligible request but we have a turnstile push,
3953 	 * it must be a non overcommit thread request that failed
3954 	 * the admission check.
3955 	 *
3956 	 * Just fake a BG thread request so that if the push stops the creator
3957 	 * priority just drops to 4.
3958 	 */
3959 	if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) {
3960 		static struct workq_threadreq_s workq_sync_push_fake_req = {
3961 			.tr_qos = THREAD_QOS_BACKGROUND,
3962 		};
3963 
3964 		return &workq_sync_push_fake_req;
3965 	}
3966 
3967 	return NULL;
3968 }
3969 
3970 /*
3971  * Returns true if this caused a change in the schedule counts of the
3972  * cooperative pool
3973  */
3974 static bool
workq_adjust_cooperative_constrained_schedule_counts(struct workqueue * wq,struct uthread * uth,thread_qos_t old_thread_qos,workq_tr_flags_t tr_flags)3975 workq_adjust_cooperative_constrained_schedule_counts(struct workqueue *wq,
3976     struct uthread *uth, thread_qos_t old_thread_qos, workq_tr_flags_t tr_flags)
3977 {
3978 	workq_lock_held(wq);
3979 
3980 	/*
3981 	 * Row: thread type
3982 	 * Column: Request type
3983 	 *
3984 	 *					overcommit		non-overcommit		cooperative
3985 	 * overcommit			X				case 1				case 2
3986 	 * cooperative		case 3				case 4				case 5
3987 	 * non-overcommit	case 6					X				case 7
3988 	 *
3989 	 * Move the thread to the right bucket depending on what state it currently
3990 	 * has and what state the thread req it picks, is going to have.
3991 	 *
3992 	 * Note that the creator thread is an overcommit thread.
3993 	 */
3994 	thread_qos_t new_thread_qos = uth->uu_workq_pri.qos_req;
3995 
3996 	/*
3997 	 * Anytime a cooperative bucket's schedule count changes, we need to
3998 	 * potentially refresh the next best QoS for that pool when we determine
3999 	 * the next request for the creator
4000 	 */
4001 	bool cooperative_pool_sched_count_changed = false;
4002 
4003 	if (workq_thread_is_overcommit(uth)) {
4004 		if (workq_tr_is_nonovercommit(tr_flags)) {
4005 			// Case 1: thread is overcommit, req is non-overcommit
4006 			wq->wq_constrained_threads_scheduled++;
4007 		} else if (workq_tr_is_cooperative(tr_flags)) {
4008 			// Case 2: thread is overcommit, req is cooperative
4009 			_wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
4010 			cooperative_pool_sched_count_changed = true;
4011 		}
4012 	} else if (workq_thread_is_cooperative(uth)) {
4013 		if (workq_tr_is_overcommit(tr_flags)) {
4014 			// Case 3: thread is cooperative, req is overcommit
4015 			_wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
4016 		} else if (workq_tr_is_nonovercommit(tr_flags)) {
4017 			// Case 4: thread is cooperative, req is non-overcommit
4018 			_wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
4019 			wq->wq_constrained_threads_scheduled++;
4020 		} else {
4021 			// Case 5: thread is cooperative, req is also cooperative
4022 			assert(workq_tr_is_cooperative(tr_flags));
4023 			_wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
4024 			_wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
4025 		}
4026 		cooperative_pool_sched_count_changed = true;
4027 	} else {
4028 		if (workq_tr_is_overcommit(tr_flags)) {
4029 			// Case 6: Thread is non-overcommit, req is overcommit
4030 			wq->wq_constrained_threads_scheduled--;
4031 		} else if (workq_tr_is_cooperative(tr_flags)) {
4032 			// Case 7: Thread is non-overcommit, req is cooperative
4033 			wq->wq_constrained_threads_scheduled--;
4034 			_wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
4035 			cooperative_pool_sched_count_changed = true;
4036 		}
4037 	}
4038 
4039 	return cooperative_pool_sched_count_changed;
4040 }
4041 
4042 static workq_threadreq_t
workq_threadreq_select(struct workqueue * wq,struct uthread * uth)4043 workq_threadreq_select(struct workqueue *wq, struct uthread *uth)
4044 {
4045 	workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
4046 	uintptr_t proprietor;
4047 	thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
4048 	uint8_t pri = 0;
4049 
4050 	if (uth == wq->wq_creator) {
4051 		uth = NULL;
4052 	}
4053 
4054 	/*
4055 	 * Compute the best priority request (special or turnstile)
4056 	 */
4057 
4058 	pri = (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile,
4059 	    &proprietor);
4060 	if (pri) {
4061 		struct kqworkloop *kqwl = (struct kqworkloop *)proprietor;
4062 		req_pri = &kqwl->kqwl_request;
4063 		if (req_pri->tr_state != WORKQ_TR_STATE_QUEUED) {
4064 			panic("Invalid thread request (%p) state %d",
4065 			    req_pri, req_pri->tr_state);
4066 		}
4067 	} else {
4068 		req_pri = NULL;
4069 	}
4070 
4071 	req_tmp = priority_queue_max(&wq->wq_special_queue,
4072 	    struct workq_threadreq_s, tr_entry);
4073 	if (req_tmp && pri < priority_queue_entry_sched_pri(&wq->wq_special_queue,
4074 	    &req_tmp->tr_entry)) {
4075 		req_pri = req_tmp;
4076 		pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
4077 		    &req_tmp->tr_entry);
4078 	}
4079 
4080 	/*
4081 	 * Handle the manager thread request. The special queue might yield
4082 	 * a higher priority, but the manager always beats the QoS world.
4083 	 */
4084 
4085 	req_mgr = wq->wq_event_manager_threadreq;
4086 	if (req_mgr && workq_may_start_event_mgr_thread(wq, uth)) {
4087 		uint32_t mgr_pri = wq->wq_event_manager_priority;
4088 
4089 		if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
4090 			mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
4091 		} else {
4092 			mgr_pri = thread_workq_pri_for_qos(
4093 				_pthread_priority_thread_qos(mgr_pri));
4094 		}
4095 
4096 		return mgr_pri >= pri ? req_mgr : req_pri;
4097 	}
4098 
4099 	/*
4100 	 * Compute the best QoS Request, and check whether it beats the "pri" one
4101 	 */
4102 
4103 	req_qos = priority_queue_max(&wq->wq_overcommit_queue,
4104 	    struct workq_threadreq_s, tr_entry);
4105 	if (req_qos) {
4106 		qos = req_qos->tr_qos;
4107 	}
4108 
4109 	req_tmp = workq_cooperative_queue_best_req(wq, uth);
4110 	if (req_tmp && qos <= req_tmp->tr_qos) {
4111 		/*
4112 		 * Cooperative TR is better between overcommit and cooperative.  Note
4113 		 * that if qos is same between overcommit and cooperative, we choose
4114 		 * cooperative.
4115 		 *
4116 		 * Pick cooperative pool if it passes the admissions check
4117 		 */
4118 		if (workq_cooperative_allowance(wq, req_tmp->tr_qos, uth, true)) {
4119 			req_qos = req_tmp;
4120 			qos = req_qos->tr_qos;
4121 		}
4122 	}
4123 
4124 	/*
4125 	 * Compare the best QoS so far - either from overcommit or from cooperative
4126 	 * pool - and compare it with the constrained pool
4127 	 */
4128 	req_tmp = priority_queue_max(&wq->wq_constrained_queue,
4129 	    struct workq_threadreq_s, tr_entry);
4130 
4131 	if (req_tmp && qos < req_tmp->tr_qos) {
4132 		/*
4133 		 * Constrained pool is best in QoS between overcommit, cooperative
4134 		 * and constrained. Now check how it fairs against the priority case
4135 		 */
4136 		if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
4137 			return req_pri;
4138 		}
4139 
4140 		if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true)) {
4141 			/*
4142 			 * If the constrained thread request is the best one and passes
4143 			 * the admission check, pick it.
4144 			 */
4145 			return req_tmp;
4146 		}
4147 	}
4148 
4149 	if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
4150 		return req_pri;
4151 	}
4152 
4153 	return req_qos;
4154 }
4155 
4156 /*
4157  * The creator is an anonymous thread that is counted as scheduled,
4158  * but otherwise without its scheduler callback set or tracked as active
4159  * that is used to make other threads.
4160  *
4161  * When more requests are added or an existing one is hurried along,
4162  * a creator is elected and setup, or the existing one overridden accordingly.
4163  *
4164  * While this creator is in flight, because no request has been dequeued,
4165  * already running threads have a chance at stealing thread requests avoiding
4166  * useless context switches, and the creator once scheduled may not find any
4167  * work to do and will then just park again.
4168  *
4169  * The creator serves the dual purpose of informing the scheduler of work that
4170  * hasn't be materialized as threads yet, and also as a natural pacing mechanism
4171  * for thread creation.
4172  *
4173  * By being anonymous (and not bound to anything) it means that thread requests
4174  * can be stolen from this creator by threads already on core yielding more
4175  * efficient scheduling and reduced context switches.
4176  */
4177 static void
workq_schedule_creator(proc_t p,struct workqueue * wq,workq_kern_threadreq_flags_t flags)4178 workq_schedule_creator(proc_t p, struct workqueue *wq,
4179     workq_kern_threadreq_flags_t flags)
4180 {
4181 	workq_threadreq_t req;
4182 	struct uthread *uth;
4183 	bool needs_wakeup;
4184 
4185 	workq_lock_held(wq);
4186 	assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0);
4187 
4188 again:
4189 	uth = wq->wq_creator;
4190 
4191 	if (!wq->wq_reqcount) {
4192 		/*
4193 		 * There is no thread request left.
4194 		 *
4195 		 * If there is a creator, leave everything in place, so that it cleans
4196 		 * up itself in workq_push_idle_thread().
4197 		 *
4198 		 * Else, make sure the turnstile state is reset to no inheritor.
4199 		 */
4200 		if (uth == NULL) {
4201 			workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4202 		}
4203 		return;
4204 	}
4205 
4206 	req = workq_threadreq_select_for_creator(wq);
4207 	if (req == NULL) {
4208 		/*
4209 		 * There isn't a thread request that passes the admission check.
4210 		 *
4211 		 * If there is a creator, do not touch anything, the creator will sort
4212 		 * it out when it runs.
4213 		 *
4214 		 * Else, set the inheritor to "WORKQ" so that the turnstile propagation
4215 		 * code calls us if anything changes.
4216 		 */
4217 		if (uth == NULL) {
4218 			workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
4219 		}
4220 		return;
4221 	}
4222 
4223 
4224 	if (uth) {
4225 		/*
4226 		 * We need to maybe override the creator we already have
4227 		 */
4228 		if (workq_thread_needs_priority_change(req, uth)) {
4229 			WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4230 			    wq, 1, uthread_tid(uth), req->tr_qos);
4231 			workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4232 		}
4233 		assert(wq->wq_inheritor == get_machthread(uth));
4234 	} else if (wq->wq_thidlecount) {
4235 		/*
4236 		 * We need to unpark a creator thread
4237 		 */
4238 		wq->wq_creator = uth = workq_pop_idle_thread(wq, UT_WORKQ_OVERCOMMIT,
4239 		    &needs_wakeup);
4240 		/* Always reset the priorities on the newly chosen creator */
4241 		workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4242 		workq_turnstile_update_inheritor(wq, get_machthread(uth),
4243 		    TURNSTILE_INHERITOR_THREAD);
4244 		WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4245 		    wq, 2, uthread_tid(uth), req->tr_qos);
4246 		uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4247 		uth->uu_save.uus_workq_park_data.yields = 0;
4248 		if (needs_wakeup) {
4249 			workq_thread_wakeup(uth);
4250 		}
4251 	} else {
4252 		/*
4253 		 * We need to allocate a thread...
4254 		 */
4255 		if (__improbable(wq->wq_nthreads >= wq_max_threads)) {
4256 			/* out of threads, just go away */
4257 			flags = WORKQ_THREADREQ_NONE;
4258 		} else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) {
4259 			act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ);
4260 		} else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) {
4261 			/* This can drop the workqueue lock, and take it again */
4262 			workq_schedule_immediate_thread_creation(wq);
4263 		} else if (workq_add_new_idle_thread(p, wq)) {
4264 			goto again;
4265 		} else {
4266 			workq_schedule_delayed_thread_creation(wq, 0);
4267 		}
4268 
4269 		/*
4270 		 * If the current thread is the inheritor:
4271 		 *
4272 		 * If we set the AST, then the thread will stay the inheritor until
4273 		 * either the AST calls workq_kern_threadreq_redrive(), or it parks
4274 		 * and calls workq_push_idle_thread().
4275 		 *
4276 		 * Else, the responsibility of the thread creation is with a thread-call
4277 		 * and we need to clear the inheritor.
4278 		 */
4279 		if ((flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) == 0 &&
4280 		    wq->wq_inheritor == current_thread()) {
4281 			workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4282 		}
4283 	}
4284 }
4285 
4286 /**
4287  * Same as workq_unpark_select_threadreq_or_park_and_unlock,
4288  * but do not allow early binds.
4289  *
4290  * Called with the base pri frozen, will unfreeze it.
4291  */
4292 __attribute__((noreturn, noinline))
4293 static void
workq_select_threadreq_or_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)4294 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4295     struct uthread *uth, uint32_t setup_flags)
4296 {
4297 	workq_threadreq_t req = NULL;
4298 	bool is_creator = (wq->wq_creator == uth);
4299 	bool schedule_creator = false;
4300 
4301 	if (__improbable(_wq_exiting(wq))) {
4302 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0);
4303 		goto park;
4304 	}
4305 
4306 	if (wq->wq_reqcount == 0) {
4307 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0);
4308 		goto park;
4309 	}
4310 
4311 	req = workq_threadreq_select(wq, uth);
4312 	if (__improbable(req == NULL)) {
4313 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0);
4314 		goto park;
4315 	}
4316 
4317 	struct uu_workq_policy old_pri = uth->uu_workq_pri;
4318 	uint8_t tr_flags = req->tr_flags;
4319 	struct turnstile *req_ts = kqueue_threadreq_get_turnstile(req);
4320 
4321 	/*
4322 	 * Attempt to setup ourselves as the new thing to run, moving all priority
4323 	 * pushes to ourselves.
4324 	 *
4325 	 * If the current thread is the creator, then the fact that we are presently
4326 	 * running is proof that we'll do something useful, so keep going.
4327 	 *
4328 	 * For other cases, peek at the AST to know whether the scheduler wants
4329 	 * to preempt us, if yes, park instead, and move the thread request
4330 	 * turnstile back to the workqueue.
4331 	 */
4332 	if (req_ts) {
4333 		workq_perform_turnstile_operation_locked(wq, ^{
4334 			turnstile_update_inheritor(req_ts, get_machthread(uth),
4335 			TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_THREAD);
4336 			turnstile_update_inheritor_complete(req_ts,
4337 			TURNSTILE_INTERLOCK_HELD);
4338 		});
4339 	}
4340 
4341 	/* accounting changes of aggregate thscheduled_count and thactive which has
4342 	 * to be paired with the workq_thread_reset_pri below so that we have
4343 	 * uth->uu_workq_pri match with thactive.
4344 	 *
4345 	 * This is undone when the thread parks */
4346 	if (is_creator) {
4347 		WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0,
4348 		    uth->uu_save.uus_workq_park_data.yields);
4349 		wq->wq_creator = NULL;
4350 		_wq_thactive_inc(wq, req->tr_qos);
4351 		wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++;
4352 	} else if (old_pri.qos_bucket != req->tr_qos) {
4353 		_wq_thactive_move(wq, old_pri.qos_bucket, req->tr_qos);
4354 	}
4355 	workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4356 
4357 	/*
4358 	 * Make relevant accounting changes for pool specific counts.
4359 	 *
4360 	 * The schedule counts changing can affect what the next best request
4361 	 * for cooperative thread pool is if this request is dequeued.
4362 	 */
4363 	bool cooperative_sched_count_changed =
4364 	    workq_adjust_cooperative_constrained_schedule_counts(wq, uth,
4365 	    old_pri.qos_req, tr_flags);
4366 
4367 	if (workq_tr_is_overcommit(tr_flags)) {
4368 		workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
4369 	} else if (workq_tr_is_cooperative(tr_flags)) {
4370 		workq_thread_set_type(uth, UT_WORKQ_COOPERATIVE);
4371 	} else {
4372 		workq_thread_set_type(uth, 0);
4373 	}
4374 
4375 	if (__improbable(thread_unfreeze_base_pri(get_machthread(uth)) && !is_creator)) {
4376 		if (req_ts) {
4377 			workq_perform_turnstile_operation_locked(wq, ^{
4378 				turnstile_update_inheritor(req_ts, wq->wq_turnstile,
4379 				TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
4380 				turnstile_update_inheritor_complete(req_ts,
4381 				TURNSTILE_INTERLOCK_HELD);
4382 			});
4383 		}
4384 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 3, 0, 0);
4385 		goto park_thawed;
4386 	}
4387 
4388 	/*
4389 	 * We passed all checks, dequeue the request, bind to it, and set it up
4390 	 * to return to user.
4391 	 */
4392 	WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4393 	    workq_trace_req_id(req), tr_flags, 0);
4394 	wq->wq_fulfilled++;
4395 	schedule_creator = workq_threadreq_dequeue(wq, req,
4396 	    cooperative_sched_count_changed);
4397 
4398 	workq_thread_reset_cpupercent(req, uth);
4399 
4400 	if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4401 		kqueue_threadreq_bind_prepost(p, req, uth);
4402 		req = NULL;
4403 	} else if (req->tr_count > 0) {
4404 		req = NULL;
4405 	}
4406 
4407 	if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4408 		uth->uu_workq_flags ^= UT_WORKQ_NEW;
4409 		setup_flags |= WQ_SETUP_FIRST_USE;
4410 	}
4411 
4412 	/* If one of the following is true, call workq_schedule_creator (which also
4413 	 * adjusts priority of existing creator):
4414 	 *
4415 	 *	  - We are the creator currently so the wq may need a new creator
4416 	 *	  - The request we're binding to is the highest priority one, existing
4417 	 *	  creator's priority might need to be adjusted to reflect the next
4418 	 *	  highest TR
4419 	 */
4420 	if (is_creator || schedule_creator) {
4421 		/* This can drop the workqueue lock, and take it again */
4422 		workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
4423 	}
4424 
4425 	workq_unlock(wq);
4426 
4427 	if (req) {
4428 		zfree(workq_zone_threadreq, req);
4429 	}
4430 
4431 	/*
4432 	 * Run Thread, Run!
4433 	 */
4434 	uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI;
4435 	if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
4436 		upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
4437 	} else if (workq_tr_is_overcommit(tr_flags)) {
4438 		upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
4439 	} else if (workq_tr_is_cooperative(tr_flags)) {
4440 		upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
4441 	}
4442 	if (tr_flags & WORKQ_TR_FLAG_KEVENT) {
4443 		upcall_flags |= WQ_FLAG_THREAD_KEVENT;
4444 		assert((upcall_flags & WQ_FLAG_THREAD_COOPERATIVE) == 0);
4445 	}
4446 
4447 	if (tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
4448 		upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
4449 	}
4450 	uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
4451 
4452 	if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4453 		kqueue_threadreq_bind_commit(p, get_machthread(uth));
4454 	} else {
4455 #if CONFIG_PREADOPT_TG
4456 		/*
4457 		 * The thread may have a preadopt thread group on it already because it
4458 		 * got tagged with it as a creator thread. So we need to make sure to
4459 		 * clear that since we don't have preadoption for anonymous thread
4460 		 * requests
4461 		 */
4462 		thread_set_preadopt_thread_group(get_machthread(uth), NULL);
4463 #endif
4464 	}
4465 
4466 	workq_setup_and_run(p, uth, setup_flags);
4467 	__builtin_unreachable();
4468 
4469 park:
4470 	thread_unfreeze_base_pri(get_machthread(uth));
4471 park_thawed:
4472 	workq_park_and_unlock(p, wq, uth, setup_flags);
4473 }
4474 
4475 /**
4476  * Runs a thread request on a thread
4477  *
4478  * - if thread is THREAD_NULL, will find a thread and run the request there.
4479  *   Otherwise, the thread must be the current thread.
4480  *
4481  * - if req is NULL, will find the highest priority request and run that.  If
4482  *   it is not NULL, it must be a threadreq object in state NEW.  If it can not
4483  *   be run immediately, it will be enqueued and moved to state QUEUED.
4484  *
4485  *   Either way, the thread request object serviced will be moved to state
4486  *   BINDING and attached to the uthread.
4487  *
4488  * Should be called with the workqueue lock held.  Will drop it.
4489  * Should be called with the base pri not frozen.
4490  */
4491 __attribute__((noreturn, noinline))
4492 static void
workq_unpark_select_threadreq_or_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)4493 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4494     struct uthread *uth, uint32_t setup_flags)
4495 {
4496 	if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) {
4497 		if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4498 			setup_flags |= WQ_SETUP_FIRST_USE;
4499 		}
4500 		uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND);
4501 		/*
4502 		 * This pointer is possibly freed and only used for tracing purposes.
4503 		 */
4504 		workq_threadreq_t req = uth->uu_save.uus_workq_park_data.thread_request;
4505 		workq_unlock(wq);
4506 		WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4507 		    VM_KERNEL_ADDRHIDE(req), 0, 0);
4508 		(void)req;
4509 
4510 		workq_setup_and_run(p, uth, setup_flags);
4511 		__builtin_unreachable();
4512 	}
4513 
4514 	thread_freeze_base_pri(get_machthread(uth));
4515 	workq_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
4516 }
4517 
4518 static bool
workq_creator_should_yield(struct workqueue * wq,struct uthread * uth)4519 workq_creator_should_yield(struct workqueue *wq, struct uthread *uth)
4520 {
4521 	thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
4522 
4523 	if (qos >= THREAD_QOS_USER_INTERACTIVE) {
4524 		return false;
4525 	}
4526 
4527 	uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot;
4528 	if (wq->wq_fulfilled == snapshot) {
4529 		return false;
4530 	}
4531 
4532 	uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)];
4533 	if (wq->wq_fulfilled - snapshot > conc) {
4534 		/* we fulfilled more than NCPU requests since being dispatched */
4535 		WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1,
4536 		    wq->wq_fulfilled, snapshot);
4537 		return true;
4538 	}
4539 
4540 	for (uint8_t i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) {
4541 		cnt += wq->wq_thscheduled_count[i];
4542 	}
4543 	if (conc <= cnt) {
4544 		/* We fulfilled requests and have more than NCPU scheduled threads */
4545 		WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2,
4546 		    wq->wq_fulfilled, snapshot);
4547 		return true;
4548 	}
4549 
4550 	return false;
4551 }
4552 
4553 /**
4554  * parked thread wakes up
4555  */
4556 __attribute__((noreturn, noinline))
4557 static void
workq_unpark_continue(void * parameter __unused,wait_result_t wr __unused)4558 workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused)
4559 {
4560 	thread_t th = current_thread();
4561 	struct uthread *uth = get_bsdthread_info(th);
4562 	proc_t p = current_proc();
4563 	struct workqueue *wq = proc_get_wqptr_fast(p);
4564 
4565 	workq_lock_spin(wq);
4566 
4567 	if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) {
4568 		/*
4569 		 * If the number of threads we have out are able to keep up with the
4570 		 * demand, then we should avoid sending this creator thread to
4571 		 * userspace.
4572 		 */
4573 		uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4574 		uth->uu_save.uus_workq_park_data.yields++;
4575 		workq_unlock(wq);
4576 		thread_yield_with_continuation(workq_unpark_continue, NULL);
4577 		__builtin_unreachable();
4578 	}
4579 
4580 	if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
4581 		workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, WQ_SETUP_NONE);
4582 		__builtin_unreachable();
4583 	}
4584 
4585 	if (__probable(wr == THREAD_AWAKENED)) {
4586 		/*
4587 		 * We were set running, but for the purposes of dying.
4588 		 */
4589 		assert(uth->uu_workq_flags & UT_WORKQ_DYING);
4590 		assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
4591 	} else {
4592 		/*
4593 		 * workaround for <rdar://problem/38647347>,
4594 		 * in case we do hit userspace, make sure calling
4595 		 * workq_thread_terminate() does the right thing here,
4596 		 * and if we never call it, that workq_exit() will too because it sees
4597 		 * this thread on the runlist.
4598 		 */
4599 		assert(wr == THREAD_INTERRUPTED);
4600 		wq->wq_thdying_count++;
4601 		uth->uu_workq_flags |= UT_WORKQ_DYING;
4602 	}
4603 
4604 	workq_unpark_for_death_and_unlock(p, wq, uth,
4605 	    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE);
4606 	__builtin_unreachable();
4607 }
4608 
4609 __attribute__((noreturn, noinline))
4610 static void
workq_setup_and_run(proc_t p,struct uthread * uth,int setup_flags)4611 workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags)
4612 {
4613 	thread_t th = get_machthread(uth);
4614 	vm_map_t vmap = get_task_map(proc_task(p));
4615 
4616 	if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
4617 		/*
4618 		 * For preemption reasons, we want to reset the voucher as late as
4619 		 * possible, so we do it in two places:
4620 		 *   - Just before parking (i.e. in workq_park_and_unlock())
4621 		 *   - Prior to doing the setup for the next workitem (i.e. here)
4622 		 *
4623 		 * Those two places are sufficient to ensure we always reset it before
4624 		 * it goes back out to user space, but be careful to not break that
4625 		 * guarantee.
4626 		 *
4627 		 * Note that setting the voucher to NULL will not clear the preadoption
4628 		 * thread group on this thread
4629 		 */
4630 		__assert_only kern_return_t kr;
4631 		kr = thread_set_voucher_name(MACH_PORT_NULL);
4632 		assert(kr == KERN_SUCCESS);
4633 	}
4634 
4635 	uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags;
4636 	if (!(setup_flags & WQ_SETUP_FIRST_USE)) {
4637 		upcall_flags |= WQ_FLAG_THREAD_REUSE;
4638 	}
4639 
4640 	if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
4641 		/*
4642 		 * For threads that have an outside-of-QoS thread priority, indicate
4643 		 * to userspace that setting QoS should only affect the TSD and not
4644 		 * change QOS in the kernel.
4645 		 */
4646 		upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
4647 	} else {
4648 		/*
4649 		 * Put the QoS class value into the lower bits of the reuse_thread
4650 		 * register, this is where the thread priority used to be stored
4651 		 * anyway.
4652 		 */
4653 		upcall_flags |= uth->uu_save.uus_workq_park_data.qos |
4654 		    WQ_FLAG_THREAD_PRIO_QOS;
4655 	}
4656 
4657 	if (uth->uu_workq_thport == MACH_PORT_NULL) {
4658 		/* convert_thread_to_port_pinned() consumes a reference */
4659 		thread_reference(th);
4660 		/* Convert to immovable/pinned thread port, but port is not pinned yet */
4661 		ipc_port_t port = convert_thread_to_port_pinned(th);
4662 		/* Atomically, pin and copy out the port */
4663 		uth->uu_workq_thport = ipc_port_copyout_send_pinned(port, get_task_ipcspace(proc_task(p)));
4664 	}
4665 
4666 	/* Thread has been set up to run, arm its next workqueue quantum or disarm
4667 	 * if it is no longer supporting that */
4668 	if (thread_supports_cooperative_workqueue(th)) {
4669 		thread_arm_workqueue_quantum(th);
4670 	} else {
4671 		thread_disarm_workqueue_quantum(th);
4672 	}
4673 
4674 	/*
4675 	 * Call out to pthread, this sets up the thread, pulls in kevent structs
4676 	 * onto the stack, sets up the thread state and then returns to userspace.
4677 	 */
4678 	WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START,
4679 	    proc_get_wqptr_fast(p), 0, 0, 0);
4680 
4681 	if (workq_thread_is_cooperative(uth)) {
4682 		thread_sched_call(th, NULL);
4683 	} else {
4684 		thread_sched_call(th, workq_sched_callback);
4685 	}
4686 
4687 	pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
4688 	    uth->uu_workq_thport, 0, setup_flags, upcall_flags);
4689 
4690 	__builtin_unreachable();
4691 }
4692 
4693 #pragma mark misc
4694 
4695 int
fill_procworkqueue(proc_t p,struct proc_workqueueinfo * pwqinfo)4696 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
4697 {
4698 	struct workqueue *wq = proc_get_wqptr(p);
4699 	int error = 0;
4700 	int     activecount;
4701 
4702 	if (wq == NULL) {
4703 		return EINVAL;
4704 	}
4705 
4706 	/*
4707 	 * This is sometimes called from interrupt context by the kperf sampler.
4708 	 * In that case, it's not safe to spin trying to take the lock since we
4709 	 * might already hold it.  So, we just try-lock it and error out if it's
4710 	 * already held.  Since this is just a debugging aid, and all our callers
4711 	 * are able to handle an error, that's fine.
4712 	 */
4713 	bool locked = workq_lock_try(wq);
4714 	if (!locked) {
4715 		return EBUSY;
4716 	}
4717 
4718 	wq_thactive_t act = _wq_thactive(wq);
4719 	activecount = _wq_thactive_aggregate_downto_qos(wq, act,
4720 	    WORKQ_THREAD_QOS_MIN, NULL, NULL);
4721 	if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) {
4722 		activecount++;
4723 	}
4724 	pwqinfo->pwq_nthreads = wq->wq_nthreads;
4725 	pwqinfo->pwq_runthreads = activecount;
4726 	pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
4727 	pwqinfo->pwq_state = 0;
4728 
4729 	if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
4730 		pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
4731 	}
4732 
4733 	if (wq->wq_nthreads >= wq_max_threads) {
4734 		pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
4735 	}
4736 
4737 	workq_unlock(wq);
4738 	return error;
4739 }
4740 
4741 boolean_t
workqueue_get_pwq_exceeded(void * v,boolean_t * exceeded_total,boolean_t * exceeded_constrained)4742 workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total,
4743     boolean_t *exceeded_constrained)
4744 {
4745 	proc_t p = v;
4746 	struct proc_workqueueinfo pwqinfo;
4747 	int err;
4748 
4749 	assert(p != NULL);
4750 	assert(exceeded_total != NULL);
4751 	assert(exceeded_constrained != NULL);
4752 
4753 	err = fill_procworkqueue(p, &pwqinfo);
4754 	if (err) {
4755 		return FALSE;
4756 	}
4757 	if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) {
4758 		return FALSE;
4759 	}
4760 
4761 	*exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT);
4762 	*exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT);
4763 
4764 	return TRUE;
4765 }
4766 
4767 uint32_t
workqueue_get_pwq_state_kdp(void * v)4768 workqueue_get_pwq_state_kdp(void * v)
4769 {
4770 	static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) ==
4771 	    kTaskWqExceededConstrainedThreadLimit);
4772 	static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) ==
4773 	    kTaskWqExceededTotalThreadLimit);
4774 	static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable);
4775 	static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT |
4776 	    WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT) == 0x7);
4777 
4778 	if (v == NULL) {
4779 		return 0;
4780 	}
4781 
4782 	proc_t p = v;
4783 	struct workqueue *wq = proc_get_wqptr(p);
4784 
4785 	if (wq == NULL || workq_lock_is_acquired_kdp(wq)) {
4786 		return 0;
4787 	}
4788 
4789 	uint32_t pwq_state = WQ_FLAGS_AVAILABLE;
4790 
4791 	if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
4792 		pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
4793 	}
4794 
4795 	if (wq->wq_nthreads >= wq_max_threads) {
4796 		pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
4797 	}
4798 
4799 	return pwq_state;
4800 }
4801 
4802 void
workq_init(void)4803 workq_init(void)
4804 {
4805 	clock_interval_to_absolutetime_interval(wq_stalled_window.usecs,
4806 	    NSEC_PER_USEC, &wq_stalled_window.abstime);
4807 	clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs,
4808 	    NSEC_PER_USEC, &wq_reduce_pool_window.abstime);
4809 	clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs,
4810 	    NSEC_PER_USEC, &wq_max_timer_interval.abstime);
4811 
4812 	thread_deallocate_daemon_register_queue(&workq_deallocate_queue,
4813 	    workq_deallocate_queue_invoke);
4814 }
4815