xref: /xnu-12377.41.6/osfmk/kern/mpsc_ring.c (revision bbb1b6f9e71b8cdde6e5cd6f4841f207dee3d828)
1 /*
2  * Copyright (c) 2024 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 
29 #include "kpc.h"
30 #include <kern/mpsc_ring.h>
31 #include <kern/assert.h>
32 #include <kern/kalloc.h>
33 #include <os/atomic_private.h>
34 
35 /*
36  * This ringbuffer has the following constraints:
37  *
38  * - Multiple-producer: More than one thread will need to write into the buffer
39  *   at once.
40  * - Single-consumer: Only the single reader under the global lock will consume
41  *   and send samples to user space.
42  * - Bounded: Writers will drop their data if there's no space left to write.
43  * - Known-parallelism: A fixed number of writers.
44  *
45  * The ringbuffer that stores the kernel samples has a region of allocated
46  * memory and offsets that are maintained by the reader and writers. The
47  * offsets are 32-bits but typically updated atomically as a single 64-bit
48  * value. "Head" refers to an offset used for writing and "tail" is the offset
49  * for reading.
50  *
51  * Writers follow a reserve-commit scheme to ensure that no other writer can
52  * interfere with their view into the region and the reader only sees
53  * fully-written data.  To get a view that can store their data, the writers do
54  * a relaxed load of the offsets and determine how to update the next writer
55  * offset.  The next operations happen in a loop:
56  *
57  * - Add the size of the data to be written to a local copy of the `head`
58  *   offset.
59  * - Reserve their interest in the write offset by updating a per-CPU "holds"
60  *   list with the current `head` value.
61  * - Do a compare-exchange on the offsets to attempt with the updated `head`
62  *   offset.
63  * - If this fails, continue the loop with updated values of the offsets.
64  * - Otherwise, exit the loop.
65  *
66  * The reader will do an atomic load of the offsets with an acquire barrier and
67  * remember the writer's offset.  Then it will loop through the per-CPU holds
68  * and look for the one with the earliest offset.  That value, combined with
69  * the writer's offset, is the furthest it can safely read samples.
70  *
71  * Here's a typical ringbuffer in use:
72  *
73  *                                                hold by 2
74  *                                                ●       hold by 0
75  *                                                │       ●
76  *                                                │       │
77  *  ┌─────────────────────────────────────────────▼───────▼─────────────┐
78  *  │        █████████████████████████████████████░░░░░░░░░░░░          │
79  *  └────────▲────────────────────────────────────────────────▲─────────┘
80  *  0        │                                                │         capacity
81  *           ●                                                ●
82  *           tail                                             head
83  *
84  * The filled region after `tail` has been written and is ready to be read.  The
85  * unfilled region has already been read and is available for writing.  There
86  * are two concurrent writers (with IDs 2 and 0) with holds outstanding, marked
87  * by the shaded region.  Here's a different configuration, after the `head`
88  * has wrapped and with the reader "caught up" to the writers:
89  *
90  *                                                            hold by 3
91  *                                                            ●
92  *                                                            │
93  *  ┌─────────────────────────────────────────────────────────▼─────────┐
94  *  │░░░░                                                     ░░░░░░░░░░│
95  *  └────▲────────────────────────────────────────────────────▲─────────┘
96  *  0    │                                                    │         capacity
97  *       ●                                                    ●
98  *       head                                                 tail
99  *
100  * There's one writer active (ID 3), so `tail` can't advance past it. And
101  * finally, here's a configuration where there's no more buffer space available
102  * for writing:
103  *
104  *  ┌───────────────────────────────────────────────────────────────────┐
105  *  │█████████████████████████████████████████████████████****██████████│
106  *  └─────────────────────────────────────────────────────▲───▲─────────┘
107  *  0                                                     │   │         capacity
108  *                                                        ●   ●
109  *                                                     head   tail
110  *
111  * Almost the entire buffer is waiting to be read.  The `*` between `head` and
112  * `tail` is "wasted" space because writers need a contiguous region of memory
113  * to write into. In this case, there's not enough of it before running into
114  * `tail`.
115  */
116 
117 #define HOLD_EMPTY (~0)
118 
119 void
mpsc_ring_init(struct mpsc_ring * buf,uint8_t capacity_pow_2,uint8_t writers_max)120 mpsc_ring_init(
121 	struct mpsc_ring *buf,
122 	uint8_t capacity_pow_2,
123 	uint8_t writers_max)
124 {
125 	/*
126 	 * Check that this ringbuffer hasn't already been initialized.
127 	 */
128 	assert3p(buf->mr_buffer, ==, NULL);
129 	assert3u(buf->mr_capacity, ==, 0);
130 
131 	/*
132 	 * Check for reasonable capacity values.
133 	 */
134 	assert3u(capacity_pow_2, <, 30);
135 	assert3u(capacity_pow_2, >, 0);
136 
137 	/*
138 	 * Must be more than one potential writer.
139 	 */
140 	assert3u(writers_max, >, 0);
141 
142 	*buf = (struct mpsc_ring){ 0 };
143 
144 	/*
145 	 * Allocate the data buffer to the specified capacity.
146 	 */
147 	uint32_t capacity = 1U << capacity_pow_2;
148 	buf->mr_buffer = kalloc_data_tag(
149 		capacity,
150 		Z_WAITOK | Z_ZERO,
151 		VM_KERN_MEMORY_DIAG);
152 	if (!buf->mr_buffer) {
153 		panic(
154 			"mpsc_ring_init: failed to allocate %u bytes for buffer",
155 			capacity);
156 	}
157 	buf->mr_capacity = capacity;
158 
159 	/*
160 	 * Allocate the per-writer holds array.
161 	 */
162 	size_t holds_size = writers_max * sizeof(buf->mr_writer_holds[0]);
163 	buf->mr_writer_holds = kalloc_data_tag(
164 		holds_size,
165 		Z_WAITOK | Z_ZERO,
166 		VM_KERN_MEMORY_DIAG);
167 	if (!buf->mr_writer_holds) {
168 		panic(
169 			"mpsc_ring_init: failed to allocate %zu bytes for holds",
170 			holds_size);
171 	}
172 	buf->mr_writer_count = writers_max;
173 
174 	/*
175 	 * Initialize the holds to be empty.
176 	 */
177 	for (uint8_t i = 0; i < writers_max; i++) {
178 		buf->mr_writer_holds[i] = HOLD_EMPTY;
179 	}
180 	buf->mr_head_tail = (union mpsc_ring_head_tail){ 0 };
181 	/*
182 	 * Publish these updates.
183 	 */
184 	os_atomic_thread_fence(release);
185 }
186 
187 /**
188  * Copy to or from the ringbuffer, taking wrap around at the end into account.
189  *
190  * @discussion
191  * This function does not enforce any bounds checking on the head or tail
192  * offsets and is a helper for higher-level interfaces.
193  *
194  * @param buf
195  * The ringbuffer to copy into or out of.
196  *
197  * @param offset
198  * The offset to start the copy operation at.
199  *
200  * @param data
201  * The input or output buffer.
202  *
203  * @param size
204  * The amount of bytes to copy.
205  *
206  * @param in
207  * The direction of the copy. True to treat @link data @/link as a source and
208  * copy into the ringbuffer and false to tread @link data @/link as a
209  * destination and copy out of the ringbuffer.
210  */
211 OS_ALWAYS_INLINE
212 static void
_mpsc_ring_copy(const struct mpsc_ring * buf,uint32_t offset,void * data,uint32_t size,bool in)213 _mpsc_ring_copy(
214 	const struct mpsc_ring *buf,
215 	uint32_t offset,
216 	void *data,
217 	uint32_t size,
218 	bool in)
219 {
220 	/*
221 	 * Find the offset into the ringbuffer's memory.
222 	 */
223 	uint32_t const offset_trunc = offset % buf->mr_capacity;
224 
225 	/*
226 	 * Determine how much contiguous space is left in the ringbuffer for a
227 	 * single memcpy.
228 	 */
229 	uint32_t const left_contig = buf->mr_capacity - offset_trunc;
230 	uint32_t const size_contig = MIN(left_contig, size);
231 	memcpy(in ? &buf->mr_buffer[offset_trunc] : data,
232 	    in ? data : &buf->mr_buffer[offset_trunc],
233 	    size_contig);
234 	if (size_contig != size) {
235 		/*
236 		 * If there's any leftover data uncopied, copy it at the start of the
237 		 * ringbuffer.
238 		 */
239 		uint32_t const size_left = size - size_contig;
240 		void * const data_left = (char *)data + size_contig;
241 		memcpy(in ? buf->mr_buffer : data_left,
242 		    in ? data_left : buf->mr_buffer,
243 		    size_left);
244 	}
245 }
246 
247 uint32_t
mpsc_ring_write(struct mpsc_ring * buf,uint8_t writer_id,const void * data,uint32_t size)248 mpsc_ring_write(
249 	struct mpsc_ring *buf,
250 	uint8_t writer_id,
251 	const void *data,
252 	uint32_t size)
253 {
254 	/*
255 	 * Get an initial guess at where to write.
256 	 */
257 	union mpsc_ring_head_tail head_tail = os_atomic_load(
258 		&buf->mr_head_tail,
259 		relaxed);
260 	union mpsc_ring_head_tail new_head_tail = { 0 };
261 
262 	os_atomic_rmw_loop(
263 		&buf->mr_head_tail.mrht_head_tail,
264 		head_tail.mrht_head_tail /* old */,
265 		new_head_tail.mrht_head_tail /* new */,
266 		release,
267 	{
268 		/*
269 		 * Check for empty space in the buffer.
270 		 */
271 		uint32_t const leftover = head_tail.mrht_head + size - head_tail.mrht_tail;
272 		if (leftover >= buf->mr_capacity) {
273 		        /*
274 		         * Not enough space available for all the data, so give up.
275 		         */
276 		        os_atomic_rmw_loop_give_up(goto out);
277 		}
278 
279 		/*
280 		 * Compute a new head offset based on the size being written.
281 		 */
282 		new_head_tail = head_tail;
283 		new_head_tail.mrht_head += size;
284 
285 		/*
286 		 * Reserve the start of the space with a hold.
287 		 */
288 		os_atomic_store(
289 			&buf->mr_writer_holds[writer_id],
290 			head_tail.mrht_head,
291 			relaxed);
292 	});
293 
294 	_mpsc_ring_copy(buf, head_tail.mrht_head, (void *)(uintptr_t)data, size, true);
295 
296 out:
297 	/*
298 	 * Release the hold value so it can synchronize with acquires on the read
299 	 * side.
300 	 */
301 	os_atomic_store(&buf->mr_writer_holds[writer_id], HOLD_EMPTY, release);
302 	return buf->mr_capacity - (head_tail.mrht_head - head_tail.mrht_tail);
303 }
304 
305 mpsc_ring_cursor_t
mpsc_ring_read_start(struct mpsc_ring * buf)306 mpsc_ring_read_start(struct mpsc_ring *buf)
307 {
308 	/*
309 	 * Acquire to ensure that any holds updated are visible.
310 	 */
311 	union mpsc_ring_head_tail head_tail = os_atomic_load(&buf->mr_head_tail, acquire);
312 	for (uint8_t i = 0; i < buf->mr_writer_count; i++) {
313 		/*
314 		 * Check for any earlier holds to avoid reading past writes-in-progress.
315 		 */
316 		uint32_t hold = os_atomic_load(&buf->mr_writer_holds[i], relaxed);
317 		if (hold != ~0) {
318 			head_tail.mrht_head = MIN(head_tail.mrht_head, hold);
319 		}
320 	}
321 
322 	return (mpsc_ring_cursor_t){
323 		       .mrc_commit_pos = head_tail.mrht_tail,
324 		       .mrc_pos = head_tail.mrht_tail,
325 		       .mrc_limit = head_tail.mrht_head,
326 	};
327 }
328 
329 bool
mpsc_ring_cursor_advance(const struct mpsc_ring * buf,mpsc_ring_cursor_t * cursor,void * target,uint32_t size)330 mpsc_ring_cursor_advance(
331 	const struct mpsc_ring *buf,
332 	mpsc_ring_cursor_t *cursor,
333 	void *target,
334 	uint32_t size)
335 {
336 	if (size > cursor->mrc_limit - cursor->mrc_pos) {
337 		return false;
338 	}
339 	_mpsc_ring_copy(buf, cursor->mrc_pos, target, size, false);
340 	cursor->mrc_pos += size;
341 	return true;
342 }
343 
344 void
mpsc_ring_cursor_commit(const struct mpsc_ring * __unused buf,mpsc_ring_cursor_t * cursor)345 mpsc_ring_cursor_commit(
346 	const struct mpsc_ring * __unused buf,
347 	mpsc_ring_cursor_t *cursor)
348 {
349 	cursor->mrc_commit_pos = cursor->mrc_pos;
350 }
351 
352 void
mpsc_ring_read_finish(struct mpsc_ring * buf,mpsc_ring_cursor_t cursor)353 mpsc_ring_read_finish(
354 	struct mpsc_ring *buf,
355 	mpsc_ring_cursor_t cursor)
356 {
357 	/*
358 	 * Relaxed, as there's no need to synchronize with any other readers: this
359 	 * ringbuffer is single-consumer.
360 	 */
361 	os_atomic_store(&buf->mr_head_tail.mrht_tail, cursor.mrc_commit_pos, relaxed);
362 }
363 
364 void
mpsc_ring_read_cancel(struct mpsc_ring * __unused buf,mpsc_ring_cursor_t __unused cursor)365 mpsc_ring_read_cancel(
366 	struct mpsc_ring * __unused buf,
367 	mpsc_ring_cursor_t __unused cursor)
368 {
369 	/*
370 	 * Nothing to do; just "consume" the cursor.
371 	 */
372 }
373