/* * Copyright (c) 2015-2021 Apple Inc. All rights reserved. * * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ * * This file contains Original Code and/or Modifications of Original Code * as defined in and that are subject to the Apple Public Source License * Version 2.0 (the 'License'). You may not use this file except in * compliance with the License. The rights granted to you under the License * may not be used to create, or enable the creation or redistribution of, * unlawful or unlicensed copies of an Apple operating system, or to * circumvent, violate, or enable the circumvention or violation of, any * terms of an Apple operating system software license agreement. * * Please obtain a copy of the License at * http://www.opensource.apple.com/apsl/ and read it before using this file. * * The Original Code and all software distributed under the License are * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. * Please see the License for the specific language governing rights and * limitations under the License. * * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ */ /* * @OSF_FREE_COPYRIGHT@ */ /* * Mach Operating System * Copyright (c) 1991,1990,1989,1988,1987 Carnegie Mellon University * All Rights Reserved. * * Permission to use, copy, modify and distribute this software and its * documentation is hereby granted, provided that both the copyright * notice and this permission notice appear in all copies of the * software, derivative works or modified versions, and any portions * thereof, and that both notices appear in supporting documentation. * * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS" * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE. * * Carnegie Mellon requests users of this software to return to * * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU * School of Computer Science * Carnegie Mellon University * Pittsburgh PA 15213-3890 * * any improvements or extensions that they make and grant Carnegie Mellon * the rights to redistribute these changes. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /*! * @const waitq_set_unlink_batch * * @brief * How many links are unhooked under a single set lock hold. * * @discussion * Holding a waitq set lock for too long can cause * extreme contention (when a set is being torn down concurrently * to messages being sent to ports who used to belong to that set). * * In order to fight this, large wait queue sets will drop * and reacquire their lock for each unlinking batch. */ static TUNABLE(uint32_t, waitq_set_unlink_batch, "waitq_set_unlink_batch", 64); /*! * @const WQL_PREPOST_MARKER * * @brief * Marker set in the @c wql_wqs field of wait queue linkages to denote that * this linkage has preposted to its wait queue set already. * * @discussion * This bit is manipulated under both the wait queue and the wait queue set * locks, and is used for two purposes: * * - for port set queues, it denotes in which circle queue the linkage * is queued on (@c waitq_set::wqset_links or @c waitq_set::wqset_preposts) * * - as an optimization during pre-post to not walk sets this link already * preposted to. */ #define WQL_PREPOST_MARKER 1ul #if __LP64__ /*! * @struct waitq_link_hdr * * @brief * Common "header" between all linkages, in order to find the waitq_set * of this linkage. * * @discussion * Due to unfortunate alignment constraints on @c queue_chain_t, * this is wildly different for LP64 and ILP32. * * Do note that `wql */ struct waitq_link_hdr { uintptr_t wql_wqs; }; /*! * @struct waitq_sellink * * @brief * Linkages used for select waitq queues to select wait queue sets. * * @discussion * Select linkages are one way (queue to set) for two reasons: * * 1. select doesn't use the wait queue subsystem to discover which file * descriptor woke up the set (it will instead scan all fds again), * * 2. all linkages are unhooked on each syscall return, so we minimize * work to be done to be as quick as possible, using a fast invalidation * scheme based on unique identifiers and sequestering * (see @c select_set_nextid()). */ struct waitq_sellink { uintptr_t wql_wqs; struct waitq_link_list_entry wql_next; uint64_t wql_setid; }; /*! * @struct waitq_link * * @brief * Linkages used for port wait queues and port-set wait queue sets. * * @discussion * Those linkages go both ways so that receiving messages through a port-set * can quickly find ports that preposted to the set. * * It also means that unhooking linkages cannot be lazy. */ struct waitq_link { uintptr_t wql_wqs; /**< wait queue set for this link */ queue_chain_t wql_qlink; /**< linkage through the waitq list */ queue_chain_t wql_slink; /**< linkage through the wqset list */ struct waitq *wql_wq; /**< wait queue for this link */ }; #else struct waitq_link_hdr { uint64_t __wql_padding; uintptr_t wql_wqs; }; struct waitq_sellink { struct waitq_link_list_entry wql_next; uintptr_t __wql_padding; uintptr_t wql_wqs; uint64_t wql_setid; }; struct waitq_link { queue_chain_t wql_qlink; uintptr_t wql_wqs; struct waitq *wql_wq; queue_chain_t wql_slink; }; #endif static_assert(offsetof(struct waitq_link_hdr, wql_wqs) == offsetof(struct waitq_sellink, wql_wqs)); static_assert(offsetof(struct waitq_link_hdr, wql_wqs) == offsetof(struct waitq_link, wql_wqs)); static_assert(sizeof(struct waitq) <= WQ_OPAQUE_SIZE, "waitq structure size mismatch"); static_assert(__alignof(struct waitq) == WQ_OPAQUE_ALIGN, "waitq structure alignment mismatch"); static KALLOC_TYPE_DEFINE(waitq_sellink_zone, struct waitq_sellink, KT_PRIV_ACCT); static KALLOC_TYPE_DEFINE(waitq_link_zone, struct waitq_link, KT_PRIV_ACCT); ZONE_DEFINE_ID(ZONE_ID_SELECT_SET, "select_set", struct select_set, ZC_SEQUESTER | ZC_NOPGZ | ZC_ZFREE_CLEARMEM); static LCK_GRP_DECLARE(waitq_lck_grp, "waitq"); static uint64_t PERCPU_DATA(select_setid); struct waitq select_conflict_queue; #pragma mark waitq links static inline bool waitq_is_sellink(waitq_type_t type) { return type == WQT_SELECT || type == WQT_SELECT_SET; } static inline bool wql_sellink_valid(struct select_set *selset, struct waitq_sellink *link) { return waitq_valid(selset) && selset->selset_id == link->wql_setid; } static waitq_t wql_wqs(waitq_link_t link) { return (waitq_t){ (void *)(link.wqlh->wql_wqs & ~WQL_PREPOST_MARKER) }; } static bool wql_wqs_preposted(waitq_link_t link) { return link.wqlh->wql_wqs & WQL_PREPOST_MARKER; } static void wql_wqs_mark_preposted(waitq_link_t link) { assert(!wql_wqs_preposted(link)); link.wqlh->wql_wqs |= WQL_PREPOST_MARKER; } static void wql_wqs_clear_preposted(waitq_link_t link) { assert(wql_wqs_preposted(link)); link.wqlh->wql_wqs &= ~WQL_PREPOST_MARKER; } static circle_queue_t wql_wqs_queue(struct waitq_set *wqs, struct waitq_link *link) { return wql_wqs_preposted(link) ? &wqs->wqset_preposts : &wqs->wqset_links; } static void wql_list_push(waitq_link_list_t *list, waitq_link_t link) { link.wqls->wql_next.next = list->next; list->next = &link.wqls->wql_next; } static inline struct waitq_sellink * wql_list_elem(struct waitq_link_list_entry *e) { return e ? __container_of(e, struct waitq_sellink, wql_next) : NULL; } /*! * @function wql_list_next() * * @brief * Helper function to implement wait queue link list enumeration. * * @param e in: pointer to the current element, * out: pointer to the next element or NULL * @param end which element to stop enumeration at (NULL for lists, * or the first element enumerated for circle queues). * @returns true (makes writing for(;;) based enumerators easier). */ static inline bool wql_list_next(struct waitq_link_list_entry **e, struct waitq_link_list_entry *end) { if (*e == NULL || (*e)->next == end) { *e = NULL; } else { *e = (*e)->next; } return true; } #define __wql_list_foreach(it, head, end) \ for (struct waitq_link_list_entry *__it = (head)->next, *__end = end; \ ((it) = wql_list_elem(__it)); wql_list_next(&__it, __end)) #define wql_list_foreach(it, head) \ __wql_list_foreach(it, head, NULL) #define wql_list_foreach_safe(it, head) \ for (struct waitq_link_list_entry *__it = (head)->next; \ ((it) = wql_list_elem(__it)) && wql_list_next(&__it, NULL); ) /* * Gross hack: passing `__it` to `__wql_list_foreach` makes it stop whether * we circle back to the first element or NULL (whichever comes first). * * This allows to have a single enumeration function oblivious to whether * we enumerate a circle queue or a sellink list. */ #define waitq_link_foreach(link, waitq) \ __wql_list_foreach((link).wqls, &(waitq).wq_q->waitq_sellinks, __it) static_assert(offsetof(struct waitq, waitq_sellinks) == offsetof(struct waitq, waitq_links)); static_assert(offsetof(struct waitq_sellink, wql_next) == offsetof(struct waitq_link, wql_qlink.next)); static struct waitq_link * wql_find(struct waitq *waitq, waitq_t wqset) { struct waitq_link *link; cqe_foreach_element(link, &waitq->waitq_links, wql_qlink) { if (waitq_same(wql_wqs(link), wqset)) { return link; } } return NULL; } waitq_link_t waitq_link_alloc(waitq_type_t type) { waitq_link_t link; if (waitq_is_sellink(type)) { link.wqls = zalloc_flags(waitq_sellink_zone, Z_WAITOK | Z_ZERO); } else { link.wqll = zalloc_flags(waitq_link_zone, Z_WAITOK | Z_ZERO); } return link; } void waitq_link_free(waitq_type_t type, waitq_link_t link) { if (waitq_is_sellink(type)) { return zfree(waitq_sellink_zone, link.wqls); } else { return zfree(waitq_link_zone, link.wqll); } } void waitq_link_free_list(waitq_type_t type, waitq_link_list_t *free_l) { waitq_link_t link; wql_list_foreach_safe(link.wqls, free_l) { waitq_link_free(type, link); } free_l->next = NULL; } #pragma mark global wait queues static __startup_data struct waitq g_boot_waitq; static SECURITY_READ_ONLY_LATE(struct waitq *) global_waitqs = &g_boot_waitq; static SECURITY_READ_ONLY_LATE(uint32_t) g_num_waitqs = 1; /* * Zero out the used MSBs of the event. */ #define _CAST_TO_EVENT_MASK(event) \ ((waitq_flags_t)(uintptr_t)(event) & ((1ul << _EVENT_MASK_BITS) - 1ul)) static inline uint32_t waitq_hash(char *key, size_t length) { return os_hash_jenkins(key, length) & (g_num_waitqs - 1); } /* return a global waitq pointer corresponding to the given event */ struct waitq * _global_eventq(char *event, size_t event_length) { return &global_waitqs[waitq_hash(event, event_length)]; } bool waitq_is_valid(waitq_t waitq) { return waitq_valid(waitq); } static inline bool waitq_is_global(waitq_t waitq) { if (waitq_type(waitq) != WQT_QUEUE) { return false; } return waitq.wq_q >= global_waitqs && waitq.wq_q < global_waitqs + g_num_waitqs; } static inline bool waitq_empty(waitq_t wq) { struct turnstile *ts; switch (waitq_type(wq)) { case WQT_TURNSTILE: return priority_queue_empty(&wq.wq_q->waitq_prio_queue); case WQT_PORT: ts = wq.wq_q->waitq_ts; return ts == TURNSTILE_NULL || priority_queue_empty(&ts->ts_waitq.waitq_prio_queue); case WQT_QUEUE: case WQT_SELECT: case WQT_PORT_SET: case WQT_SELECT_SET: return circle_queue_empty(&wq.wq_q->waitq_queue); default: return true; } } #if CONFIG_WAITQ_STATS #define NWAITQ_BTFRAMES 5 struct wq_stats { uint64_t waits; uint64_t wakeups; uint64_t clears; uint64_t failed_wakeups; uintptr_t last_wait[NWAITQ_BTFRAMES]; uintptr_t last_wakeup[NWAITQ_BTFRAMES]; uintptr_t last_failed_wakeup[NWAITQ_BTFRAMES]; }; /* this global is for lldb */ const uint32_t g_nwaitq_btframes = NWAITQ_BTFRAMES; struct wq_stats g_boot_stats; struct wq_stats *g_waitq_stats = &g_boot_stats; static __inline__ void waitq_grab_backtrace(uintptr_t bt[NWAITQ_BTFRAMES], unsigned skip) { uintptr_t buf[NWAITQ_BTFRAMES + skip]; memset(buf, 0, (NWAITQ_BTFRAMES + skip) * sizeof(uintptr_t)); backtrace(buf, g_nwaitq_btframes + skip, NULL, NULL); memcpy(&bt[0], &buf[skip], NWAITQ_BTFRAMES * sizeof(uintptr_t)); } static __inline__ struct wq_stats * waitq_global_stats(waitq_t waitq) { struct wq_stats *wqs; uint32_t idx; if (!waitq_is_global(waitq)) { return NULL; } idx = (uint32_t)(waitq.wq_q - global_waitqs); assert(idx < g_num_waitqs); wqs = &g_waitq_stats[idx]; return wqs; } static __inline__ void waitq_stats_count_wait(waitq_t waitq) { struct wq_stats *wqs = waitq_global_stats(waitq); if (wqs != NULL) { wqs->waits++; waitq_grab_backtrace(wqs->last_wait, 2); } } static __inline__ void waitq_stats_count_wakeup(waitq_t waitq, int n) { struct wq_stats *wqs = waitq_global_stats(waitq); if (wqs != NULL) { if (n > 0) { wqs->wakeups += n; waitq_grab_backtrace(wqs->last_wakeup, 2); } else { wqs->failed_wakeups++; waitq_grab_backtrace(wqs->last_failed_wakeup, 2); } } } static __inline__ void waitq_stats_count_clear_wakeup(waitq_t waitq) { struct wq_stats *wqs = waitq_global_stats(waitq); if (wqs != NULL) { wqs->wakeups++; wqs->clears++; waitq_grab_backtrace(wqs->last_wakeup, 2); } } #else /* !CONFIG_WAITQ_STATS */ #define waitq_stats_count_wait(q) do { } while (0) #define waitq_stats_count_wakeup(q, n) do { } while (0) #define waitq_stats_count_clear_wakeup(q) do { } while (0) #endif static struct waitq * waitq_get_safeq(waitq_t waitq) { if (waitq_type(waitq) == WQT_PORT) { struct turnstile *ts = waitq.wq_q->waitq_ts; return ts ? &ts->ts_waitq : NULL; } uint32_t hash = os_hash_kernel_pointer(waitq.wq_q); return &global_waitqs[hash & (g_num_waitqs - 1)]; } /* * Since the priority ordered waitq uses basepri as the * ordering key assert that this value fits in a uint8_t. */ static_assert(MAXPRI <= UINT8_MAX); static inline void waitq_thread_insert(struct waitq *safeq, thread_t thread, waitq_t wq, event64_t event) { if (waitq_type(safeq) == WQT_TURNSTILE) { turnstile_stats_update(0, TSU_TURNSTILE_BLOCK_COUNT, NULL); turnstile_waitq_add_thread_priority_queue(safeq, thread); } else { turnstile_stats_update(0, TSU_REGULAR_WAITQ_BLOCK_COUNT, NULL); /* * This is the extent to which we currently take scheduling * attributes into account: * * - If the thread is vm privileged, we stick it at the front * of the queue, later, these queues will honor the policy * value set at waitq_init time. * * - Realtime threads get priority for wait queue placements. * This allows wait_queue_wakeup_one to prefer a waiting * realtime thread, similar in principle to performing * a wait_queue_wakeup_all and allowing scheduler * prioritization to run the realtime thread, but without * causing the lock contention of that scenario. */ if (thread->sched_pri >= BASEPRI_REALTIME || !safeq->waitq_fifo || (thread->options & TH_OPT_VMPRIV)) { circle_enqueue_head(&safeq->waitq_queue, &thread->wait_links); } else { circle_enqueue_tail(&safeq->waitq_queue, &thread->wait_links); } } /* mark the event and real waitq, even if enqueued on a global safeq */ thread->wait_event = event; thread->waitq = wq; } /** * clear the thread-related waitq state, moving the thread from * TH_WAIT to TH_WAIT | TH_WAKING, where it is no longer on a waitq and * can expect to be go'ed in the near future. * * Clearing the waitq prevents further propagation of a turnstile boost * on the thread and stops a clear_wait from succeeding. * * Conditions: * 'thread' is locked, thread is waiting */ static inline void thread_clear_waitq_state(thread_t thread) { assert(thread->state & TH_WAIT); thread->waitq.wq_q = NULL; thread->wait_event = NO_EVENT64; thread->at_safe_point = FALSE; thread->block_hint = kThreadWaitNone; thread->state |= TH_WAKING; } static inline void waitq_thread_remove(waitq_t wq, thread_t thread) { if (waitq_type(wq) == WQT_TURNSTILE) { KERNEL_DEBUG_CONSTANT_IST(KDEBUG_TRACE, (TURNSTILE_CODE(TURNSTILE_HEAP_OPERATIONS, (THREAD_REMOVED_FROM_TURNSTILE_WAITQ))) | DBG_FUNC_NONE, VM_KERNEL_UNSLIDE_OR_PERM(waitq_to_turnstile(wq.wq_q)), thread_tid(thread), 0, 0, 0); priority_queue_remove(&wq.wq_q->waitq_prio_queue, &thread->wait_prioq_links); } else { circle_dequeue(&wq.wq_q->waitq_queue, &thread->wait_links); if (waitq_is_global(wq) && waitq_empty(wq)) { wq.wq_q->waitq_eventmask = 0; } } thread_clear_waitq_state(thread); } bool waitq_wait_possible(thread_t thread) { return waitq_is_null(thread->waitq) && ((thread->state & TH_WAKING) == 0); } __startup_func static void waitq_bootstrap(void) { const uint32_t qsz = sizeof(struct waitq); vm_offset_t whsize; int cpu = 0; /* * Determine the amount of memory we're willing to reserve for * the waitqueue hash table */ if (!PE_parse_boot_argn("wqsize", &whsize, sizeof(whsize))) { whsize = round_page(thread_max * qsz / 5); } /* * Determine the number of waitqueues we can fit. * The hash algorithm requires that this be a power of 2. */ g_num_waitqs = 0x80000000u >> __builtin_clzl(whsize / qsz); assert(g_num_waitqs > 0); whsize = round_page(g_num_waitqs * qsz); kmem_alloc(kernel_map, (vm_offset_t *)&global_waitqs, whsize, KMA_NOFAIL | KMA_KOBJECT | KMA_NOPAGEWAIT | KMA_PERMANENT, VM_KERN_MEMORY_WAITQ); #if CONFIG_WAITQ_STATS whsize = round_page(g_num_waitqs * sizeof(struct wq_stats)); kmem_alloc(kernel_map, (vm_offset_t *)&g_waitq_stats, whsize, KMA_NOFAIL | KMA_KOBJECT | KMA_NOPAGEWAIT | KMA_ZERO | KMA_PERMANENT, VM_KERN_MEMORY_WAITQ); #endif for (uint32_t i = 0; i < g_num_waitqs; i++) { waitq_init(&global_waitqs[i], WQT_QUEUE, SYNC_POLICY_FIFO); } waitq_init(&select_conflict_queue, WQT_SELECT, SYNC_POLICY_FIFO); percpu_foreach(setid, select_setid) { /* is not cpu_number() but CPUs haven't been numbered yet */ *setid = cpu++; } } STARTUP(MACH_IPC, STARTUP_RANK_FIRST, waitq_bootstrap); #pragma mark locking static hw_spin_timeout_status_t waitq_timeout_handler(void *_lock, hw_spin_timeout_t to, hw_spin_state_t st) { lck_spinlock_to_info_t lsti; hw_lck_ticket_t tmp; struct waitq *wq = _lock; if (machine_timeout_suspended()) { return HW_LOCK_TIMEOUT_CONTINUE; } lsti = lck_spinlock_timeout_hit(&wq->waitq_interlock, 0); tmp.tcurnext = os_atomic_load(&wq->waitq_interlock.tcurnext, relaxed); panic("waitq(%p) lock " HW_SPIN_TIMEOUT_FMT "; cpu=%d, " "cticket: 0x%x, nticket: 0x%x, waiting for 0x%x, " HW_SPIN_TIMEOUT_DETAILS_FMT, wq, HW_SPIN_TIMEOUT_ARG(to, st), cpu_number(), tmp.cticket, tmp.nticket, lsti->extra, HW_SPIN_TIMEOUT_DETAILS_ARG(to, st)); } static const struct hw_spin_policy waitq_spin_policy = { .hwsp_name = "waitq", #if defined(__i386__) || defined(__x86_64__) .hwsp_timeout = &LockTimeOutTSC, #else .hwsp_timeout_atomic = &LockTimeOut, #endif /* * Double the standard lock timeout, because wait queues tend * to iterate over a number of threads - locking each. If there is * a problem with a thread lock, it normally times out at the wait * queue level first, hiding the real problem. */ .hwsp_timeout_shift = 1, .hwsp_lock_offset = offsetof(struct waitq, waitq_interlock), .hwsp_op_timeout = waitq_timeout_handler, }; void waitq_invalidate(waitq_t waitq) { hw_lck_ticket_invalidate(&waitq.wq_q->waitq_interlock); } bool waitq_held(waitq_t wq) { return hw_lck_ticket_held(&wq.wq_q->waitq_interlock); } void waitq_lock(waitq_t wq) { (void)hw_lck_ticket_lock_to(&wq.wq_q->waitq_interlock, &waitq_spin_policy, &waitq_lck_grp); #if defined(__x86_64__) pltrace(FALSE); #endif } bool waitq_lock_try(waitq_t wq) { bool rc = hw_lck_ticket_lock_try(&wq.wq_q->waitq_interlock, &waitq_lck_grp); #if defined(__x86_64__) if (rc) { pltrace(FALSE); } #endif return rc; } bool waitq_lock_reserve(waitq_t wq, uint32_t *ticket) { return hw_lck_ticket_reserve(&wq.wq_q->waitq_interlock, ticket, &waitq_lck_grp); } void waitq_lock_wait(waitq_t wq, uint32_t ticket) { (void)hw_lck_ticket_wait(&wq.wq_q->waitq_interlock, ticket, &waitq_spin_policy, &waitq_lck_grp); #if defined(__x86_64__) pltrace(FALSE); #endif } bool waitq_lock_allow_invalid(waitq_t wq) { hw_lock_status_t rc; rc = hw_lck_ticket_lock_allow_invalid(&wq.wq_q->waitq_interlock, &waitq_spin_policy, &waitq_lck_grp); #if defined(__x86_64__) if (rc == HW_LOCK_ACQUIRED) { pltrace(FALSE); } #endif return rc == HW_LOCK_ACQUIRED; } void waitq_unlock(waitq_t wq) { assert(waitq_held(wq)); #if defined(__x86_64__) pltrace(TRUE); #endif hw_lck_ticket_unlock(&wq.wq_q->waitq_interlock); } #pragma mark assert_wait / wakeup struct waitq_select_args { /* input parameters */ event64_t event; wait_result_t result; waitq_wakeup_flags_t flags; uint32_t max_threads; bool is_identified; /* output parameters */ /* counts all woken threads, may have more threads than on threadq */ uint32_t nthreads; /* preemption is disabled while threadq is non-empty */ circle_queue_head_t threadq; }; static inline void maybe_adjust_thread_pri( thread_t thread, waitq_wakeup_flags_t flags, __kdebug_only waitq_t waitq) { /* * If the caller is requesting the waitq subsystem to promote the * priority of the awoken thread, then boost the thread's priority to * the default WAITQ_BOOST_PRIORITY (if it's not already equal or * higher priority). This boost must be removed via a call to * waitq_clear_promotion_locked before the thread waits again. */ if (flags & WAITQ_PROMOTE_PRIORITY) { uintptr_t trace_waitq = 0; if (__improbable(kdebug_enable)) { trace_waitq = VM_KERNEL_UNSLIDE_OR_PERM(waitq.wq_q); } sched_thread_promote_reason(thread, TH_SFLAG_WAITQ_PROMOTED, trace_waitq); } } static void waitq_select_queue_add(waitq_t waitq, thread_t thread, struct waitq_select_args *args) { spl_t s = splsched(); thread_lock(thread); thread_clear_waitq_state(thread); if (!args->is_identified && thread->state & TH_RUN) { /* * A thread that is currently on core may try to clear its own * wait with clear wait or by waking its own event instead of * calling thread_block as is normally expected. After doing * this, it expects to be able to immediately wait again. * * If we are currently on a different CPU and waking that * thread, as soon as we unlock the waitq and thread, that * operation could complete, but we would still be holding the * thread on our flush queue, leaving it in the waking state * where it can't yet assert another wait. * * Since we know that we won't actually need to enqueue the * thread on the runq due to it being on core, we can just * immediately unblock it here so that the thread will be in a * waitable state after we release its thread lock from this * lock hold. * * Wakeups using *_identify can't be allowed to pass * thread block until they're resumed, so they can't use * this path. That means they are not allowed to skip calling * thread_block. */ maybe_adjust_thread_pri(thread, args->flags, waitq); thread_go(thread, args->result, false); } else { if (circle_queue_empty(&args->threadq)) { /* * preemption is disabled while threads are * on threadq - balanced in: * waitq_resume_identified_thread * waitq_select_queue_flush */ disable_preemption(); } circle_enqueue_tail(&args->threadq, &thread->wait_links); } thread_unlock(thread); splx(s); } #if SCHED_HYGIENE_DEBUG TUNABLE_DEV_WRITEABLE(uint32_t, waitq_flush_excess_threads, "waitq_flush_excess_threads", 20); TUNABLE_DEV_WRITEABLE(uint32_t, waitq_flush_excess_time_mt, "waitq_flush_excess_time_mt", 7200); /* 300us */ #endif /* SCHED_HYGIENE_DEBUG */ static void waitq_select_queue_flush(waitq_t waitq, struct waitq_select_args *args) { thread_t thread = THREAD_NULL; assert(!circle_queue_empty(&args->threadq)); int flushed_threads = 0; #if SCHED_HYGIENE_DEBUG uint64_t start_time = ml_get_sched_hygiene_timebase(); disable_preemption(); #endif /* SCHED_HYGIENE_DEBUG */ cqe_foreach_element_safe(thread, &args->threadq, wait_links) { circle_dequeue(&args->threadq, &thread->wait_links); assert_thread_magic(thread); spl_t s = splsched(); thread_lock(thread); maybe_adjust_thread_pri(thread, args->flags, waitq); thread_go(thread, args->result, args->flags & WAITQ_HANDOFF); thread_unlock(thread); splx(s); flushed_threads++; } #if SCHED_HYGIENE_DEBUG uint64_t end_time = ml_get_sched_hygiene_timebase(); /* * Check for a combination of excess threads and long time, * so that a single thread wakeup that gets stuck is still caught */ if (waitq_flush_excess_threads && waitq_flush_excess_time_mt && flushed_threads > waitq_flush_excess_threads && (end_time - start_time) > waitq_flush_excess_time_mt) { /* * Hack alert: * * If a wakeup-all is done with interrupts disabled, or if * there are enough threads / lock contention to pass the * preemption disable threshold, it can take Too Long to get * through waking up all the threads, leading to * the watchdog going off. * * While we are working on a change to break up this * giant glob of work into smaller chunks, remove this * time region from the watchdog's memory to avoid * unit tests that wake up hundreds of threads on * one semaphore from causing this to blow up. * * We only trigger this when seeing a combination of * excess threads and long time, so that a single * thread wakeup that gets stuck is still caught. * * This was improved with * rdar://90325140 * to enable interrupts during most wakeup-all's * and will be removed with * rdar://101110793 */ if (ml_get_interrupts_enabled() == false) { ml_spin_debug_reset(current_thread()); ml_irq_debug_abandon(); } abandon_preemption_disable_measurement(); KDBG(MACHDBG_CODE(DBG_MACH_SCHED, MACH_INT_MASKED_RESET), flushed_threads, end_time - start_time); } enable_preemption(); #endif /* SCHED_HYGIENE_DEBUG */ /* * match the disable when making threadq nonempty from * waitq_select_queue_add */ enable_preemption(); } /** * Routine to iterate over the waitq for non-priority ordered waitqs * * Conditions: * args->waitq (and the posted waitq) is locked * * Notes: * If one or more threads are selected, this may disable preemption, * which is balanced when the threadq is flushed in * waitq_resume_identified_thread or waitq_select_queue_flush. */ static waitq_flags_t waitq_queue_iterate_locked(struct waitq *safeq, struct waitq *waitq, struct waitq_select_args *args) { thread_t thread = THREAD_NULL; waitq_flags_t eventmask = 0; cqe_foreach_element_safe(thread, &safeq->waitq_queue, wait_links) { assert_thread_magic(thread); /* * For non-priority ordered waitqs, we allow multiple events to be * mux'ed into the same waitq. Also safeqs may contain threads from * multiple waitqs. Only pick threads that match the * requested wait event. */ if (waitq_same(thread->waitq, waitq) && thread->wait_event == args->event) { /* We found a matching thread! Pull it from the queue. */ circle_dequeue(&safeq->waitq_queue, &thread->wait_links); waitq_select_queue_add(waitq, thread, args); if (++args->nthreads >= args->max_threads) { break; } } else { /* thread wasn't selected so track its event */ eventmask |= waitq_same(thread->waitq, safeq) ? _CAST_TO_EVENT_MASK(thread->wait_event) : _CAST_TO_EVENT_MASK(thread->waitq.wq_q); } } return eventmask; } /** * Routine to iterate and remove threads from priority ordered waitqs * * Conditions: * args->waitq (and the posted waitq) is locked * * Notes: * The priority ordered waitqs only support maximum priority element removal. * * Also, the implementation makes sure that all threads in a priority ordered * waitq are waiting on the same wait event. This is not necessarily true for * non-priority ordered waitqs. If one or more threads are selected, this may * disable preemption. */ static void waitq_prioq_iterate_locked( struct waitq *ts_wq, struct waitq *waitq, struct waitq_select_args *args) { struct turnstile *ts = waitq_to_turnstile(ts_wq); bool update_inheritor = (args->flags & WAITQ_UPDATE_INHERITOR); if (update_inheritor && args->max_threads == UINT32_MAX) { /* * If we are going to wake up all threads, * go ahead and set the inheritor to NULL. */ turnstile_kernel_update_inheritor_on_wake_locked(ts, TURNSTILE_INHERITOR_NULL, TURNSTILE_INHERITOR_THREAD); update_inheritor = false; } while (!priority_queue_empty(&ts_wq->waitq_prio_queue)) { thread_t thread; thread = priority_queue_remove_max(&ts_wq->waitq_prio_queue, struct thread, wait_prioq_links); assert_thread_magic(thread); /* * Ensure the wait event matches since priority ordered waitqs do not * support multiple events in the same waitq. */ assert(waitq_same(thread->waitq, waitq) && (thread->wait_event == args->event)); if (update_inheritor) { turnstile_inheritor_t inheritor = thread; if (priority_queue_empty(&ts_wq->waitq_prio_queue)) { inheritor = TURNSTILE_INHERITOR_NULL; } turnstile_kernel_update_inheritor_on_wake_locked(ts, inheritor, TURNSTILE_INHERITOR_THREAD); update_inheritor = false; } waitq_select_queue_add(waitq, thread, args); if (++args->nthreads >= args->max_threads) { break; } } } /** * @function do_waitq_select_n_locked_queue * * @brief * Selects threads waiting on a wait queue. * * @discussion * @c waitq is locked. * If @c waitq is a set, then the wait queue posting to it is locked too. * * If one or more threads are selected, this may disable preemption. */ static void do_waitq_select_n_locked_queue(waitq_t waitq, struct waitq_select_args *args) { spl_t s = 0; struct waitq *safeq; waitq_flags_t eventmask, remaining_eventmask; if (waitq_irq_safe(waitq)) { eventmask = _CAST_TO_EVENT_MASK(args->event); safeq = waitq.wq_q; } else { /* JMM - add flag to waitq to avoid global lookup if no waiters */ eventmask = _CAST_TO_EVENT_MASK(waitq.wq_q); safeq = waitq_get_safeq(waitq); if (safeq == NULL) { return; } s = splsched(); waitq_lock(safeq); } /* * If the safeq doesn't have an eventmask (not global) or the event * we're looking for IS set in its eventmask, then scan the threads * in that queue for ones that match the original pair. */ if (waitq_type(safeq) == WQT_TURNSTILE) { waitq_prioq_iterate_locked(safeq, waitq.wq_q, args); } else if (!waitq_is_global(safeq)) { waitq_queue_iterate_locked(safeq, waitq.wq_q, args); } else if ((safeq->waitq_eventmask & eventmask) == eventmask) { remaining_eventmask = waitq_queue_iterate_locked(safeq, waitq.wq_q, args); /* * Update the eventmask of global queues we just scanned: * - If we selected all the threads in the queue, * we can clear its eventmask. * * - If we didn't find enough threads to fill our needs, * then we can assume we looked at every thread in the queue * and the mask we computed is complete - so reset it. */ if (waitq_empty(safeq)) { safeq->waitq_eventmask = 0; } else if (args->nthreads < args->max_threads) { safeq->waitq_eventmask = remaining_eventmask; } } /* unlock the safe queue if we locked one above */ if (!waitq_same(waitq, safeq)) { waitq_unlock(safeq); splx(s); } } /** * @function do_waitq_link_select_n_locked() * * @brief * Selects threads waiting on any set a wait queue belongs to, * or preposts the wait queue onto them. * * @discussion * @c waitq is locked. */ __attribute__((noinline)) static void do_waitq_select_n_locked_sets(waitq_t waitq, struct waitq_select_args *args) { waitq_type_t wq_type = waitq_type(waitq); waitq_link_t link; assert(args->event == NO_EVENT64); assert(waitq_preposts(waitq)); waitq_link_foreach(link, waitq) { waitq_t wqset = wql_wqs(link); if (wql_wqs_preposted(link)) { /* * The wql_wqs_preposted() bit is cleared * under both the wq/wqset lock. * * If the wqset is still preposted, * we really won't find threads there. * * Just mark the waitq as preposted and move on. */ if (wq_type == WQT_PORT) { waitq.wq_q->waitq_preposted = true; } continue; } if (wq_type == WQT_SELECT) { /* * If PGZ picked this select set, * translate it to the real address * * If it is still a select set * (the slot could have been reused), * then keep using it for the rest of the logic. * * Even in the extremely unlikely case where * the slot was reused for another select_set, * the `wql_sellink_valid` check below will * take care of debouncing it. But we must * forget the original pointer we read * so that we unlock the proper object. */ wqset.wqs_sel = pgz_decode_allow_invalid(wqset.wqs_sel, ZONE_ID_SELECT_SET); if (!wqset.wqs_sel) { continue; } if (!waitq_lock_allow_invalid(wqset)) { continue; } if (!wql_sellink_valid(wqset.wqs_sel, link.wqls)) { goto out_unlock; } } else { waitq_lock(wqset); if (!waitq_valid(wqset)) { goto out_unlock; } } /* * Find any threads waiting on this wait queue set as a queue. */ do_waitq_select_n_locked_queue(wqset, args); if (args->nthreads == 0) { /* No thread selected: prepost 'waitq' to 'wqset' */ wql_wqs_mark_preposted(link); if (wq_type == WQT_SELECT) { wqset.wqs_sel->selset_preposted = true; } else { waitq.wq_q->waitq_preposted = true; circle_dequeue(&wqset.wqs_set->wqset_links, &link.wqll->wql_slink); circle_enqueue_tail(&wqset.wqs_set->wqset_preposts, &link.wqll->wql_slink); ipc_pset_prepost(wqset.wqs_set, waitq.wq_q); } } out_unlock: waitq_unlock(wqset); if (args->nthreads >= args->max_threads) { break; } } } /** * @function do_waitq_select_n_locked * * @brief * Selects threads waiting on a wait queue, or preposts it. * * @discussion * @c waitq is locked. * * Recurses into all sets this wait queue belongs to. */ static void do_waitq_select_n_locked(waitq_t waitq, struct waitq_select_args *args) { do_waitq_select_n_locked_queue(waitq, args); if (args->nthreads >= args->max_threads) { /* already enough threads found */ return; } if (args->event != NO_EVENT64 || !waitq_preposts(waitq)) { /* this wakeup should not recurse into sets */ return; } do_waitq_select_n_locked_sets(waitq, args); } static inline bool waitq_is_preposted_set(waitq_t waitq) { switch (waitq_type(waitq)) { case WQT_PORT_SET: return waitq_set_first_prepost(waitq.wqs_set, WQS_PREPOST_PEEK) != NULL; case WQT_SELECT_SET: return waitq.wqs_sel->selset_preposted; default: return false; } } wait_result_t waitq_assert_wait64_locked(waitq_t waitq, event64_t wait_event, wait_interrupt_t interruptible, wait_timeout_urgency_t urgency, uint64_t deadline, uint64_t leeway, thread_t thread) { wait_result_t wait_result; struct waitq *safeq; uintptr_t eventmask; spl_t s; switch (waitq_type(waitq)) { case WQT_PORT: case WQT_SELECT: case WQT_PORT_SET: case WQT_SELECT_SET: assert(wait_event == NO_EVENT64); break; default: assert(wait_event != NO_EVENT64); break; } /* * Warning: Do _not_ place debugging print statements here. * The waitq is locked! */ assert(!thread->started || thread == current_thread()); if (!waitq_wait_possible(thread)) { panic("thread already waiting on %p", thread->waitq.wq_q); } s = splsched(); /* * early-out if the thread is waiting on a wait queue set * that has already been pre-posted. * * Note: waitq_is_preposted_set() may unlock the waitq-set */ if (waitq_is_preposted_set(waitq)) { thread_lock(thread); thread->wait_result = THREAD_AWAKENED; thread_unlock(thread); splx(s); return THREAD_AWAKENED; } /* * If already dealing with an irq safe wait queue, we are all set. * Otherwise, determine a global queue to use and lock it. */ if (waitq_irq_safe(waitq)) { safeq = waitq.wq_q; eventmask = _CAST_TO_EVENT_MASK(wait_event); } else { safeq = waitq_get_safeq(waitq); if (__improbable(safeq == NULL)) { panic("Trying to assert_wait on a turnstile proxy " "that hasn't been donated one (waitq: %p)", waitq.wq_q); } eventmask = _CAST_TO_EVENT_MASK(waitq.wq_q); waitq_lock(safeq); } /* lock the thread now that we have the irq-safe waitq locked */ thread_lock(thread); wait_result = thread_mark_wait_locked(thread, interruptible); /* thread->wait_result has been set */ if (wait_result == THREAD_WAITING) { waitq_thread_insert(safeq, thread, waitq, wait_event); if (deadline != 0) { bool was_active; was_active = timer_call_enter_with_leeway(thread->wait_timer, NULL, deadline, leeway, urgency, FALSE); if (!was_active) { thread->wait_timer_active++; } thread->wait_timer_armed = true; } if (waitq_is_global(safeq)) { safeq->waitq_eventmask |= (waitq_flags_t)eventmask; } waitq_stats_count_wait(waitq); } /* unlock the thread */ thread_unlock(thread); /* update the inheritor's thread priority if the waitq is embedded in turnstile */ if (waitq_type(safeq) == WQT_TURNSTILE && wait_result == THREAD_WAITING) { turnstile_recompute_priority_locked(waitq_to_turnstile(safeq)); turnstile_update_inheritor_locked(waitq_to_turnstile(safeq)); } /* unlock the safeq if we locked it here */ if (!waitq_same(waitq, safeq)) { waitq_unlock(safeq); } splx(s); return wait_result; } bool waitq_pull_thread_locked(waitq_t waitq, thread_t thread) { struct waitq *safeq; uint32_t ticket; assert_thread_magic(thread); /* Find the interrupts disabled queue thread is waiting on */ if (waitq_irq_safe(waitq)) { safeq = waitq.wq_q; } else { safeq = waitq_get_safeq(waitq); if (__improbable(safeq == NULL)) { panic("Trying to clear_wait on a turnstile proxy " "that hasn't been donated one (waitq: %p)", waitq.wq_q); } } /* * thread is already locked so have to try for the waitq lock. * * We can't wait for the waitq lock under the thread lock, * however we can reserve our slot in the lock queue, * and if that reservation requires waiting, we are guaranteed * that this waitq can't die until we got our turn! */ if (!waitq_lock_reserve(safeq, &ticket)) { thread_unlock(thread); waitq_lock_wait(safeq, ticket); thread_lock(thread); if (!waitq_same(waitq, thread->waitq)) { /* * While we were waiting for our reservation the thread * stopped waiting on this waitq, bail out. */ waitq_unlock(safeq); return false; } } waitq_thread_remove(safeq, thread); waitq_stats_count_clear_wakeup(waitq); waitq_unlock(safeq); return true; } void waitq_clear_promotion_locked(waitq_t waitq, thread_t thread) { spl_t s = 0; assert(waitq_held(waitq)); assert(thread != THREAD_NULL); assert(thread == current_thread()); /* This flag is only cleared by the thread itself, so safe to check outside lock */ if ((thread->sched_flags & TH_SFLAG_WAITQ_PROMOTED) != TH_SFLAG_WAITQ_PROMOTED) { return; } if (!waitq_irq_safe(waitq)) { s = splsched(); } thread_lock(thread); sched_thread_unpromote_reason(thread, TH_SFLAG_WAITQ_PROMOTED, 0); thread_unlock(thread); if (!waitq_irq_safe(waitq)) { splx(s); } } static inline bool waitq_should_unlock(waitq_wakeup_flags_t flags) { return (flags & (WAITQ_UNLOCK | WAITQ_KEEP_LOCKED)) == WAITQ_UNLOCK; } static inline bool waitq_should_enable_interrupts(waitq_wakeup_flags_t flags) { return (flags & (WAITQ_UNLOCK | WAITQ_KEEP_LOCKED | WAITQ_ENABLE_INTERRUPTS)) == (WAITQ_UNLOCK | WAITQ_ENABLE_INTERRUPTS); } kern_return_t waitq_wakeup64_all_locked( waitq_t waitq, event64_t wake_event, wait_result_t result, waitq_wakeup_flags_t flags) { struct waitq_select_args args = { .event = wake_event, .result = result, .flags = flags & ~WAITQ_HANDOFF, .max_threads = UINT32_MAX, }; assert(waitq_held(waitq)); if (flags & WAITQ_ENABLE_INTERRUPTS) { assert(waitq_should_unlock(flags)); assert(ml_get_interrupts_enabled() == false); } do_waitq_select_n_locked(waitq, &args); waitq_stats_count_wakeup(waitq, args.nthreads); if (waitq_should_unlock(flags)) { waitq_unlock(waitq); } if (waitq_should_enable_interrupts(flags)) { ml_set_interrupts_enabled(true); } if (!circle_queue_empty(&args.threadq)) { waitq_select_queue_flush(waitq, &args); } if (args.nthreads > 0) { return KERN_SUCCESS; } return KERN_NOT_WAITING; } kern_return_t waitq_wakeup64_one_locked( waitq_t waitq, event64_t wake_event, wait_result_t result, waitq_wakeup_flags_t flags) { struct waitq_select_args args = { .event = wake_event, .result = result, .flags = flags, .max_threads = 1, }; assert(waitq_held(waitq)); if (flags & WAITQ_ENABLE_INTERRUPTS) { assert(waitq_should_unlock(flags)); assert(ml_get_interrupts_enabled() == false); } do_waitq_select_n_locked(waitq, &args); waitq_stats_count_wakeup(waitq, args.nthreads); if (waitq_should_unlock(flags)) { waitq_unlock(waitq); } if (waitq_should_enable_interrupts(flags)) { ml_set_interrupts_enabled(true); } if (!circle_queue_empty(&args.threadq)) { waitq_select_queue_flush(waitq, &args); } if (args.nthreads > 0) { return KERN_SUCCESS; } return KERN_NOT_WAITING; } thread_t waitq_wakeup64_identify_locked( waitq_t waitq, event64_t wake_event, wait_result_t result, waitq_wakeup_flags_t flags) { struct waitq_select_args args = { .event = wake_event, .result = result, .flags = flags, .max_threads = 1, .is_identified = true, }; assert(waitq_held(waitq)); do_waitq_select_n_locked(waitq, &args); waitq_stats_count_wakeup(waitq, args.nthreads); if (waitq_should_unlock(flags)) { waitq_unlock(waitq); } if (waitq_should_enable_interrupts(flags)) { ml_set_interrupts_enabled(true); } if (args.nthreads > 0) { thread_t thread = cqe_dequeue_head(&args.threadq, struct thread, wait_links); assert(args.nthreads == 1 && circle_queue_empty(&args.threadq)); /* Thread is off waitq, not unblocked yet */ return thread; } return THREAD_NULL; } void waitq_resume_identified_thread( waitq_t waitq, thread_t thread, wait_result_t result, waitq_wakeup_flags_t flags) { spl_t spl = splsched(); thread_lock(thread); assert((thread->state & (TH_WAIT | TH_WAKING)) == (TH_WAIT | TH_WAKING)); maybe_adjust_thread_pri(thread, flags, waitq); thread_go(thread, result, (flags & WAITQ_HANDOFF)); thread_unlock(thread); splx(spl); enable_preemption(); // balance disable upon pulling thread } void waitq_resume_and_bind_identified_thread( waitq_t waitq, thread_t thread, processor_t processor, wait_result_t result, waitq_wakeup_flags_t flags) { spl_t spl = splsched(); thread_lock(thread); assert((thread->state & (TH_WAIT | TH_WAKING)) == (TH_WAIT | TH_WAKING)); maybe_adjust_thread_pri(thread, flags, waitq); thread_bind_during_wakeup(thread, processor); thread_go(thread, result, (flags & WAITQ_HANDOFF)); thread_unlock(thread); splx(spl); enable_preemption(); // balance disable upon pulling thread } kern_return_t waitq_wakeup64_thread_and_unlock( struct waitq *waitq, event64_t event, thread_t thread, wait_result_t result) { kern_return_t ret = KERN_NOT_WAITING; assert(waitq_irq_safe(waitq)); assert(waitq_held(waitq)); assert_thread_magic(thread); /* * See if the thread was still waiting there. If so, it got * dequeued and returned locked. * * By holding the thread locked across the go, a thread on another CPU * can't see itself in 'waking' state, even if it uses clear_wait. */ thread_lock(thread); if (waitq_same(thread->waitq, waitq) && thread->wait_event == event) { waitq_thread_remove(waitq, thread); ret = KERN_SUCCESS; } waitq_stats_count_wakeup(waitq, ret == KERN_SUCCESS ? 1 : 0); waitq_unlock(waitq); if (ret == KERN_SUCCESS) { thread_go(thread, result, /* handoff */ false); } thread_unlock(thread); return ret; } #pragma mark waitq __attribute__((always_inline)) void waitq_init(waitq_t waitq, waitq_type_t type, int policy) { assert((policy & SYNC_POLICY_FIXED_PRIORITY) == 0); *waitq.wq_q = (struct waitq){ .waitq_type = type, .waitq_fifo = ((policy & SYNC_POLICY_REVERSED) == 0), }; switch (type) { case WQT_INVALID: __builtin_trap(); case WQT_TURNSTILE: /* For turnstile, initialize it as a priority queue */ priority_queue_init(&waitq.wq_q->waitq_prio_queue); assert(waitq.wq_q->waitq_fifo == 0); break; case WQT_PORT: waitq.wq_q->waitq_ts = TURNSTILE_NULL; break; case WQT_PORT_SET: circle_queue_init(&waitq.wqs_set->wqset_preposts); OS_FALLTHROUGH; case WQT_SELECT_SET: case WQT_QUEUE: case WQT_SELECT: circle_queue_init(&waitq.wq_q->waitq_queue); break; } if (policy & SYNC_POLICY_INIT_LOCKED) { hw_lck_ticket_init_locked(&waitq.wq_q->waitq_interlock, &waitq_lck_grp); } else { hw_lck_ticket_init(&waitq.wq_q->waitq_interlock, &waitq_lck_grp); } } void waitq_deinit(waitq_t waitq) { waitq_type_t type = waitq_type(waitq); switch (type) { case WQT_QUEUE: assert(circle_queue_empty(&waitq.wq_q->waitq_queue)); waitq_invalidate(waitq); break; case WQT_TURNSTILE: assert(priority_queue_empty(&waitq.wq_q->waitq_prio_queue)); assert(waitq.wq_q->waitq_inheritor == TURNSTILE_INHERITOR_NULL); waitq_invalidate(waitq); break; case WQT_PORT: assert(waitq.wq_q->waitq_ts == TURNSTILE_NULL); assert(circle_queue_empty(&waitq.wq_q->waitq_links)); break; case WQT_SELECT: assert(waitq.wq_q->waitq_sellinks.next == NULL); assert(circle_queue_empty(&waitq.wqs_set->wqset_queue)); break; case WQT_PORT_SET: assert(circle_queue_empty(&waitq.wqs_set->wqset_queue)); assert(circle_queue_empty(&waitq.wqs_set->wqset_links)); assert(circle_queue_empty(&waitq.wqs_set->wqset_preposts)); break; default: panic("invalid wait type: %p/%d", waitq.wq_q, type); } /* * The waitq must have been invalidated, or hw_lck_ticket_destroy() * below won't wait for reservations from waitq_lock_reserve(), * or waitq_lock_allow_invalid(). */ assert(!waitq_valid(waitq.wqs_set)); hw_lck_ticket_destroy(&waitq.wq_q->waitq_interlock, &waitq_lck_grp); } #pragma mark port-set sets void waitq_set_unlink_all_locked(struct waitq_set *wqset, waitq_link_list_t *free_l) { uint32_t batch = waitq_set_unlink_batch; waitq_invalidate(wqset); for (;;) { struct waitq_link *link; queue_entry_t elt; circle_queue_t q; struct waitq *wq; uint32_t ticket; bool stable = true; if (!circle_queue_empty(&wqset->wqset_links)) { q = &wqset->wqset_links; } else if (!circle_queue_empty(&wqset->wqset_preposts)) { q = &wqset->wqset_preposts; } else { break; } if (batch-- == 0) { waitq_unlock(wqset); waitq_lock(wqset); batch = waitq_set_unlink_batch; continue; } elt = circle_queue_first(q); link = cqe_element(elt, struct waitq_link, wql_slink); wq = link->wql_wq; if (__improbable(!waitq_lock_reserve(wq, &ticket))) { waitq_unlock(wqset); waitq_lock_wait(wq, ticket); waitq_lock(wqset); stable = (elt == circle_queue_first(q) && link->wql_wq == wq); } if (stable) { circle_dequeue(q, &link->wql_slink); circle_dequeue(&wq->waitq_links, &link->wql_qlink); wql_list_push(free_l, link); } waitq_unlock(wq); } } void waitq_clear_prepost_locked(struct waitq *waitq) { assert(waitq_type(waitq) == WQT_PORT); waitq->waitq_preposted = false; } void waitq_set_foreach_member_locked(struct waitq_set *wqs, void (^cb)(struct waitq *)) { struct waitq_link *link; cqe_foreach_element(link, &wqs->wqset_links, wql_slink) { cb(link->wql_wq); } cqe_foreach_element(link, &wqs->wqset_preposts, wql_slink) { cb(link->wql_wq); } } __abortlike static void __waitq_link_arguments_panic(struct waitq *waitq, struct waitq_set *wqset) { if (!waitq_valid(waitq)) { panic("Invalid waitq: %p", waitq); } if (waitq_type(waitq) != WQT_PORT) { panic("Invalid waitq type: %p:%d", waitq, waitq->waitq_type); } panic("Invalid waitq-set: %p", wqset); } static inline void __waitq_link_arguments_validate(struct waitq *waitq, struct waitq_set *wqset) { if (!waitq_valid(waitq) || waitq_type(waitq) != WQT_PORT || waitq_type(wqset) != WQT_PORT_SET) { __waitq_link_arguments_panic(waitq, wqset); } } __abortlike static void __waitq_invalid_panic(waitq_t waitq) { panic("Invalid waitq: %p", waitq.wq_q); } static void __waitq_validate(waitq_t waitq) { if (!waitq_valid(waitq)) { __waitq_invalid_panic(waitq); } } kern_return_t waitq_link_locked(struct waitq *waitq, struct waitq_set *wqset, waitq_link_t *linkp) { assert(linkp->wqlh); __waitq_link_arguments_validate(waitq, wqset); if (wql_find(waitq, wqset)) { return KERN_ALREADY_IN_SET; } linkp->wqll->wql_wq = waitq; linkp->wqll->wql_wqs = (uintptr_t)wqset; if (waitq_valid(wqset)) { circle_enqueue_tail(&wqset->wqset_links, &linkp->wqll->wql_slink); circle_enqueue_tail(&waitq->waitq_links, &linkp->wqll->wql_qlink); *linkp = WQL_NULL; } return KERN_SUCCESS; } kern_return_t waitq_link_prepost_locked(struct waitq *waitq, struct waitq_set *wqset) { struct waitq_link *link; __waitq_link_arguments_validate(waitq, wqset); link = wql_find(waitq, wqset); if (link == NULL) { return KERN_NOT_IN_SET; } if (!wql_wqs_preposted(link)) { wql_wqs_mark_preposted(link); waitq->waitq_preposted = true; circle_dequeue(&wqset->wqset_links, &link->wql_slink); circle_enqueue_tail(&wqset->wqset_preposts, &link->wql_slink); ipc_pset_prepost(wqset, waitq); } return KERN_SUCCESS; } waitq_link_t waitq_unlink_locked(struct waitq *waitq, struct waitq_set *wqset) { struct waitq_link *link; __waitq_link_arguments_validate(waitq, wqset); link = wql_find(waitq, wqset); if (link) { circle_dequeue(wql_wqs_queue(wqset, link), &link->wql_slink); circle_dequeue(&waitq->waitq_links, &link->wql_qlink); } return (waitq_link_t){ .wqll = link }; } void waitq_unlink_all_locked(struct waitq *waitq, struct waitq_set *except_wqset, waitq_link_list_t *free_l) { struct waitq_link *kept_link = NULL; struct waitq_link *link; assert(waitq_type(waitq) == WQT_PORT); cqe_foreach_element_safe(link, &waitq->waitq_links, wql_qlink) { waitq_t wqs = wql_wqs(link); if (wqs.wqs_set == except_wqset) { kept_link = link; continue; } waitq_lock(wqs); circle_dequeue(wql_wqs_queue(wqs.wqs_set, link), &link->wql_slink); wql_list_push(free_l, link); waitq_unlock(wqs); } circle_queue_init(&waitq->waitq_links); if (kept_link) { circle_enqueue_tail(&waitq->waitq_links, &kept_link->wql_qlink); } } struct waitq * waitq_set_first_prepost(struct waitq_set *wqset, wqs_prepost_flags_t flags) { circle_queue_t q = &wqset->wqset_preposts; queue_entry_t elt; struct waitq_link *link; struct waitq *wq; uint32_t ticket; if (__improbable(!waitq_valid(wqset))) { return NULL; } while (!circle_queue_empty(q)) { elt = circle_queue_first(q); link = cqe_element(elt, struct waitq_link, wql_slink); wq = link->wql_wq; if (__improbable(!waitq_lock_reserve(wq, &ticket))) { waitq_unlock(wqset); waitq_lock_wait(wq, ticket); waitq_lock(wqset); if (!waitq_valid(wqset)) { waitq_unlock(wq); return NULL; } if (elt != circle_queue_first(q) || link->wql_wq != wq) { waitq_unlock(wq); continue; } } if (wq->waitq_preposted) { if ((flags & WQS_PREPOST_PEEK) == 0) { circle_queue_rotate_head_forward(q); } if ((flags & WQS_PREPOST_LOCK) == 0) { waitq_unlock(wq); } return wq; } /* * We found a link that is no longer preposted, * someone must have called waitq_clear_prepost_locked() * and this set just only noticed. */ wql_wqs_clear_preposted(link); waitq_unlock(wq); circle_dequeue(q, &link->wql_slink); circle_enqueue_tail(&wqset->wqset_links, &link->wql_slink); } return NULL; } #pragma mark select sets /** * @function select_set_nextid() * * @brief * Generate a unique ID for a select set "generation" * * @discussion * This mixes the CPU number with a monotonic clock * (in order to avoid contention on a global atomic). * * In order for select sets to be invalidated very quickly, * they do not have backward linkages to their member queues. * * Instead, each time a new @c select() "pass" is initiated, * a new ID is generated, which is copied onto the @c waitq_sellink * links at the time of link. * * The zone for select sets is sequestered, which allows for select * wait queues to speculatively lock their set during prepost * and use this ID to debounce wakeups and avoid spurious wakeups * (as an "optimization" because select recovers from spurious wakeups, * we just want those to be very rare). */ __attribute__((always_inline)) static inline uint64_t select_set_nextid(bool preemption_enabled) { /* waitq_bootstrap() set the low byte to a unique value per CPU */ static_assert(MAX_CPUS <= 256); const uint64_t inc = 256; uint64_t id; #ifdef __x86_64__ /* uncontended atomics are slower than disabling preemption on Intel */ if (preemption_enabled) { disable_preemption(); } id = (*PERCPU_GET(select_setid) += inc); if (preemption_enabled) { enable_preemption(); } #else /* * if preemption is enabled this might update another CPU's * setid, which will be rare but is acceptable, it still * produces a unique select ID. * * We chose this because the uncontended atomics on !intel * are faster than disabling/reenabling preemption. */ (void)preemption_enabled; id = os_atomic_add(PERCPU_GET(select_setid), inc, relaxed); #endif return id; } struct select_set * select_set_alloc(void) { struct select_set *selset; selset = zalloc_id(ZONE_ID_SELECT_SET, Z_ZERO | Z_WAITOK | Z_NOFAIL); waitq_init(selset, WQT_SELECT_SET, SYNC_POLICY_FIFO); selset->selset_id = select_set_nextid(true); return selset; } __abortlike static void __select_set_link_arguments_panic(struct waitq *waitq, struct select_set *set) { if (!waitq_valid(waitq)) { panic("Invalid waitq: %p", waitq); } if (waitq_type(waitq) != WQT_SELECT) { panic("Invalid waitq type: %p:%d", waitq, waitq->waitq_type); } panic("Invalid waitq-set: %p", set); } static inline void __select_set_link_arguments_validate(struct waitq *waitq, struct select_set *set) { if (!waitq_valid(waitq) || waitq_type(waitq) != WQT_SELECT || waitq_type(set) != WQT_SELECT_SET) { __select_set_link_arguments_panic(waitq, set); } } void select_set_link(struct waitq *waitq, struct select_set *set, waitq_link_t *linkp) { struct waitq_sellink *link; __select_set_link_arguments_validate(waitq, set); waitq_lock(waitq); if (waitq == &select_conflict_queue) { waitq_lock(set); set->selset_conflict = true; waitq_unlock(set); } wql_list_foreach(link, &waitq->waitq_sellinks) { if (waitq_same(wql_wqs(link), set)) { goto found; } } link = linkp->wqls; *linkp = WQL_NULL; wql_list_push(&waitq->waitq_sellinks, link); found: link->wql_wqs = (uintptr_t)set; link->wql_setid = set->selset_id; waitq_unlock(waitq); } static void select_set_unlink_conflict_queue(struct select_set *set) { struct waitq_link_list_entry **prev; struct waitq_sellink *link; waitq_lock(&select_conflict_queue); /* * We know the conflict queue is hooked, * so find the linkage and free it. */ prev = &select_conflict_queue.waitq_sellinks.next; for (;;) { assert(*prev); link = wql_list_elem(*prev); if (waitq_same(wql_wqs(link), set)) { *prev = link->wql_next.next; break; } prev = &link->wql_next.next; } waitq_unlock(&select_conflict_queue); waitq_link_free(WQT_SELECT_SET, link); } static void __select_set_reset(struct select_set *set, bool invalidate) { if (set->selset_conflict) { select_set_unlink_conflict_queue(set); } waitq_lock(set); if (invalidate) { waitq_invalidate(set); } set->selset_id = select_set_nextid(false); set->selset_preposted = 0; set->selset_conflict = 0; waitq_unlock(set); } void select_set_reset(struct select_set *set) { __select_set_reset(set, false); } void select_set_free(struct select_set *set) { __select_set_reset(set, true); hw_lck_ticket_destroy(&set->selset_interlock, &waitq_lck_grp); zfree_id(ZONE_ID_SELECT_SET, set); } void select_waitq_wakeup_and_deinit( struct waitq *waitq, event64_t wake_event, wait_result_t result) { waitq_link_list_t free_l = { }; if (waitq_is_valid(waitq)) { assert(waitq_type(waitq) == WQT_SELECT); waitq_lock(waitq); waitq_wakeup64_all_locked(waitq, wake_event, result, WAITQ_KEEP_LOCKED); waitq_invalidate(waitq); free_l = waitq->waitq_sellinks; waitq->waitq_sellinks.next = NULL; waitq_unlock(waitq); waitq_link_free_list(WQT_SELECT, &free_l); waitq_deinit(waitq); } } #pragma mark assert_wait / wakeup (high level) wait_result_t waitq_assert_wait64(struct waitq *waitq, event64_t wait_event, wait_interrupt_t interruptible, uint64_t deadline) { thread_t thread = current_thread(); wait_result_t ret; spl_t s = 0; __waitq_validate(waitq); if (waitq_irq_safe(waitq)) { s = splsched(); } waitq_lock(waitq); ret = waitq_assert_wait64_locked(waitq, wait_event, interruptible, TIMEOUT_URGENCY_SYS_NORMAL, deadline, TIMEOUT_NO_LEEWAY, thread); waitq_unlock(waitq); if (waitq_irq_safe(waitq)) { splx(s); } return ret; } wait_result_t waitq_assert_wait64_leeway(struct waitq *waitq, event64_t wait_event, wait_interrupt_t interruptible, wait_timeout_urgency_t urgency, uint64_t deadline, uint64_t leeway) { wait_result_t ret; thread_t thread = current_thread(); spl_t s = 0; __waitq_validate(waitq); if (waitq_irq_safe(waitq)) { s = splsched(); } waitq_lock(waitq); ret = waitq_assert_wait64_locked(waitq, wait_event, interruptible, urgency, deadline, leeway, thread); waitq_unlock(waitq); if (waitq_irq_safe(waitq)) { splx(s); } return ret; } kern_return_t waitq_wakeup64_one( waitq_t waitq, event64_t wake_event, wait_result_t result, waitq_wakeup_flags_t flags) { __waitq_validate(waitq); spl_t spl = 0; if (waitq_irq_safe(waitq)) { spl = splsched(); } waitq_lock(waitq); /* waitq is unlocked upon return, splx is handled */ return waitq_wakeup64_one_locked(waitq, wake_event, result, flags | waitq_flags_splx(spl) | WAITQ_UNLOCK); } kern_return_t waitq_wakeup64_all( waitq_t waitq, event64_t wake_event, wait_result_t result, waitq_wakeup_flags_t flags) { __waitq_validate(waitq); spl_t spl = 0; if (waitq_irq_safe(waitq)) { spl = splsched(); } waitq_lock(waitq); /* waitq is unlocked upon return, splx is handled */ return waitq_wakeup64_all_locked(waitq, wake_event, result, flags | waitq_flags_splx(spl) | WAITQ_UNLOCK); } kern_return_t waitq_wakeup64_thread( struct waitq *waitq, event64_t event, thread_t thread, wait_result_t result) { spl_t s = splsched(); kern_return_t ret; __waitq_validate(waitq); assert(waitq_irq_safe(waitq)); waitq_lock(waitq); ret = waitq_wakeup64_thread_and_unlock(waitq, event, thread, result); splx(s); return ret; } thread_t waitq_wakeup64_identify( waitq_t waitq, event64_t wake_event, wait_result_t result, waitq_wakeup_flags_t flags) { __waitq_validate(waitq); spl_t spl = 0; if (waitq_irq_safe(waitq)) { spl = splsched(); } waitq_lock(waitq); thread_t thread = waitq_wakeup64_identify_locked(waitq, wake_event, result, flags | waitq_flags_splx(spl) | WAITQ_UNLOCK); /* waitq is unlocked, thread is not go-ed yet */ /* preemption disabled if thread non-null */ /* splx is handled */ if (thread != THREAD_NULL) { thread_reference(thread); waitq_resume_identified_thread(waitq, thread, result, flags); /* preemption enabled, thread go-ed */ /* returns +1 ref to running thread */ return thread; } return THREAD_NULL; } #pragma mark tests #if DEBUG || DEVELOPMENT #include #include #define MAX_GLOBAL_TEST_QUEUES 64 static struct waitq wqt_waitq_array[MAX_GLOBAL_TEST_QUEUES]; static bool wqt_running; static bool wqt_init; static bool wqt_start(const char *test, int64_t *out) { if (os_atomic_xchg(&wqt_running, true, acquire)) { *out = 0; return false; } if (!wqt_init) { wqt_init = true; for (int i = 0; i < MAX_GLOBAL_TEST_QUEUES; i++) { waitq_init(&wqt_waitq_array[i], WQT_PORT, SYNC_POLICY_FIFO); } } printf("[WQ] starting %s\n", test); return true; } static int wqt_end(const char *test, int64_t *out) { os_atomic_store(&wqt_running, false, release); printf("[WQ] done %s\n", test); *out = 1; return 0; } static struct waitq * wqt_wq(uint32_t index) { return &wqt_waitq_array[index]; } static uint32_t wqt_idx(struct waitq *waitq) { assert(waitq >= wqt_waitq_array && waitq < wqt_waitq_array + MAX_GLOBAL_TEST_QUEUES); return (uint32_t)(waitq - wqt_waitq_array); } __attribute__((overloadable)) static uint64_t wqt_bit(uint32_t index) { return 1ull << index; } __attribute__((overloadable)) static uint64_t wqt_bit(struct waitq *waitq) { return wqt_bit(wqt_idx(waitq)); } static struct waitq_set * wqt_wqset_create(void) { struct waitq_set *wqset; wqset = &ipc_pset_alloc_special(ipc_space_kernel)->ips_wqset; printf("[WQ]: created waitq set %p\n", wqset); return wqset; } static void wqt_wqset_free(struct waitq_set *wqset) { printf("[WQ]: destroying waitq set %p\n", wqset); waitq_lock(wqset); ipc_pset_destroy(ipc_space_kernel, __container_of(wqset, struct ipc_pset, ips_wqset)); } static void wqt_link(uint32_t index, struct waitq_set *wqset, kern_return_t want) { struct waitq *waitq = wqt_wq(index); waitq_link_t link = waitq_link_alloc(WQT_PORT_SET); kern_return_t kr; printf("[WQ]: linking waitq [%d] to global wqset (%p)\n", index, wqset); waitq_lock(waitq); waitq_lock(wqset); kr = waitq_link_locked(waitq, wqset, &link); waitq_unlock(wqset); waitq_unlock(waitq); if (link.wqlh) { waitq_link_free(WQT_PORT_SET, link); } printf("[WQ]:\tkr=%d\texpected=%d\n", kr, want); assert(kr == want); } static void wqt_unlink(uint32_t index, struct waitq_set *wqset, kern_return_t want) { struct waitq *waitq = wqt_wq(index); waitq_link_t link; kern_return_t kr; printf("[WQ]: unlinking waitq [%d] from global wqset (%p)\n", index, wqset); waitq_lock(waitq); waitq_lock(wqset); link = waitq_unlink_locked(waitq, wqset); waitq_unlock(wqset); waitq_unlock(waitq); if (link.wqlh) { waitq_link_free(WQT_PORT_SET, link); kr = KERN_SUCCESS; } else { kr = KERN_NOT_IN_SET; } printf("[WQ]: \tkr=%d\n", kr); assert(kr == want); } static void wqt_wakeup_one(uint32_t index, event64_t event64, kern_return_t want) { kern_return_t kr; printf("[WQ]: Waking one thread on waitq [%d] event:0x%llx\n", index, event64); kr = waitq_wakeup64_one(wqt_wq(index), event64, THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT); printf("[WQ]: \tkr=%d\n", kr); assert(kr == want); } static void wqt_clear_preposts(uint32_t idx) { waitq_lock(wqt_wq(idx)); (void)waitq_clear_prepost_locked(wqt_wq(idx)); waitq_unlock(wqt_wq(idx)); } static void wqt_preposts_gc_locked(struct waitq_set *wqset) { circle_queue_t q = &wqset->wqset_preposts; struct waitq_link *link; uint32_t ticket; again: cqe_foreach_element_safe(link, q, wql_slink) { struct waitq *wq = link->wql_wq; if (!waitq_lock_reserve(wq, &ticket)) { waitq_unlock(wqset); waitq_lock_wait(wq, ticket); waitq_lock(wqset); waitq_unlock(wq); /* the list was possibly mutated, restart */ goto again; } if (!wq->waitq_preposted) { wql_wqs_clear_preposted(link); circle_dequeue(q, &link->wql_slink); circle_enqueue_tail(&wqset->wqset_links, &link->wql_slink); } waitq_unlock(wq); } } static void wqt_expect_preposts(struct waitq_set *wqset, uint64_t preposts) { struct waitq_link *link; uint64_t found = 0; waitq_lock(wqset); wqt_preposts_gc_locked(wqset); cqe_foreach_element(link, &wqset->wqset_preposts, wql_slink) { struct waitq *waitq = link->wql_wq; printf("[WQ]: found prepost %d\n", wqt_idx(waitq)); assertf((found & wqt_bit(waitq)) == 0, "found waitq %d twice", wqt_idx(waitq)); found |= wqt_bit(waitq); } waitq_unlock(wqset); assertf(found == preposts, "preposts expected 0x%llx, but got 0x%llx", preposts, found); } static int waitq_basic_test(__unused int64_t in, int64_t *out) { struct waitq_set *wqset; if (!wqt_start(__func__, out)) { return EBUSY; } wqset = wqt_wqset_create(); wqt_link(10, wqset, KERN_SUCCESS); wqt_link(10, wqset, KERN_ALREADY_IN_SET); wqt_link(11, wqset, KERN_SUCCESS); wqt_link(11, wqset, KERN_ALREADY_IN_SET); wqt_link(12, wqset, KERN_SUCCESS); wqt_link(12, wqset, KERN_ALREADY_IN_SET); wqt_wakeup_one(10, NO_EVENT64, KERN_NOT_WAITING); wqt_wakeup_one(12, NO_EVENT64, KERN_NOT_WAITING); wqt_expect_preposts(wqset, wqt_bit(10) | wqt_bit(12)); wqt_clear_preposts(10); wqt_expect_preposts(wqset, wqt_bit(12)); wqt_clear_preposts(12); wqt_expect_preposts(wqset, 0); wqt_unlink(12, wqset, KERN_SUCCESS); wqt_unlink(12, wqset, KERN_NOT_IN_SET); wqt_unlink(11, wqset, KERN_SUCCESS); wqt_unlink(10, wqset, KERN_SUCCESS); wqt_wqset_free(wqset); return wqt_end(__func__, out); } SYSCTL_TEST_REGISTER(waitq_basic, waitq_basic_test); #endif /* DEBUG || DEVELOPMENT */