xref: /xnu-8020.121.3/osfmk/kern/mpsc_queue.c (revision fdd8201d7b966f0c3ea610489d29bd841d358941)
1 /*
2  * Copyright (c) 2018 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 
29 #include <machine/machine_cpu.h>
30 #include <kern/locks.h>
31 #include <kern/mpsc_queue.h>
32 #include <kern/thread.h>
33 
34 #pragma mark Single Consumer calls
35 
36 __attribute__((noinline))
37 static mpsc_queue_chain_t
_mpsc_queue_wait_for_enqueuer(struct mpsc_queue_chain * _Atomic * ptr)38 _mpsc_queue_wait_for_enqueuer(struct mpsc_queue_chain *_Atomic *ptr)
39 {
40 	return hw_wait_while_equals_long(ptr, NULL);
41 }
42 
43 void
mpsc_queue_restore_batch(mpsc_queue_head_t q,mpsc_queue_chain_t first,mpsc_queue_chain_t last)44 mpsc_queue_restore_batch(mpsc_queue_head_t q, mpsc_queue_chain_t first,
45     mpsc_queue_chain_t last)
46 {
47 	mpsc_queue_chain_t head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
48 
49 	os_atomic_store(&last->mpqc_next, head, relaxed);
50 
51 	if (head == NULL &&
52 	    !os_atomic_cmpxchg(&q->mpqh_tail, &q->mpqh_head, last, release)) {
53 		head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
54 		if (__improbable(head == NULL)) {
55 			head = _mpsc_queue_wait_for_enqueuer(&q->mpqh_head.mpqc_next);
56 		}
57 		os_atomic_store(&last->mpqc_next, head, relaxed);
58 	}
59 
60 	os_atomic_store(&q->mpqh_head.mpqc_next, first, relaxed);
61 }
62 
63 mpsc_queue_chain_t
mpsc_queue_dequeue_batch(mpsc_queue_head_t q,mpsc_queue_chain_t * tail_out,os_atomic_dependency_t dependency)64 mpsc_queue_dequeue_batch(mpsc_queue_head_t q, mpsc_queue_chain_t *tail_out,
65     os_atomic_dependency_t dependency)
66 {
67 	mpsc_queue_chain_t head, tail;
68 
69 	q = os_atomic_inject_dependency(q, dependency);
70 
71 	tail = os_atomic_load(&q->mpqh_tail, relaxed);
72 	if (__improbable(tail == &q->mpqh_head)) {
73 		*tail_out = NULL;
74 		return NULL;
75 	}
76 
77 	head = os_atomic_load(&q->mpqh_head.mpqc_next, relaxed);
78 	if (__improbable(head == NULL)) {
79 		head = _mpsc_queue_wait_for_enqueuer(&q->mpqh_head.mpqc_next);
80 	}
81 	os_atomic_store(&q->mpqh_head.mpqc_next, NULL, relaxed);
82 	/*
83 	 * 22708742: set tail to &q->mpqh_head with release, so that NULL write
84 	 * to head above doesn't clobber the head set by concurrent enqueuer
85 	 *
86 	 * The other half of the seq_cst is required to pair with any enqueuer that
87 	 * contributed to an element in this list (pairs with the release fence in
88 	 * __mpsc_queue_append_update_tail().
89 	 *
90 	 * Making this seq_cst instead of acq_rel makes mpsc_queue_append*()
91 	 * visibility transitive (when items hop from one queue to the next)
92 	 * which is expected by clients implicitly.
93 	 *
94 	 * Note that this is the same number of fences that a traditional lock
95 	 * would have, but as a once-per-batch cost.
96 	 */
97 	*tail_out = os_atomic_xchg(&q->mpqh_tail, &q->mpqh_head, seq_cst);
98 
99 	return head;
100 }
101 
102 mpsc_queue_chain_t
mpsc_queue_batch_next(mpsc_queue_chain_t cur,mpsc_queue_chain_t tail)103 mpsc_queue_batch_next(mpsc_queue_chain_t cur, mpsc_queue_chain_t tail)
104 {
105 	mpsc_queue_chain_t elm = NULL;
106 	if (cur == tail || cur == NULL) {
107 		return elm;
108 	}
109 
110 	elm = os_atomic_load(&cur->mpqc_next, relaxed);
111 	if (__improbable(elm == NULL)) {
112 		elm = _mpsc_queue_wait_for_enqueuer(&cur->mpqc_next);
113 	}
114 	return elm;
115 }
116 
117 #pragma mark "GCD"-like facilities
118 
119 static void _mpsc_daemon_queue_drain(mpsc_daemon_queue_t, thread_t);
120 static void _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t, mpsc_queue_chain_t);
121 
122 /* thread based queues */
123 
124 static void
_mpsc_daemon_queue_init(mpsc_daemon_queue_t dq,mpsc_daemon_init_options_t flags)125 _mpsc_daemon_queue_init(mpsc_daemon_queue_t dq, mpsc_daemon_init_options_t flags)
126 {
127 	if (flags & MPSC_DAEMON_INIT_INACTIVE) {
128 		os_atomic_init(&dq->mpd_state, MPSC_QUEUE_STATE_INACTIVE);
129 	}
130 }
131 
132 static void
_mpsc_queue_thread_continue(void * param,wait_result_t wr __unused)133 _mpsc_queue_thread_continue(void *param, wait_result_t wr __unused)
134 {
135 	mpsc_daemon_queue_t dq = param;
136 	mpsc_daemon_queue_kind_t kind = dq->mpd_kind;
137 	thread_t self = dq->mpd_thread;
138 
139 	__builtin_assume(self != THREAD_NULL);
140 
141 	if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) {
142 		self->options |= TH_OPT_SYSTEM_CRITICAL;
143 	}
144 
145 	assert(dq->mpd_thread == current_thread());
146 	_mpsc_daemon_queue_drain(dq, self);
147 
148 	if (kind == MPSC_QUEUE_KIND_THREAD_CRITICAL) {
149 		self->options &= ~TH_OPT_SYSTEM_CRITICAL;
150 	}
151 
152 	thread_block_parameter(_mpsc_queue_thread_continue, dq);
153 }
154 
155 static void
_mpsc_queue_thread_wakeup(mpsc_daemon_queue_t dq)156 _mpsc_queue_thread_wakeup(mpsc_daemon_queue_t dq)
157 {
158 	thread_wakeup_thread((event_t)dq, dq->mpd_thread);
159 }
160 
161 static kern_return_t
_mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,mpsc_daemon_invoke_fn_t invoke,int pri,const char * name,mpsc_daemon_queue_kind_t kind,mpsc_daemon_init_options_t flags)162 _mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,
163     mpsc_daemon_invoke_fn_t invoke, int pri, const char *name,
164     mpsc_daemon_queue_kind_t kind, mpsc_daemon_init_options_t flags)
165 {
166 	kern_return_t kr;
167 
168 	*dq = (struct mpsc_daemon_queue){
169 		.mpd_kind   = kind,
170 		.mpd_invoke = invoke,
171 		.mpd_queue  = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
172 		.mpd_chain  = { MPSC_QUEUE_NOTQUEUED_MARKER },
173 	};
174 	_mpsc_daemon_queue_init(dq, flags);
175 
176 	kr = kernel_thread_create(_mpsc_queue_thread_continue, dq, pri,
177 	    &dq->mpd_thread);
178 	if (kr == KERN_SUCCESS) {
179 		thread_set_thread_name(dq->mpd_thread, name);
180 		thread_start_in_assert_wait(dq->mpd_thread, (event_t)dq, THREAD_UNINT);
181 		thread_deallocate(dq->mpd_thread);
182 	}
183 	return kr;
184 }
185 
186 kern_return_t
mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,mpsc_daemon_invoke_fn_t invoke,int pri,const char * name,mpsc_daemon_init_options_t flags)187 mpsc_daemon_queue_init_with_thread(mpsc_daemon_queue_t dq,
188     mpsc_daemon_invoke_fn_t invoke, int pri, const char *name,
189     mpsc_daemon_init_options_t flags)
190 {
191 	return _mpsc_daemon_queue_init_with_thread(dq, invoke, pri, name,
192 	           MPSC_QUEUE_KIND_THREAD, flags);
193 }
194 
195 /* thread-call based queues */
196 
197 static void
_mpsc_queue_thread_call_drain(thread_call_param_t arg0,thread_call_param_t arg1 __unused)198 _mpsc_queue_thread_call_drain(thread_call_param_t arg0,
199     thread_call_param_t arg1 __unused)
200 {
201 	_mpsc_daemon_queue_drain((mpsc_daemon_queue_t)arg0, NULL);
202 }
203 
204 static void
_mpsc_queue_thread_call_wakeup(mpsc_daemon_queue_t dq)205 _mpsc_queue_thread_call_wakeup(mpsc_daemon_queue_t dq)
206 {
207 	thread_call_enter(dq->mpd_call);
208 }
209 
210 void
mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq,mpsc_daemon_invoke_fn_t invoke,thread_call_priority_t pri,mpsc_daemon_init_options_t flags)211 mpsc_daemon_queue_init_with_thread_call(mpsc_daemon_queue_t dq,
212     mpsc_daemon_invoke_fn_t invoke, thread_call_priority_t pri,
213     mpsc_daemon_init_options_t flags)
214 {
215 	*dq = (struct mpsc_daemon_queue){
216 		.mpd_kind   = MPSC_QUEUE_KIND_THREAD_CALL,
217 		.mpd_invoke = invoke,
218 		.mpd_queue  = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
219 		.mpd_chain  = { MPSC_QUEUE_NOTQUEUED_MARKER },
220 	};
221 	_mpsc_daemon_queue_init(dq, flags);
222 	dq->mpd_call = thread_call_allocate_with_options(
223 		_mpsc_queue_thread_call_drain, dq, pri, THREAD_CALL_OPTIONS_ONCE);
224 }
225 
226 /* nested queues */
227 
228 void
mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm,__unused mpsc_daemon_queue_t tq)229 mpsc_daemon_queue_nested_invoke(mpsc_queue_chain_t elm,
230     __unused mpsc_daemon_queue_t tq)
231 {
232 	mpsc_daemon_queue_t dq;
233 	dq = mpsc_queue_element(elm, struct mpsc_daemon_queue, mpd_chain);
234 	_mpsc_daemon_queue_drain(dq, NULL);
235 }
236 
237 static void
_mpsc_daemon_queue_nested_wakeup(mpsc_daemon_queue_t dq)238 _mpsc_daemon_queue_nested_wakeup(mpsc_daemon_queue_t dq)
239 {
240 	_mpsc_daemon_queue_enqueue(dq->mpd_target, &dq->mpd_chain);
241 }
242 
243 void
mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq,mpsc_daemon_invoke_fn_t invoke,mpsc_daemon_queue_t target,mpsc_daemon_init_options_t flags)244 mpsc_daemon_queue_init_with_target(mpsc_daemon_queue_t dq,
245     mpsc_daemon_invoke_fn_t invoke, mpsc_daemon_queue_t target,
246     mpsc_daemon_init_options_t flags)
247 {
248 	*dq = (struct mpsc_daemon_queue){
249 		.mpd_kind   = MPSC_QUEUE_KIND_NESTED,
250 		.mpd_invoke = invoke,
251 		.mpd_target = target,
252 		.mpd_queue  = MPSC_QUEUE_INITIALIZER(dq->mpd_queue),
253 		.mpd_chain  = { MPSC_QUEUE_NOTQUEUED_MARKER },
254 	};
255 	_mpsc_daemon_queue_init(dq, flags);
256 }
257 
258 /* enqueue, drain & cancelation */
259 
260 static void
_mpsc_daemon_queue_drain(mpsc_daemon_queue_t dq,thread_t self)261 _mpsc_daemon_queue_drain(mpsc_daemon_queue_t dq, thread_t self)
262 {
263 	mpsc_daemon_invoke_fn_t invoke = dq->mpd_invoke;
264 	mpsc_queue_chain_t head, cur, tail;
265 	mpsc_daemon_queue_state_t st;
266 
267 again:
268 	/*
269 	 * Most of the time we're woken up because we're dirty,
270 	 * This atomic xor sets DRAINING and clears WAKEUP in a single atomic
271 	 * in that case.
272 	 *
273 	 * However, if we're woken up for cancelation, the state may be reduced to
274 	 * the CANCELED bit set only, and then the xor will actually set WAKEUP.
275 	 * We need to correct this and clear it back to avoid looping below.
276 	 * This is safe to do as no one is allowed to enqueue more work after
277 	 * cancelation has happened.
278 	 *
279 	 * We use `st` as a dependency token to pair with the release fence in
280 	 * _mpsc_daemon_queue_enqueue() which gives us the guarantee that the update
281 	 * to the tail of the MPSC queue that made it non empty is visible to us.
282 	 */
283 	st = os_atomic_xor(&dq->mpd_state,
284 	    MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP, dependency);
285 	assert(st & MPSC_QUEUE_STATE_DRAINING);
286 	if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) {
287 		assert(st & MPSC_QUEUE_STATE_CANCELED);
288 		os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, relaxed);
289 	}
290 
291 	os_atomic_dependency_t dep = os_atomic_make_dependency((uintptr_t)st);
292 	if ((head = mpsc_queue_dequeue_batch(&dq->mpd_queue, &tail, dep))) {
293 		do {
294 			mpsc_queue_batch_foreach_safe(cur, head, tail) {
295 				os_atomic_store(&cur->mpqc_next,
296 				    MPSC_QUEUE_NOTQUEUED_MARKER, relaxed);
297 				invoke(cur, dq);
298 			}
299 		} while ((head = mpsc_queue_dequeue_batch(&dq->mpd_queue, &tail, dep)));
300 
301 		if (dq->mpd_options & MPSC_QUEUE_OPTION_BATCH) {
302 			invoke(MPSC_QUEUE_BATCH_END, dq);
303 		}
304 	}
305 
306 	if (self) {
307 		assert_wait((event_t)dq, THREAD_UNINT);
308 	}
309 
310 	/*
311 	 * Unlike GCD no fence is necessary here: there is no concept similar
312 	 * to "dispatch_sync()" that would require changes this thread made to be
313 	 * visible to other threads as part of the mpsc_daemon_queue machinery.
314 	 *
315 	 * Making updates that happened on the daemon queue visible to other threads
316 	 * is the responsibility of the client.
317 	 */
318 	st = os_atomic_andnot(&dq->mpd_state, MPSC_QUEUE_STATE_DRAINING, relaxed);
319 
320 	/*
321 	 * A wakeup has happened while we were draining,
322 	 * which means that the queue did an [ empty -> non empty ]
323 	 * transition during our drain.
324 	 *
325 	 * Chances are we already observed and drained everything,
326 	 * but we need to be absolutely sure, so start a drain again
327 	 * as the enqueuer observed the DRAINING bit and has skipped calling
328 	 * _mpsc_daemon_queue_wakeup().
329 	 */
330 	if (__improbable(st & MPSC_QUEUE_STATE_WAKEUP)) {
331 		if (self) {
332 			clear_wait(self, THREAD_AWAKENED);
333 		}
334 		goto again;
335 	}
336 
337 	/* dereferencing `dq` past this point is unsafe */
338 
339 	if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
340 		thread_wakeup(&dq->mpd_state);
341 		if (self) {
342 			clear_wait(self, THREAD_AWAKENED);
343 			thread_terminate_self();
344 			__builtin_unreachable();
345 		}
346 	}
347 }
348 
349 static void
_mpsc_daemon_queue_wakeup(mpsc_daemon_queue_t dq)350 _mpsc_daemon_queue_wakeup(mpsc_daemon_queue_t dq)
351 {
352 	switch (dq->mpd_kind) {
353 	case MPSC_QUEUE_KIND_NESTED:
354 		_mpsc_daemon_queue_nested_wakeup(dq);
355 		break;
356 	case MPSC_QUEUE_KIND_THREAD:
357 	case MPSC_QUEUE_KIND_THREAD_CRITICAL:
358 		_mpsc_queue_thread_wakeup(dq);
359 		break;
360 	case MPSC_QUEUE_KIND_THREAD_CALL:
361 		_mpsc_queue_thread_call_wakeup(dq);
362 		break;
363 	default:
364 		panic("mpsc_queue[%p]: invalid kind (%d)", dq, dq->mpd_kind);
365 	}
366 }
367 
368 static void
_mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t dq,mpsc_queue_chain_t elm)369 _mpsc_daemon_queue_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm)
370 {
371 	mpsc_daemon_queue_state_t st;
372 
373 	if (mpsc_queue_append(&dq->mpd_queue, elm)) {
374 		/*
375 		 * Pairs with the acquire fence in _mpsc_daemon_queue_drain().
376 		 */
377 		st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_WAKEUP, release);
378 		if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
379 			panic("mpsc_queue[%p]: use after cancelation", dq);
380 		}
381 
382 		if ((st & (MPSC_QUEUE_STATE_DRAINING | MPSC_QUEUE_STATE_WAKEUP |
383 		    MPSC_QUEUE_STATE_INACTIVE)) == 0) {
384 			_mpsc_daemon_queue_wakeup(dq);
385 		}
386 	}
387 }
388 
389 void
mpsc_daemon_enqueue(mpsc_daemon_queue_t dq,mpsc_queue_chain_t elm,mpsc_queue_options_t options)390 mpsc_daemon_enqueue(mpsc_daemon_queue_t dq, mpsc_queue_chain_t elm,
391     mpsc_queue_options_t options)
392 {
393 	if (options & MPSC_QUEUE_DISABLE_PREEMPTION) {
394 		disable_preemption();
395 	}
396 
397 	_mpsc_daemon_queue_enqueue(dq, elm);
398 
399 	if (options & MPSC_QUEUE_DISABLE_PREEMPTION) {
400 		enable_preemption();
401 	}
402 }
403 
404 void
mpsc_daemon_queue_activate(mpsc_daemon_queue_t dq)405 mpsc_daemon_queue_activate(mpsc_daemon_queue_t dq)
406 {
407 	mpsc_daemon_queue_state_t st;
408 
409 	st = os_atomic_andnot_orig(&dq->mpd_state,
410 	    MPSC_QUEUE_STATE_INACTIVE, relaxed);
411 	if ((st & MPSC_QUEUE_STATE_WAKEUP) && (st & MPSC_QUEUE_STATE_INACTIVE)) {
412 		_mpsc_daemon_queue_wakeup(dq);
413 	}
414 }
415 
416 void
mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq)417 mpsc_daemon_queue_cancel_and_wait(mpsc_daemon_queue_t dq)
418 {
419 	mpsc_daemon_queue_state_t st;
420 
421 	assert_wait((event_t)&dq->mpd_state, THREAD_UNINT);
422 
423 	st = os_atomic_or_orig(&dq->mpd_state, MPSC_QUEUE_STATE_CANCELED, relaxed);
424 	if (__improbable(st & MPSC_QUEUE_STATE_CANCELED)) {
425 		panic("mpsc_queue[%p]: cancelled twice (%x)", dq, st);
426 	}
427 	if (__improbable(st & MPSC_QUEUE_STATE_INACTIVE)) {
428 		panic("mpsc_queue[%p]: queue is inactive (%x)", dq, st);
429 	}
430 
431 	if (dq->mpd_kind == MPSC_QUEUE_KIND_NESTED && st == 0) {
432 		clear_wait(current_thread(), THREAD_AWAKENED);
433 	} else {
434 		disable_preemption();
435 		_mpsc_daemon_queue_wakeup(dq);
436 		enable_preemption();
437 		thread_block(THREAD_CONTINUE_NULL);
438 	}
439 
440 	switch (dq->mpd_kind) {
441 	case MPSC_QUEUE_KIND_NESTED:
442 		dq->mpd_target = NULL;
443 		break;
444 	case MPSC_QUEUE_KIND_THREAD:
445 	case MPSC_QUEUE_KIND_THREAD_CRITICAL:
446 		dq->mpd_thread = NULL;
447 		break;
448 	case MPSC_QUEUE_KIND_THREAD_CALL:
449 		thread_call_cancel_wait(dq->mpd_call);
450 		thread_call_free(dq->mpd_call);
451 		dq->mpd_call = NULL;
452 		break;
453 	default:
454 		panic("mpsc_queue[%p]: invalid kind (%d)", dq, dq->mpd_kind);
455 	}
456 	dq->mpd_kind = MPSC_QUEUE_KIND_UNKNOWN;
457 }
458 
459 #pragma mark deferred deallocation daemon
460 
461 static struct mpsc_daemon_queue thread_deferred_deallocation_queue;
462 
463 void
thread_deallocate_daemon_init(void)464 thread_deallocate_daemon_init(void)
465 {
466 	kern_return_t kr;
467 
468 	kr = _mpsc_daemon_queue_init_with_thread(&thread_deferred_deallocation_queue,
469 	    mpsc_daemon_queue_nested_invoke, MINPRI_KERNEL,
470 	    "daemon.deferred-deallocation", MPSC_QUEUE_KIND_THREAD_CRITICAL,
471 	    MPSC_DAEMON_INIT_NONE);
472 	if (kr != KERN_SUCCESS) {
473 		panic("thread_deallocate_daemon_init: creating daemon failed (%d)", kr);
474 	}
475 }
476 
477 void
thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq,mpsc_daemon_invoke_fn_t invoke)478 thread_deallocate_daemon_register_queue(mpsc_daemon_queue_t dq,
479     mpsc_daemon_invoke_fn_t invoke)
480 {
481 	mpsc_daemon_queue_init_with_target(dq, invoke,
482 	    &thread_deferred_deallocation_queue, MPSC_DAEMON_INIT_NONE);
483 }
484