/* * Copyright (c) 2024 Apple Inc. All rights reserved. * * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ * * This file contains Original Code and/or Modifications of Original Code * as defined in and that are subject to the Apple Public Source License * Version 2.0 (the 'License'). You may not use this file except in * compliance with the License. The rights granted to you under the License * may not be used to create, or enable the creation or redistribution of, * unlawful or unlicensed copies of an Apple operating system, or to * circumvent, violate, or enable the circumvention or violation of, any * terms of an Apple operating system software license agreement. * * Please obtain a copy of the License at * http://www.opensource.apple.com/apsl/ and read it before using this file. * * The Original Code and all software distributed under the License are * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. * Please see the License for the specific language governing rights and * limitations under the License. * * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ */ #include "kpc.h" #include #include #include #include /* * This ringbuffer has the following constraints: * * - Multiple-producer: More than one thread will need to write into the buffer * at once. * - Single-consumer: Only the single reader under the global lock will consume * and send samples to user space. * - Bounded: Writers will drop their data if there's no space left to write. * - Known-parallelism: A fixed number of writers. * * The ringbuffer that stores the kernel samples has a region of allocated * memory and offsets that are maintained by the reader and writers. The * offsets are 32-bits but typically updated atomically as a single 64-bit * value. "Head" refers to an offset used for writing and "tail" is the offset * for reading. * * Writers follow a reserve-commit scheme to ensure that no other writer can * interfere with their view into the region and the reader only sees * fully-written data. To get a view that can store their data, the writers do * a relaxed load of the offsets and determine how to update the next writer * offset. The next operations happen in a loop: * * - Add the size of the data to be written to a local copy of the `head` * offset. * - Reserve their interest in the write offset by updating a per-CPU "holds" * list with the current `head` value. * - Do a compare-exchange on the offsets to attempt with the updated `head` * offset. * - If this fails, continue the loop with updated values of the offsets. * - Otherwise, exit the loop. * * The reader will do an atomic load of the offsets with an acquire barrier and * remember the writer's offset. Then it will loop through the per-CPU holds * and look for the one with the earliest offset. That value, combined with * the writer's offset, is the furthest it can safely read samples. * * Here's a typical ringbuffer in use: * * hold by 2 * ● hold by 0 * │ ● * │ │ * ┌─────────────────────────────────────────────▼───────▼─────────────┐ * │ █████████████████████████████████████░░░░░░░░░░░░ │ * └────────▲────────────────────────────────────────────────▲─────────┘ * 0 │ │ capacity * ● ● * tail head * * The filled region after `tail` has been written and is ready to be read. The * unfilled region has already been read and is available for writing. There * are two concurrent writers (with IDs 2 and 0) with holds outstanding, marked * by the shaded region. Here's a different configuration, after the `head` * has wrapped and with the reader "caught up" to the writers: * * hold by 3 * ● * │ * ┌─────────────────────────────────────────────────────────▼─────────┐ * │░░░░ ░░░░░░░░░░│ * └────▲────────────────────────────────────────────────────▲─────────┘ * 0 │ │ capacity * ● ● * head tail * * There's one writer active (ID 3), so `tail` can't advance past it. And * finally, here's a configuration where there's no more buffer space available * for writing: * * ┌───────────────────────────────────────────────────────────────────┐ * │█████████████████████████████████████████████████████****██████████│ * └─────────────────────────────────────────────────────▲───▲─────────┘ * 0 │ │ capacity * ● ● * head tail * * Almost the entire buffer is waiting to be read. The `*` between `head` and * `tail` is "wasted" space because writers need a contiguous region of memory * to write into. In this case, there's not enough of it before running into * `tail`. */ #define HOLD_EMPTY (~0) void mpsc_ring_init( struct mpsc_ring *buf, uint8_t capacity_pow_2, uint8_t writers_max) { /* * Check that this ringbuffer hasn't already been initialized. */ assert3p(buf->mr_buffer, ==, NULL); assert3u(buf->mr_capacity, ==, 0); /* * Check for reasonable capacity values. */ assert3u(capacity_pow_2, <, 30); assert3u(capacity_pow_2, >, 0); /* * Must be more than one potential writer. */ assert3u(writers_max, >, 0); *buf = (struct mpsc_ring){ 0 }; /* * Allocate the data buffer to the specified capacity. */ uint32_t capacity = 1U << capacity_pow_2; buf->mr_buffer = kalloc_data_tag( capacity, Z_WAITOK | Z_ZERO, VM_KERN_MEMORY_DIAG); if (!buf->mr_buffer) { panic( "mpsc_ring_init: failed to allocate %u bytes for buffer", capacity); } buf->mr_capacity = capacity; /* * Allocate the per-writer holds array. */ size_t holds_size = writers_max * sizeof(buf->mr_writer_holds[0]); buf->mr_writer_holds = kalloc_data_tag( holds_size, Z_WAITOK | Z_ZERO, VM_KERN_MEMORY_DIAG); if (!buf->mr_writer_holds) { panic( "mpsc_ring_init: failed to allocate %zu bytes for holds", holds_size); } buf->mr_writer_count = writers_max; /* * Initialize the holds to be empty. */ for (uint8_t i = 0; i < writers_max; i++) { buf->mr_writer_holds[i] = HOLD_EMPTY; } buf->mr_head_tail = (union mpsc_ring_head_tail){ 0 }; /* * Publish these updates. */ os_atomic_thread_fence(release); } /** * Copy to or from the ringbuffer, taking wrap around at the end into account. * * @discussion * This function does not enforce any bounds checking on the head or tail * offsets and is a helper for higher-level interfaces. * * @param buf * The ringbuffer to copy into or out of. * * @param offset * The offset to start the copy operation at. * * @param data * The input or output buffer. * * @param size * The amount of bytes to copy. * * @param in * The direction of the copy. True to treat @link data @/link as a source and * copy into the ringbuffer and false to tread @link data @/link as a * destination and copy out of the ringbuffer. */ OS_ALWAYS_INLINE static void _mpsc_ring_copy( const struct mpsc_ring *buf, uint32_t offset, void *data, uint32_t size, bool in) { /* * Find the offset into the ringbuffer's memory. */ uint32_t const offset_trunc = offset % buf->mr_capacity; /* * Determine how much contiguous space is left in the ringbuffer for a * single memcpy. */ uint32_t const left_contig = buf->mr_capacity - offset_trunc; uint32_t const size_contig = MIN(left_contig, size); memcpy(in ? &buf->mr_buffer[offset_trunc] : data, in ? data : &buf->mr_buffer[offset_trunc], size_contig); if (size_contig != size) { /* * If there's any leftover data uncopied, copy it at the start of the * ringbuffer. */ uint32_t const size_left = size - size_contig; void * const data_left = (char *)data + size_contig; memcpy(in ? buf->mr_buffer : data_left, in ? data_left : buf->mr_buffer, size_left); } } uint32_t mpsc_ring_write( struct mpsc_ring *buf, uint8_t writer_id, const void *data, uint32_t size) { /* * Get an initial guess at where to write. */ union mpsc_ring_head_tail head_tail = os_atomic_load( &buf->mr_head_tail, relaxed); union mpsc_ring_head_tail new_head_tail = { 0 }; os_atomic_rmw_loop( &buf->mr_head_tail.mrht_head_tail, head_tail.mrht_head_tail /* old */, new_head_tail.mrht_head_tail /* new */, release, { /* * Check for empty space in the buffer. */ uint32_t const leftover = head_tail.mrht_head + size - head_tail.mrht_tail; if (leftover >= buf->mr_capacity) { /* * Not enough space available for all the data, so give up. */ os_atomic_rmw_loop_give_up(goto out); } /* * Compute a new head offset based on the size being written. */ new_head_tail = head_tail; new_head_tail.mrht_head += size; /* * Reserve the start of the space with a hold. */ os_atomic_store( &buf->mr_writer_holds[writer_id], head_tail.mrht_head, relaxed); }); _mpsc_ring_copy(buf, head_tail.mrht_head, (void *)(uintptr_t)data, size, true); out: /* * Release the hold value so it can synchronize with acquires on the read * side. */ os_atomic_store(&buf->mr_writer_holds[writer_id], HOLD_EMPTY, release); return buf->mr_capacity - (head_tail.mrht_head - head_tail.mrht_tail); } mpsc_ring_cursor_t mpsc_ring_read_start(struct mpsc_ring *buf) { /* * Acquire to ensure that any holds updated are visible. */ union mpsc_ring_head_tail head_tail = os_atomic_load(&buf->mr_head_tail, acquire); for (uint8_t i = 0; i < buf->mr_writer_count; i++) { /* * Check for any earlier holds to avoid reading past writes-in-progress. */ uint32_t hold = os_atomic_load(&buf->mr_writer_holds[i], relaxed); if (hold != ~0) { head_tail.mrht_head = MIN(head_tail.mrht_head, hold); } } return (mpsc_ring_cursor_t){ .mrc_commit_pos = head_tail.mrht_tail, .mrc_pos = head_tail.mrht_tail, .mrc_limit = head_tail.mrht_head, }; } bool mpsc_ring_cursor_advance( const struct mpsc_ring *buf, mpsc_ring_cursor_t *cursor, void *target, uint32_t size) { if (size > cursor->mrc_limit - cursor->mrc_pos) { return false; } _mpsc_ring_copy(buf, cursor->mrc_pos, target, size, false); cursor->mrc_pos += size; return true; } void mpsc_ring_cursor_commit( const struct mpsc_ring * __unused buf, mpsc_ring_cursor_t *cursor) { cursor->mrc_commit_pos = cursor->mrc_pos; } void mpsc_ring_read_finish( struct mpsc_ring *buf, mpsc_ring_cursor_t cursor) { /* * Relaxed, as there's no need to synchronize with any other readers: this * ringbuffer is single-consumer. */ os_atomic_store(&buf->mr_head_tail.mrht_tail, cursor.mrc_commit_pos, relaxed); } void mpsc_ring_read_cancel( struct mpsc_ring * __unused buf, mpsc_ring_cursor_t __unused cursor) { /* * Nothing to do; just "consume" the cursor. */ }