1 /*
2 * Copyright (c) 2024 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28
29 #include "kpc.h"
30 #include <kern/mpsc_ring.h>
31 #include <kern/assert.h>
32 #include <kern/kalloc.h>
33 #include <os/atomic_private.h>
34
35 /*
36 * This ringbuffer has the following constraints:
37 *
38 * - Multiple-producer: More than one thread will need to write into the buffer
39 * at once.
40 * - Single-consumer: Only the single reader under the global lock will consume
41 * and send samples to user space.
42 * - Bounded: Writers will drop their data if there's no space left to write.
43 * - Known-parallelism: A fixed number of writers.
44 *
45 * The ringbuffer that stores the kernel samples has a region of allocated
46 * memory and offsets that are maintained by the reader and writers. The
47 * offsets are 32-bits but typically updated atomically as a single 64-bit
48 * value. "Head" refers to an offset used for writing and "tail" is the offset
49 * for reading.
50 *
51 * Writers follow a reserve-commit scheme to ensure that no other writer can
52 * interfere with their view into the region and the reader only sees
53 * fully-written data. To get a view that can store their data, the writers do
54 * a relaxed load of the offsets and determine how to update the next writer
55 * offset. The next operations happen in a loop:
56 *
57 * - Add the size of the data to be written to a local copy of the `head`
58 * offset.
59 * - Reserve their interest in the write offset by updating a per-CPU "holds"
60 * list with the current `head` value.
61 * - Do a compare-exchange on the offsets to attempt with the updated `head`
62 * offset.
63 * - If this fails, continue the loop with updated values of the offsets.
64 * - Otherwise, exit the loop.
65 *
66 * The reader will do an atomic load of the offsets with an acquire barrier and
67 * remember the writer's offset. Then it will loop through the per-CPU holds
68 * and look for the one with the earliest offset. That value, combined with
69 * the writer's offset, is the furthest it can safely read samples.
70 *
71 * Here's a typical ringbuffer in use:
72 *
73 * hold by 2
74 * ● hold by 0
75 * │ ●
76 * │ │
77 * ┌─────────────────────────────────────────────▼───────▼─────────────┐
78 * │ █████████████████████████████████████░░░░░░░░░░░░ │
79 * └────────▲────────────────────────────────────────────────▲─────────┘
80 * 0 │ │ capacity
81 * ● ●
82 * tail head
83 *
84 * The filled region after `tail` has been written and is ready to be read. The
85 * unfilled region has already been read and is available for writing. There
86 * are two concurrent writers (with IDs 2 and 0) with holds outstanding, marked
87 * by the shaded region. Here's a different configuration, after the `head`
88 * has wrapped and with the reader "caught up" to the writers:
89 *
90 * hold by 3
91 * ●
92 * │
93 * ┌─────────────────────────────────────────────────────────▼─────────┐
94 * │░░░░ ░░░░░░░░░░│
95 * └────▲────────────────────────────────────────────────────▲─────────┘
96 * 0 │ │ capacity
97 * ● ●
98 * head tail
99 *
100 * There's one writer active (ID 3), so `tail` can't advance past it. And
101 * finally, here's a configuration where there's no more buffer space available
102 * for writing:
103 *
104 * ┌───────────────────────────────────────────────────────────────────┐
105 * │█████████████████████████████████████████████████████****██████████│
106 * └─────────────────────────────────────────────────────▲───▲─────────┘
107 * 0 │ │ capacity
108 * ● ●
109 * head tail
110 *
111 * Almost the entire buffer is waiting to be read. The `*` between `head` and
112 * `tail` is "wasted" space because writers need a contiguous region of memory
113 * to write into. In this case, there's not enough of it before running into
114 * `tail`.
115 */
116
117 #define HOLD_EMPTY (~0)
118
119 void
mpsc_ring_init(struct mpsc_ring * buf,uint8_t capacity_pow_2,uint8_t writers_max)120 mpsc_ring_init(
121 struct mpsc_ring *buf,
122 uint8_t capacity_pow_2,
123 uint8_t writers_max)
124 {
125 /*
126 * Check that this ringbuffer hasn't already been initialized.
127 */
128 assert3p(buf->mr_buffer, ==, NULL);
129 assert3u(buf->mr_capacity, ==, 0);
130
131 /*
132 * Check for reasonable capacity values.
133 */
134 assert3u(capacity_pow_2, <, 30);
135 assert3u(capacity_pow_2, >, 0);
136
137 /*
138 * Must be more than one potential writer.
139 */
140 assert3u(writers_max, >, 0);
141
142 *buf = (struct mpsc_ring){ 0 };
143
144 /*
145 * Allocate the data buffer to the specified capacity.
146 */
147 uint32_t capacity = 1U << capacity_pow_2;
148 buf->mr_buffer = kalloc_data_tag(
149 capacity,
150 Z_WAITOK | Z_ZERO,
151 VM_KERN_MEMORY_DIAG);
152 if (!buf->mr_buffer) {
153 panic(
154 "mpsc_ring_init: failed to allocate %u bytes for buffer",
155 capacity);
156 }
157 buf->mr_capacity = capacity;
158
159 /*
160 * Allocate the per-writer holds array.
161 */
162 size_t holds_size = writers_max * sizeof(buf->mr_writer_holds[0]);
163 buf->mr_writer_holds = kalloc_data_tag(
164 holds_size,
165 Z_WAITOK | Z_ZERO,
166 VM_KERN_MEMORY_DIAG);
167 if (!buf->mr_writer_holds) {
168 panic(
169 "mpsc_ring_init: failed to allocate %zu bytes for holds",
170 holds_size);
171 }
172 buf->mr_writer_count = writers_max;
173
174 /*
175 * Initialize the holds to be empty.
176 */
177 for (uint8_t i = 0; i < writers_max; i++) {
178 buf->mr_writer_holds[i] = HOLD_EMPTY;
179 }
180 buf->mr_head_tail = (union mpsc_ring_head_tail){ 0 };
181 /*
182 * Publish these updates.
183 */
184 os_atomic_thread_fence(release);
185 }
186
187 /**
188 * Copy to or from the ringbuffer, taking wrap around at the end into account.
189 *
190 * @discussion
191 * This function does not enforce any bounds checking on the head or tail
192 * offsets and is a helper for higher-level interfaces.
193 *
194 * @param buf
195 * The ringbuffer to copy into or out of.
196 *
197 * @param offset
198 * The offset to start the copy operation at.
199 *
200 * @param data
201 * The input or output buffer.
202 *
203 * @param size
204 * The amount of bytes to copy.
205 *
206 * @param in
207 * The direction of the copy. True to treat @link data @/link as a source and
208 * copy into the ringbuffer and false to tread @link data @/link as a
209 * destination and copy out of the ringbuffer.
210 */
211 OS_ALWAYS_INLINE
212 static void
_mpsc_ring_copy(const struct mpsc_ring * buf,uint32_t offset,void * data,uint32_t size,bool in)213 _mpsc_ring_copy(
214 const struct mpsc_ring *buf,
215 uint32_t offset,
216 void *data,
217 uint32_t size,
218 bool in)
219 {
220 /*
221 * Find the offset into the ringbuffer's memory.
222 */
223 uint32_t const offset_trunc = offset % buf->mr_capacity;
224
225 /*
226 * Determine how much contiguous space is left in the ringbuffer for a
227 * single memcpy.
228 */
229 uint32_t const left_contig = buf->mr_capacity - offset_trunc;
230 uint32_t const size_contig = MIN(left_contig, size);
231 memcpy(in ? &buf->mr_buffer[offset_trunc] : data,
232 in ? data : &buf->mr_buffer[offset_trunc],
233 size_contig);
234 if (size_contig != size) {
235 /*
236 * If there's any leftover data uncopied, copy it at the start of the
237 * ringbuffer.
238 */
239 uint32_t const size_left = size - size_contig;
240 void * const data_left = (char *)data + size_contig;
241 memcpy(in ? buf->mr_buffer : data_left,
242 in ? data_left : buf->mr_buffer,
243 size_left);
244 }
245 }
246
247 uint32_t
mpsc_ring_write(struct mpsc_ring * buf,uint8_t writer_id,const void * data,uint32_t size)248 mpsc_ring_write(
249 struct mpsc_ring *buf,
250 uint8_t writer_id,
251 const void *data,
252 uint32_t size)
253 {
254 /*
255 * Get an initial guess at where to write.
256 */
257 union mpsc_ring_head_tail head_tail = os_atomic_load(
258 &buf->mr_head_tail,
259 relaxed);
260 union mpsc_ring_head_tail new_head_tail = { 0 };
261
262 os_atomic_rmw_loop(
263 &buf->mr_head_tail.mrht_head_tail,
264 head_tail.mrht_head_tail /* old */,
265 new_head_tail.mrht_head_tail /* new */,
266 release,
267 {
268 /*
269 * Check for empty space in the buffer.
270 */
271 uint32_t const leftover = head_tail.mrht_head + size - head_tail.mrht_tail;
272 if (leftover >= buf->mr_capacity) {
273 /*
274 * Not enough space available for all the data, so give up.
275 */
276 os_atomic_rmw_loop_give_up(goto out);
277 }
278
279 /*
280 * Compute a new head offset based on the size being written.
281 */
282 new_head_tail = head_tail;
283 new_head_tail.mrht_head += size;
284
285 /*
286 * Reserve the start of the space with a hold.
287 */
288 os_atomic_store(
289 &buf->mr_writer_holds[writer_id],
290 head_tail.mrht_head,
291 relaxed);
292 });
293
294 _mpsc_ring_copy(buf, head_tail.mrht_head, (void *)(uintptr_t)data, size, true);
295
296 out:
297 /*
298 * Release the hold value so it can synchronize with acquires on the read
299 * side.
300 */
301 os_atomic_store(&buf->mr_writer_holds[writer_id], HOLD_EMPTY, release);
302 return buf->mr_capacity - (head_tail.mrht_head - head_tail.mrht_tail);
303 }
304
305 mpsc_ring_cursor_t
mpsc_ring_read_start(struct mpsc_ring * buf)306 mpsc_ring_read_start(struct mpsc_ring *buf)
307 {
308 /*
309 * Acquire to ensure that any holds updated are visible.
310 */
311 union mpsc_ring_head_tail head_tail = os_atomic_load(&buf->mr_head_tail, acquire);
312 for (uint8_t i = 0; i < buf->mr_writer_count; i++) {
313 /*
314 * Check for any earlier holds to avoid reading past writes-in-progress.
315 */
316 uint32_t hold = os_atomic_load(&buf->mr_writer_holds[i], relaxed);
317 if (hold != ~0) {
318 head_tail.mrht_head = MIN(head_tail.mrht_head, hold);
319 }
320 }
321
322 return (mpsc_ring_cursor_t){
323 .mrc_commit_pos = head_tail.mrht_tail,
324 .mrc_pos = head_tail.mrht_tail,
325 .mrc_limit = head_tail.mrht_head,
326 };
327 }
328
329 bool
mpsc_ring_cursor_advance(const struct mpsc_ring * buf,mpsc_ring_cursor_t * cursor,void * target,uint32_t size)330 mpsc_ring_cursor_advance(
331 const struct mpsc_ring *buf,
332 mpsc_ring_cursor_t *cursor,
333 void *target,
334 uint32_t size)
335 {
336 if (size > cursor->mrc_limit - cursor->mrc_pos) {
337 return false;
338 }
339 _mpsc_ring_copy(buf, cursor->mrc_pos, target, size, false);
340 cursor->mrc_pos += size;
341 return true;
342 }
343
344 void
mpsc_ring_cursor_commit(const struct mpsc_ring * __unused buf,mpsc_ring_cursor_t * cursor)345 mpsc_ring_cursor_commit(
346 const struct mpsc_ring * __unused buf,
347 mpsc_ring_cursor_t *cursor)
348 {
349 cursor->mrc_commit_pos = cursor->mrc_pos;
350 }
351
352 void
mpsc_ring_read_finish(struct mpsc_ring * buf,mpsc_ring_cursor_t cursor)353 mpsc_ring_read_finish(
354 struct mpsc_ring *buf,
355 mpsc_ring_cursor_t cursor)
356 {
357 /*
358 * Relaxed, as there's no need to synchronize with any other readers: this
359 * ringbuffer is single-consumer.
360 */
361 os_atomic_store(&buf->mr_head_tail.mrht_tail, cursor.mrc_commit_pos, relaxed);
362 }
363
364 void
mpsc_ring_read_cancel(struct mpsc_ring * __unused buf,mpsc_ring_cursor_t __unused cursor)365 mpsc_ring_read_cancel(
366 struct mpsc_ring * __unused buf,
367 mpsc_ring_cursor_t __unused cursor)
368 {
369 /*
370 * Nothing to do; just "consume" the cursor.
371 */
372 }
373