xref: /xnu-8019.80.24/bsd/pthread/pthread_workqueue.c (revision a325d9c4a84054e40bbe985afedcb50ab80993ea)
1 /*
2  * Copyright (c) 2000-2020 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 /* Copyright (c) 1995-2018 Apple, Inc. All Rights Reserved */
29 
30 #include <sys/cdefs.h>
31 
32 #include <kern/assert.h>
33 #include <kern/ast.h>
34 #include <kern/clock.h>
35 #include <kern/cpu_data.h>
36 #include <kern/kern_types.h>
37 #include <kern/policy_internal.h>
38 #include <kern/processor.h>
39 #include <kern/sched_prim.h>    /* for thread_exception_return */
40 #include <kern/task.h>
41 #include <kern/thread.h>
42 #include <kern/thread_group.h>
43 #include <kern/zalloc.h>
44 #include <mach/kern_return.h>
45 #include <mach/mach_param.h>
46 #include <mach/mach_port.h>
47 #include <mach/mach_types.h>
48 #include <mach/mach_vm.h>
49 #include <mach/sync_policy.h>
50 #include <mach/task.h>
51 #include <mach/thread_act.h> /* for thread_resume */
52 #include <mach/thread_policy.h>
53 #include <mach/thread_status.h>
54 #include <mach/vm_prot.h>
55 #include <mach/vm_statistics.h>
56 #include <machine/atomic.h>
57 #include <machine/machine_routines.h>
58 #include <machine/smp.h>
59 #include <vm/vm_map.h>
60 #include <vm/vm_protos.h>
61 
62 #include <sys/eventvar.h>
63 #include <sys/kdebug.h>
64 #include <sys/kernel.h>
65 #include <sys/lock.h>
66 #include <sys/param.h>
67 #include <sys/proc_info.h>      /* for fill_procworkqueue */
68 #include <sys/proc_internal.h>
69 #include <sys/pthread_shims.h>
70 #include <sys/resourcevar.h>
71 #include <sys/signalvar.h>
72 #include <sys/sysctl.h>
73 #include <sys/sysproto.h>
74 #include <sys/systm.h>
75 #include <sys/ulock.h> /* for ulock_owner_value_to_port_name */
76 
77 #include <pthread/bsdthread_private.h>
78 #include <pthread/workqueue_syscalls.h>
79 #include <pthread/workqueue_internal.h>
80 #include <pthread/workqueue_trace.h>
81 
82 #include <os/log.h>
83 
84 static void workq_unpark_continue(void *uth, wait_result_t wr) __dead2;
85 static void workq_schedule_creator(proc_t p, struct workqueue *wq,
86     workq_kern_threadreq_flags_t flags);
87 
88 static bool workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
89     workq_threadreq_t req);
90 
91 static uint32_t workq_constrained_allowance(struct workqueue *wq,
92     thread_qos_t at_qos, struct uthread *uth, bool may_start_timer);
93 
94 static bool _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq);
95 
96 static bool workq_thread_is_busy(uint64_t cur_ts,
97     _Atomic uint64_t *lastblocked_tsp);
98 
99 static int workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
100 
101 static bool
102 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags);
103 
104 static inline void
105 workq_lock_spin(struct workqueue *wq);
106 
107 static inline void
108 workq_unlock(struct workqueue *wq);
109 
110 #pragma mark globals
111 
112 struct workq_usec_var {
113 	uint32_t usecs;
114 	uint64_t abstime;
115 };
116 
117 #define WORKQ_SYSCTL_USECS(var, init) \
118 	        static struct workq_usec_var var = { .usecs = init }; \
119 	        SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
120 	                        CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &var, 0, \
121 	                        workq_sysctl_handle_usecs, "I", "")
122 
123 static LCK_GRP_DECLARE(workq_lck_grp, "workq");
124 os_refgrp_decl(static, workq_refgrp, "workq", NULL);
125 
126 static ZONE_DECLARE(workq_zone_workqueue, "workq.wq",
127     sizeof(struct workqueue), ZC_NONE);
128 static ZONE_DECLARE(workq_zone_threadreq, "workq.threadreq",
129     sizeof(struct workq_threadreq_s), ZC_CACHING);
130 
131 static struct mpsc_daemon_queue workq_deallocate_queue;
132 
133 WORKQ_SYSCTL_USECS(wq_stalled_window, WQ_STALLED_WINDOW_USECS);
134 WORKQ_SYSCTL_USECS(wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
135 WORKQ_SYSCTL_USECS(wq_max_timer_interval, WQ_MAX_TIMER_INTERVAL_USECS);
136 static uint32_t wq_max_threads              = WORKQUEUE_MAXTHREADS;
137 static uint32_t wq_max_constrained_threads  = WORKQUEUE_MAXTHREADS / 8;
138 static uint32_t wq_init_constrained_limit   = 1;
139 static uint16_t wq_death_max_load;
140 static uint32_t wq_max_parallelism[WORKQ_NUM_QOS_BUCKETS];
141 
142 /*
143  * This is not a hard limit but the max size we want to aim to hit across the
144  * entire cooperative pool. We can oversubscribe the pool due to non-cooperative
145  * workers and the max we will oversubscribe the pool by, is a total of
146  * wq_max_cooperative_threads * WORKQ_NUM_QOS_BUCKETS.
147  */
148 static uint32_t wq_max_cooperative_threads;
149 
150 static inline uint32_t
wq_cooperative_queue_max_size(struct workqueue * wq)151 wq_cooperative_queue_max_size(struct workqueue *wq)
152 {
153 	return wq->wq_cooperative_queue_has_limited_max_size ? 1 : wq_max_cooperative_threads;
154 }
155 
156 #pragma mark sysctls
157 
158 static int
159 workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
160 {
161 #pragma unused(arg2)
162 	struct workq_usec_var *v = arg1;
163 	int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
164 	if (error || !req->newptr) {
165 		return error;
166 	}
167 	clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
168 	    &v->abstime);
169 	return 0;
170 }
171 
172 SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
173     &wq_max_threads, 0, "");
174 
175 SYSCTL_INT(_kern, OID_AUTO, wq_max_constrained_threads, CTLFLAG_RW | CTLFLAG_LOCKED,
176     &wq_max_constrained_threads, 0, "");
177 
178 static int
179 wq_limit_cooperative_threads_for_proc SYSCTL_HANDLER_ARGS
180 {
181 #pragma unused(arg1, arg2, oidp)
182 	int input_pool_size = 0;
183 	int changed;
184 	int error = 0;
185 
186 	error = sysctl_io_number(req, 0, sizeof(int), &input_pool_size, &changed);
187 	if (error || !changed) {
188 		return error;
189 	}
190 
191 #define WQ_COOPERATIVE_POOL_SIZE_DEFAULT 0
192 #define WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS -1
193 /* Not available currently, but sysctl interface is designed to allow these
194  * extra parameters:
195  *		WQ_COOPERATIVE_POOL_SIZE_STRICT : -2 (across all bucket)
196  *		WQ_COOPERATIVE_POOL_SIZE_CUSTOM : [1, 512]
197  */
198 
199 	if (input_pool_size != WQ_COOPERATIVE_POOL_SIZE_DEFAULT
200 	    && input_pool_size != WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS) {
201 		error = EINVAL;
202 		goto out;
203 	}
204 
205 	proc_t p = req->p;
206 	struct workqueue *wq = proc_get_wqptr(p);
207 
208 	if (wq != NULL) {
209 		workq_lock_spin(wq);
210 		if (wq->wq_reqcount > 0 || wq->wq_nthreads > 0) {
211 			// Hackily enforce that the workqueue is still new (no requests or
212 			// threads)
213 			error = ENOTSUP;
214 		} else {
215 			wq->wq_cooperative_queue_has_limited_max_size = (input_pool_size == WQ_COOPERATIVE_POOL_SIZE_STRICT_PER_QOS);
216 		}
217 		workq_unlock(wq);
218 	} else {
219 		/* This process has no workqueue, calling this syctl makes no sense */
220 		return ENOTSUP;
221 	}
222 
223 out:
224 	return error;
225 }
226 
227 SYSCTL_PROC(_kern, OID_AUTO, wq_limit_cooperative_threads,
228     CTLFLAG_ANYBODY | CTLFLAG_MASKED | CTLFLAG_WR | CTLFLAG_LOCKED | CTLTYPE_INT, 0, 0,
229     wq_limit_cooperative_threads_for_proc,
230     "I", "Modify the max pool size of the cooperative pool");
231 
232 #pragma mark p_wqptr
233 
234 #define WQPTR_IS_INITING_VALUE ((struct workqueue *)~(uintptr_t)0)
235 
236 static struct workqueue *
proc_get_wqptr_fast(struct proc * p)237 proc_get_wqptr_fast(struct proc *p)
238 {
239 	return os_atomic_load(&p->p_wqptr, relaxed);
240 }
241 
242 struct workqueue *
proc_get_wqptr(struct proc * p)243 proc_get_wqptr(struct proc *p)
244 {
245 	struct workqueue *wq = proc_get_wqptr_fast(p);
246 	return wq == WQPTR_IS_INITING_VALUE ? NULL : wq;
247 }
248 
249 static void
proc_set_wqptr(struct proc * p,struct workqueue * wq)250 proc_set_wqptr(struct proc *p, struct workqueue *wq)
251 {
252 	wq = os_atomic_xchg(&p->p_wqptr, wq, release);
253 	if (wq == WQPTR_IS_INITING_VALUE) {
254 		proc_lock(p);
255 		thread_wakeup(&p->p_wqptr);
256 		proc_unlock(p);
257 	}
258 }
259 
260 static bool
proc_init_wqptr_or_wait(struct proc * p)261 proc_init_wqptr_or_wait(struct proc *p)
262 {
263 	struct workqueue *wq;
264 
265 	proc_lock(p);
266 	wq = os_atomic_load(&p->p_wqptr, relaxed);
267 
268 	if (wq == NULL) {
269 		os_atomic_store(&p->p_wqptr, WQPTR_IS_INITING_VALUE, relaxed);
270 		proc_unlock(p);
271 		return true;
272 	}
273 
274 	if (wq == WQPTR_IS_INITING_VALUE) {
275 		assert_wait(&p->p_wqptr, THREAD_UNINT);
276 		proc_unlock(p);
277 		thread_block(THREAD_CONTINUE_NULL);
278 	} else {
279 		proc_unlock(p);
280 	}
281 	return false;
282 }
283 
284 static inline event_t
workq_parked_wait_event(struct uthread * uth)285 workq_parked_wait_event(struct uthread *uth)
286 {
287 	return (event_t)&uth->uu_workq_stackaddr;
288 }
289 
290 static inline void
workq_thread_wakeup(struct uthread * uth)291 workq_thread_wakeup(struct uthread *uth)
292 {
293 	thread_wakeup_thread(workq_parked_wait_event(uth), get_machthread(uth));
294 }
295 
296 #pragma mark wq_thactive
297 
298 #if defined(__LP64__)
299 // Layout is:
300 //   127 - 115 : 13 bits of zeroes
301 //   114 - 112 : best QoS among all pending constrained requests
302 //   111 -   0 : MGR, AUI, UI, IN, DF, UT, BG+MT buckets every 16 bits
303 #define WQ_THACTIVE_BUCKET_WIDTH 16
304 #define WQ_THACTIVE_QOS_SHIFT    (7 * WQ_THACTIVE_BUCKET_WIDTH)
305 #else
306 // Layout is:
307 //   63 - 61 : best QoS among all pending constrained requests
308 //   60      : Manager bucket (0 or 1)
309 //   59 -  0 : AUI, UI, IN, DF, UT, BG+MT buckets every 10 bits
310 #define WQ_THACTIVE_BUCKET_WIDTH 10
311 #define WQ_THACTIVE_QOS_SHIFT    (6 * WQ_THACTIVE_BUCKET_WIDTH + 1)
312 #endif
313 #define WQ_THACTIVE_BUCKET_MASK  ((1U << WQ_THACTIVE_BUCKET_WIDTH) - 1)
314 #define WQ_THACTIVE_BUCKET_HALF  (1U << (WQ_THACTIVE_BUCKET_WIDTH - 1))
315 
316 static_assert(sizeof(wq_thactive_t) * CHAR_BIT - WQ_THACTIVE_QOS_SHIFT >= 3,
317     "Make sure we have space to encode a QoS");
318 
319 static inline wq_thactive_t
_wq_thactive(struct workqueue * wq)320 _wq_thactive(struct workqueue *wq)
321 {
322 	return os_atomic_load_wide(&wq->wq_thactive, relaxed);
323 }
324 
325 static inline int
_wq_bucket(thread_qos_t qos)326 _wq_bucket(thread_qos_t qos)
327 {
328 	// Map both BG and MT to the same bucket by over-shifting down and
329 	// clamping MT and BG together.
330 	switch (qos) {
331 	case THREAD_QOS_MAINTENANCE:
332 		return 0;
333 	default:
334 		return qos - 2;
335 	}
336 }
337 
338 #define WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(tha) \
339 	        ((thread_qos_t)((tha) >> WQ_THACTIVE_QOS_SHIFT))
340 
341 static inline thread_qos_t
_wq_thactive_best_constrained_req_qos(struct workqueue * wq)342 _wq_thactive_best_constrained_req_qos(struct workqueue *wq)
343 {
344 	// Avoid expensive atomic operations: the three bits we're loading are in
345 	// a single byte, and always updated under the workqueue lock
346 	wq_thactive_t v = *(wq_thactive_t *)&wq->wq_thactive;
347 	return WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(v);
348 }
349 
350 static void
_wq_thactive_refresh_best_constrained_req_qos(struct workqueue * wq)351 _wq_thactive_refresh_best_constrained_req_qos(struct workqueue *wq)
352 {
353 	thread_qos_t old_qos, new_qos;
354 	workq_threadreq_t req;
355 
356 	req = priority_queue_max(&wq->wq_constrained_queue,
357 	    struct workq_threadreq_s, tr_entry);
358 	new_qos = req ? req->tr_qos : THREAD_QOS_UNSPECIFIED;
359 	old_qos = _wq_thactive_best_constrained_req_qos(wq);
360 	if (old_qos != new_qos) {
361 		long delta = (long)new_qos - (long)old_qos;
362 		wq_thactive_t v = (wq_thactive_t)delta << WQ_THACTIVE_QOS_SHIFT;
363 		/*
364 		 * We can do an atomic add relative to the initial load because updates
365 		 * to this qos are always serialized under the workqueue lock.
366 		 */
367 		v = os_atomic_add(&wq->wq_thactive, v, relaxed);
368 #ifdef __LP64__
369 		WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, (uint64_t)v,
370 		    (uint64_t)(v >> 64), 0);
371 #else
372 		WQ_TRACE_WQ(TRACE_wq_thactive_update, wq, v, 0, 0);
373 #endif
374 	}
375 }
376 
377 static inline wq_thactive_t
_wq_thactive_offset_for_qos(thread_qos_t qos)378 _wq_thactive_offset_for_qos(thread_qos_t qos)
379 {
380 	return (wq_thactive_t)1 << (_wq_bucket(qos) * WQ_THACTIVE_BUCKET_WIDTH);
381 }
382 
383 static inline wq_thactive_t
_wq_thactive_inc(struct workqueue * wq,thread_qos_t qos)384 _wq_thactive_inc(struct workqueue *wq, thread_qos_t qos)
385 {
386 	wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
387 	return os_atomic_add_orig(&wq->wq_thactive, v, relaxed);
388 }
389 
390 static inline wq_thactive_t
_wq_thactive_dec(struct workqueue * wq,thread_qos_t qos)391 _wq_thactive_dec(struct workqueue *wq, thread_qos_t qos)
392 {
393 	wq_thactive_t v = _wq_thactive_offset_for_qos(qos);
394 	return os_atomic_sub_orig(&wq->wq_thactive, v, relaxed);
395 }
396 
397 static inline void
_wq_thactive_move(struct workqueue * wq,thread_qos_t old_qos,thread_qos_t new_qos)398 _wq_thactive_move(struct workqueue *wq,
399     thread_qos_t old_qos, thread_qos_t new_qos)
400 {
401 	wq_thactive_t v = _wq_thactive_offset_for_qos(new_qos) -
402 	    _wq_thactive_offset_for_qos(old_qos);
403 	os_atomic_add(&wq->wq_thactive, v, relaxed);
404 	wq->wq_thscheduled_count[_wq_bucket(old_qos)]--;
405 	wq->wq_thscheduled_count[_wq_bucket(new_qos)]++;
406 }
407 
408 static inline uint32_t
_wq_thactive_aggregate_downto_qos(struct workqueue * wq,wq_thactive_t v,thread_qos_t qos,uint32_t * busycount,uint32_t * max_busycount)409 _wq_thactive_aggregate_downto_qos(struct workqueue *wq, wq_thactive_t v,
410     thread_qos_t qos, uint32_t *busycount, uint32_t *max_busycount)
411 {
412 	uint32_t count = 0, active;
413 	uint64_t curtime;
414 
415 	assert(WORKQ_THREAD_QOS_MIN <= qos && qos <= WORKQ_THREAD_QOS_MAX);
416 
417 	if (busycount) {
418 		curtime = mach_absolute_time();
419 		*busycount = 0;
420 	}
421 	if (max_busycount) {
422 		*max_busycount = THREAD_QOS_LAST - qos;
423 	}
424 
425 	int i = _wq_bucket(qos);
426 	v >>= i * WQ_THACTIVE_BUCKET_WIDTH;
427 	for (; i < WORKQ_NUM_QOS_BUCKETS; i++, v >>= WQ_THACTIVE_BUCKET_WIDTH) {
428 		active = v & WQ_THACTIVE_BUCKET_MASK;
429 		count += active;
430 
431 		if (busycount && wq->wq_thscheduled_count[i] > active) {
432 			if (workq_thread_is_busy(curtime, &wq->wq_lastblocked_ts[i])) {
433 				/*
434 				 * We only consider the last blocked thread for a given bucket
435 				 * as busy because we don't want to take the list lock in each
436 				 * sched callback. However this is an approximation that could
437 				 * contribute to thread creation storms.
438 				 */
439 				(*busycount)++;
440 			}
441 		}
442 	}
443 
444 	return count;
445 }
446 
447 static inline void
_wq_cooperative_queue_scheduled_count_dec(struct workqueue * wq,thread_qos_t qos)448 _wq_cooperative_queue_scheduled_count_dec(struct workqueue *wq, thread_qos_t qos)
449 {
450 	__assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]--;
451 	assert(old_scheduled_count > 0);
452 }
453 
454 static inline void
_wq_cooperative_queue_scheduled_count_inc(struct workqueue * wq,thread_qos_t qos)455 _wq_cooperative_queue_scheduled_count_inc(struct workqueue *wq, thread_qos_t qos)
456 {
457 	__assert_only uint8_t old_scheduled_count = wq->wq_cooperative_queue_scheduled_count[_wq_bucket(qos)]++;
458 	assert(old_scheduled_count < UINT8_MAX);
459 }
460 
461 #pragma mark wq_flags
462 
463 static inline uint32_t
_wq_flags(struct workqueue * wq)464 _wq_flags(struct workqueue *wq)
465 {
466 	return os_atomic_load(&wq->wq_flags, relaxed);
467 }
468 
469 static inline bool
_wq_exiting(struct workqueue * wq)470 _wq_exiting(struct workqueue *wq)
471 {
472 	return _wq_flags(wq) & WQ_EXITING;
473 }
474 
475 bool
workq_is_exiting(struct proc * p)476 workq_is_exiting(struct proc *p)
477 {
478 	struct workqueue *wq = proc_get_wqptr(p);
479 	return !wq || _wq_exiting(wq);
480 }
481 
482 
483 #pragma mark workqueue lock
484 
485 static bool
workq_lock_is_acquired_kdp(struct workqueue * wq)486 workq_lock_is_acquired_kdp(struct workqueue *wq)
487 {
488 	return kdp_lck_ticket_is_acquired(&wq->wq_lock);
489 }
490 
491 static inline void
workq_lock_spin(struct workqueue * wq)492 workq_lock_spin(struct workqueue *wq)
493 {
494 	lck_ticket_lock(&wq->wq_lock, &workq_lck_grp);
495 }
496 
497 static inline void
workq_lock_held(struct workqueue * wq)498 workq_lock_held(struct workqueue *wq)
499 {
500 	LCK_TICKET_ASSERT_OWNED(&wq->wq_lock);
501 }
502 
503 static inline bool
workq_lock_try(struct workqueue * wq)504 workq_lock_try(struct workqueue *wq)
505 {
506 	return lck_ticket_lock_try(&wq->wq_lock, &workq_lck_grp);
507 }
508 
509 static inline void
workq_unlock(struct workqueue * wq)510 workq_unlock(struct workqueue *wq)
511 {
512 	lck_ticket_unlock(&wq->wq_lock);
513 }
514 
515 #pragma mark idle thread lists
516 
517 #define WORKQ_POLICY_INIT(qos) \
518 	        (struct uu_workq_policy){ .qos_req = qos, .qos_bucket = qos }
519 
520 static inline thread_qos_t
workq_pri_bucket(struct uu_workq_policy req)521 workq_pri_bucket(struct uu_workq_policy req)
522 {
523 	return MAX(MAX(req.qos_req, req.qos_max), req.qos_override);
524 }
525 
526 static inline thread_qos_t
workq_pri_override(struct uu_workq_policy req)527 workq_pri_override(struct uu_workq_policy req)
528 {
529 	return MAX(workq_pri_bucket(req), req.qos_bucket);
530 }
531 
532 static inline bool
workq_thread_needs_params_change(workq_threadreq_t req,struct uthread * uth)533 workq_thread_needs_params_change(workq_threadreq_t req, struct uthread *uth)
534 {
535 	workq_threadreq_param_t cur_trp, req_trp = { };
536 
537 	cur_trp.trp_value = uth->uu_save.uus_workq_park_data.workloop_params;
538 	if (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
539 		req_trp = kqueue_threadreq_workloop_param(req);
540 	}
541 
542 	/*
543 	 * CPU percent flags are handled separately to policy changes, so ignore
544 	 * them for all of these checks.
545 	 */
546 	uint16_t cur_flags = (cur_trp.trp_flags & ~TRP_CPUPERCENT);
547 	uint16_t req_flags = (req_trp.trp_flags & ~TRP_CPUPERCENT);
548 
549 	if (!req_flags && !cur_flags) {
550 		return false;
551 	}
552 
553 	if (req_flags != cur_flags) {
554 		return true;
555 	}
556 
557 	if ((req_flags & TRP_PRIORITY) && req_trp.trp_pri != cur_trp.trp_pri) {
558 		return true;
559 	}
560 
561 	if ((req_flags & TRP_POLICY) && req_trp.trp_pol != cur_trp.trp_pol) {
562 		return true;
563 	}
564 
565 	return false;
566 }
567 
568 static inline bool
workq_thread_needs_priority_change(workq_threadreq_t req,struct uthread * uth)569 workq_thread_needs_priority_change(workq_threadreq_t req, struct uthread *uth)
570 {
571 	if (workq_thread_needs_params_change(req, uth)) {
572 		return true;
573 	}
574 
575 	if (req->tr_qos != workq_pri_override(uth->uu_workq_pri)) {
576 		return true;
577 	}
578 
579 #if CONFIG_PREADOPT_TG
580 	thread_group_qos_t tg = kqr_preadopt_thread_group(req);
581 	if (KQWL_HAS_VALID_PREADOPTED_TG(tg)) {
582 		/*
583 		 * Ideally, we'd add check here to see if thread's preadopt TG is same
584 		 * as the thread requests's thread group and short circuit if that is
585 		 * the case. But in the interest of keeping the code clean and not
586 		 * taking the thread lock here, we're going to skip this. We will
587 		 * eventually shortcircuit once we try to set the preadoption thread
588 		 * group on the thread.
589 		 */
590 		return true;
591 	}
592 #endif
593 
594 	return false;
595 }
596 
597 static void
workq_thread_update_bucket(proc_t p,struct workqueue * wq,struct uthread * uth,struct uu_workq_policy old_pri,struct uu_workq_policy new_pri,bool force_run)598 workq_thread_update_bucket(proc_t p, struct workqueue *wq, struct uthread *uth,
599     struct uu_workq_policy old_pri, struct uu_workq_policy new_pri,
600     bool force_run)
601 {
602 	thread_qos_t old_bucket = old_pri.qos_bucket;
603 	thread_qos_t new_bucket = workq_pri_bucket(new_pri);
604 
605 	if (old_bucket != new_bucket) {
606 		_wq_thactive_move(wq, old_bucket, new_bucket);
607 	}
608 
609 	new_pri.qos_bucket = new_bucket;
610 	uth->uu_workq_pri = new_pri;
611 
612 	if (workq_pri_override(old_pri) != new_bucket) {
613 		thread_set_workq_override(get_machthread(uth), new_bucket);
614 	}
615 
616 	if (wq->wq_reqcount && (old_bucket > new_bucket || force_run)) {
617 		int flags = WORKQ_THREADREQ_CAN_CREATE_THREADS;
618 		if (old_bucket > new_bucket) {
619 			/*
620 			 * When lowering our bucket, we may unblock a thread request,
621 			 * but we can't drop our priority before we have evaluated
622 			 * whether this is the case, and if we ever drop the workqueue lock
623 			 * that would cause a priority inversion.
624 			 *
625 			 * We hence have to disallow thread creation in that case.
626 			 */
627 			flags = 0;
628 		}
629 		workq_schedule_creator(p, wq, flags);
630 	}
631 }
632 
633 /*
634  * Sets/resets the cpu percent limits on the current thread. We can't set
635  * these limits from outside of the current thread, so this function needs
636  * to be called when we're executing on the intended
637  */
638 static void
workq_thread_reset_cpupercent(workq_threadreq_t req,struct uthread * uth)639 workq_thread_reset_cpupercent(workq_threadreq_t req, struct uthread *uth)
640 {
641 	assert(uth == current_uthread());
642 	workq_threadreq_param_t trp = { };
643 
644 	if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
645 		trp = kqueue_threadreq_workloop_param(req);
646 	}
647 
648 	if (uth->uu_workq_flags & UT_WORKQ_CPUPERCENT) {
649 		/*
650 		 * Going through disable when we have an existing CPU percent limit
651 		 * set will force the ledger to refill the token bucket of the current
652 		 * thread. Removing any penalty applied by previous thread use.
653 		 */
654 		thread_set_cpulimit(THREAD_CPULIMIT_DISABLE, 0, 0);
655 		uth->uu_workq_flags &= ~UT_WORKQ_CPUPERCENT;
656 	}
657 
658 	if (trp.trp_flags & TRP_CPUPERCENT) {
659 		thread_set_cpulimit(THREAD_CPULIMIT_BLOCK, trp.trp_cpupercent,
660 		    (uint64_t)trp.trp_refillms * NSEC_PER_SEC);
661 		uth->uu_workq_flags |= UT_WORKQ_CPUPERCENT;
662 	}
663 }
664 
665 /* Called with the workq lock held */
666 static void
workq_thread_reset_pri(struct workqueue * wq,struct uthread * uth,workq_threadreq_t req,bool unpark)667 workq_thread_reset_pri(struct workqueue *wq, struct uthread *uth,
668     workq_threadreq_t req, bool unpark)
669 {
670 	thread_t th = get_machthread(uth);
671 	thread_qos_t qos = req ? req->tr_qos : WORKQ_THREAD_QOS_CLEANUP;
672 	workq_threadreq_param_t trp = { };
673 	int priority = 31;
674 	int policy = POLICY_TIMESHARE;
675 
676 	if (req && (req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS)) {
677 		trp = kqueue_threadreq_workloop_param(req);
678 	}
679 
680 	uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
681 	uth->uu_workq_flags &= ~UT_WORKQ_OUTSIDE_QOS;
682 
683 	if (unpark) {
684 		uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
685 		// qos sent out to userspace (may differ from uu_workq_pri on param threads)
686 		uth->uu_save.uus_workq_park_data.qos = qos;
687 	}
688 
689 	if (qos == WORKQ_THREAD_QOS_MANAGER) {
690 		uint32_t mgr_pri = wq->wq_event_manager_priority;
691 		assert(trp.trp_value == 0); // manager qos and thread policy don't mix
692 
693 		if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
694 			mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
695 			thread_set_workq_pri(th, THREAD_QOS_UNSPECIFIED, mgr_pri,
696 			    POLICY_TIMESHARE);
697 			return;
698 		}
699 
700 		qos = _pthread_priority_thread_qos(mgr_pri);
701 	} else {
702 		if (trp.trp_flags & TRP_PRIORITY) {
703 			qos = THREAD_QOS_UNSPECIFIED;
704 			priority = trp.trp_pri;
705 			uth->uu_workq_flags |= UT_WORKQ_OUTSIDE_QOS;
706 		}
707 
708 		if (trp.trp_flags & TRP_POLICY) {
709 			policy = trp.trp_pol;
710 		}
711 	}
712 
713 #if CONFIG_PREADOPT_TG
714 	if (req && (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP)) {
715 		/*
716 		 * We cannot safely read and borrow the reference from the kqwl since it
717 		 * can disappear from under us at any time due to the max-ing logic in
718 		 * kqueue_set_preadopted_thread_group.
719 		 *
720 		 * As such, we do the following dance:
721 		 *
722 		 * 1) cmpxchng and steal the kqwl's preadopt thread group and leave
723 		 * behind with (NULL + QoS). At this point, we have the reference
724 		 * to the thread group from the kqwl.
725 		 * 2) Have the thread set the preadoption thread group on itself.
726 		 * 3) cmpxchng from (NULL + QoS) which we set earlier in (1), back to
727 		 * thread_group + QoS. ie we try to give the reference back to the kqwl.
728 		 * If we fail, that's because a higher QoS thread group was set on the
729 		 * kqwl in kqueue_set_preadopted_thread_group in which case, we need to
730 		 * go back to (1).
731 		 */
732 
733 		_Atomic(struct thread_group *) * tg_loc = kqr_preadopt_thread_group_addr(req);
734 
735 		thread_group_qos_t old_tg, new_tg;
736 		int ret = 0;
737 again:
738 		ret = os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
739 			if (!KQWL_HAS_VALID_PREADOPTED_TG(old_tg)) {
740 			        os_atomic_rmw_loop_give_up(break);
741 			}
742 
743 			/*
744 			 * Leave the QoS behind - kqueue_set_preadopted_thread_group will
745 			 * only modify it if there is a higher QoS thread group to attach
746 			 */
747 			new_tg = (thread_group_qos_t) ((uintptr_t) old_tg & KQWL_PREADOPT_TG_QOS_MASK);
748 		});
749 
750 		if (ret) {
751 			/*
752 			 * We successfully took the ref from the kqwl so set it on the
753 			 * thread now
754 			 */
755 			thread_set_preadopt_thread_group(th, KQWL_GET_PREADOPTED_TG(old_tg));
756 
757 			thread_group_qos_t thread_group_to_expect = new_tg;
758 			thread_group_qos_t thread_group_to_set = old_tg;
759 
760 			os_atomic_rmw_loop(tg_loc, old_tg, new_tg, relaxed, {
761 				if (old_tg != thread_group_to_expect) {
762 				        /*
763 				         * There was an intervening write to the kqwl_preadopt_tg,
764 				         * and it has a higher QoS than what we are working with
765 				         * here. Abandon our current adopted thread group and redo
766 				         * the full dance
767 				         */
768 				        thread_group_deallocate_safe(KQWL_GET_PREADOPTED_TG(thread_group_to_set));
769 				        os_atomic_rmw_loop_give_up(goto again);
770 				}
771 
772 				new_tg = thread_group_to_set;
773 			});
774 		} else {
775 			/* Nothing valid on the kqwl, just clear what's on the thread */
776 			thread_set_preadopt_thread_group(th, NULL);
777 		}
778 	} else {
779 		/* Not even a kqwl, clear what's on the thread */
780 		thread_set_preadopt_thread_group(th, NULL);
781 	}
782 #endif
783 	thread_set_workq_pri(th, qos, priority, policy);
784 }
785 
786 /*
787  * Called by kevent with the NOTE_WL_THREAD_REQUEST knote lock held,
788  * every time a servicer is being told about a new max QoS.
789  */
790 void
workq_thread_set_max_qos(struct proc * p,workq_threadreq_t kqr)791 workq_thread_set_max_qos(struct proc *p, workq_threadreq_t kqr)
792 {
793 	struct uu_workq_policy old_pri, new_pri;
794 	struct uthread *uth = current_uthread();
795 	struct workqueue *wq = proc_get_wqptr_fast(p);
796 	thread_qos_t qos = kqr->tr_kq_qos_index;
797 
798 	if (uth->uu_workq_pri.qos_max == qos) {
799 		return;
800 	}
801 
802 	workq_lock_spin(wq);
803 	old_pri = new_pri = uth->uu_workq_pri;
804 	new_pri.qos_max = qos;
805 	workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
806 	workq_unlock(wq);
807 }
808 
809 #pragma mark idle threads accounting and handling
810 
811 static inline struct uthread *
workq_oldest_killable_idle_thread(struct workqueue * wq)812 workq_oldest_killable_idle_thread(struct workqueue *wq)
813 {
814 	struct uthread *uth = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
815 
816 	if (uth && !uth->uu_save.uus_workq_park_data.has_stack) {
817 		uth = TAILQ_PREV(uth, workq_uthread_head, uu_workq_entry);
818 		if (uth) {
819 			assert(uth->uu_save.uus_workq_park_data.has_stack);
820 		}
821 	}
822 	return uth;
823 }
824 
825 static inline uint64_t
workq_kill_delay_for_idle_thread(struct workqueue * wq)826 workq_kill_delay_for_idle_thread(struct workqueue *wq)
827 {
828 	uint64_t delay = wq_reduce_pool_window.abstime;
829 	uint16_t idle = wq->wq_thidlecount;
830 
831 	/*
832 	 * If we have less than wq_death_max_load threads, have a 5s timer.
833 	 *
834 	 * For the next wq_max_constrained_threads ones, decay linearly from
835 	 * from 5s to 50ms.
836 	 */
837 	if (idle <= wq_death_max_load) {
838 		return delay;
839 	}
840 
841 	if (wq_max_constrained_threads > idle - wq_death_max_load) {
842 		delay *= (wq_max_constrained_threads - (idle - wq_death_max_load));
843 	}
844 	return delay / wq_max_constrained_threads;
845 }
846 
847 static inline bool
workq_should_kill_idle_thread(struct workqueue * wq,struct uthread * uth,uint64_t now)848 workq_should_kill_idle_thread(struct workqueue *wq, struct uthread *uth,
849     uint64_t now)
850 {
851 	uint64_t delay = workq_kill_delay_for_idle_thread(wq);
852 	return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
853 }
854 
855 static void
workq_death_call_schedule(struct workqueue * wq,uint64_t deadline)856 workq_death_call_schedule(struct workqueue *wq, uint64_t deadline)
857 {
858 	uint32_t wq_flags = os_atomic_load(&wq->wq_flags, relaxed);
859 
860 	if (wq_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
861 		return;
862 	}
863 	os_atomic_or(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
864 
865 	WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_NONE, wq, 1, 0, 0);
866 
867 	/*
868 	 * <rdar://problem/13139182> Due to how long term timers work, the leeway
869 	 * can't be too short, so use 500ms which is long enough that we will not
870 	 * wake up the CPU for killing threads, but short enough that it doesn't
871 	 * fall into long-term timer list shenanigans.
872 	 */
873 	thread_call_enter_delayed_with_leeway(wq->wq_death_call, NULL, deadline,
874 	    wq_reduce_pool_window.abstime / 10,
875 	    THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
876 }
877 
878 /*
879  * `decrement` is set to the number of threads that are no longer dying:
880  * - because they have been resuscitated just in time (workq_pop_idle_thread)
881  * - or have been killed (workq_thread_terminate).
882  */
883 static void
workq_death_policy_evaluate(struct workqueue * wq,uint16_t decrement)884 workq_death_policy_evaluate(struct workqueue *wq, uint16_t decrement)
885 {
886 	struct uthread *uth;
887 
888 	assert(wq->wq_thdying_count >= decrement);
889 	if ((wq->wq_thdying_count -= decrement) > 0) {
890 		return;
891 	}
892 
893 	if (wq->wq_thidlecount <= 1) {
894 		return;
895 	}
896 
897 	if ((uth = workq_oldest_killable_idle_thread(wq)) == NULL) {
898 		return;
899 	}
900 
901 	uint64_t now = mach_absolute_time();
902 	uint64_t delay = workq_kill_delay_for_idle_thread(wq);
903 
904 	if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
905 		WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
906 		    wq, wq->wq_thidlecount, 0, 0);
907 		wq->wq_thdying_count++;
908 		uth->uu_workq_flags |= UT_WORKQ_DYING;
909 		if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) == 0) {
910 			workq_thread_wakeup(uth);
911 		}
912 		return;
913 	}
914 
915 	workq_death_call_schedule(wq,
916 	    uth->uu_save.uus_workq_park_data.idle_stamp + delay);
917 }
918 
919 void
workq_thread_terminate(struct proc * p,struct uthread * uth)920 workq_thread_terminate(struct proc *p, struct uthread *uth)
921 {
922 	struct workqueue *wq = proc_get_wqptr_fast(p);
923 
924 	workq_lock_spin(wq);
925 	TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
926 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
927 		WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_END,
928 		    wq, wq->wq_thidlecount, 0, 0);
929 		workq_death_policy_evaluate(wq, 1);
930 	}
931 	if (wq->wq_nthreads-- == wq_max_threads) {
932 		/*
933 		 * We got under the thread limit again, which may have prevented
934 		 * thread creation from happening, redrive if there are pending requests
935 		 */
936 		if (wq->wq_reqcount) {
937 			workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
938 		}
939 	}
940 	workq_unlock(wq);
941 
942 	thread_deallocate(get_machthread(uth));
943 }
944 
945 static void
workq_kill_old_threads_call(void * param0,void * param1 __unused)946 workq_kill_old_threads_call(void *param0, void *param1 __unused)
947 {
948 	struct workqueue *wq = param0;
949 
950 	workq_lock_spin(wq);
951 	WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_START, wq, 0, 0, 0);
952 	os_atomic_andnot(&wq->wq_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
953 	workq_death_policy_evaluate(wq, 0);
954 	WQ_TRACE_WQ(TRACE_wq_death_call | DBG_FUNC_END, wq, 0, 0, 0);
955 	workq_unlock(wq);
956 }
957 
958 static struct uthread *
workq_pop_idle_thread(struct workqueue * wq,uint16_t uu_flags,bool * needs_wakeup)959 workq_pop_idle_thread(struct workqueue *wq, uint16_t uu_flags,
960     bool *needs_wakeup)
961 {
962 	struct uthread *uth;
963 
964 	if ((uth = TAILQ_FIRST(&wq->wq_thidlelist))) {
965 		TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
966 	} else {
967 		uth = TAILQ_FIRST(&wq->wq_thnewlist);
968 		TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
969 	}
970 	TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
971 
972 	assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
973 	uth->uu_workq_flags |= UT_WORKQ_RUNNING | uu_flags;
974 
975 	/* A thread is never woken up as part of the cooperative pool */
976 	assert((uu_flags & UT_WORKQ_COOPERATIVE) == 0);
977 
978 	if ((uu_flags & UT_WORKQ_OVERCOMMIT) == 0) {
979 		wq->wq_constrained_threads_scheduled++;
980 	}
981 	wq->wq_threads_scheduled++;
982 	wq->wq_thidlecount--;
983 
984 	if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
985 		uth->uu_workq_flags ^= UT_WORKQ_DYING;
986 		workq_death_policy_evaluate(wq, 1);
987 		*needs_wakeup = false;
988 	} else if (uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) {
989 		*needs_wakeup = false;
990 	} else {
991 		*needs_wakeup = true;
992 	}
993 	return uth;
994 }
995 
996 /*
997  * Called by thread_create_workq_waiting() during thread initialization, before
998  * assert_wait, before the thread has been started.
999  */
1000 event_t
workq_thread_init_and_wq_lock(task_t task,thread_t th)1001 workq_thread_init_and_wq_lock(task_t task, thread_t th)
1002 {
1003 	struct uthread *uth = get_bsdthread_info(th);
1004 
1005 	uth->uu_workq_flags = UT_WORKQ_NEW;
1006 	uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
1007 	uth->uu_workq_thport = MACH_PORT_NULL;
1008 	uth->uu_workq_stackaddr = 0;
1009 	uth->uu_workq_pthread_kill_allowed = 0;
1010 
1011 	thread_set_tag(th, THREAD_TAG_PTHREAD | THREAD_TAG_WORKQUEUE);
1012 	thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
1013 
1014 	workq_lock_spin(proc_get_wqptr_fast(get_bsdtask_info(task)));
1015 	return workq_parked_wait_event(uth);
1016 }
1017 
1018 /**
1019  * Try to add a new workqueue thread.
1020  *
1021  * - called with workq lock held
1022  * - dropped and retaken around thread creation
1023  * - return with workq lock held
1024  */
1025 static bool
workq_add_new_idle_thread(proc_t p,struct workqueue * wq)1026 workq_add_new_idle_thread(proc_t p, struct workqueue *wq)
1027 {
1028 	mach_vm_offset_t th_stackaddr;
1029 	kern_return_t kret;
1030 	thread_t th;
1031 
1032 	wq->wq_nthreads++;
1033 
1034 	workq_unlock(wq);
1035 
1036 	vm_map_t vmap = get_task_map(p->task);
1037 
1038 	kret = pthread_functions->workq_create_threadstack(p, vmap, &th_stackaddr);
1039 	if (kret != KERN_SUCCESS) {
1040 		WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1041 		    kret, 1, 0);
1042 		goto out;
1043 	}
1044 
1045 	kret = thread_create_workq_waiting(p->task, workq_unpark_continue, &th);
1046 	if (kret != KERN_SUCCESS) {
1047 		WQ_TRACE_WQ(TRACE_wq_thread_create_failed | DBG_FUNC_NONE, wq,
1048 		    kret, 0, 0);
1049 		pthread_functions->workq_destroy_threadstack(p, vmap, th_stackaddr);
1050 		goto out;
1051 	}
1052 
1053 	// thread_create_workq_waiting() will return with the wq lock held
1054 	// on success, because it calls workq_thread_init_and_wq_lock() above
1055 
1056 	struct uthread *uth = get_bsdthread_info(th);
1057 
1058 	wq->wq_creations++;
1059 	wq->wq_thidlecount++;
1060 	uth->uu_workq_stackaddr = (user_addr_t)th_stackaddr;
1061 	TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1062 
1063 	WQ_TRACE_WQ(TRACE_wq_thread_create | DBG_FUNC_NONE, wq, 0, 0, 0);
1064 	return true;
1065 
1066 out:
1067 	workq_lock_spin(wq);
1068 	/*
1069 	 * Do not redrive here if we went under wq_max_threads again,
1070 	 * it is the responsibility of the callers of this function
1071 	 * to do so when it fails.
1072 	 */
1073 	wq->wq_nthreads--;
1074 	return false;
1075 }
1076 
1077 static inline bool
workq_thread_is_overcommit(struct uthread * uth)1078 workq_thread_is_overcommit(struct uthread *uth)
1079 {
1080 	return (uth->uu_workq_flags & UT_WORKQ_OVERCOMMIT) != 0;
1081 }
1082 
1083 static inline bool
workq_thread_is_nonovercommit(struct uthread * uth)1084 workq_thread_is_nonovercommit(struct uthread *uth)
1085 {
1086 	return (uth->uu_workq_flags & (UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE)) == 0;
1087 }
1088 
1089 static inline bool
workq_thread_is_cooperative(struct uthread * uth)1090 workq_thread_is_cooperative(struct uthread *uth)
1091 {
1092 	return (uth->uu_workq_flags & UT_WORKQ_COOPERATIVE) != 0;
1093 }
1094 
1095 static inline void
workq_thread_set_type(struct uthread * uth,uint16_t flags)1096 workq_thread_set_type(struct uthread *uth, uint16_t flags)
1097 {
1098 	uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1099 	uth->uu_workq_flags |= flags;
1100 }
1101 
1102 
1103 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
1104 
1105 __attribute__((noreturn, noinline))
1106 static void
workq_unpark_for_death_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t death_flags,uint32_t setup_flags)1107 workq_unpark_for_death_and_unlock(proc_t p, struct workqueue *wq,
1108     struct uthread *uth, uint32_t death_flags, uint32_t setup_flags)
1109 {
1110 	thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
1111 	bool first_use = uth->uu_workq_flags & UT_WORKQ_NEW;
1112 
1113 	if (qos > WORKQ_THREAD_QOS_CLEANUP) {
1114 		workq_thread_reset_pri(wq, uth, NULL, /*unpark*/ true);
1115 		qos = WORKQ_THREAD_QOS_CLEANUP;
1116 	}
1117 
1118 	workq_thread_reset_cpupercent(NULL, uth);
1119 
1120 	if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
1121 		wq->wq_thidlecount--;
1122 		if (first_use) {
1123 			TAILQ_REMOVE(&wq->wq_thnewlist, uth, uu_workq_entry);
1124 		} else {
1125 			TAILQ_REMOVE(&wq->wq_thidlelist, uth, uu_workq_entry);
1126 		}
1127 	}
1128 	TAILQ_INSERT_TAIL(&wq->wq_thrunlist, uth, uu_workq_entry);
1129 
1130 	workq_unlock(wq);
1131 
1132 	if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
1133 		__assert_only kern_return_t kr;
1134 		kr = thread_set_voucher_name(MACH_PORT_NULL);
1135 		assert(kr == KERN_SUCCESS);
1136 	}
1137 
1138 	uint32_t flags = WQ_FLAG_THREAD_NEWSPI | qos | WQ_FLAG_THREAD_PRIO_QOS;
1139 	thread_t th = get_machthread(uth);
1140 	vm_map_t vmap = get_task_map(p->task);
1141 
1142 	if (!first_use) {
1143 		flags |= WQ_FLAG_THREAD_REUSE;
1144 	}
1145 
1146 	pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
1147 	    uth->uu_workq_thport, 0, WQ_SETUP_EXIT_THREAD, flags);
1148 	__builtin_unreachable();
1149 }
1150 
1151 bool
workq_is_current_thread_updating_turnstile(struct workqueue * wq)1152 workq_is_current_thread_updating_turnstile(struct workqueue *wq)
1153 {
1154 	return wq->wq_turnstile_updater == current_thread();
1155 }
1156 
1157 __attribute__((always_inline))
1158 static inline void
1159 workq_perform_turnstile_operation_locked(struct workqueue *wq,
1160     void (^operation)(void))
1161 {
1162 	workq_lock_held(wq);
1163 	wq->wq_turnstile_updater = current_thread();
1164 	operation();
1165 	wq->wq_turnstile_updater = THREAD_NULL;
1166 }
1167 
1168 static void
workq_turnstile_update_inheritor(struct workqueue * wq,turnstile_inheritor_t inheritor,turnstile_update_flags_t flags)1169 workq_turnstile_update_inheritor(struct workqueue *wq,
1170     turnstile_inheritor_t inheritor,
1171     turnstile_update_flags_t flags)
1172 {
1173 	if (wq->wq_inheritor == inheritor) {
1174 		return;
1175 	}
1176 	wq->wq_inheritor = inheritor;
1177 	workq_perform_turnstile_operation_locked(wq, ^{
1178 		turnstile_update_inheritor(wq->wq_turnstile, inheritor,
1179 		flags | TURNSTILE_IMMEDIATE_UPDATE);
1180 		turnstile_update_inheritor_complete(wq->wq_turnstile,
1181 		TURNSTILE_INTERLOCK_HELD);
1182 	});
1183 }
1184 
1185 static void
workq_push_idle_thread(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)1186 workq_push_idle_thread(proc_t p, struct workqueue *wq, struct uthread *uth,
1187     uint32_t setup_flags)
1188 {
1189 	uint64_t now = mach_absolute_time();
1190 	bool is_creator = (uth == wq->wq_creator);
1191 
1192 	if (workq_thread_is_cooperative(uth)) {
1193 		assert(!is_creator);
1194 
1195 		thread_qos_t thread_qos = uth->uu_workq_pri.qos_bucket;
1196 		_wq_cooperative_queue_scheduled_count_dec(wq, thread_qos);
1197 
1198 		/* Before we get here, we always go through
1199 		 * workq_select_threadreq_or_park_and_unlock. If we got here, it means
1200 		 * that we went through the logic in workq_threadreq_select which
1201 		 * did the refresh for the next best cooperative qos while
1202 		 * excluding the current thread - we shouldn't need to do it again.
1203 		 */
1204 		assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
1205 	} else if (workq_thread_is_nonovercommit(uth)) {
1206 		assert(!is_creator);
1207 
1208 		wq->wq_constrained_threads_scheduled--;
1209 	}
1210 
1211 	uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING | UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
1212 	TAILQ_REMOVE(&wq->wq_thrunlist, uth, uu_workq_entry);
1213 	wq->wq_threads_scheduled--;
1214 
1215 	if (is_creator) {
1216 		wq->wq_creator = NULL;
1217 		WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 3, 0,
1218 		    uth->uu_save.uus_workq_park_data.yields);
1219 	}
1220 
1221 	if (wq->wq_inheritor == get_machthread(uth)) {
1222 		assert(wq->wq_creator == NULL);
1223 		if (wq->wq_reqcount) {
1224 			workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
1225 		} else {
1226 			workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
1227 		}
1228 	}
1229 
1230 	if (uth->uu_workq_flags & UT_WORKQ_NEW) {
1231 		assert(is_creator || (_wq_flags(wq) & WQ_EXITING));
1232 		TAILQ_INSERT_TAIL(&wq->wq_thnewlist, uth, uu_workq_entry);
1233 		wq->wq_thidlecount++;
1234 		return;
1235 	}
1236 
1237 	if (!is_creator) {
1238 		_wq_thactive_dec(wq, uth->uu_workq_pri.qos_bucket);
1239 		wq->wq_thscheduled_count[_wq_bucket(uth->uu_workq_pri.qos_bucket)]--;
1240 		uth->uu_workq_flags |= UT_WORKQ_IDLE_CLEANUP;
1241 	}
1242 
1243 	uth->uu_save.uus_workq_park_data.idle_stamp = now;
1244 
1245 	struct uthread *oldest = workq_oldest_killable_idle_thread(wq);
1246 	uint16_t cur_idle = wq->wq_thidlecount;
1247 
1248 	if (cur_idle >= wq_max_constrained_threads ||
1249 	    (wq->wq_thdying_count == 0 && oldest &&
1250 	    workq_should_kill_idle_thread(wq, oldest, now))) {
1251 		/*
1252 		 * Immediately kill threads if we have too may of them.
1253 		 *
1254 		 * And swap "place" with the oldest one we'd have woken up.
1255 		 * This is a relatively desperate situation where we really
1256 		 * need to kill threads quickly and it's best to kill
1257 		 * the one that's currently on core than context switching.
1258 		 */
1259 		if (oldest) {
1260 			oldest->uu_save.uus_workq_park_data.idle_stamp = now;
1261 			TAILQ_REMOVE(&wq->wq_thidlelist, oldest, uu_workq_entry);
1262 			TAILQ_INSERT_HEAD(&wq->wq_thidlelist, oldest, uu_workq_entry);
1263 		}
1264 
1265 		WQ_TRACE_WQ(TRACE_wq_thread_terminate | DBG_FUNC_START,
1266 		    wq, cur_idle, 0, 0);
1267 		wq->wq_thdying_count++;
1268 		uth->uu_workq_flags |= UT_WORKQ_DYING;
1269 		uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
1270 		workq_unpark_for_death_and_unlock(p, wq, uth, 0, setup_flags);
1271 		__builtin_unreachable();
1272 	}
1273 
1274 	struct uthread *tail = TAILQ_LAST(&wq->wq_thidlelist, workq_uthread_head);
1275 
1276 	cur_idle += 1;
1277 	wq->wq_thidlecount = cur_idle;
1278 
1279 	if (cur_idle >= wq_death_max_load && tail &&
1280 	    tail->uu_save.uus_workq_park_data.has_stack) {
1281 		uth->uu_save.uus_workq_park_data.has_stack = false;
1282 		TAILQ_INSERT_TAIL(&wq->wq_thidlelist, uth, uu_workq_entry);
1283 	} else {
1284 		uth->uu_save.uus_workq_park_data.has_stack = true;
1285 		TAILQ_INSERT_HEAD(&wq->wq_thidlelist, uth, uu_workq_entry);
1286 	}
1287 
1288 	if (!tail) {
1289 		uint64_t delay = workq_kill_delay_for_idle_thread(wq);
1290 		workq_death_call_schedule(wq, now + delay);
1291 	}
1292 }
1293 
1294 #pragma mark thread requests
1295 
1296 static inline bool
workq_tr_is_overcommit(workq_tr_flags_t tr_flags)1297 workq_tr_is_overcommit(workq_tr_flags_t tr_flags)
1298 {
1299 	return (tr_flags & WORKQ_TR_FLAG_OVERCOMMIT) != 0;
1300 }
1301 
1302 static inline bool
workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)1303 workq_tr_is_nonovercommit(workq_tr_flags_t tr_flags)
1304 {
1305 	return (tr_flags & (WORKQ_TR_FLAG_OVERCOMMIT | WORKQ_TR_FLAG_COOPERATIVE)) == 0;
1306 }
1307 
1308 static inline bool
workq_tr_is_cooperative(workq_tr_flags_t tr_flags)1309 workq_tr_is_cooperative(workq_tr_flags_t tr_flags)
1310 {
1311 	return (tr_flags & WORKQ_TR_FLAG_COOPERATIVE) != 0;
1312 }
1313 
1314 #define workq_threadreq_is_overcommit(req) workq_tr_is_overcommit((req)->tr_flags)
1315 #define workq_threadreq_is_nonovercommit(req) workq_tr_is_nonovercommit((req)->tr_flags)
1316 #define workq_threadreq_is_cooperative(req) workq_tr_is_cooperative((req)->tr_flags)
1317 
1318 static inline int
workq_priority_for_req(workq_threadreq_t req)1319 workq_priority_for_req(workq_threadreq_t req)
1320 {
1321 	thread_qos_t qos = req->tr_qos;
1322 
1323 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1324 		workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
1325 		assert(trp.trp_flags & TRP_PRIORITY);
1326 		return trp.trp_pri;
1327 	}
1328 	return thread_workq_pri_for_qos(qos);
1329 }
1330 
1331 static inline struct priority_queue_sched_max *
workq_priority_queue_for_req(struct workqueue * wq,workq_threadreq_t req)1332 workq_priority_queue_for_req(struct workqueue *wq, workq_threadreq_t req)
1333 {
1334 	assert(!workq_tr_is_cooperative(req->tr_flags));
1335 
1336 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
1337 		return &wq->wq_special_queue;
1338 	} else if (workq_tr_is_overcommit(req->tr_flags)) {
1339 		return &wq->wq_overcommit_queue;
1340 	} else {
1341 		return &wq->wq_constrained_queue;
1342 	}
1343 }
1344 
1345 
1346 /* Calculates the number of threads scheduled >= the input QoS */
1347 static uint64_t
workq_num_cooperative_threads_scheduled_to_qos(struct workqueue * wq,thread_qos_t qos)1348 workq_num_cooperative_threads_scheduled_to_qos(struct workqueue *wq, thread_qos_t qos)
1349 {
1350 	workq_lock_held(wq);
1351 
1352 	uint64_t num_cooperative_threads = 0;
1353 
1354 	for (thread_qos_t cur_qos = WORKQ_THREAD_QOS_MAX; cur_qos >= qos; cur_qos--) {
1355 		int bucket = _wq_bucket(cur_qos);
1356 		num_cooperative_threads += wq->wq_cooperative_queue_scheduled_count[bucket];
1357 	}
1358 
1359 	return num_cooperative_threads;
1360 }
1361 
1362 static uint64_t
workq_num_cooperative_threads_scheduled_total(struct workqueue * wq)1363 workq_num_cooperative_threads_scheduled_total(struct workqueue *wq)
1364 {
1365 	return workq_num_cooperative_threads_scheduled_to_qos(wq, WORKQ_THREAD_QOS_MIN);
1366 }
1367 
1368 #if DEBUG || DEVELOPMENT
1369 static bool
workq_has_cooperative_thread_requests(struct workqueue * wq)1370 workq_has_cooperative_thread_requests(struct workqueue *wq)
1371 {
1372 	for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1373 		int bucket = _wq_bucket(qos);
1374 		if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1375 			return true;
1376 		}
1377 	}
1378 
1379 	return false;
1380 }
1381 #endif
1382 
1383 /*
1384  * Determines the next QoS bucket we should service next in the cooperative
1385  * pool. This function will always return a QoS for cooperative pool as long as
1386  * there are requests to be serviced.
1387  *
1388  * Unlike the other thread pools, for the cooperative thread pool the schedule
1389  * counts for the various buckets in the pool affect the next best request for
1390  * it.
1391  *
1392  * This function is called in the following contexts:
1393  *
1394  * a) When determining the best thread QoS for cooperative bucket for the
1395  * creator/thread reuse
1396  *
1397  * b) Once (a) has happened and thread has bound to a thread request, figuring
1398  * out whether the next best request for this pool has changed so that creator
1399  * can be scheduled.
1400  *
1401  * Returns true if the cooperative queue's best qos changed from previous
1402  * value.
1403  */
1404 static bool
_wq_cooperative_queue_refresh_best_req_qos(struct workqueue * wq)1405 _wq_cooperative_queue_refresh_best_req_qos(struct workqueue *wq)
1406 {
1407 	workq_lock_held(wq);
1408 
1409 	thread_qos_t old_best_req_qos = wq->wq_cooperative_queue_best_req_qos;
1410 
1411 	/* We determine the next best cooperative thread request based on the
1412 	 * following:
1413 	 *
1414 	 * 1. Take the MAX of the following:
1415 	 *		a) Highest qos with pending TRs such that number of scheduled
1416 	 *		threads so far with >= qos is < wq_max_cooperative_threads
1417 	 *		b) Highest qos bucket with pending TRs but no scheduled threads for that bucket
1418 	 *
1419 	 * 2. If the result of (1) is UN, then we pick the highest priority amongst
1420 	 * pending thread requests in the pool.
1421 	 *
1422 	 */
1423 	thread_qos_t highest_qos_with_no_scheduled = THREAD_QOS_UNSPECIFIED;
1424 	thread_qos_t highest_qos_req_with_width = THREAD_QOS_UNSPECIFIED;
1425 
1426 	thread_qos_t highest_qos_req = THREAD_QOS_UNSPECIFIED;
1427 
1428 	int scheduled_count_till_qos = 0;
1429 
1430 	for (thread_qos_t qos = WORKQ_THREAD_QOS_MAX; qos >= WORKQ_THREAD_QOS_MIN; qos--) {
1431 		int bucket = _wq_bucket(qos);
1432 		uint8_t scheduled_count_for_bucket = wq->wq_cooperative_queue_scheduled_count[bucket];
1433 		scheduled_count_till_qos += scheduled_count_for_bucket;
1434 
1435 		if (!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1436 			if (qos > highest_qos_req) {
1437 				highest_qos_req = qos;
1438 			}
1439 			/*
1440 			 * The pool isn't saturated for threads at and above this QoS, and
1441 			 * this qos bucket has pending requests
1442 			 */
1443 			if (scheduled_count_till_qos < wq_cooperative_queue_max_size(wq)) {
1444 				if (qos > highest_qos_req_with_width) {
1445 					highest_qos_req_with_width = qos;
1446 				}
1447 			}
1448 
1449 			/*
1450 			 * There are no threads scheduled for this bucket but there
1451 			 * is work pending, give it at least 1 thread
1452 			 */
1453 			if (scheduled_count_for_bucket == 0) {
1454 				if (qos > highest_qos_with_no_scheduled) {
1455 					highest_qos_with_no_scheduled = qos;
1456 				}
1457 			}
1458 		}
1459 	}
1460 
1461 	wq->wq_cooperative_queue_best_req_qos = MAX(highest_qos_with_no_scheduled, highest_qos_req_with_width);
1462 	if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1463 		wq->wq_cooperative_queue_best_req_qos = highest_qos_req;
1464 	}
1465 
1466 #if DEBUG || DEVELOPMENT
1467 	/* Assert that if we are showing up the next best req as UN, then there
1468 	 * actually is no thread request in the cooperative pool buckets */
1469 	if (wq->wq_cooperative_queue_best_req_qos == THREAD_QOS_UNSPECIFIED) {
1470 		assert(!workq_has_cooperative_thread_requests(wq));
1471 	}
1472 #endif
1473 
1474 	return old_best_req_qos != wq->wq_cooperative_queue_best_req_qos;
1475 }
1476 
1477 /*
1478  * Returns whether or not the input thread (or creator thread if uth is NULL)
1479  * should be allowed to work as part of the cooperative pool for the <input qos>
1480  * bucket.
1481  *
1482  * This function is called in a bunch of places:
1483  *		a) Quantum expires for a thread and it is part of the cooperative pool
1484  *		b) When trying to pick a thread request for the creator thread to
1485  *		represent.
1486  *		c) When a thread is trying to pick a thread request to actually bind to
1487  *		and service.
1488  *
1489  * Called with workq lock held.
1490  */
1491 
1492 #define WQ_COOPERATIVE_POOL_UNSATURATED 1
1493 #define WQ_COOPERATIVE_BUCKET_UNSERVICED 2
1494 #define WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS 3
1495 
1496 static bool
workq_cooperative_allowance(struct workqueue * wq,thread_qos_t qos,struct uthread * uth,bool may_start_timer)1497 workq_cooperative_allowance(struct workqueue *wq, thread_qos_t qos, struct uthread *uth,
1498     bool may_start_timer)
1499 {
1500 	workq_lock_held(wq);
1501 
1502 	bool exclude_thread_as_scheduled = false;
1503 	bool passed_admissions = false;
1504 	int bucket = _wq_bucket(qos);
1505 
1506 	if (uth && workq_thread_is_cooperative(uth)) {
1507 		exclude_thread_as_scheduled = true;
1508 		_wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_bucket);
1509 	}
1510 
1511 	/*
1512 	 * We have not saturated the pool yet, let this thread continue
1513 	 */
1514 	uint64_t total_cooperative_threads;
1515 	total_cooperative_threads = workq_num_cooperative_threads_scheduled_total(wq);
1516 	if (total_cooperative_threads < wq_cooperative_queue_max_size(wq)) {
1517 		passed_admissions = true;
1518 		WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1519 		    total_cooperative_threads, qos, passed_admissions,
1520 		    WQ_COOPERATIVE_POOL_UNSATURATED);
1521 		goto out;
1522 	}
1523 
1524 	/*
1525 	 * Without this thread, nothing is servicing the bucket which has pending
1526 	 * work
1527 	 */
1528 	uint64_t bucket_scheduled = wq->wq_cooperative_queue_scheduled_count[bucket];
1529 	if (bucket_scheduled == 0 &&
1530 	    !STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket])) {
1531 		passed_admissions = true;
1532 		WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE,
1533 		    total_cooperative_threads, qos, passed_admissions,
1534 		    WQ_COOPERATIVE_BUCKET_UNSERVICED);
1535 		goto out;
1536 	}
1537 
1538 	/*
1539 	 * If number of threads at the QoS bucket >= input QoS exceeds the max we want
1540 	 * for the pool, deny this thread
1541 	 */
1542 	uint64_t aggregate_down_to_qos = workq_num_cooperative_threads_scheduled_to_qos(wq, qos);
1543 	passed_admissions = (aggregate_down_to_qos < wq_cooperative_queue_max_size(wq));
1544 	WQ_TRACE(TRACE_wq_cooperative_admission | DBG_FUNC_NONE, aggregate_down_to_qos,
1545 	    qos, passed_admissions, WQ_COOPERATIVE_POOL_SATURATED_UP_TO_QOS);
1546 
1547 	if (!passed_admissions && may_start_timer) {
1548 		workq_schedule_delayed_thread_creation(wq, 0);
1549 	}
1550 
1551 out:
1552 	if (exclude_thread_as_scheduled) {
1553 		_wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_bucket);
1554 	}
1555 	return passed_admissions;
1556 }
1557 
1558 /*
1559  * returns true if the best request for the pool changed as a result of
1560  * enqueuing this thread request.
1561  */
1562 static bool
workq_threadreq_enqueue(struct workqueue * wq,workq_threadreq_t req)1563 workq_threadreq_enqueue(struct workqueue *wq, workq_threadreq_t req)
1564 {
1565 	assert(req->tr_state == WORKQ_TR_STATE_NEW);
1566 
1567 	req->tr_state = WORKQ_TR_STATE_QUEUED;
1568 	wq->wq_reqcount += req->tr_count;
1569 
1570 	if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1571 		assert(wq->wq_event_manager_threadreq == NULL);
1572 		assert(req->tr_flags & WORKQ_TR_FLAG_KEVENT);
1573 		assert(req->tr_count == 1);
1574 		wq->wq_event_manager_threadreq = req;
1575 		return true;
1576 	}
1577 
1578 	if (workq_threadreq_is_cooperative(req)) {
1579 		assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1580 		assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1581 
1582 		struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1583 		STAILQ_INSERT_TAIL(bucket, req, tr_link);
1584 
1585 		return _wq_cooperative_queue_refresh_best_req_qos(wq);
1586 	}
1587 
1588 	struct priority_queue_sched_max *q = workq_priority_queue_for_req(wq, req);
1589 
1590 	priority_queue_entry_set_sched_pri(q, &req->tr_entry,
1591 	    workq_priority_for_req(req), false);
1592 
1593 	if (priority_queue_insert(q, &req->tr_entry)) {
1594 		if (workq_threadreq_is_nonovercommit(req)) {
1595 			_wq_thactive_refresh_best_constrained_req_qos(wq);
1596 		}
1597 		return true;
1598 	}
1599 	return false;
1600 }
1601 
1602 /*
1603  * returns true if one of the following is true (so as to update creator if
1604  * needed):
1605  *
1606  * (a) the next highest request of the pool we dequeued the request from changed
1607  * (b) the next highest requests of the pool the current thread used to be a
1608  * part of, changed
1609  *
1610  * For overcommit, special and constrained pools, the next highest QoS for each
1611  * pool just a MAX of pending requests so tracking (a) is sufficient.
1612  *
1613  * But for cooperative thread pool, the next highest QoS for the pool depends on
1614  * schedule counts in the pool as well. So if the current thread used to be
1615  * cooperative in it's previous logical run ie (b), then that can also affect
1616  * cooperative pool's next best QoS requests.
1617  */
1618 static bool
workq_threadreq_dequeue(struct workqueue * wq,workq_threadreq_t req,bool cooperative_sched_count_changed)1619 workq_threadreq_dequeue(struct workqueue *wq, workq_threadreq_t req,
1620     bool cooperative_sched_count_changed)
1621 {
1622 	wq->wq_reqcount--;
1623 
1624 	bool next_highest_request_changed = false;
1625 
1626 	if (--req->tr_count == 0) {
1627 		if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
1628 			assert(wq->wq_event_manager_threadreq == req);
1629 			assert(req->tr_count == 0);
1630 			wq->wq_event_manager_threadreq = NULL;
1631 
1632 			/* If a cooperative thread was the one which picked up the manager
1633 			 * thread request, we need to reevaluate the cooperative pool
1634 			 * anyways.
1635 			 */
1636 			if (cooperative_sched_count_changed) {
1637 				_wq_cooperative_queue_refresh_best_req_qos(wq);
1638 			}
1639 			return true;
1640 		}
1641 
1642 		if (workq_threadreq_is_cooperative(req)) {
1643 			assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
1644 			assert(req->tr_qos != WORKQ_THREAD_QOS_ABOVEUI);
1645 			assert(req->tr_qos == wq->wq_cooperative_queue_best_req_qos);
1646 
1647 			struct workq_threadreq_tailq *bucket = &wq->wq_cooperative_queue[_wq_bucket(req->tr_qos)];
1648 			__assert_only workq_threadreq_t head = STAILQ_FIRST(bucket);
1649 
1650 			assert(head == req);
1651 			STAILQ_REMOVE_HEAD(bucket, tr_link);
1652 
1653 			/*
1654 			 * If the request we're dequeueing is cooperative, then the sched
1655 			 * counts definitely changed.
1656 			 */
1657 			assert(cooperative_sched_count_changed);
1658 		}
1659 
1660 		/*
1661 		 * We want to do the cooperative pool refresh after dequeueing a
1662 		 * cooperative thread request if any (to combine both effects into 1
1663 		 * refresh operation)
1664 		 */
1665 		if (cooperative_sched_count_changed) {
1666 			next_highest_request_changed = _wq_cooperative_queue_refresh_best_req_qos(wq);
1667 		}
1668 
1669 		if (!workq_threadreq_is_cooperative(req)) {
1670 			/*
1671 			 * All other types of requests are enqueued in priority queues
1672 			 */
1673 
1674 			if (priority_queue_remove(workq_priority_queue_for_req(wq, req),
1675 			    &req->tr_entry)) {
1676 				next_highest_request_changed |= true;
1677 				if (workq_threadreq_is_nonovercommit(req)) {
1678 					_wq_thactive_refresh_best_constrained_req_qos(wq);
1679 				}
1680 			}
1681 		}
1682 	}
1683 
1684 	return next_highest_request_changed;
1685 }
1686 
1687 static void
workq_threadreq_destroy(proc_t p,workq_threadreq_t req)1688 workq_threadreq_destroy(proc_t p, workq_threadreq_t req)
1689 {
1690 	req->tr_state = WORKQ_TR_STATE_CANCELED;
1691 	if (req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT)) {
1692 		kqueue_threadreq_cancel(p, req);
1693 	} else {
1694 		zfree(workq_zone_threadreq, req);
1695 	}
1696 }
1697 
1698 #pragma mark workqueue thread creation thread calls
1699 
1700 static inline bool
workq_thread_call_prepost(struct workqueue * wq,uint32_t sched,uint32_t pend,uint32_t fail_mask)1701 workq_thread_call_prepost(struct workqueue *wq, uint32_t sched, uint32_t pend,
1702     uint32_t fail_mask)
1703 {
1704 	uint32_t old_flags, new_flags;
1705 
1706 	os_atomic_rmw_loop(&wq->wq_flags, old_flags, new_flags, acquire, {
1707 		if (__improbable(old_flags & (WQ_EXITING | sched | pend | fail_mask))) {
1708 		        os_atomic_rmw_loop_give_up(return false);
1709 		}
1710 		if (__improbable(old_flags & WQ_PROC_SUSPENDED)) {
1711 		        new_flags = old_flags | pend;
1712 		} else {
1713 		        new_flags = old_flags | sched;
1714 		}
1715 	});
1716 
1717 	return (old_flags & WQ_PROC_SUSPENDED) == 0;
1718 }
1719 
1720 #define WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART 0x1
1721 
1722 static bool
workq_schedule_delayed_thread_creation(struct workqueue * wq,int flags)1723 workq_schedule_delayed_thread_creation(struct workqueue *wq, int flags)
1724 {
1725 	assert(!preemption_enabled());
1726 
1727 	if (!workq_thread_call_prepost(wq, WQ_DELAYED_CALL_SCHEDULED,
1728 	    WQ_DELAYED_CALL_PENDED, WQ_IMMEDIATE_CALL_PENDED |
1729 	    WQ_IMMEDIATE_CALL_SCHEDULED)) {
1730 		return false;
1731 	}
1732 
1733 	uint64_t now = mach_absolute_time();
1734 
1735 	if (flags & WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART) {
1736 		/* do not change the window */
1737 	} else if (now - wq->wq_thread_call_last_run <= wq->wq_timer_interval) {
1738 		wq->wq_timer_interval *= 2;
1739 		if (wq->wq_timer_interval > wq_max_timer_interval.abstime) {
1740 			wq->wq_timer_interval = (uint32_t)wq_max_timer_interval.abstime;
1741 		}
1742 	} else if (now - wq->wq_thread_call_last_run > 2 * wq->wq_timer_interval) {
1743 		wq->wq_timer_interval /= 2;
1744 		if (wq->wq_timer_interval < wq_stalled_window.abstime) {
1745 			wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
1746 		}
1747 	}
1748 
1749 	WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1750 	    _wq_flags(wq), wq->wq_timer_interval);
1751 
1752 	thread_call_t call = wq->wq_delayed_call;
1753 	uintptr_t arg = WQ_DELAYED_CALL_SCHEDULED;
1754 	uint64_t deadline = now + wq->wq_timer_interval;
1755 	if (thread_call_enter1_delayed(call, (void *)arg, deadline)) {
1756 		panic("delayed_call was already enqueued");
1757 	}
1758 	return true;
1759 }
1760 
1761 static void
workq_schedule_immediate_thread_creation(struct workqueue * wq)1762 workq_schedule_immediate_thread_creation(struct workqueue *wq)
1763 {
1764 	assert(!preemption_enabled());
1765 
1766 	if (workq_thread_call_prepost(wq, WQ_IMMEDIATE_CALL_SCHEDULED,
1767 	    WQ_IMMEDIATE_CALL_PENDED, 0)) {
1768 		WQ_TRACE_WQ(TRACE_wq_start_add_timer, wq, wq->wq_reqcount,
1769 		    _wq_flags(wq), 0);
1770 
1771 		uintptr_t arg = WQ_IMMEDIATE_CALL_SCHEDULED;
1772 		if (thread_call_enter1(wq->wq_immediate_call, (void *)arg)) {
1773 			panic("immediate_call was already enqueued");
1774 		}
1775 	}
1776 }
1777 
1778 void
workq_proc_suspended(struct proc * p)1779 workq_proc_suspended(struct proc *p)
1780 {
1781 	struct workqueue *wq = proc_get_wqptr(p);
1782 
1783 	if (wq) {
1784 		os_atomic_or(&wq->wq_flags, WQ_PROC_SUSPENDED, relaxed);
1785 	}
1786 }
1787 
1788 void
workq_proc_resumed(struct proc * p)1789 workq_proc_resumed(struct proc *p)
1790 {
1791 	struct workqueue *wq = proc_get_wqptr(p);
1792 	uint32_t wq_flags;
1793 
1794 	if (!wq) {
1795 		return;
1796 	}
1797 
1798 	wq_flags = os_atomic_andnot_orig(&wq->wq_flags, WQ_PROC_SUSPENDED |
1799 	    WQ_DELAYED_CALL_PENDED | WQ_IMMEDIATE_CALL_PENDED, relaxed);
1800 	if ((wq_flags & WQ_EXITING) == 0) {
1801 		disable_preemption();
1802 		if (wq_flags & WQ_IMMEDIATE_CALL_PENDED) {
1803 			workq_schedule_immediate_thread_creation(wq);
1804 		} else if (wq_flags & WQ_DELAYED_CALL_PENDED) {
1805 			workq_schedule_delayed_thread_creation(wq,
1806 			    WORKQ_SCHEDULE_DELAYED_THREAD_CREATION_RESTART);
1807 		}
1808 		enable_preemption();
1809 	}
1810 }
1811 
1812 /**
1813  * returns whether lastblocked_tsp is within wq_stalled_window usecs of now
1814  */
1815 static bool
workq_thread_is_busy(uint64_t now,_Atomic uint64_t * lastblocked_tsp)1816 workq_thread_is_busy(uint64_t now, _Atomic uint64_t *lastblocked_tsp)
1817 {
1818 	uint64_t lastblocked_ts = os_atomic_load_wide(lastblocked_tsp, relaxed);
1819 	if (now <= lastblocked_ts) {
1820 		/*
1821 		 * Because the update of the timestamp when a thread blocks
1822 		 * isn't serialized against us looking at it (i.e. we don't hold
1823 		 * the workq lock), it's possible to have a timestamp that matches
1824 		 * the current time or that even looks to be in the future relative
1825 		 * to when we grabbed the current time...
1826 		 *
1827 		 * Just treat this as a busy thread since it must have just blocked.
1828 		 */
1829 		return true;
1830 	}
1831 	return (now - lastblocked_ts) < wq_stalled_window.abstime;
1832 }
1833 
1834 static void
workq_add_new_threads_call(void * _p,void * flags)1835 workq_add_new_threads_call(void *_p, void *flags)
1836 {
1837 	proc_t p = _p;
1838 	struct workqueue *wq = proc_get_wqptr(p);
1839 	uint32_t my_flag = (uint32_t)(uintptr_t)flags;
1840 
1841 	/*
1842 	 * workq_exit() will set the workqueue to NULL before
1843 	 * it cancels thread calls.
1844 	 */
1845 	if (!wq) {
1846 		return;
1847 	}
1848 
1849 	assert((my_flag == WQ_DELAYED_CALL_SCHEDULED) ||
1850 	    (my_flag == WQ_IMMEDIATE_CALL_SCHEDULED));
1851 
1852 	WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_START, wq, _wq_flags(wq),
1853 	    wq->wq_nthreads, wq->wq_thidlecount);
1854 
1855 	workq_lock_spin(wq);
1856 
1857 	wq->wq_thread_call_last_run = mach_absolute_time();
1858 	os_atomic_andnot(&wq->wq_flags, my_flag, release);
1859 
1860 	/* This can drop the workqueue lock, and take it again */
1861 	workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
1862 
1863 	workq_unlock(wq);
1864 
1865 	WQ_TRACE_WQ(TRACE_wq_add_timer | DBG_FUNC_END, wq, 0,
1866 	    wq->wq_nthreads, wq->wq_thidlecount);
1867 }
1868 
1869 #pragma mark thread state tracking
1870 
1871 static void
workq_sched_callback(int type,thread_t thread)1872 workq_sched_callback(int type, thread_t thread)
1873 {
1874 	thread_ro_t tro = get_thread_ro(thread);
1875 	struct uthread *uth = get_bsdthread_info(thread);
1876 	struct workqueue *wq = proc_get_wqptr(tro->tro_proc);
1877 	thread_qos_t req_qos, qos = uth->uu_workq_pri.qos_bucket;
1878 	wq_thactive_t old_thactive;
1879 	bool start_timer = false;
1880 
1881 	if (qos == WORKQ_THREAD_QOS_MANAGER) {
1882 		return;
1883 	}
1884 
1885 	switch (type) {
1886 	case SCHED_CALL_BLOCK:
1887 		old_thactive = _wq_thactive_dec(wq, qos);
1888 		req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1889 
1890 		/*
1891 		 * Remember the timestamp of the last thread that blocked in this
1892 		 * bucket, it used used by admission checks to ignore one thread
1893 		 * being inactive if this timestamp is recent enough.
1894 		 *
1895 		 * If we collide with another thread trying to update the
1896 		 * last_blocked (really unlikely since another thread would have to
1897 		 * get scheduled and then block after we start down this path), it's
1898 		 * not a problem.  Either timestamp is adequate, so no need to retry
1899 		 */
1900 		os_atomic_store_wide(&wq->wq_lastblocked_ts[_wq_bucket(qos)],
1901 		    thread_last_run_time(thread), relaxed);
1902 
1903 		if (req_qos == THREAD_QOS_UNSPECIFIED) {
1904 			/*
1905 			 * No pending request at the moment we could unblock, move on.
1906 			 */
1907 		} else if (qos < req_qos) {
1908 			/*
1909 			 * The blocking thread is at a lower QoS than the highest currently
1910 			 * pending constrained request, nothing has to be redriven
1911 			 */
1912 		} else {
1913 			uint32_t max_busycount, old_req_count;
1914 			old_req_count = _wq_thactive_aggregate_downto_qos(wq, old_thactive,
1915 			    req_qos, NULL, &max_busycount);
1916 			/*
1917 			 * If it is possible that may_start_constrained_thread had refused
1918 			 * admission due to being over the max concurrency, we may need to
1919 			 * spin up a new thread.
1920 			 *
1921 			 * We take into account the maximum number of busy threads
1922 			 * that can affect may_start_constrained_thread as looking at the
1923 			 * actual number may_start_constrained_thread will see is racy.
1924 			 *
1925 			 * IOW at NCPU = 4, for IN (req_qos = 1), if the old req count is
1926 			 * between NCPU (4) and NCPU - 2 (2) we need to redrive.
1927 			 */
1928 			uint32_t conc = wq_max_parallelism[_wq_bucket(qos)];
1929 			if (old_req_count <= conc && conc <= old_req_count + max_busycount) {
1930 				start_timer = workq_schedule_delayed_thread_creation(wq, 0);
1931 			}
1932 		}
1933 		if (__improbable(kdebug_enable)) {
1934 			__unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1935 			    old_thactive, qos, NULL, NULL);
1936 			WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_START, wq,
1937 			    old - 1, qos | (req_qos << 8),
1938 			    wq->wq_reqcount << 1 | start_timer);
1939 		}
1940 		break;
1941 
1942 	case SCHED_CALL_UNBLOCK:
1943 		/*
1944 		 * we cannot take the workqueue_lock here...
1945 		 * an UNBLOCK can occur from a timer event which
1946 		 * is run from an interrupt context... if the workqueue_lock
1947 		 * is already held by this processor, we'll deadlock...
1948 		 * the thread lock for the thread being UNBLOCKED
1949 		 * is also held
1950 		 */
1951 		old_thactive = _wq_thactive_inc(wq, qos);
1952 		if (__improbable(kdebug_enable)) {
1953 			__unused uint32_t old = _wq_thactive_aggregate_downto_qos(wq,
1954 			    old_thactive, qos, NULL, NULL);
1955 			req_qos = WQ_THACTIVE_BEST_CONSTRAINED_REQ_QOS(old_thactive);
1956 			WQ_TRACE_WQ(TRACE_wq_thread_block | DBG_FUNC_END, wq,
1957 			    old + 1, qos | (req_qos << 8),
1958 			    wq->wq_threads_scheduled);
1959 		}
1960 		break;
1961 	}
1962 }
1963 
1964 #pragma mark workq lifecycle
1965 
1966 void
workq_reference(struct workqueue * wq)1967 workq_reference(struct workqueue *wq)
1968 {
1969 	os_ref_retain(&wq->wq_refcnt);
1970 }
1971 
1972 static void
workq_deallocate_queue_invoke(mpsc_queue_chain_t e,__assert_only mpsc_daemon_queue_t dq)1973 workq_deallocate_queue_invoke(mpsc_queue_chain_t e,
1974     __assert_only mpsc_daemon_queue_t dq)
1975 {
1976 	struct workqueue *wq;
1977 	struct turnstile *ts;
1978 
1979 	wq = mpsc_queue_element(e, struct workqueue, wq_destroy_link);
1980 	assert(dq == &workq_deallocate_queue);
1981 
1982 	turnstile_complete((uintptr_t)wq, &wq->wq_turnstile, &ts, TURNSTILE_WORKQS);
1983 	assert(ts);
1984 	turnstile_cleanup();
1985 	turnstile_deallocate(ts);
1986 
1987 	lck_ticket_destroy(&wq->wq_lock, &workq_lck_grp);
1988 	zfree(workq_zone_workqueue, wq);
1989 }
1990 
1991 static void
workq_deallocate(struct workqueue * wq)1992 workq_deallocate(struct workqueue *wq)
1993 {
1994 	if (os_ref_release_relaxed(&wq->wq_refcnt) == 0) {
1995 		workq_deallocate_queue_invoke(&wq->wq_destroy_link,
1996 		    &workq_deallocate_queue);
1997 	}
1998 }
1999 
2000 void
workq_deallocate_safe(struct workqueue * wq)2001 workq_deallocate_safe(struct workqueue *wq)
2002 {
2003 	if (__improbable(os_ref_release_relaxed(&wq->wq_refcnt) == 0)) {
2004 		mpsc_daemon_enqueue(&workq_deallocate_queue, &wq->wq_destroy_link,
2005 		    MPSC_QUEUE_DISABLE_PREEMPTION);
2006 	}
2007 }
2008 
2009 /**
2010  * Setup per-process state for the workqueue.
2011  */
2012 int
workq_open(struct proc * p,__unused struct workq_open_args * uap,__unused int32_t * retval)2013 workq_open(struct proc *p, __unused struct workq_open_args *uap,
2014     __unused int32_t *retval)
2015 {
2016 	struct workqueue *wq;
2017 	int error = 0;
2018 
2019 	if ((p->p_lflag & P_LREGISTER) == 0) {
2020 		return EINVAL;
2021 	}
2022 
2023 	if (wq_init_constrained_limit) {
2024 		uint32_t limit, num_cpus = ml_wait_max_cpus();
2025 
2026 		/*
2027 		 * set up the limit for the constrained pool
2028 		 * this is a virtual pool in that we don't
2029 		 * maintain it on a separate idle and run list
2030 		 */
2031 		limit = num_cpus * WORKQUEUE_CONSTRAINED_FACTOR;
2032 
2033 		if (limit > wq_max_constrained_threads) {
2034 			wq_max_constrained_threads = limit;
2035 		}
2036 
2037 		if (wq_max_threads > WQ_THACTIVE_BUCKET_HALF) {
2038 			wq_max_threads = WQ_THACTIVE_BUCKET_HALF;
2039 		}
2040 		if (wq_max_threads > CONFIG_THREAD_MAX - 20) {
2041 			wq_max_threads = CONFIG_THREAD_MAX - 20;
2042 		}
2043 
2044 		wq_death_max_load = (uint16_t)fls(num_cpus) + 1;
2045 
2046 		for (thread_qos_t qos = WORKQ_THREAD_QOS_MIN; qos <= WORKQ_THREAD_QOS_MAX; qos++) {
2047 			wq_max_parallelism[_wq_bucket(qos)] =
2048 			    qos_max_parallelism(qos, QOS_PARALLELISM_COUNT_LOGICAL);
2049 		}
2050 
2051 		wq_max_cooperative_threads = num_cpus;
2052 
2053 		wq_init_constrained_limit = 0;
2054 	}
2055 
2056 	if (proc_get_wqptr(p) == NULL) {
2057 		if (proc_init_wqptr_or_wait(p) == FALSE) {
2058 			assert(proc_get_wqptr(p) != NULL);
2059 			goto out;
2060 		}
2061 
2062 		wq = zalloc_flags(workq_zone_workqueue, Z_WAITOK | Z_ZERO);
2063 
2064 		os_ref_init_count(&wq->wq_refcnt, &workq_refgrp, 1);
2065 
2066 		// Start the event manager at the priority hinted at by the policy engine
2067 		thread_qos_t mgr_priority_hint = task_get_default_manager_qos(current_task());
2068 		pthread_priority_t pp = _pthread_priority_make_from_thread_qos(mgr_priority_hint, 0, 0);
2069 		wq->wq_event_manager_priority = (uint32_t)pp;
2070 		wq->wq_timer_interval = (uint32_t)wq_stalled_window.abstime;
2071 		wq->wq_proc = p;
2072 		turnstile_prepare((uintptr_t)wq, &wq->wq_turnstile, turnstile_alloc(),
2073 		    TURNSTILE_WORKQS);
2074 
2075 		TAILQ_INIT(&wq->wq_thrunlist);
2076 		TAILQ_INIT(&wq->wq_thnewlist);
2077 		TAILQ_INIT(&wq->wq_thidlelist);
2078 		priority_queue_init(&wq->wq_overcommit_queue);
2079 		priority_queue_init(&wq->wq_constrained_queue);
2080 		priority_queue_init(&wq->wq_special_queue);
2081 		for (int bucket = 0; bucket < WORKQ_NUM_QOS_BUCKETS; bucket++) {
2082 			STAILQ_INIT(&wq->wq_cooperative_queue[bucket]);
2083 		}
2084 
2085 		/* We are only using the delayed thread call for the constrained pool
2086 		 * which can't have work at >= UI QoS and so we can be fine with a
2087 		 * UI QoS thread call.
2088 		 */
2089 		wq->wq_delayed_call = thread_call_allocate_with_qos(
2090 			workq_add_new_threads_call, p, THREAD_QOS_USER_INTERACTIVE,
2091 			THREAD_CALL_OPTIONS_ONCE);
2092 		wq->wq_immediate_call = thread_call_allocate_with_options(
2093 			workq_add_new_threads_call, p, THREAD_CALL_PRIORITY_KERNEL,
2094 			THREAD_CALL_OPTIONS_ONCE);
2095 		wq->wq_death_call = thread_call_allocate_with_options(
2096 			workq_kill_old_threads_call, wq,
2097 			THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
2098 
2099 		lck_ticket_init(&wq->wq_lock, &workq_lck_grp);
2100 
2101 		WQ_TRACE_WQ(TRACE_wq_create | DBG_FUNC_NONE, wq,
2102 		    VM_KERNEL_ADDRHIDE(wq), 0, 0);
2103 		proc_set_wqptr(p, wq);
2104 	}
2105 out:
2106 
2107 	return error;
2108 }
2109 
2110 /*
2111  * Routine:	workq_mark_exiting
2112  *
2113  * Function:	Mark the work queue such that new threads will not be added to the
2114  *		work queue after we return.
2115  *
2116  * Conditions:	Called against the current process.
2117  */
2118 void
workq_mark_exiting(struct proc * p)2119 workq_mark_exiting(struct proc *p)
2120 {
2121 	struct workqueue *wq = proc_get_wqptr(p);
2122 	uint32_t wq_flags;
2123 	workq_threadreq_t mgr_req;
2124 
2125 	if (!wq) {
2126 		return;
2127 	}
2128 
2129 	WQ_TRACE_WQ(TRACE_wq_pthread_exit | DBG_FUNC_START, wq, 0, 0, 0);
2130 
2131 	workq_lock_spin(wq);
2132 
2133 	wq_flags = os_atomic_or_orig(&wq->wq_flags, WQ_EXITING, relaxed);
2134 	if (__improbable(wq_flags & WQ_EXITING)) {
2135 		panic("workq_mark_exiting called twice");
2136 	}
2137 
2138 	/*
2139 	 * Opportunistically try to cancel thread calls that are likely in flight.
2140 	 * workq_exit() will do the proper cleanup.
2141 	 */
2142 	if (wq_flags & WQ_IMMEDIATE_CALL_SCHEDULED) {
2143 		thread_call_cancel(wq->wq_immediate_call);
2144 	}
2145 	if (wq_flags & WQ_DELAYED_CALL_SCHEDULED) {
2146 		thread_call_cancel(wq->wq_delayed_call);
2147 	}
2148 	if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
2149 		thread_call_cancel(wq->wq_death_call);
2150 	}
2151 
2152 	mgr_req = wq->wq_event_manager_threadreq;
2153 	wq->wq_event_manager_threadreq = NULL;
2154 	wq->wq_reqcount = 0; /* workq_schedule_creator must not look at queues */
2155 	wq->wq_creator = NULL;
2156 	workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
2157 
2158 	workq_unlock(wq);
2159 
2160 	if (mgr_req) {
2161 		kqueue_threadreq_cancel(p, mgr_req);
2162 	}
2163 	/*
2164 	 * No one touches the priority queues once WQ_EXITING is set.
2165 	 * It is hence safe to do the tear down without holding any lock.
2166 	 */
2167 	priority_queue_destroy(&wq->wq_overcommit_queue,
2168 	    struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2169 		workq_threadreq_destroy(p, e);
2170 	});
2171 	priority_queue_destroy(&wq->wq_constrained_queue,
2172 	    struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2173 		workq_threadreq_destroy(p, e);
2174 	});
2175 	priority_queue_destroy(&wq->wq_special_queue,
2176 	    struct workq_threadreq_s, tr_entry, ^(workq_threadreq_t e){
2177 		workq_threadreq_destroy(p, e);
2178 	});
2179 
2180 	WQ_TRACE(TRACE_wq_pthread_exit | DBG_FUNC_END, 0, 0, 0, 0);
2181 }
2182 
2183 /*
2184  * Routine:	workq_exit
2185  *
2186  * Function:	clean up the work queue structure(s) now that there are no threads
2187  *		left running inside the work queue (except possibly current_thread).
2188  *
2189  * Conditions:	Called by the last thread in the process.
2190  *		Called against current process.
2191  */
2192 void
workq_exit(struct proc * p)2193 workq_exit(struct proc *p)
2194 {
2195 	struct workqueue *wq;
2196 	struct uthread *uth, *tmp;
2197 
2198 	wq = os_atomic_xchg(&p->p_wqptr, NULL, relaxed);
2199 	if (wq != NULL) {
2200 		thread_t th = current_thread();
2201 
2202 		WQ_TRACE_WQ(TRACE_wq_workqueue_exit | DBG_FUNC_START, wq, 0, 0, 0);
2203 
2204 		if (thread_get_tag(th) & THREAD_TAG_WORKQUEUE) {
2205 			/*
2206 			 * <rdar://problem/40111515> Make sure we will no longer call the
2207 			 * sched call, if we ever block this thread, which the cancel_wait
2208 			 * below can do.
2209 			 */
2210 			thread_sched_call(th, NULL);
2211 		}
2212 
2213 		/*
2214 		 * Thread calls are always scheduled by the proc itself or under the
2215 		 * workqueue spinlock if WQ_EXITING is not yet set.
2216 		 *
2217 		 * Either way, when this runs, the proc has no threads left beside
2218 		 * the one running this very code, so we know no thread call can be
2219 		 * dispatched anymore.
2220 		 */
2221 		thread_call_cancel_wait(wq->wq_delayed_call);
2222 		thread_call_cancel_wait(wq->wq_immediate_call);
2223 		thread_call_cancel_wait(wq->wq_death_call);
2224 		thread_call_free(wq->wq_delayed_call);
2225 		thread_call_free(wq->wq_immediate_call);
2226 		thread_call_free(wq->wq_death_call);
2227 
2228 		/*
2229 		 * Clean up workqueue data structures for threads that exited and
2230 		 * didn't get a chance to clean up after themselves.
2231 		 *
2232 		 * idle/new threads should have been interrupted and died on their own
2233 		 */
2234 		TAILQ_FOREACH_SAFE(uth, &wq->wq_thrunlist, uu_workq_entry, tmp) {
2235 			thread_t mth = get_machthread(uth);
2236 			thread_sched_call(mth, NULL);
2237 			thread_deallocate(mth);
2238 		}
2239 		assert(TAILQ_EMPTY(&wq->wq_thnewlist));
2240 		assert(TAILQ_EMPTY(&wq->wq_thidlelist));
2241 
2242 		WQ_TRACE_WQ(TRACE_wq_destroy | DBG_FUNC_END, wq,
2243 		    VM_KERNEL_ADDRHIDE(wq), 0, 0);
2244 
2245 		workq_deallocate(wq);
2246 
2247 		WQ_TRACE(TRACE_wq_workqueue_exit | DBG_FUNC_END, 0, 0, 0, 0);
2248 	}
2249 }
2250 
2251 
2252 #pragma mark bsd thread control
2253 
2254 bool
bsdthread_part_of_cooperative_workqueue(struct uthread * uth)2255 bsdthread_part_of_cooperative_workqueue(struct uthread *uth)
2256 {
2257 	return (workq_thread_is_cooperative(uth) || workq_thread_is_nonovercommit(uth)) &&
2258 	       (uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER);
2259 }
2260 
2261 static bool
_pthread_priority_to_policy(pthread_priority_t priority,thread_qos_policy_data_t * data)2262 _pthread_priority_to_policy(pthread_priority_t priority,
2263     thread_qos_policy_data_t *data)
2264 {
2265 	data->qos_tier = _pthread_priority_thread_qos(priority);
2266 	data->tier_importance = _pthread_priority_relpri(priority);
2267 	if (data->qos_tier == THREAD_QOS_UNSPECIFIED || data->tier_importance > 0 ||
2268 	    data->tier_importance < THREAD_QOS_MIN_TIER_IMPORTANCE) {
2269 		return false;
2270 	}
2271 	return true;
2272 }
2273 
2274 static int
bsdthread_set_self(proc_t p,thread_t th,pthread_priority_t priority,mach_port_name_t voucher,enum workq_set_self_flags flags)2275 bsdthread_set_self(proc_t p, thread_t th, pthread_priority_t priority,
2276     mach_port_name_t voucher, enum workq_set_self_flags flags)
2277 {
2278 	struct uthread *uth = get_bsdthread_info(th);
2279 	struct workqueue *wq = proc_get_wqptr(p);
2280 
2281 	kern_return_t kr;
2282 	int unbind_rv = 0, qos_rv = 0, voucher_rv = 0, fixedpri_rv = 0;
2283 	bool is_wq_thread = (thread_get_tag(th) & THREAD_TAG_WORKQUEUE);
2284 
2285 	if (flags & WORKQ_SET_SELF_WQ_KEVENT_UNBIND) {
2286 		if (!is_wq_thread) {
2287 			unbind_rv = EINVAL;
2288 			goto qos;
2289 		}
2290 
2291 		if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
2292 			unbind_rv = EINVAL;
2293 			goto qos;
2294 		}
2295 
2296 		workq_threadreq_t kqr = uth->uu_kqr_bound;
2297 		if (kqr == NULL) {
2298 			unbind_rv = EALREADY;
2299 			goto qos;
2300 		}
2301 
2302 		if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2303 			unbind_rv = EINVAL;
2304 			goto qos;
2305 		}
2306 
2307 		kqueue_threadreq_unbind(p, kqr);
2308 	}
2309 
2310 qos:
2311 	if (flags & WORKQ_SET_SELF_QOS_FLAG) {
2312 		thread_qos_policy_data_t new_policy;
2313 
2314 		if (!_pthread_priority_to_policy(priority, &new_policy)) {
2315 			qos_rv = EINVAL;
2316 			goto voucher;
2317 		}
2318 
2319 		if (!is_wq_thread) {
2320 			/*
2321 			 * Threads opted out of QoS can't change QoS
2322 			 */
2323 			if (!thread_has_qos_policy(th)) {
2324 				qos_rv = EPERM;
2325 				goto voucher;
2326 			}
2327 		} else if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER ||
2328 		    uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_ABOVEUI) {
2329 			/*
2330 			 * Workqueue manager threads or threads above UI can't change QoS
2331 			 */
2332 			qos_rv = EINVAL;
2333 			goto voucher;
2334 		} else {
2335 			/*
2336 			 * For workqueue threads, possibly adjust buckets and redrive thread
2337 			 * requests.
2338 			 *
2339 			 * Transitions allowed:
2340 			 *
2341 			 * overcommit --> non-overcommit
2342 			 * overcommit --> overcommit
2343 			 * non-overcommit --> non-overcommit
2344 			 * non-overcommit --> overcommit (to be deprecated later)
2345 			 * cooperative --> cooperative
2346 			 *
2347 			 * All other transitions aren't allowed so reject them.
2348 			 */
2349 			if (workq_thread_is_overcommit(uth) && _pthread_priority_is_cooperative(priority)) {
2350 				qos_rv = EINVAL;
2351 				goto voucher;
2352 			} else if (workq_thread_is_cooperative(uth) && !_pthread_priority_is_cooperative(priority)) {
2353 				qos_rv = EINVAL;
2354 				goto voucher;
2355 			} else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_cooperative(priority)) {
2356 				qos_rv = EINVAL;
2357 				goto voucher;
2358 			}
2359 
2360 			struct uu_workq_policy old_pri, new_pri;
2361 			bool force_run = false;
2362 
2363 			workq_lock_spin(wq);
2364 
2365 			old_pri = new_pri = uth->uu_workq_pri;
2366 			new_pri.qos_req = (thread_qos_t)new_policy.qos_tier;
2367 
2368 			/* Adjust schedule counts for various types of transitions */
2369 
2370 			/* overcommit -> non-overcommit */
2371 			if (workq_thread_is_overcommit(uth) && _pthread_priority_is_nonovercommit(priority)) {
2372 				workq_thread_set_type(uth, 0);
2373 				wq->wq_constrained_threads_scheduled++;
2374 
2375 				/* non-overcommit -> overcommit */
2376 			} else if (workq_thread_is_nonovercommit(uth) && _pthread_priority_is_overcommit(priority)) {
2377 				workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
2378 				force_run = (wq->wq_constrained_threads_scheduled-- == wq_max_constrained_threads);
2379 
2380 				/* cooperative -> cooperative */
2381 			} else if (workq_thread_is_cooperative(uth)) {
2382 				_wq_cooperative_queue_scheduled_count_dec(wq, old_pri.qos_bucket);
2383 				_wq_cooperative_queue_scheduled_count_inc(wq, workq_pri_bucket(new_pri));
2384 
2385 				/* We're changing schedule counts within cooperative pool, we
2386 				 * need to refresh best cooperative QoS logic again */
2387 				force_run = _wq_cooperative_queue_refresh_best_req_qos(wq);
2388 			}
2389 
2390 			/* This will also call schedule_creator if needed */
2391 			workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, force_run);
2392 			workq_unlock(wq);
2393 
2394 			if (workq_thread_is_overcommit(uth)) {
2395 				thread_disarm_workqueue_quantum(th);
2396 			} else {
2397 				/* If the thread changed QoS buckets, the quantum duration
2398 				 * may have changed too */
2399 				thread_arm_workqueue_quantum(th);
2400 			}
2401 		}
2402 
2403 		kr = thread_policy_set_internal(th, THREAD_QOS_POLICY,
2404 		    (thread_policy_t)&new_policy, THREAD_QOS_POLICY_COUNT);
2405 		if (kr != KERN_SUCCESS) {
2406 			qos_rv = EINVAL;
2407 		}
2408 	}
2409 
2410 voucher:
2411 	if (flags & WORKQ_SET_SELF_VOUCHER_FLAG) {
2412 		kr = thread_set_voucher_name(voucher);
2413 		if (kr != KERN_SUCCESS) {
2414 			voucher_rv = ENOENT;
2415 			goto fixedpri;
2416 		}
2417 	}
2418 
2419 fixedpri:
2420 	if (qos_rv) {
2421 		goto done;
2422 	}
2423 	if (flags & WORKQ_SET_SELF_FIXEDPRIORITY_FLAG) {
2424 		thread_extended_policy_data_t extpol = {.timeshare = 0};
2425 
2426 		if (is_wq_thread) {
2427 			/* Not allowed on workqueue threads */
2428 			fixedpri_rv = ENOTSUP;
2429 			goto done;
2430 		}
2431 
2432 		kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2433 		    (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2434 		if (kr != KERN_SUCCESS) {
2435 			fixedpri_rv = EINVAL;
2436 			goto done;
2437 		}
2438 	} else if (flags & WORKQ_SET_SELF_TIMESHARE_FLAG) {
2439 		thread_extended_policy_data_t extpol = {.timeshare = 1};
2440 
2441 		if (is_wq_thread) {
2442 			/* Not allowed on workqueue threads */
2443 			fixedpri_rv = ENOTSUP;
2444 			goto done;
2445 		}
2446 
2447 		kr = thread_policy_set_internal(th, THREAD_EXTENDED_POLICY,
2448 		    (thread_policy_t)&extpol, THREAD_EXTENDED_POLICY_COUNT);
2449 		if (kr != KERN_SUCCESS) {
2450 			fixedpri_rv = EINVAL;
2451 			goto done;
2452 		}
2453 	}
2454 
2455 done:
2456 	if (qos_rv && voucher_rv) {
2457 		/* Both failed, give that a unique error. */
2458 		return EBADMSG;
2459 	}
2460 
2461 	if (unbind_rv) {
2462 		return unbind_rv;
2463 	}
2464 
2465 	if (qos_rv) {
2466 		return qos_rv;
2467 	}
2468 
2469 	if (voucher_rv) {
2470 		return voucher_rv;
2471 	}
2472 
2473 	if (fixedpri_rv) {
2474 		return fixedpri_rv;
2475 	}
2476 
2477 
2478 	return 0;
2479 }
2480 
2481 static int
bsdthread_add_explicit_override(proc_t p,mach_port_name_t kport,pthread_priority_t pp,user_addr_t resource)2482 bsdthread_add_explicit_override(proc_t p, mach_port_name_t kport,
2483     pthread_priority_t pp, user_addr_t resource)
2484 {
2485 	thread_qos_t qos = _pthread_priority_thread_qos(pp);
2486 	if (qos == THREAD_QOS_UNSPECIFIED) {
2487 		return EINVAL;
2488 	}
2489 
2490 	thread_t th = port_name_to_thread(kport,
2491 	    PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2492 	if (th == THREAD_NULL) {
2493 		return ESRCH;
2494 	}
2495 
2496 	int rv = proc_thread_qos_add_override(p->task, th, 0, qos, TRUE,
2497 	    resource, THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2498 
2499 	thread_deallocate(th);
2500 	return rv;
2501 }
2502 
2503 static int
bsdthread_remove_explicit_override(proc_t p,mach_port_name_t kport,user_addr_t resource)2504 bsdthread_remove_explicit_override(proc_t p, mach_port_name_t kport,
2505     user_addr_t resource)
2506 {
2507 	thread_t th = port_name_to_thread(kport,
2508 	    PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2509 	if (th == THREAD_NULL) {
2510 		return ESRCH;
2511 	}
2512 
2513 	int rv = proc_thread_qos_remove_override(p->task, th, 0, resource,
2514 	    THREAD_QOS_OVERRIDE_TYPE_PTHREAD_EXPLICIT_OVERRIDE);
2515 
2516 	thread_deallocate(th);
2517 	return rv;
2518 }
2519 
2520 static int
workq_thread_add_dispatch_override(proc_t p,mach_port_name_t kport,pthread_priority_t pp,user_addr_t ulock_addr)2521 workq_thread_add_dispatch_override(proc_t p, mach_port_name_t kport,
2522     pthread_priority_t pp, user_addr_t ulock_addr)
2523 {
2524 	struct uu_workq_policy old_pri, new_pri;
2525 	struct workqueue *wq = proc_get_wqptr(p);
2526 
2527 	thread_qos_t qos_override = _pthread_priority_thread_qos(pp);
2528 	if (qos_override == THREAD_QOS_UNSPECIFIED) {
2529 		return EINVAL;
2530 	}
2531 
2532 	thread_t thread = port_name_to_thread(kport,
2533 	    PORT_INTRANS_THREAD_IN_CURRENT_TASK);
2534 	if (thread == THREAD_NULL) {
2535 		return ESRCH;
2536 	}
2537 
2538 	struct uthread *uth = get_bsdthread_info(thread);
2539 	if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2540 		thread_deallocate(thread);
2541 		return EPERM;
2542 	}
2543 
2544 	WQ_TRACE_WQ(TRACE_wq_override_dispatch | DBG_FUNC_NONE,
2545 	    wq, thread_tid(thread), 1, pp);
2546 
2547 	thread_mtx_lock(thread);
2548 
2549 	if (ulock_addr) {
2550 		uint32_t val;
2551 		int rc;
2552 		/*
2553 		 * Workaround lack of explicit support for 'no-fault copyin'
2554 		 * <rdar://problem/24999882>, as disabling preemption prevents paging in
2555 		 */
2556 		disable_preemption();
2557 		rc = copyin_atomic32(ulock_addr, &val);
2558 		enable_preemption();
2559 		if (rc == 0 && ulock_owner_value_to_port_name(val) != kport) {
2560 			goto out;
2561 		}
2562 	}
2563 
2564 	workq_lock_spin(wq);
2565 
2566 	old_pri = uth->uu_workq_pri;
2567 	if (old_pri.qos_override >= qos_override) {
2568 		/* Nothing to do */
2569 	} else if (thread == current_thread()) {
2570 		new_pri = old_pri;
2571 		new_pri.qos_override = qos_override;
2572 		workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2573 	} else {
2574 		uth->uu_workq_pri.qos_override = qos_override;
2575 		if (qos_override > workq_pri_override(old_pri)) {
2576 			thread_set_workq_override(thread, qos_override);
2577 		}
2578 	}
2579 
2580 	workq_unlock(wq);
2581 
2582 out:
2583 	thread_mtx_unlock(thread);
2584 	thread_deallocate(thread);
2585 	return 0;
2586 }
2587 
2588 static int
workq_thread_reset_dispatch_override(proc_t p,thread_t thread)2589 workq_thread_reset_dispatch_override(proc_t p, thread_t thread)
2590 {
2591 	struct uu_workq_policy old_pri, new_pri;
2592 	struct workqueue *wq = proc_get_wqptr(p);
2593 	struct uthread *uth = get_bsdthread_info(thread);
2594 
2595 	if ((thread_get_tag(thread) & THREAD_TAG_WORKQUEUE) == 0) {
2596 		return EPERM;
2597 	}
2598 
2599 	WQ_TRACE_WQ(TRACE_wq_override_reset | DBG_FUNC_NONE, wq, 0, 0, 0);
2600 
2601 	workq_lock_spin(wq);
2602 	old_pri = new_pri = uth->uu_workq_pri;
2603 	new_pri.qos_override = THREAD_QOS_UNSPECIFIED;
2604 	workq_thread_update_bucket(p, wq, uth, old_pri, new_pri, false);
2605 	workq_unlock(wq);
2606 	return 0;
2607 }
2608 
2609 static int
workq_thread_allow_kill(__unused proc_t p,thread_t thread,bool enable)2610 workq_thread_allow_kill(__unused proc_t p, thread_t thread, bool enable)
2611 {
2612 	if (!(thread_get_tag(thread) & THREAD_TAG_WORKQUEUE)) {
2613 		// If the thread isn't a workqueue thread, don't set the
2614 		// kill_allowed bit; however, we still need to return 0
2615 		// instead of an error code since this code is executed
2616 		// on the abort path which needs to not depend on the
2617 		// pthread_t (returning an error depends on pthread_t via
2618 		// cerror_nocancel)
2619 		return 0;
2620 	}
2621 	struct uthread *uth = get_bsdthread_info(thread);
2622 	uth->uu_workq_pthread_kill_allowed = enable;
2623 	return 0;
2624 }
2625 
2626 static int
bsdthread_get_max_parallelism(thread_qos_t qos,unsigned long flags,int * retval)2627 bsdthread_get_max_parallelism(thread_qos_t qos, unsigned long flags,
2628     int *retval)
2629 {
2630 	static_assert(QOS_PARALLELISM_COUNT_LOGICAL ==
2631 	    _PTHREAD_QOS_PARALLELISM_COUNT_LOGICAL, "logical");
2632 	static_assert(QOS_PARALLELISM_REALTIME ==
2633 	    _PTHREAD_QOS_PARALLELISM_REALTIME, "realtime");
2634 	static_assert(QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE ==
2635 	    _PTHREAD_QOS_PARALLELISM_CLUSTER_SHARED_RSRC, "cluster shared resource");
2636 
2637 	if (flags & ~(QOS_PARALLELISM_REALTIME | QOS_PARALLELISM_COUNT_LOGICAL | QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE)) {
2638 		return EINVAL;
2639 	}
2640 
2641 	/* No units are present */
2642 	if (flags & QOS_PARALLELISM_CLUSTER_SHARED_RESOURCE) {
2643 		return ENOTSUP;
2644 	}
2645 
2646 	if (flags & QOS_PARALLELISM_REALTIME) {
2647 		if (qos) {
2648 			return EINVAL;
2649 		}
2650 	} else if (qos == THREAD_QOS_UNSPECIFIED || qos >= THREAD_QOS_LAST) {
2651 		return EINVAL;
2652 	}
2653 
2654 	*retval = qos_max_parallelism(qos, flags);
2655 	return 0;
2656 }
2657 
2658 static int
bsdthread_dispatch_apply_attr(__unused struct proc * p,thread_t thread,unsigned long flags,uint64_t value1,__unused uint64_t value2)2659 bsdthread_dispatch_apply_attr(__unused struct proc *p, thread_t thread,
2660     unsigned long flags, uint64_t value1, __unused uint64_t value2)
2661 {
2662 	uint32_t apply_worker_index;
2663 	kern_return_t kr;
2664 
2665 	switch (flags) {
2666 	case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_SET:
2667 		apply_worker_index = (uint32_t)value1;
2668 		kr = thread_shared_rsrc_policy_set(thread, apply_worker_index, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2669 		/*
2670 		 * KERN_INVALID_POLICY indicates that the thread was trying to bind to a
2671 		 * cluster which it was not eligible to execute on.
2672 		 */
2673 		return (kr == KERN_SUCCESS) ? 0 : ((kr == KERN_INVALID_POLICY) ? ENOTSUP : EINVAL);
2674 	case _PTHREAD_DISPATCH_APPLY_ATTR_CLUSTER_SHARED_RSRC_CLEAR:
2675 		kr = thread_shared_rsrc_policy_clear(thread, CLUSTER_SHARED_RSRC_TYPE_RR, SHARED_RSRC_POLICY_AGENT_DISPATCH);
2676 		return (kr == KERN_SUCCESS) ? 0 : EINVAL;
2677 	default:
2678 		return EINVAL;
2679 	}
2680 }
2681 
2682 #define ENSURE_UNUSED(arg) \
2683 	        ({ if ((arg) != 0) { return EINVAL; } })
2684 
2685 int
bsdthread_ctl(struct proc * p,struct bsdthread_ctl_args * uap,int * retval)2686 bsdthread_ctl(struct proc *p, struct bsdthread_ctl_args *uap, int *retval)
2687 {
2688 	switch (uap->cmd) {
2689 	case BSDTHREAD_CTL_QOS_OVERRIDE_START:
2690 		return bsdthread_add_explicit_override(p, (mach_port_name_t)uap->arg1,
2691 		           (pthread_priority_t)uap->arg2, uap->arg3);
2692 	case BSDTHREAD_CTL_QOS_OVERRIDE_END:
2693 		ENSURE_UNUSED(uap->arg3);
2694 		return bsdthread_remove_explicit_override(p, (mach_port_name_t)uap->arg1,
2695 		           (user_addr_t)uap->arg2);
2696 
2697 	case BSDTHREAD_CTL_QOS_OVERRIDE_DISPATCH:
2698 		return workq_thread_add_dispatch_override(p, (mach_port_name_t)uap->arg1,
2699 		           (pthread_priority_t)uap->arg2, uap->arg3);
2700 	case BSDTHREAD_CTL_QOS_OVERRIDE_RESET:
2701 		return workq_thread_reset_dispatch_override(p, current_thread());
2702 
2703 	case BSDTHREAD_CTL_SET_SELF:
2704 		return bsdthread_set_self(p, current_thread(),
2705 		           (pthread_priority_t)uap->arg1, (mach_port_name_t)uap->arg2,
2706 		           (enum workq_set_self_flags)uap->arg3);
2707 
2708 	case BSDTHREAD_CTL_QOS_MAX_PARALLELISM:
2709 		ENSURE_UNUSED(uap->arg3);
2710 		return bsdthread_get_max_parallelism((thread_qos_t)uap->arg1,
2711 		           (unsigned long)uap->arg2, retval);
2712 	case BSDTHREAD_CTL_WORKQ_ALLOW_KILL:
2713 		ENSURE_UNUSED(uap->arg2);
2714 		ENSURE_UNUSED(uap->arg3);
2715 		return workq_thread_allow_kill(p, current_thread(), (bool)uap->arg1);
2716 	case BSDTHREAD_CTL_DISPATCH_APPLY_ATTR:
2717 		return bsdthread_dispatch_apply_attr(p, current_thread(),
2718 		           (unsigned long)uap->arg1, (uint64_t)uap->arg2,
2719 		           (uint64_t)uap->arg3);
2720 	case BSDTHREAD_CTL_SET_QOS:
2721 	case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_ADD:
2722 	case BSDTHREAD_CTL_QOS_DISPATCH_ASYNCHRONOUS_OVERRIDE_RESET:
2723 		/* no longer supported */
2724 		return ENOTSUP;
2725 
2726 	default:
2727 		return EINVAL;
2728 	}
2729 }
2730 
2731 #pragma mark workqueue thread manipulation
2732 
2733 static void __dead2
2734 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2735     struct uthread *uth, uint32_t setup_flags);
2736 
2737 static void __dead2
2738 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
2739     struct uthread *uth, uint32_t setup_flags);
2740 
2741 static void workq_setup_and_run(proc_t p, struct uthread *uth, int flags) __dead2;
2742 
2743 #if KDEBUG_LEVEL >= KDEBUG_LEVEL_STANDARD
2744 static inline uint64_t
workq_trace_req_id(workq_threadreq_t req)2745 workq_trace_req_id(workq_threadreq_t req)
2746 {
2747 	struct kqworkloop *kqwl;
2748 	if (req->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
2749 		kqwl = __container_of(req, struct kqworkloop, kqwl_request);
2750 		return kqwl->kqwl_dynamicid;
2751 	}
2752 
2753 	return VM_KERNEL_ADDRHIDE(req);
2754 }
2755 #endif
2756 
2757 /**
2758  * Entry point for libdispatch to ask for threads
2759  */
2760 static int
workq_reqthreads(struct proc * p,uint32_t reqcount,pthread_priority_t pp,bool cooperative)2761 workq_reqthreads(struct proc *p, uint32_t reqcount, pthread_priority_t pp, bool cooperative)
2762 {
2763 	thread_qos_t qos = _pthread_priority_thread_qos(pp);
2764 	struct workqueue *wq = proc_get_wqptr(p);
2765 	uint32_t unpaced, upcall_flags = WQ_FLAG_THREAD_NEWSPI;
2766 	int ret = 0;
2767 
2768 	if (wq == NULL || reqcount <= 0 || reqcount > UINT16_MAX ||
2769 	    qos == THREAD_QOS_UNSPECIFIED) {
2770 		ret = EINVAL;
2771 		goto exit;
2772 	}
2773 
2774 	WQ_TRACE_WQ(TRACE_wq_wqops_reqthreads | DBG_FUNC_NONE,
2775 	    wq, reqcount, pp, cooperative);
2776 
2777 	workq_threadreq_t req = zalloc(workq_zone_threadreq);
2778 	priority_queue_entry_init(&req->tr_entry);
2779 	req->tr_state = WORKQ_TR_STATE_NEW;
2780 	req->tr_qos   = qos;
2781 	workq_tr_flags_t tr_flags = 0;
2782 
2783 	if (pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
2784 		tr_flags |= WORKQ_TR_FLAG_OVERCOMMIT;
2785 		upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
2786 	}
2787 
2788 	if (cooperative) {
2789 		tr_flags |= WORKQ_TR_FLAG_COOPERATIVE;
2790 		upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
2791 
2792 		if (reqcount > 1) {
2793 			ret = ENOTSUP;
2794 			goto free_and_exit;
2795 		}
2796 	}
2797 
2798 	/* A thread request cannot be both overcommit and cooperative */
2799 	if (workq_tr_is_cooperative(tr_flags) &&
2800 	    workq_tr_is_overcommit(tr_flags)) {
2801 		ret = EINVAL;
2802 		goto free_and_exit;
2803 	}
2804 	req->tr_flags = tr_flags;
2805 
2806 	WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE,
2807 	    wq, workq_trace_req_id(req), req->tr_qos, reqcount);
2808 
2809 	workq_lock_spin(wq);
2810 	do {
2811 		if (_wq_exiting(wq)) {
2812 			goto unlock_and_exit;
2813 		}
2814 
2815 		/*
2816 		 * When userspace is asking for parallelism, wakeup up to (reqcount - 1)
2817 		 * threads without pacing, to inform the scheduler of that workload.
2818 		 *
2819 		 * The last requests, or the ones that failed the admission checks are
2820 		 * enqueued and go through the regular creator codepath.
2821 		 *
2822 		 * If there aren't enough threads, add one, but re-evaluate everything
2823 		 * as conditions may now have changed.
2824 		 */
2825 		unpaced = reqcount - 1;
2826 
2827 		if (reqcount > 1) {
2828 			/* We don't handle asking for parallelism on the cooperative
2829 			 * workqueue just yet */
2830 			assert(!workq_threadreq_is_cooperative(req));
2831 
2832 			if (workq_threadreq_is_nonovercommit(req)) {
2833 				unpaced = workq_constrained_allowance(wq, qos, NULL, false);
2834 				if (unpaced >= reqcount - 1) {
2835 					unpaced = reqcount - 1;
2836 				}
2837 			}
2838 		}
2839 
2840 		/*
2841 		 * This path does not currently handle custom workloop parameters
2842 		 * when creating threads for parallelism.
2843 		 */
2844 		assert(!(req->tr_flags & WORKQ_TR_FLAG_WL_PARAMS));
2845 
2846 		/*
2847 		 * This is a trimmed down version of workq_threadreq_bind_and_unlock()
2848 		 */
2849 		while (unpaced > 0 && wq->wq_thidlecount) {
2850 			struct uthread *uth;
2851 			bool needs_wakeup;
2852 			uint8_t uu_flags = UT_WORKQ_EARLY_BOUND;
2853 
2854 			if (workq_tr_is_overcommit(req->tr_flags)) {
2855 				uu_flags |= UT_WORKQ_OVERCOMMIT;
2856 			}
2857 
2858 			uth = workq_pop_idle_thread(wq, uu_flags, &needs_wakeup);
2859 
2860 			_wq_thactive_inc(wq, qos);
2861 			wq->wq_thscheduled_count[_wq_bucket(qos)]++;
2862 			workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
2863 			wq->wq_fulfilled++;
2864 
2865 			uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
2866 			uth->uu_save.uus_workq_park_data.thread_request = req;
2867 			if (needs_wakeup) {
2868 				workq_thread_wakeup(uth);
2869 			}
2870 			unpaced--;
2871 			reqcount--;
2872 		}
2873 	} while (unpaced && wq->wq_nthreads < wq_max_threads &&
2874 	    workq_add_new_idle_thread(p, wq));
2875 
2876 	if (_wq_exiting(wq)) {
2877 		goto unlock_and_exit;
2878 	}
2879 
2880 	req->tr_count = (uint16_t)reqcount;
2881 	if (workq_threadreq_enqueue(wq, req)) {
2882 		/* This can drop the workqueue lock, and take it again */
2883 		workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
2884 	}
2885 	workq_unlock(wq);
2886 	return 0;
2887 
2888 unlock_and_exit:
2889 	workq_unlock(wq);
2890 free_and_exit:
2891 	zfree(workq_zone_threadreq, req);
2892 exit:
2893 	return ret;
2894 }
2895 
2896 bool
workq_kern_threadreq_initiate(struct proc * p,workq_threadreq_t req,struct turnstile * workloop_ts,thread_qos_t qos,workq_kern_threadreq_flags_t flags)2897 workq_kern_threadreq_initiate(struct proc *p, workq_threadreq_t req,
2898     struct turnstile *workloop_ts, thread_qos_t qos,
2899     workq_kern_threadreq_flags_t flags)
2900 {
2901 	struct workqueue *wq = proc_get_wqptr_fast(p);
2902 	struct uthread *uth = NULL;
2903 
2904 	assert(req->tr_flags & (WORKQ_TR_FLAG_WORKLOOP | WORKQ_TR_FLAG_KEVENT));
2905 
2906 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
2907 		workq_threadreq_param_t trp = kqueue_threadreq_workloop_param(req);
2908 		qos = thread_workq_qos_for_pri(trp.trp_pri);
2909 		if (qos == THREAD_QOS_UNSPECIFIED) {
2910 			qos = WORKQ_THREAD_QOS_ABOVEUI;
2911 		}
2912 	}
2913 
2914 	assert(req->tr_state == WORKQ_TR_STATE_IDLE);
2915 	priority_queue_entry_init(&req->tr_entry);
2916 	req->tr_count = 1;
2917 	req->tr_state = WORKQ_TR_STATE_NEW;
2918 	req->tr_qos   = qos;
2919 
2920 	WQ_TRACE_WQ(TRACE_wq_thread_request_initiate | DBG_FUNC_NONE, wq,
2921 	    workq_trace_req_id(req), qos, 1);
2922 
2923 	if (flags & WORKQ_THREADREQ_ATTEMPT_REBIND) {
2924 		/*
2925 		 * we're called back synchronously from the context of
2926 		 * kqueue_threadreq_unbind from within workq_thread_return()
2927 		 * we can try to match up this thread with this request !
2928 		 */
2929 		uth = current_uthread();
2930 		assert(uth->uu_kqr_bound == NULL);
2931 	}
2932 
2933 	workq_lock_spin(wq);
2934 	if (_wq_exiting(wq)) {
2935 		req->tr_state = WORKQ_TR_STATE_IDLE;
2936 		workq_unlock(wq);
2937 		return false;
2938 	}
2939 
2940 	if (uth && workq_threadreq_admissible(wq, uth, req)) {
2941 		/* This is the case of the rebind - we were about to park and unbind
2942 		 * when more events came so keep the binding.
2943 		 */
2944 		assert(uth != wq->wq_creator);
2945 
2946 		if (uth->uu_workq_pri.qos_bucket != req->tr_qos) {
2947 			_wq_thactive_move(wq, uth->uu_workq_pri.qos_bucket, req->tr_qos);
2948 			workq_thread_reset_pri(wq, uth, req, /*unpark*/ false);
2949 		}
2950 		/*
2951 		 * We're called from workq_kern_threadreq_initiate()
2952 		 * due to an unbind, with the kq req held.
2953 		 */
2954 		WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
2955 		    workq_trace_req_id(req), req->tr_flags, 0);
2956 		wq->wq_fulfilled++;
2957 
2958 		kqueue_threadreq_bind(p, req, get_machthread(uth), 0);
2959 	} else {
2960 		if (workloop_ts) {
2961 			workq_perform_turnstile_operation_locked(wq, ^{
2962 				turnstile_update_inheritor(workloop_ts, wq->wq_turnstile,
2963 				TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
2964 				turnstile_update_inheritor_complete(workloop_ts,
2965 				TURNSTILE_INTERLOCK_HELD);
2966 			});
2967 		}
2968 
2969 		bool reevaluate_creator_thread_group = false;
2970 #if CONFIG_PREADOPT_TG
2971 		reevaluate_creator_thread_group = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
2972 #endif
2973 		/* We enqueued the highest priority item or we may need to reevaluate if
2974 		 * the creator needs a thread group pre-adoption */
2975 		if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_thread_group) {
2976 			workq_schedule_creator(p, wq, flags);
2977 		}
2978 	}
2979 
2980 	workq_unlock(wq);
2981 
2982 	return true;
2983 }
2984 
2985 void
workq_kern_threadreq_modify(struct proc * p,workq_threadreq_t req,thread_qos_t qos,workq_kern_threadreq_flags_t flags)2986 workq_kern_threadreq_modify(struct proc *p, workq_threadreq_t req,
2987     thread_qos_t qos, workq_kern_threadreq_flags_t flags)
2988 {
2989 	struct workqueue *wq = proc_get_wqptr_fast(p);
2990 	bool make_overcommit = false;
2991 
2992 	if (req->tr_flags & WORKQ_TR_FLAG_WL_OUTSIDE_QOS) {
2993 		/* Requests outside-of-QoS shouldn't accept modify operations */
2994 		return;
2995 	}
2996 
2997 	workq_lock_spin(wq);
2998 
2999 	assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3000 	assert(req->tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP));
3001 
3002 	if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3003 		kqueue_threadreq_bind(p, req, req->tr_thread, 0);
3004 		workq_unlock(wq);
3005 		return;
3006 	}
3007 
3008 	if (flags & WORKQ_THREADREQ_MAKE_OVERCOMMIT) {
3009 		/* TODO (rokhinip): We come into this code path for kqwl thread
3010 		 * requests. kqwl requests cannot be cooperative.
3011 		 */
3012 		assert(!workq_threadreq_is_cooperative(req));
3013 
3014 		make_overcommit = workq_threadreq_is_nonovercommit(req);
3015 	}
3016 
3017 	if (_wq_exiting(wq) || (req->tr_qos == qos && !make_overcommit)) {
3018 		workq_unlock(wq);
3019 		return;
3020 	}
3021 
3022 	assert(req->tr_count == 1);
3023 	if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3024 		panic("Invalid thread request (%p) state %d", req, req->tr_state);
3025 	}
3026 
3027 	WQ_TRACE_WQ(TRACE_wq_thread_request_modify | DBG_FUNC_NONE, wq,
3028 	    workq_trace_req_id(req), qos, 0);
3029 
3030 	struct priority_queue_sched_max *pq = workq_priority_queue_for_req(wq, req);
3031 	workq_threadreq_t req_max;
3032 
3033 	/*
3034 	 * Stage 1: Dequeue the request from its priority queue.
3035 	 *
3036 	 * If we dequeue the root item of the constrained priority queue,
3037 	 * maintain the best constrained request qos invariant.
3038 	 */
3039 	if (priority_queue_remove(pq, &req->tr_entry)) {
3040 		if (workq_threadreq_is_nonovercommit(req)) {
3041 			_wq_thactive_refresh_best_constrained_req_qos(wq);
3042 		}
3043 	}
3044 
3045 	/*
3046 	 * Stage 2: Apply changes to the thread request
3047 	 *
3048 	 * If the item will not become the root of the priority queue it belongs to,
3049 	 * then we need to wait in line, just enqueue and return quickly.
3050 	 */
3051 	if (__improbable(make_overcommit)) {
3052 		req->tr_flags ^= WORKQ_TR_FLAG_OVERCOMMIT;
3053 		pq = workq_priority_queue_for_req(wq, req);
3054 	}
3055 	req->tr_qos = qos;
3056 
3057 	req_max = priority_queue_max(pq, struct workq_threadreq_s, tr_entry);
3058 	if (req_max && req_max->tr_qos >= qos) {
3059 		priority_queue_entry_set_sched_pri(pq, &req->tr_entry,
3060 		    workq_priority_for_req(req), false);
3061 		priority_queue_insert(pq, &req->tr_entry);
3062 		workq_unlock(wq);
3063 		return;
3064 	}
3065 
3066 	/*
3067 	 * Stage 3: Reevaluate whether we should run the thread request.
3068 	 *
3069 	 * Pretend the thread request is new again:
3070 	 * - adjust wq_reqcount to not count it anymore.
3071 	 * - make its state WORKQ_TR_STATE_NEW (so that workq_threadreq_bind_and_unlock
3072 	 *   properly attempts a synchronous bind)
3073 	 */
3074 	wq->wq_reqcount--;
3075 	req->tr_state = WORKQ_TR_STATE_NEW;
3076 
3077 	/* We enqueued the highest priority item or we may need to reevaluate if
3078 	 * the creator needs a thread group pre-adoption if the request got a new TG */
3079 	bool reevaluate_creator_tg = false;
3080 
3081 #if CONFIG_PREADOPT_TG
3082 	reevaluate_creator_tg = (flags & WORKQ_THREADREQ_REEVALUATE_PREADOPT_TG);
3083 #endif
3084 
3085 	if (workq_threadreq_enqueue(wq, req) || reevaluate_creator_tg) {
3086 		workq_schedule_creator(p, wq, flags);
3087 	}
3088 	workq_unlock(wq);
3089 }
3090 
3091 void
workq_kern_threadreq_lock(struct proc * p)3092 workq_kern_threadreq_lock(struct proc *p)
3093 {
3094 	workq_lock_spin(proc_get_wqptr_fast(p));
3095 }
3096 
3097 void
workq_kern_threadreq_unlock(struct proc * p)3098 workq_kern_threadreq_unlock(struct proc *p)
3099 {
3100 	workq_unlock(proc_get_wqptr_fast(p));
3101 }
3102 
3103 void
workq_kern_threadreq_update_inheritor(struct proc * p,workq_threadreq_t req,thread_t owner,struct turnstile * wl_ts,turnstile_update_flags_t flags)3104 workq_kern_threadreq_update_inheritor(struct proc *p, workq_threadreq_t req,
3105     thread_t owner, struct turnstile *wl_ts,
3106     turnstile_update_flags_t flags)
3107 {
3108 	struct workqueue *wq = proc_get_wqptr_fast(p);
3109 	turnstile_inheritor_t inheritor;
3110 
3111 	assert(req->tr_qos != WORKQ_THREAD_QOS_MANAGER);
3112 	assert(req->tr_flags & WORKQ_TR_FLAG_WORKLOOP);
3113 	workq_lock_held(wq);
3114 
3115 	if (req->tr_state == WORKQ_TR_STATE_BINDING) {
3116 		kqueue_threadreq_bind(p, req, req->tr_thread,
3117 		    KQUEUE_THREADERQ_BIND_NO_INHERITOR_UPDATE);
3118 		return;
3119 	}
3120 
3121 	if (_wq_exiting(wq)) {
3122 		inheritor = TURNSTILE_INHERITOR_NULL;
3123 	} else {
3124 		if (req->tr_state != WORKQ_TR_STATE_QUEUED) {
3125 			panic("Invalid thread request (%p) state %d", req, req->tr_state);
3126 		}
3127 
3128 		if (owner) {
3129 			inheritor = owner;
3130 			flags |= TURNSTILE_INHERITOR_THREAD;
3131 		} else {
3132 			inheritor = wq->wq_turnstile;
3133 			flags |= TURNSTILE_INHERITOR_TURNSTILE;
3134 		}
3135 	}
3136 
3137 	workq_perform_turnstile_operation_locked(wq, ^{
3138 		turnstile_update_inheritor(wl_ts, inheritor, flags);
3139 	});
3140 }
3141 
3142 void
workq_kern_threadreq_redrive(struct proc * p,workq_kern_threadreq_flags_t flags)3143 workq_kern_threadreq_redrive(struct proc *p, workq_kern_threadreq_flags_t flags)
3144 {
3145 	struct workqueue *wq = proc_get_wqptr_fast(p);
3146 
3147 	workq_lock_spin(wq);
3148 	workq_schedule_creator(p, wq, flags);
3149 	workq_unlock(wq);
3150 }
3151 
3152 /*
3153  * Always called at AST by the thread on itself
3154  *
3155  * Upon quantum expiry, the workqueue subsystem evaluates its state and decides
3156  * on what the thread should do next. The TSD value is always set by the thread
3157  * on itself in the kernel and cleared either by userspace when it acks the TSD
3158  * value and takes action, or by the thread in the kernel when the quantum
3159  * expires again.
3160  */
3161 void
workq_kern_quantum_expiry_reevaluate(proc_t proc,thread_t thread)3162 workq_kern_quantum_expiry_reevaluate(proc_t proc, thread_t thread)
3163 {
3164 	struct uthread *uth = get_bsdthread_info(thread);
3165 
3166 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3167 		return;
3168 	}
3169 
3170 	if (!thread_supports_cooperative_workqueue(thread)) {
3171 		panic("Quantum expired for thread that doesn't support cooperative workqueue");
3172 	}
3173 
3174 	thread_qos_t qos = uth->uu_workq_pri.qos_bucket;
3175 	if (qos == THREAD_QOS_UNSPECIFIED) {
3176 		panic("Thread should not have workq bucket of QoS UN");
3177 	}
3178 
3179 	assert(thread_has_expired_workqueue_quantum(thread, false));
3180 
3181 	struct workqueue *wq = proc_get_wqptr(proc);
3182 	assert(wq != NULL);
3183 
3184 	/*
3185 	 * For starters, we're just going to evaluate and see if we need to narrow
3186 	 * the pool and tell this thread to park if needed. In the future, we'll
3187 	 * evaluate and convey other workqueue state information like needing to
3188 	 * pump kevents, etc.
3189 	 */
3190 	uint64_t flags = 0;
3191 
3192 	workq_lock_spin(wq);
3193 
3194 	if (workq_thread_is_cooperative(uth)) {
3195 		if (!workq_cooperative_allowance(wq, qos, uth, false)) {
3196 			flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3197 		} else {
3198 			/* In the future, when we have kevent hookups for the cooperative
3199 			 * pool, we need fancier logic for what userspace should do. But
3200 			 * right now, only userspace thread requests exist - so we'll just
3201 			 * tell userspace to shuffle work items */
3202 			flags |= PTHREAD_WQ_QUANTUM_EXPIRY_SHUFFLE;
3203 		}
3204 	} else if (workq_thread_is_nonovercommit(uth)) {
3205 		if (!workq_constrained_allowance(wq, qos, uth, false)) {
3206 			flags |= PTHREAD_WQ_QUANTUM_EXPIRY_NARROW;
3207 		}
3208 	}
3209 	workq_unlock(wq);
3210 
3211 	WQ_TRACE(TRACE_wq_quantum_expiry_reevaluate, flags, 0, 0, 0);
3212 
3213 	kevent_set_workq_quantum_expiry_user_tsd(proc, thread, flags);
3214 
3215 	/* We have conveyed to userspace about what it needs to do upon quantum
3216 	 * expiry, now rearm the workqueue quantum again */
3217 	thread_arm_workqueue_quantum(get_machthread(uth));
3218 }
3219 
3220 void
workq_schedule_creator_turnstile_redrive(struct workqueue * wq,bool locked)3221 workq_schedule_creator_turnstile_redrive(struct workqueue *wq, bool locked)
3222 {
3223 	if (locked) {
3224 		workq_schedule_creator(NULL, wq, WORKQ_THREADREQ_NONE);
3225 	} else {
3226 		workq_schedule_immediate_thread_creation(wq);
3227 	}
3228 }
3229 
3230 static int
workq_thread_return(struct proc * p,struct workq_kernreturn_args * uap,struct workqueue * wq)3231 workq_thread_return(struct proc *p, struct workq_kernreturn_args *uap,
3232     struct workqueue *wq)
3233 {
3234 	thread_t th = current_thread();
3235 	struct uthread *uth = get_bsdthread_info(th);
3236 	workq_threadreq_t kqr = uth->uu_kqr_bound;
3237 	workq_threadreq_param_t trp = { };
3238 	int nevents = uap->affinity, error;
3239 	user_addr_t eventlist = uap->item;
3240 
3241 	if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3242 	    (uth->uu_workq_flags & UT_WORKQ_DYING)) {
3243 		return EINVAL;
3244 	}
3245 
3246 	if (eventlist && nevents && kqr == NULL) {
3247 		return EINVAL;
3248 	}
3249 
3250 	/* reset signal mask on the workqueue thread to default state */
3251 	if (uth->uu_sigmask != (sigset_t)(~workq_threadmask)) {
3252 		proc_lock(p);
3253 		uth->uu_sigmask = ~workq_threadmask;
3254 		proc_unlock(p);
3255 	}
3256 
3257 	if (kqr && kqr->tr_flags & WORKQ_TR_FLAG_WL_PARAMS) {
3258 		/*
3259 		 * Ensure we store the threadreq param before unbinding
3260 		 * the kqr from this thread.
3261 		 */
3262 		trp = kqueue_threadreq_workloop_param(kqr);
3263 	}
3264 
3265 	/*
3266 	 * Freeze the base pri while we decide the fate of this thread.
3267 	 *
3268 	 * Either:
3269 	 * - we return to user and kevent_cleanup will have unfrozen the base pri,
3270 	 * - or we proceed to workq_select_threadreq_or_park_and_unlock() who will.
3271 	 */
3272 	thread_freeze_base_pri(th);
3273 
3274 	if (kqr) {
3275 		uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI | WQ_FLAG_THREAD_REUSE;
3276 		if (kqr->tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
3277 			upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
3278 		} else {
3279 			upcall_flags |= WQ_FLAG_THREAD_KEVENT;
3280 		}
3281 		if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
3282 			upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
3283 		} else {
3284 			if (workq_thread_is_overcommit(uth)) {
3285 				upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
3286 			}
3287 			if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
3288 				upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
3289 			} else {
3290 				upcall_flags |= uth->uu_workq_pri.qos_req |
3291 				    WQ_FLAG_THREAD_PRIO_QOS;
3292 			}
3293 		}
3294 		error = pthread_functions->workq_handle_stack_events(p, th,
3295 		    get_task_map(p->task), uth->uu_workq_stackaddr,
3296 		    uth->uu_workq_thport, eventlist, nevents, upcall_flags);
3297 		if (error) {
3298 			assert(uth->uu_kqr_bound == kqr);
3299 			return error;
3300 		}
3301 
3302 		// pthread is supposed to pass KEVENT_FLAG_PARKING here
3303 		// which should cause the above call to either:
3304 		// - not return
3305 		// - return an error
3306 		// - return 0 and have unbound properly
3307 		assert(uth->uu_kqr_bound == NULL);
3308 	}
3309 
3310 	WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_END, wq, uap->options, 0, 0);
3311 
3312 	thread_sched_call(th, NULL);
3313 	thread_will_park_or_terminate(th);
3314 #if CONFIG_WORKLOOP_DEBUG
3315 	UU_KEVENT_HISTORY_WRITE_ENTRY(uth, { .uu_error = -1, });
3316 #endif
3317 
3318 	workq_lock_spin(wq);
3319 	WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3320 	uth->uu_save.uus_workq_park_data.workloop_params = trp.trp_value;
3321 	workq_select_threadreq_or_park_and_unlock(p, wq, uth,
3322 	    WQ_SETUP_CLEAR_VOUCHER);
3323 	__builtin_unreachable();
3324 }
3325 
3326 /**
3327  * Multiplexed call to interact with the workqueue mechanism
3328  */
3329 int
workq_kernreturn(struct proc * p,struct workq_kernreturn_args * uap,int32_t * retval)3330 workq_kernreturn(struct proc *p, struct workq_kernreturn_args *uap, int32_t *retval)
3331 {
3332 	int options = uap->options;
3333 	int arg2 = uap->affinity;
3334 	int arg3 = uap->prio;
3335 	struct workqueue *wq = proc_get_wqptr(p);
3336 	int error = 0;
3337 
3338 	if ((p->p_lflag & P_LREGISTER) == 0) {
3339 		return EINVAL;
3340 	}
3341 
3342 	switch (options) {
3343 	case WQOPS_QUEUE_NEWSPISUPP: {
3344 		/*
3345 		 * arg2 = offset of serialno into dispatch queue
3346 		 * arg3 = kevent support
3347 		 */
3348 		int offset = arg2;
3349 		if (arg3 & 0x01) {
3350 			// If we get here, then userspace has indicated support for kevent delivery.
3351 		}
3352 
3353 		p->p_dispatchqueue_serialno_offset = (uint64_t)offset;
3354 		break;
3355 	}
3356 	case WQOPS_QUEUE_REQTHREADS: {
3357 		/*
3358 		 * arg2 = number of threads to start
3359 		 * arg3 = priority
3360 		 */
3361 		error = workq_reqthreads(p, arg2, arg3, false);
3362 		break;
3363 	}
3364 	/* For requesting threads for the cooperative pool */
3365 	case WQOPS_QUEUE_REQTHREADS2: {
3366 		/*
3367 		 * arg2 = number of threads to start
3368 		 * arg3 = priority
3369 		 */
3370 		error = workq_reqthreads(p, arg2, arg3, true);
3371 		break;
3372 	}
3373 	case WQOPS_SET_EVENT_MANAGER_PRIORITY: {
3374 		/*
3375 		 * arg2 = priority for the manager thread
3376 		 *
3377 		 * if _PTHREAD_PRIORITY_SCHED_PRI_FLAG is set,
3378 		 * the low bits of the value contains a scheduling priority
3379 		 * instead of a QOS value
3380 		 */
3381 		pthread_priority_t pri = arg2;
3382 
3383 		if (wq == NULL) {
3384 			error = EINVAL;
3385 			break;
3386 		}
3387 
3388 		/*
3389 		 * Normalize the incoming priority so that it is ordered numerically.
3390 		 */
3391 		if (pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
3392 			pri &= (_PTHREAD_PRIORITY_SCHED_PRI_MASK |
3393 			    _PTHREAD_PRIORITY_SCHED_PRI_FLAG);
3394 		} else {
3395 			thread_qos_t qos = _pthread_priority_thread_qos(pri);
3396 			int relpri = _pthread_priority_relpri(pri);
3397 			if (relpri > 0 || relpri < THREAD_QOS_MIN_TIER_IMPORTANCE ||
3398 			    qos == THREAD_QOS_UNSPECIFIED) {
3399 				error = EINVAL;
3400 				break;
3401 			}
3402 			pri &= ~_PTHREAD_PRIORITY_FLAGS_MASK;
3403 		}
3404 
3405 		/*
3406 		 * If userspace passes a scheduling priority, that wins over any QoS.
3407 		 * Userspace should takes care not to lower the priority this way.
3408 		 */
3409 		workq_lock_spin(wq);
3410 		if (wq->wq_event_manager_priority < (uint32_t)pri) {
3411 			wq->wq_event_manager_priority = (uint32_t)pri;
3412 		}
3413 		workq_unlock(wq);
3414 		break;
3415 	}
3416 	case WQOPS_THREAD_KEVENT_RETURN:
3417 	case WQOPS_THREAD_WORKLOOP_RETURN:
3418 	case WQOPS_THREAD_RETURN: {
3419 		error = workq_thread_return(p, uap, wq);
3420 		break;
3421 	}
3422 
3423 	case WQOPS_SHOULD_NARROW: {
3424 		/*
3425 		 * arg2 = priority to test
3426 		 * arg3 = unused
3427 		 */
3428 		thread_t th = current_thread();
3429 		struct uthread *uth = get_bsdthread_info(th);
3430 		if (((thread_get_tag(th) & THREAD_TAG_WORKQUEUE) == 0) ||
3431 		    (uth->uu_workq_flags & (UT_WORKQ_DYING | UT_WORKQ_OVERCOMMIT))) {
3432 			error = EINVAL;
3433 			break;
3434 		}
3435 
3436 		thread_qos_t qos = _pthread_priority_thread_qos(arg2);
3437 		if (qos == THREAD_QOS_UNSPECIFIED) {
3438 			error = EINVAL;
3439 			break;
3440 		}
3441 		workq_lock_spin(wq);
3442 		bool should_narrow = !workq_constrained_allowance(wq, qos, uth, false);
3443 		workq_unlock(wq);
3444 
3445 		*retval = should_narrow;
3446 		break;
3447 	}
3448 	case WQOPS_SETUP_DISPATCH: {
3449 		/*
3450 		 * item = pointer to workq_dispatch_config structure
3451 		 * arg2 = sizeof(item)
3452 		 */
3453 		struct workq_dispatch_config cfg;
3454 		bzero(&cfg, sizeof(cfg));
3455 
3456 		error = copyin(uap->item, &cfg, MIN(sizeof(cfg), (unsigned long) arg2));
3457 		if (error) {
3458 			break;
3459 		}
3460 
3461 		if (cfg.wdc_flags & ~WORKQ_DISPATCH_SUPPORTED_FLAGS ||
3462 		    cfg.wdc_version < WORKQ_DISPATCH_MIN_SUPPORTED_VERSION) {
3463 			error = ENOTSUP;
3464 			break;
3465 		}
3466 
3467 		/* Load fields from version 1 */
3468 		p->p_dispatchqueue_serialno_offset = cfg.wdc_queue_serialno_offs;
3469 
3470 		/* Load fields from version 2 */
3471 		if (cfg.wdc_version >= 2) {
3472 			p->p_dispatchqueue_label_offset = cfg.wdc_queue_label_offs;
3473 		}
3474 
3475 		break;
3476 	}
3477 	default:
3478 		error = EINVAL;
3479 		break;
3480 	}
3481 
3482 	return error;
3483 }
3484 
3485 /*
3486  * We have no work to do, park ourselves on the idle list.
3487  *
3488  * Consumes the workqueue lock and does not return.
3489  */
3490 __attribute__((noreturn, noinline))
3491 static void
workq_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)3492 workq_park_and_unlock(proc_t p, struct workqueue *wq, struct uthread *uth,
3493     uint32_t setup_flags)
3494 {
3495 	assert(uth == current_uthread());
3496 	assert(uth->uu_kqr_bound == NULL);
3497 	workq_push_idle_thread(p, wq, uth, setup_flags); // may not return
3498 
3499 	workq_thread_reset_cpupercent(NULL, uth);
3500 
3501 #if CONFIG_PREADOPT_TG
3502 	/* Clear the preadoption thread group on the thread.
3503 	 *
3504 	 * Case 1:
3505 	 *		Creator thread which never picked up a thread request. We set a
3506 	 *		preadoption thread group on creator threads but if it never picked
3507 	 *		up a thread request and didn't go to userspace, then the thread will
3508 	 *		park with a preadoption thread group but no explicitly adopted
3509 	 *		voucher or work interval.
3510 	 *
3511 	 *		We drop the preadoption thread group here before proceeding to park.
3512 	 *		Note - we may get preempted when we drop the workq lock below.
3513 	 *
3514 	 * Case 2:
3515 	 *		Thread picked up a thread request and bound to it and returned back
3516 	 *		from userspace and is parking. At this point, preadoption thread
3517 	 *		group should be NULL since the thread has unbound from the thread
3518 	 *		request. So this operation should be a no-op.
3519 	 */
3520 	thread_set_preadopt_thread_group(get_machthread(uth), NULL);
3521 #endif
3522 
3523 	if ((uth->uu_workq_flags & UT_WORKQ_IDLE_CLEANUP) &&
3524 	    !(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3525 		workq_unlock(wq);
3526 
3527 		/*
3528 		 * workq_push_idle_thread() will unset `has_stack`
3529 		 * if it wants us to free the stack before parking.
3530 		 */
3531 		if (!uth->uu_save.uus_workq_park_data.has_stack) {
3532 			pthread_functions->workq_markfree_threadstack(p,
3533 			    get_machthread(uth), get_task_map(p->task),
3534 			    uth->uu_workq_stackaddr);
3535 		}
3536 
3537 		/*
3538 		 * When we remove the voucher from the thread, we may lose our importance
3539 		 * causing us to get preempted, so we do this after putting the thread on
3540 		 * the idle list.  Then, when we get our importance back we'll be able to
3541 		 * use this thread from e.g. the kevent call out to deliver a boosting
3542 		 * message.
3543 		 *
3544 		 * Note that setting the voucher to NULL will not clear the preadoption
3545 		 * thread since this thread could have become the creator again and
3546 		 * perhaps acquired a preadoption thread group.
3547 		 */
3548 		__assert_only kern_return_t kr;
3549 		kr = thread_set_voucher_name(MACH_PORT_NULL);
3550 		assert(kr == KERN_SUCCESS);
3551 
3552 		workq_lock_spin(wq);
3553 		uth->uu_workq_flags &= ~UT_WORKQ_IDLE_CLEANUP;
3554 		setup_flags &= ~WQ_SETUP_CLEAR_VOUCHER;
3555 	}
3556 
3557 	WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_END, wq, 0, 0, 0);
3558 
3559 	if (uth->uu_workq_flags & UT_WORKQ_RUNNING) {
3560 		/*
3561 		 * While we'd dropped the lock to unset our voucher, someone came
3562 		 * around and made us runnable.  But because we weren't waiting on the
3563 		 * event their thread_wakeup() was ineffectual.  To correct for that,
3564 		 * we just run the continuation ourselves.
3565 		 */
3566 		workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
3567 		__builtin_unreachable();
3568 	}
3569 
3570 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3571 		workq_unpark_for_death_and_unlock(p, wq, uth,
3572 		    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
3573 		__builtin_unreachable();
3574 	}
3575 
3576 	/* Disarm the workqueue quantum since the thread is now idle */
3577 	thread_disarm_workqueue_quantum(get_machthread(uth));
3578 
3579 	thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue);
3580 	assert_wait(workq_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
3581 	workq_unlock(wq);
3582 	thread_block(workq_unpark_continue);
3583 	__builtin_unreachable();
3584 }
3585 
3586 static inline bool
workq_may_start_event_mgr_thread(struct workqueue * wq,struct uthread * uth)3587 workq_may_start_event_mgr_thread(struct workqueue *wq, struct uthread *uth)
3588 {
3589 	/*
3590 	 * There's an event manager request and either:
3591 	 * - no event manager currently running
3592 	 * - we are re-using the event manager
3593 	 */
3594 	return wq->wq_thscheduled_count[_wq_bucket(WORKQ_THREAD_QOS_MANAGER)] == 0 ||
3595 	       (uth && uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER);
3596 }
3597 
3598 static uint32_t
workq_constrained_allowance(struct workqueue * wq,thread_qos_t at_qos,struct uthread * uth,bool may_start_timer)3599 workq_constrained_allowance(struct workqueue *wq, thread_qos_t at_qos,
3600     struct uthread *uth, bool may_start_timer)
3601 {
3602 	assert(at_qos != WORKQ_THREAD_QOS_MANAGER);
3603 	uint32_t count = 0;
3604 
3605 	uint32_t max_count = wq->wq_constrained_threads_scheduled;
3606 	if (uth && workq_thread_is_nonovercommit(uth)) {
3607 		/*
3608 		 * don't count the current thread as scheduled
3609 		 */
3610 		assert(max_count > 0);
3611 		max_count--;
3612 	}
3613 	if (max_count >= wq_max_constrained_threads) {
3614 		WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 1,
3615 		    wq->wq_constrained_threads_scheduled,
3616 		    wq_max_constrained_threads);
3617 		/*
3618 		 * we need 1 or more constrained threads to return to the kernel before
3619 		 * we can dispatch additional work
3620 		 */
3621 		return 0;
3622 	}
3623 	max_count -= wq_max_constrained_threads;
3624 
3625 	/*
3626 	 * Compute a metric for many how many threads are active.  We find the
3627 	 * highest priority request outstanding and then add up the number of active
3628 	 * threads in that and all higher-priority buckets.  We'll also add any
3629 	 * "busy" threads which are not currently active but blocked recently enough
3630 	 * that we can't be sure that they won't be unblocked soon and start
3631 	 * being active again.
3632 	 *
3633 	 * We'll then compare this metric to our max concurrency to decide whether
3634 	 * to add a new thread.
3635 	 */
3636 
3637 	uint32_t busycount, thactive_count;
3638 
3639 	thactive_count = _wq_thactive_aggregate_downto_qos(wq, _wq_thactive(wq),
3640 	    at_qos, &busycount, NULL);
3641 
3642 	if (uth && uth->uu_workq_pri.qos_bucket != WORKQ_THREAD_QOS_MANAGER &&
3643 	    at_qos <= uth->uu_workq_pri.qos_bucket) {
3644 		/*
3645 		 * Don't count this thread as currently active, but only if it's not
3646 		 * a manager thread, as _wq_thactive_aggregate_downto_qos ignores active
3647 		 * managers.
3648 		 */
3649 		assert(thactive_count > 0);
3650 		thactive_count--;
3651 	}
3652 
3653 	count = wq_max_parallelism[_wq_bucket(at_qos)];
3654 	if (count > thactive_count + busycount) {
3655 		count -= thactive_count + busycount;
3656 		WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 2,
3657 		    thactive_count, busycount);
3658 		return MIN(count, max_count);
3659 	} else {
3660 		WQ_TRACE_WQ(TRACE_wq_constrained_admission | DBG_FUNC_NONE, wq, 3,
3661 		    thactive_count, busycount);
3662 	}
3663 
3664 	if (may_start_timer) {
3665 		/*
3666 		 * If this is called from the add timer, we won't have another timer
3667 		 * fire when the thread exits the "busy" state, so rearm the timer.
3668 		 */
3669 		workq_schedule_delayed_thread_creation(wq, 0);
3670 	}
3671 
3672 	return 0;
3673 }
3674 
3675 static bool
workq_threadreq_admissible(struct workqueue * wq,struct uthread * uth,workq_threadreq_t req)3676 workq_threadreq_admissible(struct workqueue *wq, struct uthread *uth,
3677     workq_threadreq_t req)
3678 {
3679 	if (req->tr_qos == WORKQ_THREAD_QOS_MANAGER) {
3680 		return workq_may_start_event_mgr_thread(wq, uth);
3681 	}
3682 	if (workq_threadreq_is_cooperative(req)) {
3683 		return workq_cooperative_allowance(wq, req->tr_qos, uth, true);
3684 	}
3685 	if (workq_threadreq_is_nonovercommit(req)) {
3686 		return workq_constrained_allowance(wq, req->tr_qos, uth, true);
3687 	}
3688 
3689 	return true;
3690 }
3691 
3692 /*
3693  * Called from the context of selecting thread requests for threads returning
3694  * from userspace or creator thread
3695  */
3696 static workq_threadreq_t
workq_cooperative_queue_best_req(struct workqueue * wq,struct uthread * uth)3697 workq_cooperative_queue_best_req(struct workqueue *wq, struct uthread *uth)
3698 {
3699 	workq_lock_held(wq);
3700 
3701 	/*
3702 	 * If the current thread is cooperative, we need to exclude it as part of
3703 	 * cooperative schedule count since this thread is looking for a new
3704 	 * request. Change in the schedule count for cooperative pool therefore
3705 	 * requires us to reeevaluate the next best request for it.
3706 	 */
3707 	if (uth && workq_thread_is_cooperative(uth)) {
3708 		_wq_cooperative_queue_scheduled_count_dec(wq, uth->uu_workq_pri.qos_bucket);
3709 
3710 		(void) _wq_cooperative_queue_refresh_best_req_qos(wq);
3711 
3712 		_wq_cooperative_queue_scheduled_count_inc(wq, uth->uu_workq_pri.qos_bucket);
3713 	} else {
3714 		/*
3715 		 * The old value that was already precomputed should be safe to use -
3716 		 * add an assert that asserts that the best req QoS doesn't change in
3717 		 * this case
3718 		 */
3719 		assert(_wq_cooperative_queue_refresh_best_req_qos(wq) == false);
3720 	}
3721 
3722 	thread_qos_t qos = wq->wq_cooperative_queue_best_req_qos;
3723 
3724 	/* There are no eligible requests in the cooperative pool */
3725 	if (qos == THREAD_QOS_UNSPECIFIED) {
3726 		return NULL;
3727 	}
3728 	assert(qos != WORKQ_THREAD_QOS_ABOVEUI);
3729 	assert(qos != WORKQ_THREAD_QOS_MANAGER);
3730 
3731 	int bucket = _wq_bucket(qos);
3732 	assert(!STAILQ_EMPTY(&wq->wq_cooperative_queue[bucket]));
3733 
3734 	return STAILQ_FIRST(&wq->wq_cooperative_queue[bucket]);
3735 }
3736 
3737 static workq_threadreq_t
workq_threadreq_select_for_creator(struct workqueue * wq)3738 workq_threadreq_select_for_creator(struct workqueue *wq)
3739 {
3740 	workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
3741 	thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
3742 	uint8_t pri = 0;
3743 
3744 	/*
3745 	 * Compute the best priority request, and ignore the turnstile for now
3746 	 */
3747 
3748 	req_pri = priority_queue_max(&wq->wq_special_queue,
3749 	    struct workq_threadreq_s, tr_entry);
3750 	if (req_pri) {
3751 		pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
3752 		    &req_pri->tr_entry);
3753 	}
3754 
3755 	/*
3756 	 * Handle the manager thread request. The special queue might yield
3757 	 * a higher priority, but the manager always beats the QoS world.
3758 	 */
3759 
3760 	req_mgr = wq->wq_event_manager_threadreq;
3761 	if (req_mgr && workq_may_start_event_mgr_thread(wq, NULL)) {
3762 		uint32_t mgr_pri = wq->wq_event_manager_priority;
3763 
3764 		if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
3765 			mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
3766 		} else {
3767 			mgr_pri = thread_workq_pri_for_qos(
3768 				_pthread_priority_thread_qos(mgr_pri));
3769 		}
3770 
3771 		return mgr_pri >= pri ? req_mgr : req_pri;
3772 	}
3773 
3774 	/*
3775 	 * Compute the best QoS Request, and check whether it beats the "pri" one
3776 	 *
3777 	 * Start by comparing the overcommit and the cooperative pool
3778 	 */
3779 	req_qos = priority_queue_max(&wq->wq_overcommit_queue,
3780 	    struct workq_threadreq_s, tr_entry);
3781 	if (req_qos) {
3782 		qos = req_qos->tr_qos;
3783 	}
3784 
3785 	req_tmp = workq_cooperative_queue_best_req(wq, NULL);
3786 	if (req_tmp && qos <= req_tmp->tr_qos) {
3787 		/*
3788 		 * Cooperative TR is better between overcommit and cooperative.  Note
3789 		 * that if qos is same between overcommit and cooperative, we choose
3790 		 * cooperative.
3791 		 *
3792 		 * Pick cooperative pool if it passes the admissions check
3793 		 */
3794 		if (workq_cooperative_allowance(wq, req_tmp->tr_qos, NULL, true)) {
3795 			req_qos = req_tmp;
3796 			qos = req_qos->tr_qos;
3797 		}
3798 	}
3799 
3800 	/*
3801 	 * Compare the best QoS so far - either from overcommit or from cooperative
3802 	 * pool - and compare it with the constrained pool
3803 	 */
3804 	req_tmp = priority_queue_max(&wq->wq_constrained_queue,
3805 	    struct workq_threadreq_s, tr_entry);
3806 
3807 	if (req_tmp && qos < req_tmp->tr_qos) {
3808 		/*
3809 		 * Constrained pool is best in QoS between overcommit, cooperative
3810 		 * and constrained. Now check how it fairs against the priority case
3811 		 */
3812 		if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
3813 			return req_pri;
3814 		}
3815 
3816 		if (workq_constrained_allowance(wq, req_tmp->tr_qos, NULL, true)) {
3817 			/*
3818 			 * If the constrained thread request is the best one and passes
3819 			 * the admission check, pick it.
3820 			 */
3821 			return req_tmp;
3822 		}
3823 	}
3824 
3825 	/*
3826 	 * Compare the best of the QoS world with the priority
3827 	 */
3828 	if (pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
3829 		return req_pri;
3830 	}
3831 
3832 	if (req_qos) {
3833 		return req_qos;
3834 	}
3835 
3836 	/*
3837 	 * If we had no eligible request but we have a turnstile push,
3838 	 * it must be a non overcommit thread request that failed
3839 	 * the admission check.
3840 	 *
3841 	 * Just fake a BG thread request so that if the push stops the creator
3842 	 * priority just drops to 4.
3843 	 */
3844 	if (turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile, NULL)) {
3845 		static struct workq_threadreq_s workq_sync_push_fake_req = {
3846 			.tr_qos = THREAD_QOS_BACKGROUND,
3847 		};
3848 
3849 		return &workq_sync_push_fake_req;
3850 	}
3851 
3852 	return NULL;
3853 }
3854 
3855 /*
3856  * Returns true if this caused a change in the schedule counts of the
3857  * cooperative pool
3858  */
3859 static bool
workq_adjust_cooperative_constrained_schedule_counts(struct workqueue * wq,struct uthread * uth,thread_qos_t old_thread_qos,workq_tr_flags_t tr_flags)3860 workq_adjust_cooperative_constrained_schedule_counts(struct workqueue *wq,
3861     struct uthread *uth, thread_qos_t old_thread_qos, workq_tr_flags_t tr_flags)
3862 {
3863 	workq_lock_held(wq);
3864 
3865 	/*
3866 	 * Row: thread type
3867 	 * Column: Request type
3868 	 *
3869 	 *					overcommit		non-overcommit		cooperative
3870 	 * overcommit			X				case 1				case 2
3871 	 * cooperative		case 3				case 4				case 5
3872 	 * non-overcommit	case 6					X				case 7
3873 	 *
3874 	 * Move the thread to the right bucket depending on what state it currently
3875 	 * has and what state the thread req it picks, is going to have.
3876 	 *
3877 	 * Note that the creator thread is an overcommit thread.
3878 	 */
3879 	thread_qos_t new_thread_qos = uth->uu_workq_pri.qos_bucket;
3880 
3881 	/*
3882 	 * Anytime a cooperative bucket's schedule count changes, we need to
3883 	 * potentially refresh the next best QoS for that pool when we determine
3884 	 * the next request for the creator
3885 	 */
3886 	bool cooperative_pool_sched_count_changed = false;
3887 
3888 	if (workq_thread_is_overcommit(uth)) {
3889 		if (workq_tr_is_nonovercommit(tr_flags)) {
3890 			// Case 1: thread is overcommit, req is non-overcommit
3891 			wq->wq_constrained_threads_scheduled++;
3892 		} else if (workq_tr_is_cooperative(tr_flags)) {
3893 			// Case 2: thread is overcommit, req is cooperative
3894 			_wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
3895 			cooperative_pool_sched_count_changed = true;
3896 		}
3897 	} else if (workq_thread_is_cooperative(uth)) {
3898 		if (workq_tr_is_overcommit(tr_flags)) {
3899 			// Case 3: thread is cooperative, req is overcommit
3900 			_wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3901 		} else if (workq_tr_is_nonovercommit(tr_flags)) {
3902 			// Case 4: thread is cooperative, req is non-overcommit
3903 			_wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3904 			wq->wq_constrained_threads_scheduled++;
3905 		} else {
3906 			// Case 5: thread is cooperative, req is also cooperative
3907 			assert(workq_tr_is_cooperative(tr_flags));
3908 			_wq_cooperative_queue_scheduled_count_dec(wq, old_thread_qos);
3909 			_wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
3910 		}
3911 		cooperative_pool_sched_count_changed = true;
3912 	} else {
3913 		if (workq_tr_is_overcommit(tr_flags)) {
3914 			// Case 6: Thread is non-overcommit, req is overcommit
3915 			wq->wq_constrained_threads_scheduled--;
3916 		} else if (workq_tr_is_cooperative(tr_flags)) {
3917 			// Case 7: Thread is non-overcommit, req is cooperative
3918 			wq->wq_constrained_threads_scheduled--;
3919 			_wq_cooperative_queue_scheduled_count_inc(wq, new_thread_qos);
3920 			cooperative_pool_sched_count_changed = true;
3921 		}
3922 	}
3923 
3924 	return cooperative_pool_sched_count_changed;
3925 }
3926 
3927 static workq_threadreq_t
workq_threadreq_select(struct workqueue * wq,struct uthread * uth)3928 workq_threadreq_select(struct workqueue *wq, struct uthread *uth)
3929 {
3930 	workq_threadreq_t req_qos, req_pri, req_tmp, req_mgr;
3931 	uintptr_t proprietor;
3932 	thread_qos_t qos = THREAD_QOS_UNSPECIFIED;
3933 	uint8_t pri = 0;
3934 
3935 	if (uth == wq->wq_creator) {
3936 		uth = NULL;
3937 	}
3938 
3939 	/*
3940 	 * Compute the best priority request (special or turnstile)
3941 	 */
3942 
3943 	pri = (uint8_t)turnstile_workq_proprietor_of_max_turnstile(wq->wq_turnstile,
3944 	    &proprietor);
3945 	if (pri) {
3946 		struct kqworkloop *kqwl = (struct kqworkloop *)proprietor;
3947 		req_pri = &kqwl->kqwl_request;
3948 		if (req_pri->tr_state != WORKQ_TR_STATE_QUEUED) {
3949 			panic("Invalid thread request (%p) state %d",
3950 			    req_pri, req_pri->tr_state);
3951 		}
3952 	} else {
3953 		req_pri = NULL;
3954 	}
3955 
3956 	req_tmp = priority_queue_max(&wq->wq_special_queue,
3957 	    struct workq_threadreq_s, tr_entry);
3958 	if (req_tmp && pri < priority_queue_entry_sched_pri(&wq->wq_special_queue,
3959 	    &req_tmp->tr_entry)) {
3960 		req_pri = req_tmp;
3961 		pri = (uint8_t)priority_queue_entry_sched_pri(&wq->wq_special_queue,
3962 		    &req_tmp->tr_entry);
3963 	}
3964 
3965 	/*
3966 	 * Handle the manager thread request. The special queue might yield
3967 	 * a higher priority, but the manager always beats the QoS world.
3968 	 */
3969 
3970 	req_mgr = wq->wq_event_manager_threadreq;
3971 	if (req_mgr && workq_may_start_event_mgr_thread(wq, uth)) {
3972 		uint32_t mgr_pri = wq->wq_event_manager_priority;
3973 
3974 		if (mgr_pri & _PTHREAD_PRIORITY_SCHED_PRI_FLAG) {
3975 			mgr_pri &= _PTHREAD_PRIORITY_SCHED_PRI_MASK;
3976 		} else {
3977 			mgr_pri = thread_workq_pri_for_qos(
3978 				_pthread_priority_thread_qos(mgr_pri));
3979 		}
3980 
3981 		return mgr_pri >= pri ? req_mgr : req_pri;
3982 	}
3983 
3984 	/*
3985 	 * Compute the best QoS Request, and check whether it beats the "pri" one
3986 	 */
3987 
3988 	req_qos = priority_queue_max(&wq->wq_overcommit_queue,
3989 	    struct workq_threadreq_s, tr_entry);
3990 	if (req_qos) {
3991 		qos = req_qos->tr_qos;
3992 	}
3993 
3994 	req_tmp = workq_cooperative_queue_best_req(wq, uth);
3995 	if (req_tmp && qos <= req_tmp->tr_qos) {
3996 		/*
3997 		 * Cooperative TR is better between overcommit and cooperative.  Note
3998 		 * that if qos is same between overcommit and cooperative, we choose
3999 		 * cooperative.
4000 		 *
4001 		 * Pick cooperative pool if it passes the admissions check
4002 		 */
4003 		if (workq_cooperative_allowance(wq, req_tmp->tr_qos, uth, true)) {
4004 			req_qos = req_tmp;
4005 			qos = req_qos->tr_qos;
4006 		}
4007 	}
4008 
4009 	/*
4010 	 * Compare the best QoS so far - either from overcommit or from cooperative
4011 	 * pool - and compare it with the constrained pool
4012 	 */
4013 	req_tmp = priority_queue_max(&wq->wq_constrained_queue,
4014 	    struct workq_threadreq_s, tr_entry);
4015 
4016 	if (req_tmp && qos < req_tmp->tr_qos) {
4017 		/*
4018 		 * Constrained pool is best in QoS between overcommit, cooperative
4019 		 * and constrained. Now check how it fairs against the priority case
4020 		 */
4021 		if (pri && pri >= thread_workq_pri_for_qos(req_tmp->tr_qos)) {
4022 			return req_pri;
4023 		}
4024 
4025 		if (workq_constrained_allowance(wq, req_tmp->tr_qos, uth, true)) {
4026 			/*
4027 			 * If the constrained thread request is the best one and passes
4028 			 * the admission check, pick it.
4029 			 */
4030 			return req_tmp;
4031 		}
4032 	}
4033 
4034 	if (req_pri && (!qos || pri >= thread_workq_pri_for_qos(qos))) {
4035 		return req_pri;
4036 	}
4037 
4038 	return req_qos;
4039 }
4040 
4041 /*
4042  * The creator is an anonymous thread that is counted as scheduled,
4043  * but otherwise without its scheduler callback set or tracked as active
4044  * that is used to make other threads.
4045  *
4046  * When more requests are added or an existing one is hurried along,
4047  * a creator is elected and setup, or the existing one overridden accordingly.
4048  *
4049  * While this creator is in flight, because no request has been dequeued,
4050  * already running threads have a chance at stealing thread requests avoiding
4051  * useless context switches, and the creator once scheduled may not find any
4052  * work to do and will then just park again.
4053  *
4054  * The creator serves the dual purpose of informing the scheduler of work that
4055  * hasn't be materialized as threads yet, and also as a natural pacing mechanism
4056  * for thread creation.
4057  *
4058  * By being anonymous (and not bound to anything) it means that thread requests
4059  * can be stolen from this creator by threads already on core yielding more
4060  * efficient scheduling and reduced context switches.
4061  */
4062 static void
workq_schedule_creator(proc_t p,struct workqueue * wq,workq_kern_threadreq_flags_t flags)4063 workq_schedule_creator(proc_t p, struct workqueue *wq,
4064     workq_kern_threadreq_flags_t flags)
4065 {
4066 	workq_threadreq_t req;
4067 	struct uthread *uth;
4068 	bool needs_wakeup;
4069 
4070 	workq_lock_held(wq);
4071 	assert(p || (flags & WORKQ_THREADREQ_CAN_CREATE_THREADS) == 0);
4072 
4073 again:
4074 	uth = wq->wq_creator;
4075 
4076 	if (!wq->wq_reqcount) {
4077 		/*
4078 		 * There is no thread request left.
4079 		 *
4080 		 * If there is a creator, leave everything in place, so that it cleans
4081 		 * up itself in workq_push_idle_thread().
4082 		 *
4083 		 * Else, make sure the turnstile state is reset to no inheritor.
4084 		 */
4085 		if (uth == NULL) {
4086 			workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4087 		}
4088 		return;
4089 	}
4090 
4091 	req = workq_threadreq_select_for_creator(wq);
4092 	if (req == NULL) {
4093 		/*
4094 		 * There isn't a thread request that passes the admission check.
4095 		 *
4096 		 * If there is a creator, do not touch anything, the creator will sort
4097 		 * it out when it runs.
4098 		 *
4099 		 * Else, set the inheritor to "WORKQ" so that the turnstile propagation
4100 		 * code calls us if anything changes.
4101 		 */
4102 		if (uth == NULL) {
4103 			workq_turnstile_update_inheritor(wq, wq, TURNSTILE_INHERITOR_WORKQ);
4104 		}
4105 		return;
4106 	}
4107 
4108 
4109 	if (uth) {
4110 		/*
4111 		 * We need to maybe override the creator we already have
4112 		 */
4113 		if (workq_thread_needs_priority_change(req, uth)) {
4114 			WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4115 			    wq, 1, uthread_tid(uth), req->tr_qos);
4116 			workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4117 		}
4118 		assert(wq->wq_inheritor == get_machthread(uth));
4119 	} else if (wq->wq_thidlecount) {
4120 		/*
4121 		 * We need to unpark a creator thread
4122 		 */
4123 		wq->wq_creator = uth = workq_pop_idle_thread(wq, UT_WORKQ_OVERCOMMIT,
4124 		    &needs_wakeup);
4125 		/* Always reset the priorities on the newly chosen creator */
4126 		workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4127 		workq_turnstile_update_inheritor(wq, get_machthread(uth),
4128 		    TURNSTILE_INHERITOR_THREAD);
4129 		WQ_TRACE_WQ(TRACE_wq_creator_select | DBG_FUNC_NONE,
4130 		    wq, 2, uthread_tid(uth), req->tr_qos);
4131 		uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4132 		uth->uu_save.uus_workq_park_data.yields = 0;
4133 		if (needs_wakeup) {
4134 			workq_thread_wakeup(uth);
4135 		}
4136 	} else {
4137 		/*
4138 		 * We need to allocate a thread...
4139 		 */
4140 		if (__improbable(wq->wq_nthreads >= wq_max_threads)) {
4141 			/* out of threads, just go away */
4142 			flags = WORKQ_THREADREQ_NONE;
4143 		} else if (flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) {
4144 			act_set_astkevent(current_thread(), AST_KEVENT_REDRIVE_THREADREQ);
4145 		} else if (!(flags & WORKQ_THREADREQ_CAN_CREATE_THREADS)) {
4146 			/* This can drop the workqueue lock, and take it again */
4147 			workq_schedule_immediate_thread_creation(wq);
4148 		} else if (workq_add_new_idle_thread(p, wq)) {
4149 			goto again;
4150 		} else {
4151 			workq_schedule_delayed_thread_creation(wq, 0);
4152 		}
4153 
4154 		/*
4155 		 * If the current thread is the inheritor:
4156 		 *
4157 		 * If we set the AST, then the thread will stay the inheritor until
4158 		 * either the AST calls workq_kern_threadreq_redrive(), or it parks
4159 		 * and calls workq_push_idle_thread().
4160 		 *
4161 		 * Else, the responsibility of the thread creation is with a thread-call
4162 		 * and we need to clear the inheritor.
4163 		 */
4164 		if ((flags & WORKQ_THREADREQ_SET_AST_ON_FAILURE) == 0 &&
4165 		    wq->wq_inheritor == current_thread()) {
4166 			workq_turnstile_update_inheritor(wq, TURNSTILE_INHERITOR_NULL, 0);
4167 		}
4168 	}
4169 }
4170 
4171 /**
4172  * Same as workq_unpark_select_threadreq_or_park_and_unlock,
4173  * but do not allow early binds.
4174  *
4175  * Called with the base pri frozen, will unfreeze it.
4176  */
4177 __attribute__((noreturn, noinline))
4178 static void
workq_select_threadreq_or_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)4179 workq_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4180     struct uthread *uth, uint32_t setup_flags)
4181 {
4182 	workq_threadreq_t req = NULL;
4183 	bool is_creator = (wq->wq_creator == uth);
4184 	bool schedule_creator = false;
4185 
4186 	if (__improbable(_wq_exiting(wq))) {
4187 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 0, 0, 0);
4188 		goto park;
4189 	}
4190 
4191 	if (wq->wq_reqcount == 0) {
4192 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 1, 0, 0);
4193 		goto park;
4194 	}
4195 
4196 	req = workq_threadreq_select(wq, uth);
4197 	if (__improbable(req == NULL)) {
4198 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 2, 0, 0);
4199 		goto park;
4200 	}
4201 
4202 	thread_qos_t old_thread_bucket = uth->uu_workq_pri.qos_bucket;
4203 	uint8_t tr_flags = req->tr_flags;
4204 	struct turnstile *req_ts = kqueue_threadreq_get_turnstile(req);
4205 
4206 	/*
4207 	 * Attempt to setup ourselves as the new thing to run, moving all priority
4208 	 * pushes to ourselves.
4209 	 *
4210 	 * If the current thread is the creator, then the fact that we are presently
4211 	 * running is proof that we'll do something useful, so keep going.
4212 	 *
4213 	 * For other cases, peek at the AST to know whether the scheduler wants
4214 	 * to preempt us, if yes, park instead, and move the thread request
4215 	 * turnstile back to the workqueue.
4216 	 */
4217 	if (req_ts) {
4218 		workq_perform_turnstile_operation_locked(wq, ^{
4219 			turnstile_update_inheritor(req_ts, get_machthread(uth),
4220 			TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_THREAD);
4221 			turnstile_update_inheritor_complete(req_ts,
4222 			TURNSTILE_INTERLOCK_HELD);
4223 		});
4224 	}
4225 
4226 	/* accounting changes of aggregate thscheduled_count and thactive which has
4227 	 * to be paired with the workq_thread_reset_pri below so that we have
4228 	 * uth->uu_workq_pri match with thactive.
4229 	 *
4230 	 * This is undone when the thread parks */
4231 	if (is_creator) {
4232 		WQ_TRACE_WQ(TRACE_wq_creator_select, wq, 4, 0,
4233 		    uth->uu_save.uus_workq_park_data.yields);
4234 		wq->wq_creator = NULL;
4235 		_wq_thactive_inc(wq, req->tr_qos);
4236 		wq->wq_thscheduled_count[_wq_bucket(req->tr_qos)]++;
4237 	} else if (old_thread_bucket != req->tr_qos) {
4238 		_wq_thactive_move(wq, old_thread_bucket, req->tr_qos);
4239 	}
4240 	workq_thread_reset_pri(wq, uth, req, /*unpark*/ true);
4241 
4242 	/*
4243 	 * Make relevant accounting changes for pool specific counts.
4244 	 *
4245 	 * The schedule counts changing can affect what the next best request
4246 	 * for cooperative thread pool is if this request is dequeued.
4247 	 */
4248 	bool cooperative_sched_count_changed =
4249 	    workq_adjust_cooperative_constrained_schedule_counts(wq, uth,
4250 	    old_thread_bucket, tr_flags);
4251 
4252 	if (workq_tr_is_overcommit(tr_flags)) {
4253 		workq_thread_set_type(uth, UT_WORKQ_OVERCOMMIT);
4254 	} else if (workq_tr_is_cooperative(tr_flags)) {
4255 		workq_thread_set_type(uth, UT_WORKQ_COOPERATIVE);
4256 	} else {
4257 		workq_thread_set_type(uth, 0);
4258 	}
4259 
4260 	if (__improbable(thread_unfreeze_base_pri(get_machthread(uth)) && !is_creator)) {
4261 		if (req_ts) {
4262 			workq_perform_turnstile_operation_locked(wq, ^{
4263 				turnstile_update_inheritor(req_ts, wq->wq_turnstile,
4264 				TURNSTILE_IMMEDIATE_UPDATE | TURNSTILE_INHERITOR_TURNSTILE);
4265 				turnstile_update_inheritor_complete(req_ts,
4266 				TURNSTILE_INTERLOCK_HELD);
4267 			});
4268 		}
4269 		WQ_TRACE_WQ(TRACE_wq_select_threadreq | DBG_FUNC_NONE, wq, 3, 0, 0);
4270 		goto park_thawed;
4271 	}
4272 
4273 	/*
4274 	 * We passed all checks, dequeue the request, bind to it, and set it up
4275 	 * to return to user.
4276 	 */
4277 	WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4278 	    workq_trace_req_id(req), tr_flags, 0);
4279 	wq->wq_fulfilled++;
4280 	schedule_creator = workq_threadreq_dequeue(wq, req,
4281 	    cooperative_sched_count_changed);
4282 
4283 	workq_thread_reset_cpupercent(req, uth);
4284 
4285 	if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4286 		kqueue_threadreq_bind_prepost(p, req, uth);
4287 		req = NULL;
4288 	} else if (req->tr_count > 0) {
4289 		req = NULL;
4290 	}
4291 
4292 	if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4293 		uth->uu_workq_flags ^= UT_WORKQ_NEW;
4294 		setup_flags |= WQ_SETUP_FIRST_USE;
4295 	}
4296 
4297 	/* If one of the following is true, call workq_schedule_creator (which also
4298 	 * adjusts priority of existing creator):
4299 	 *
4300 	 *	  - We are the creator currently so the wq may need a new creator
4301 	 *	  - The request we're binding to is the highest priority one, existing
4302 	 *	  creator's priority might need to be adjusted to reflect the next
4303 	 *	  highest TR
4304 	 */
4305 	if (is_creator || schedule_creator) {
4306 		/* This can drop the workqueue lock, and take it again */
4307 		workq_schedule_creator(p, wq, WORKQ_THREADREQ_CAN_CREATE_THREADS);
4308 	}
4309 
4310 	workq_unlock(wq);
4311 
4312 	if (req) {
4313 		zfree(workq_zone_threadreq, req);
4314 	}
4315 
4316 	/*
4317 	 * Run Thread, Run!
4318 	 */
4319 	uint32_t upcall_flags = WQ_FLAG_THREAD_NEWSPI;
4320 	if (uth->uu_workq_pri.qos_bucket == WORKQ_THREAD_QOS_MANAGER) {
4321 		upcall_flags |= WQ_FLAG_THREAD_EVENT_MANAGER;
4322 	} else if (workq_tr_is_overcommit(tr_flags)) {
4323 		upcall_flags |= WQ_FLAG_THREAD_OVERCOMMIT;
4324 	} else if (workq_tr_is_cooperative(tr_flags)) {
4325 		upcall_flags |= WQ_FLAG_THREAD_COOPERATIVE;
4326 	}
4327 	if (tr_flags & WORKQ_TR_FLAG_KEVENT) {
4328 		upcall_flags |= WQ_FLAG_THREAD_KEVENT;
4329 		assert((upcall_flags & WQ_FLAG_THREAD_COOPERATIVE) == 0);
4330 	}
4331 
4332 	if (tr_flags & WORKQ_TR_FLAG_WORKLOOP) {
4333 		upcall_flags |= WQ_FLAG_THREAD_WORKLOOP | WQ_FLAG_THREAD_KEVENT;
4334 	}
4335 	uth->uu_save.uus_workq_park_data.upcall_flags = upcall_flags;
4336 
4337 	if (tr_flags & (WORKQ_TR_FLAG_KEVENT | WORKQ_TR_FLAG_WORKLOOP)) {
4338 		kqueue_threadreq_bind_commit(p, get_machthread(uth));
4339 	} else {
4340 #if CONFIG_PREADOPT_TG
4341 		/*
4342 		 * The thread may have a preadopt thread group on it already because it
4343 		 * got tagged with it as a creator thread. So we need to make sure to
4344 		 * clear that since we don't have preadoption for anonymous thread
4345 		 * requests
4346 		 */
4347 		thread_set_preadopt_thread_group(get_machthread(uth), NULL);
4348 #endif
4349 	}
4350 
4351 	workq_setup_and_run(p, uth, setup_flags);
4352 	__builtin_unreachable();
4353 
4354 park:
4355 	thread_unfreeze_base_pri(get_machthread(uth));
4356 park_thawed:
4357 	workq_park_and_unlock(p, wq, uth, setup_flags);
4358 }
4359 
4360 /**
4361  * Runs a thread request on a thread
4362  *
4363  * - if thread is THREAD_NULL, will find a thread and run the request there.
4364  *   Otherwise, the thread must be the current thread.
4365  *
4366  * - if req is NULL, will find the highest priority request and run that.  If
4367  *   it is not NULL, it must be a threadreq object in state NEW.  If it can not
4368  *   be run immediately, it will be enqueued and moved to state QUEUED.
4369  *
4370  *   Either way, the thread request object serviced will be moved to state
4371  *   BINDING and attached to the uthread.
4372  *
4373  * Should be called with the workqueue lock held.  Will drop it.
4374  * Should be called with the base pri not frozen.
4375  */
4376 __attribute__((noreturn, noinline))
4377 static void
workq_unpark_select_threadreq_or_park_and_unlock(proc_t p,struct workqueue * wq,struct uthread * uth,uint32_t setup_flags)4378 workq_unpark_select_threadreq_or_park_and_unlock(proc_t p, struct workqueue *wq,
4379     struct uthread *uth, uint32_t setup_flags)
4380 {
4381 	if (uth->uu_workq_flags & UT_WORKQ_EARLY_BOUND) {
4382 		if (uth->uu_workq_flags & UT_WORKQ_NEW) {
4383 			setup_flags |= WQ_SETUP_FIRST_USE;
4384 		}
4385 		uth->uu_workq_flags &= ~(UT_WORKQ_NEW | UT_WORKQ_EARLY_BOUND);
4386 		/*
4387 		 * This pointer is possibly freed and only used for tracing purposes.
4388 		 */
4389 		workq_threadreq_t req = uth->uu_save.uus_workq_park_data.thread_request;
4390 		workq_unlock(wq);
4391 		WQ_TRACE_WQ(TRACE_wq_thread_logical_run | DBG_FUNC_START, wq,
4392 		    VM_KERNEL_ADDRHIDE(req), 0, 0);
4393 		(void)req;
4394 
4395 		workq_setup_and_run(p, uth, setup_flags);
4396 		__builtin_unreachable();
4397 	}
4398 
4399 	thread_freeze_base_pri(get_machthread(uth));
4400 	workq_select_threadreq_or_park_and_unlock(p, wq, uth, setup_flags);
4401 }
4402 
4403 static bool
workq_creator_should_yield(struct workqueue * wq,struct uthread * uth)4404 workq_creator_should_yield(struct workqueue *wq, struct uthread *uth)
4405 {
4406 	thread_qos_t qos = workq_pri_override(uth->uu_workq_pri);
4407 
4408 	if (qos >= THREAD_QOS_USER_INTERACTIVE) {
4409 		return false;
4410 	}
4411 
4412 	uint32_t snapshot = uth->uu_save.uus_workq_park_data.fulfilled_snapshot;
4413 	if (wq->wq_fulfilled == snapshot) {
4414 		return false;
4415 	}
4416 
4417 	uint32_t cnt = 0, conc = wq_max_parallelism[_wq_bucket(qos)];
4418 	if (wq->wq_fulfilled - snapshot > conc) {
4419 		/* we fulfilled more than NCPU requests since being dispatched */
4420 		WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 1,
4421 		    wq->wq_fulfilled, snapshot);
4422 		return true;
4423 	}
4424 
4425 	for (int i = _wq_bucket(qos); i < WORKQ_NUM_QOS_BUCKETS; i++) {
4426 		cnt += wq->wq_thscheduled_count[i];
4427 	}
4428 	if (conc <= cnt) {
4429 		/* We fulfilled requests and have more than NCPU scheduled threads */
4430 		WQ_TRACE_WQ(TRACE_wq_creator_yield, wq, 2,
4431 		    wq->wq_fulfilled, snapshot);
4432 		return true;
4433 	}
4434 
4435 	return false;
4436 }
4437 
4438 /**
4439  * parked thread wakes up
4440  */
4441 __attribute__((noreturn, noinline))
4442 static void
workq_unpark_continue(void * parameter __unused,wait_result_t wr __unused)4443 workq_unpark_continue(void *parameter __unused, wait_result_t wr __unused)
4444 {
4445 	thread_t th = current_thread();
4446 	struct uthread *uth = get_bsdthread_info(th);
4447 	proc_t p = current_proc();
4448 	struct workqueue *wq = proc_get_wqptr_fast(p);
4449 
4450 	workq_lock_spin(wq);
4451 
4452 	if (wq->wq_creator == uth && workq_creator_should_yield(wq, uth)) {
4453 		/*
4454 		 * If the number of threads we have out are able to keep up with the
4455 		 * demand, then we should avoid sending this creator thread to
4456 		 * userspace.
4457 		 */
4458 		uth->uu_save.uus_workq_park_data.fulfilled_snapshot = wq->wq_fulfilled;
4459 		uth->uu_save.uus_workq_park_data.yields++;
4460 		workq_unlock(wq);
4461 		thread_yield_with_continuation(workq_unpark_continue, NULL);
4462 		__builtin_unreachable();
4463 	}
4464 
4465 	if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
4466 		workq_unpark_select_threadreq_or_park_and_unlock(p, wq, uth, WQ_SETUP_NONE);
4467 		__builtin_unreachable();
4468 	}
4469 
4470 	if (__probable(wr == THREAD_AWAKENED)) {
4471 		/*
4472 		 * We were set running, but for the purposes of dying.
4473 		 */
4474 		assert(uth->uu_workq_flags & UT_WORKQ_DYING);
4475 		assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
4476 	} else {
4477 		/*
4478 		 * workaround for <rdar://problem/38647347>,
4479 		 * in case we do hit userspace, make sure calling
4480 		 * workq_thread_terminate() does the right thing here,
4481 		 * and if we never call it, that workq_exit() will too because it sees
4482 		 * this thread on the runlist.
4483 		 */
4484 		assert(wr == THREAD_INTERRUPTED);
4485 		wq->wq_thdying_count++;
4486 		uth->uu_workq_flags |= UT_WORKQ_DYING;
4487 	}
4488 
4489 	workq_unpark_for_death_and_unlock(p, wq, uth,
4490 	    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE);
4491 	__builtin_unreachable();
4492 }
4493 
4494 __attribute__((noreturn, noinline))
4495 static void
workq_setup_and_run(proc_t p,struct uthread * uth,int setup_flags)4496 workq_setup_and_run(proc_t p, struct uthread *uth, int setup_flags)
4497 {
4498 	thread_t th = get_machthread(uth);
4499 	vm_map_t vmap = get_task_map(p->task);
4500 
4501 	if (setup_flags & WQ_SETUP_CLEAR_VOUCHER) {
4502 		/*
4503 		 * For preemption reasons, we want to reset the voucher as late as
4504 		 * possible, so we do it in two places:
4505 		 *   - Just before parking (i.e. in workq_park_and_unlock())
4506 		 *   - Prior to doing the setup for the next workitem (i.e. here)
4507 		 *
4508 		 * Those two places are sufficient to ensure we always reset it before
4509 		 * it goes back out to user space, but be careful to not break that
4510 		 * guarantee.
4511 		 *
4512 		 * Note that setting the voucher to NULL will not clear the preadoption
4513 		 * thread group on this thread
4514 		 */
4515 		__assert_only kern_return_t kr;
4516 		kr = thread_set_voucher_name(MACH_PORT_NULL);
4517 		assert(kr == KERN_SUCCESS);
4518 	}
4519 
4520 	uint32_t upcall_flags = uth->uu_save.uus_workq_park_data.upcall_flags;
4521 	if (!(setup_flags & WQ_SETUP_FIRST_USE)) {
4522 		upcall_flags |= WQ_FLAG_THREAD_REUSE;
4523 	}
4524 
4525 	if (uth->uu_workq_flags & UT_WORKQ_OUTSIDE_QOS) {
4526 		/*
4527 		 * For threads that have an outside-of-QoS thread priority, indicate
4528 		 * to userspace that setting QoS should only affect the TSD and not
4529 		 * change QOS in the kernel.
4530 		 */
4531 		upcall_flags |= WQ_FLAG_THREAD_OUTSIDEQOS;
4532 	} else {
4533 		/*
4534 		 * Put the QoS class value into the lower bits of the reuse_thread
4535 		 * register, this is where the thread priority used to be stored
4536 		 * anyway.
4537 		 */
4538 		upcall_flags |= uth->uu_save.uus_workq_park_data.qos |
4539 		    WQ_FLAG_THREAD_PRIO_QOS;
4540 	}
4541 
4542 	if (uth->uu_workq_thport == MACH_PORT_NULL) {
4543 		/* convert_thread_to_port_pinned() consumes a reference */
4544 		thread_reference(th);
4545 		/* Convert to immovable/pinned thread port, but port is not pinned yet */
4546 		ipc_port_t port = convert_thread_to_port_pinned(th);
4547 		/* Atomically, pin and copy out the port */
4548 		uth->uu_workq_thport = ipc_port_copyout_send_pinned(port, get_task_ipcspace(p->task));
4549 	}
4550 
4551 	/* Thread has been set up to run, arm its next workqueue quantum or disarm
4552 	 * if it is no longer supporting that */
4553 	if (thread_supports_cooperative_workqueue(th)) {
4554 		thread_arm_workqueue_quantum(th);
4555 	} else {
4556 		thread_disarm_workqueue_quantum(th);
4557 	}
4558 
4559 	/*
4560 	 * Call out to pthread, this sets up the thread, pulls in kevent structs
4561 	 * onto the stack, sets up the thread state and then returns to userspace.
4562 	 */
4563 	WQ_TRACE_WQ(TRACE_wq_runthread | DBG_FUNC_START,
4564 	    proc_get_wqptr_fast(p), 0, 0, 0);
4565 
4566 	if (workq_thread_is_cooperative(uth)) {
4567 		thread_sched_call(th, NULL);
4568 	} else {
4569 		thread_sched_call(th, workq_sched_callback);
4570 	}
4571 
4572 	pthread_functions->workq_setup_thread(p, th, vmap, uth->uu_workq_stackaddr,
4573 	    uth->uu_workq_thport, 0, setup_flags, upcall_flags);
4574 
4575 	__builtin_unreachable();
4576 }
4577 
4578 #pragma mark misc
4579 
4580 int
fill_procworkqueue(proc_t p,struct proc_workqueueinfo * pwqinfo)4581 fill_procworkqueue(proc_t p, struct proc_workqueueinfo * pwqinfo)
4582 {
4583 	struct workqueue *wq = proc_get_wqptr(p);
4584 	int error = 0;
4585 	int     activecount;
4586 
4587 	if (wq == NULL) {
4588 		return EINVAL;
4589 	}
4590 
4591 	/*
4592 	 * This is sometimes called from interrupt context by the kperf sampler.
4593 	 * In that case, it's not safe to spin trying to take the lock since we
4594 	 * might already hold it.  So, we just try-lock it and error out if it's
4595 	 * already held.  Since this is just a debugging aid, and all our callers
4596 	 * are able to handle an error, that's fine.
4597 	 */
4598 	bool locked = workq_lock_try(wq);
4599 	if (!locked) {
4600 		return EBUSY;
4601 	}
4602 
4603 	wq_thactive_t act = _wq_thactive(wq);
4604 	activecount = _wq_thactive_aggregate_downto_qos(wq, act,
4605 	    WORKQ_THREAD_QOS_MIN, NULL, NULL);
4606 	if (act & _wq_thactive_offset_for_qos(WORKQ_THREAD_QOS_MANAGER)) {
4607 		activecount++;
4608 	}
4609 	pwqinfo->pwq_nthreads = wq->wq_nthreads;
4610 	pwqinfo->pwq_runthreads = activecount;
4611 	pwqinfo->pwq_blockedthreads = wq->wq_threads_scheduled - activecount;
4612 	pwqinfo->pwq_state = 0;
4613 
4614 	if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
4615 		pwqinfo->pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
4616 	}
4617 
4618 	if (wq->wq_nthreads >= wq_max_threads) {
4619 		pwqinfo->pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
4620 	}
4621 
4622 	workq_unlock(wq);
4623 	return error;
4624 }
4625 
4626 boolean_t
workqueue_get_pwq_exceeded(void * v,boolean_t * exceeded_total,boolean_t * exceeded_constrained)4627 workqueue_get_pwq_exceeded(void *v, boolean_t *exceeded_total,
4628     boolean_t *exceeded_constrained)
4629 {
4630 	proc_t p = v;
4631 	struct proc_workqueueinfo pwqinfo;
4632 	int err;
4633 
4634 	assert(p != NULL);
4635 	assert(exceeded_total != NULL);
4636 	assert(exceeded_constrained != NULL);
4637 
4638 	err = fill_procworkqueue(p, &pwqinfo);
4639 	if (err) {
4640 		return FALSE;
4641 	}
4642 	if (!(pwqinfo.pwq_state & WQ_FLAGS_AVAILABLE)) {
4643 		return FALSE;
4644 	}
4645 
4646 	*exceeded_total = (pwqinfo.pwq_state & WQ_EXCEEDED_TOTAL_THREAD_LIMIT);
4647 	*exceeded_constrained = (pwqinfo.pwq_state & WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT);
4648 
4649 	return TRUE;
4650 }
4651 
4652 uint32_t
workqueue_get_pwq_state_kdp(void * v)4653 workqueue_get_pwq_state_kdp(void * v)
4654 {
4655 	static_assert((WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT << 17) ==
4656 	    kTaskWqExceededConstrainedThreadLimit);
4657 	static_assert((WQ_EXCEEDED_TOTAL_THREAD_LIMIT << 17) ==
4658 	    kTaskWqExceededTotalThreadLimit);
4659 	static_assert((WQ_FLAGS_AVAILABLE << 17) == kTaskWqFlagsAvailable);
4660 	static_assert((WQ_FLAGS_AVAILABLE | WQ_EXCEEDED_TOTAL_THREAD_LIMIT |
4661 	    WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT) == 0x7);
4662 
4663 	if (v == NULL) {
4664 		return 0;
4665 	}
4666 
4667 	proc_t p = v;
4668 	struct workqueue *wq = proc_get_wqptr(p);
4669 
4670 	if (wq == NULL || workq_lock_is_acquired_kdp(wq)) {
4671 		return 0;
4672 	}
4673 
4674 	uint32_t pwq_state = WQ_FLAGS_AVAILABLE;
4675 
4676 	if (wq->wq_constrained_threads_scheduled >= wq_max_constrained_threads) {
4677 		pwq_state |= WQ_EXCEEDED_CONSTRAINED_THREAD_LIMIT;
4678 	}
4679 
4680 	if (wq->wq_nthreads >= wq_max_threads) {
4681 		pwq_state |= WQ_EXCEEDED_TOTAL_THREAD_LIMIT;
4682 	}
4683 
4684 	return pwq_state;
4685 }
4686 
4687 void
workq_init(void)4688 workq_init(void)
4689 {
4690 	clock_interval_to_absolutetime_interval(wq_stalled_window.usecs,
4691 	    NSEC_PER_USEC, &wq_stalled_window.abstime);
4692 	clock_interval_to_absolutetime_interval(wq_reduce_pool_window.usecs,
4693 	    NSEC_PER_USEC, &wq_reduce_pool_window.abstime);
4694 	clock_interval_to_absolutetime_interval(wq_max_timer_interval.usecs,
4695 	    NSEC_PER_USEC, &wq_max_timer_interval.abstime);
4696 
4697 	thread_deallocate_daemon_register_queue(&workq_deallocate_queue,
4698 	    workq_deallocate_queue_invoke);
4699 }
4700