xref: /xnu-12377.61.12/bsd/kern/kern_aio.c (revision 4d495c6e23c53686cf65f45067f79024cf5dcee8)
1 /*
2  * Copyright (c) 2003-2024 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 
29 /*
30  * This file contains support for the POSIX 1003.1B AIO/LIO facility.
31  */
32 
33 #include <sys/systm.h>
34 #include <sys/fcntl.h>
35 #include <sys/file_internal.h>
36 #include <sys/filedesc.h>
37 #include <sys/kdebug.h>
38 #include <sys/kernel.h>
39 #include <sys/vnode_internal.h>
40 #include <sys/kauth.h>
41 #include <sys/mount_internal.h>
42 #include <sys/param.h>
43 #include <sys/proc_internal.h>
44 #include <sys/sysctl.h>
45 #include <sys/unistd.h>
46 #include <sys/user.h>
47 
48 #include <sys/aio_kern.h>
49 #include <sys/sysproto.h>
50 
51 #include <machine/limits.h>
52 
53 #include <mach/mach_types.h>
54 #include <kern/kern_types.h>
55 #include <kern/waitq.h>
56 #include <kern/zalloc.h>
57 #include <kern/task.h>
58 #include <kern/sched_prim.h>
59 #include <kern/ast.h>
60 
61 #include <vm/vm_map_xnu.h>
62 
63 #include <os/refcnt.h>
64 
65 #include <kern/thread.h>
66 #include <kern/policy_internal.h>
67 #include <pthread/workqueue_internal.h>
68 
69 #if 0
70 #undef KERNEL_DEBUG
71 #define KERNEL_DEBUG KERNEL_DEBUG_CONSTANT
72 #endif
73 
74 #define AIO_work_queued                 1
75 #define AIO_worker_wake                 2
76 #define AIO_completion_sig              3
77 #define AIO_completion_kevent           4
78 #define AIO_completion_cleanup_wait     5
79 #define AIO_completion_cleanup_wake     6
80 #define AIO_completion_suspend_wake     7
81 #define AIO_cancel                      10
82 #define AIO_cancel_async_workq          11
83 #define AIO_cancel_sync_workq           12
84 #define AIO_cancel_activeq              13
85 #define AIO_cancel_doneq                14
86 #define AIO_fsync                       20
87 #define AIO_fsync_delay                 21
88 #define AIO_read                        30
89 #define AIO_write                       40
90 #define AIO_listio                      50
91 #define AIO_error                       60
92 #define AIO_error_val                   61
93 #define AIO_error_activeq               62
94 #define AIO_error_workq                 63
95 #define AIO_return                      70
96 #define AIO_return_val                  71
97 #define AIO_return_activeq              72
98 #define AIO_return_workq                73
99 #define AIO_exec                        80
100 #define AIO_exit                        90
101 #define AIO_exit_sleep                  91
102 #define AIO_close                       100
103 #define AIO_close_sleep                 101
104 #define AIO_suspend                     110
105 #define AIO_suspend_sleep               111
106 #define AIO_worker_thread               120
107 #define AIO_register_kevent             130
108 #define AIO_WQ_process_entry            140
109 #define AIO_WQ_aio_thread_create        141
110 #define AIO_WQ_aio_thread_terminate     142
111 #define AIO_WQ_aio_death_call           143
112 #define AIO_WQ_aio_thread_park          144
113 #define AIO_WQ_aio_select_req           145
114 #define AIO_WQ_aio_thread_create_failed 146
115 #define AIO_WQ_aio_thread_wakeup        147
116 
117 static TUNABLE(uint32_t, bootarg_aio_new_workq, "aio_new_workq", 1);
118 
119 __options_decl(aio_entry_flags_t, uint32_t, {
120 	AIO_READ        = 0x00000001, /* a read */
121 	AIO_WRITE       = 0x00000002, /* a write */
122 	AIO_FSYNC       = 0x00000004, /* aio_fsync with op = O_SYNC */
123 	AIO_DSYNC       = 0x00000008, /* aio_fsync with op = O_DSYNC (not supported yet) */
124 	AIO_LIO         = 0x00000010, /* lio_listio generated IO */
125 	AIO_LIO_WAIT    = 0x00000020, /* lio_listio is waiting on the leader */
126 
127 	AIO_COMPLETED   = 0x00000100, /* request has completed */
128 	AIO_CANCELLED   = 0x00000200, /* request has been cancelled */
129 
130 	/*
131 	 * These flags mean that this entry is blocking either:
132 	 * - close (AIO_CLOSE_WAIT)
133 	 * - exit or exec (AIO_EXIT_WAIT)
134 	 *
135 	 * These flags are mutually exclusive, and the AIO_EXIT_WAIT variant
136 	 * will also neuter notifications in do_aio_completion_and_unlock().
137 	 */
138 	AIO_CLOSE_WAIT  = 0x00004000,
139 	AIO_EXIT_WAIT   = 0x00008000,
140 });
141 
142 /*! @struct aio_workq_entry
143  *
144  * @discussion
145  * This represents a piece of aio/lio work.
146  *
147  * The ownership rules go as follows:
148  *
149  * - the "proc" owns one refcount on the entry (from creation), while it is
150  *   enqueued on the aio_activeq and then the aio_doneq.
151  *
152  *   either aio_return() (user read the status) or _aio_exit() (the process
153  *   died) will dequeue the entry and consume this ref.
154  *
155  * - the async workqueue owns one refcount once the work is submitted,
156  *   which is consumed in do_aio_completion_and_unlock().
157  *
158  *   This ref protects the entry for the the end of
159  *   do_aio_completion_and_unlock() (when signal delivery happens).
160  *
161  * - lio_listio() for batches picks one of the entries to be the "leader"
162  *   of the batch. Each work item will have a refcount on its leader
163  *   so that the accounting of the batch completion can be done on the leader
164  *   (to be able to decrement lio_pending).
165  *
166  *   This ref is consumed in do_aio_completion_and_unlock() as well.
167  *
168  * - lastly, in lio_listio() when the LIO_WAIT behavior is requested,
169  *   an extra ref is taken in this syscall as it needs to keep accessing
170  *   the leader "lio_pending" field until it hits 0.
171  */
172 struct aio_workq_entry {
173 	/* queue lock */
174 	TAILQ_ENTRY(aio_workq_entry)    aio_workq_link;
175 
176 	/* Proc lock */
177 	TAILQ_ENTRY(aio_workq_entry)    aio_proc_link;  /* p_aio_activeq or p_aio_doneq */
178 	user_ssize_t                    returnval;      /* return value from read / write request */
179 	errno_t                         errorval;       /* error value from read / write request */
180 	os_refcnt_t                     aio_refcount;
181 	aio_entry_flags_t               flags;
182 
183 	int                             lio_pending;    /* pending I/Os in lio group, only on leader */
184 	struct aio_workq_entry         *lio_leader;     /* pointer to the lio leader, can be self */
185 
186 	/* Initialized and never changed, safe to access */
187 	struct proc                    *procp;          /* user proc that queued this request */
188 	user_addr_t                     uaiocbp;        /* pointer passed in from user land */
189 	struct user_aiocb               aiocb;          /* copy of aiocb from user land */
190 	struct vfs_context              context;        /* context which enqueued the request */
191 
192 	/* Initialized, and possibly freed by aio_work_thread() or at free if cancelled */
193 	vm_map_t                        aio_map;        /* user land map we have a reference to */
194 };
195 
196 /*
197  * aio requests queue up on the aio_async_workq or lio_sync_workq (for
198  * lio_listio LIO_WAIT).  Requests then move to the per process aio_activeq
199  * (proc.aio_activeq) when one of our worker threads start the IO.
200  * And finally, requests move to the per process aio_doneq (proc.aio_doneq)
201  * when the IO request completes.  The request remains on aio_doneq until
202  * user process calls aio_return or the process exits, either way that is our
203  * trigger to release aio resources.
204  */
205 typedef struct aio_workq   {
206 	TAILQ_HEAD(, aio_workq_entry)   aioq_entries;
207 	lck_spin_t                      aioq_lock;
208 	struct waitq                    aioq_waitq;
209 } *aio_workq_t;
210 
211 #define AIO_NUM_WORK_QUEUES 1
212 struct aio_anchor_cb {
213 	os_atomic(int)          aio_total_count;        /* total extant entries */
214 
215 	/* Hash table of queues here */
216 	int                     aio_num_workqs;
217 	struct aio_workq        aio_async_workqs[AIO_NUM_WORK_QUEUES];
218 };
219 typedef struct aio_anchor_cb aio_anchor_cb;
220 
221 
222 /* New per process workqueue */
223 #define WORKQUEUE_AIO_MAXTHREADS            16
224 
225 TAILQ_HEAD(workq_aio_uthread_head, uthread);
226 
227 typedef struct workq_aio_s {
228 	thread_call_t   wa_death_call;
229 	struct workq_aio_uthread_head wa_thrunlist;
230 	struct workq_aio_uthread_head wa_thidlelist;
231 	TAILQ_HEAD(, aio_workq_entry) wa_aioq_entries;
232 	proc_t wa_proc;
233 	workq_state_flags_t _Atomic wa_flags;
234 	uint16_t wa_nthreads;
235 	uint16_t wa_thidlecount;
236 	uint16_t wa_thdying_count;
237 } workq_aio_s, *workq_aio_t;
238 
239 struct aio_workq_usec_var {
240 	uint32_t usecs;
241 	uint64_t abstime;
242 };
243 
244 static int aio_workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
245 
246 #define AIO_WORKQ_SYSCTL_USECS(var, init) \
247 	        static struct aio_workq_usec_var var = { .usecs = (init) }; \
248 	        SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
249 	                        CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &(var), 0, \
250 	                        aio_workq_sysctl_handle_usecs, "I", "")
251 
252 AIO_WORKQ_SYSCTL_USECS(aio_wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
253 
254 #define WQ_AIO_TRACE(x, wq, a, b, c, d) \
255 	        ({ KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, (x)),\
256 	        proc_getpid((wq)->wa_proc), (a), (b), (c), (d)); })
257 
258 #define WQ_AIO_TRACE_WQ(x, wq) \
259 	        ({ KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, (x)),\
260 	        proc_getpid((wq)->wa_proc),\
261 	        (uintptr_t)thread_tid(current_thread()),\
262 	        (wq)->wa_nthreads, (wq)->wa_thidlecount, (wq)->wa_thdying_count); })
263 
264 /*
265  * Notes on aio sleep / wake channels.
266  * We currently pick a couple fields within the proc structure that will allow
267  * us sleep channels that currently do not collide with any other kernel routines.
268  * At this time, for binary compatibility reasons, we cannot create new proc fields.
269  */
270 #define AIO_SUSPEND_SLEEP_CHAN  p_aio_activeq
271 #define AIO_CLEANUP_SLEEP_CHAN  p_aio_total_count
272 
273 #define ASSERT_AIO_FROM_PROC(aiop, theproc)     \
274 	if ((aiop)->procp != (theproc)) {       \
275 	        panic("AIO on a proc list that does not belong to that proc."); \
276 	}
277 
278 extern kern_return_t thread_terminate(thread_t);
279 
280 /*
281  *  LOCAL PROTOTYPES
282  */
283 static void             aio_proc_lock(proc_t procp);
284 static void             aio_proc_lock_spin(proc_t procp);
285 static void             aio_proc_unlock(proc_t procp);
286 static lck_mtx_t       *aio_proc_mutex(proc_t procp);
287 static bool             aio_has_active_requests_for_process(proc_t procp);
288 static bool             aio_proc_has_active_requests_for_file(proc_t procp, int fd);
289 static boolean_t        is_already_queued(proc_t procp, user_addr_t aiocbp);
290 
291 static aio_workq_t      aio_entry_workq(aio_workq_entry *entryp);
292 static void             aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
293 static void             aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
294 static void             aio_entry_ref(aio_workq_entry *entryp);
295 static void             aio_entry_unref(aio_workq_entry *entryp);
296 static bool             aio_entry_try_workq_remove(proc_t p, aio_workq_entry *entryp);
297 static boolean_t        aio_delay_fsync_request(aio_workq_entry *entryp);
298 static void             aio_free_request(aio_workq_entry *entryp);
299 
300 static void             aio_workq_init(aio_workq_t wq);
301 static void             aio_workq_lock_spin(aio_workq_t wq);
302 static void             aio_workq_unlock(aio_workq_t wq);
303 static lck_spin_t      *aio_workq_lock(aio_workq_t wq);
304 
305 static void             aio_work_thread(void *arg, wait_result_t wr);
306 static aio_workq_entry *aio_get_some_work(void);
307 
308 static int              aio_queue_async_request(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t);
309 static int              aio_validate(proc_t, aio_workq_entry *entryp);
310 
311 static int              do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp, aio_entry_flags_t);
312 static void             do_aio_completion_and_unlock(proc_t p, aio_workq_entry *entryp, aio_entry_flags_t reason);
313 static int              do_aio_fsync(aio_workq_entry *entryp);
314 static int              do_aio_read(aio_workq_entry *entryp);
315 static int              do_aio_write(aio_workq_entry *entryp);
316 static void             do_munge_aiocb_user32_to_user(struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp);
317 static void             do_munge_aiocb_user64_to_user(struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp);
318 static aio_workq_entry *aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t);
319 static int              aio_copy_in_list(proc_t, user_addr_t, user_addr_t *, int);
320 
321 static void             workq_aio_prepare(struct proc *p);
322 static bool             workq_aio_entry_add_locked(struct proc *p, aio_workq_entry *entryp);
323 static void             workq_aio_wakeup_thread(proc_t p);
324 static void             workq_aio_wakeup_thread_and_unlock(proc_t p);
325 static int              workq_aio_process_entry(aio_workq_entry *entryp);
326 static bool             workq_aio_entry_remove_locked(struct proc *p, aio_workq_entry *entryp);
327 
328 static void             workq_aio_kill_old_threads_call(void *param0, void *param1 __unused);
329 static void             workq_aio_unpark_continue(void *parameter __unused, wait_result_t wr);
330 
331 static void             workq_aio_mark_exiting(proc_t p);
332 static void             workq_aio_exit(proc_t p);
333 
334 #define ASSERT_AIO_PROC_LOCK_OWNED(p)   LCK_MTX_ASSERT(aio_proc_mutex(p), LCK_MTX_ASSERT_OWNED)
335 #define ASSERT_AIO_WORKQ_LOCK_OWNED(q)  LCK_SPIN_ASSERT(aio_workq_lock(q), LCK_ASSERT_OWNED)
336 
337 /*
338  *  EXTERNAL PROTOTYPES
339  */
340 
341 /* in ...bsd/kern/sys_generic.c */
342 extern int dofileread(vfs_context_t ctx, struct fileproc *fp,
343     user_addr_t bufp, user_size_t nbyte,
344     off_t offset, int flags, user_ssize_t *retval);
345 extern int dofilewrite(vfs_context_t ctx, struct fileproc *fp,
346     user_addr_t bufp, user_size_t nbyte, off_t offset,
347     int flags, user_ssize_t *retval);
348 
349 /*
350  * aio external global variables.
351  */
352 extern int aio_max_requests;                    /* AIO_MAX - configurable */
353 extern int aio_max_requests_per_process;        /* AIO_PROCESS_MAX - configurable */
354 extern int aio_worker_threads;                  /* AIO_THREAD_COUNT - configurable */
355 
356 
357 /*
358  * aio static variables.
359  */
360 static aio_anchor_cb aio_anchor = {
361 	.aio_num_workqs = AIO_NUM_WORK_QUEUES,
362 };
363 os_refgrp_decl(static, aio_refgrp, "aio", NULL);
364 static LCK_GRP_DECLARE(aio_proc_lock_grp, "aio_proc");
365 static LCK_GRP_DECLARE(aio_queue_lock_grp, "aio_queue");
366 static LCK_MTX_DECLARE(aio_proc_mtx, &aio_proc_lock_grp);
367 
368 static struct klist aio_klist;
369 static LCK_GRP_DECLARE(aio_klist_lck_grp, "aio_klist");
370 static LCK_MTX_DECLARE(aio_klist_lock, &aio_klist_lck_grp);
371 
372 static KALLOC_TYPE_DEFINE(aio_workq_zonep, aio_workq_entry, KT_DEFAULT);
373 
374 /* Hash */
375 static aio_workq_t
aio_entry_workq(__unused aio_workq_entry * entryp)376 aio_entry_workq(__unused aio_workq_entry *entryp)
377 {
378 	return &aio_anchor.aio_async_workqs[0];
379 }
380 
381 static void
aio_workq_init(aio_workq_t wq)382 aio_workq_init(aio_workq_t wq)
383 {
384 	TAILQ_INIT(&wq->aioq_entries);
385 	lck_spin_init(&wq->aioq_lock, &aio_queue_lock_grp, LCK_ATTR_NULL);
386 	waitq_init(&wq->aioq_waitq, WQT_QUEUE, SYNC_POLICY_FIFO);
387 }
388 
389 
390 /*
391  * Can be passed a queue which is locked spin.
392  */
393 static void
aio_workq_remove_entry_locked(aio_workq_t queue,aio_workq_entry * entryp)394 aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
395 {
396 	ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
397 
398 	if (entryp->aio_workq_link.tqe_prev == NULL) {
399 		panic("Trying to remove an entry from a work queue, but it is not on a queue");
400 	}
401 
402 	TAILQ_REMOVE(&queue->aioq_entries, entryp, aio_workq_link);
403 	entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
404 }
405 
406 static void
aio_workq_add_entry_locked(aio_workq_t queue,aio_workq_entry * entryp)407 aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
408 {
409 	ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
410 
411 	if (bootarg_aio_new_workq) {
412 		panic("old workq implementation selected with bootarg set");
413 	}
414 
415 	TAILQ_INSERT_TAIL(&queue->aioq_entries, entryp, aio_workq_link);
416 }
417 
418 static void
aio_proc_lock(proc_t procp)419 aio_proc_lock(proc_t procp)
420 {
421 	lck_mtx_lock(aio_proc_mutex(procp));
422 }
423 
424 static void
aio_proc_lock_spin(proc_t procp)425 aio_proc_lock_spin(proc_t procp)
426 {
427 	lck_mtx_lock_spin(aio_proc_mutex(procp));
428 }
429 
430 static bool
aio_has_any_work(void)431 aio_has_any_work(void)
432 {
433 	return os_atomic_load(&aio_anchor.aio_total_count, relaxed) != 0;
434 }
435 
436 static bool
aio_try_proc_insert_active_locked(proc_t procp,aio_workq_entry * entryp)437 aio_try_proc_insert_active_locked(proc_t procp, aio_workq_entry *entryp)
438 {
439 	int old, new;
440 
441 	ASSERT_AIO_PROC_LOCK_OWNED(procp);
442 
443 	if (procp->p_aio_total_count >= aio_max_requests_per_process) {
444 		return false;
445 	}
446 
447 	if (is_already_queued(procp, entryp->uaiocbp)) {
448 		return false;
449 	}
450 
451 	os_atomic_rmw_loop(&aio_anchor.aio_total_count, old, new, relaxed, {
452 		if (old >= aio_max_requests) {
453 		        os_atomic_rmw_loop_give_up(return false);
454 		}
455 		new = old + 1;
456 	});
457 
458 	TAILQ_INSERT_TAIL(&procp->p_aio_activeq, entryp, aio_proc_link);
459 	procp->p_aio_total_count++;
460 	return true;
461 }
462 
463 static void
aio_proc_move_done_locked(proc_t procp,aio_workq_entry * entryp)464 aio_proc_move_done_locked(proc_t procp, aio_workq_entry *entryp)
465 {
466 	TAILQ_REMOVE(&procp->p_aio_activeq, entryp, aio_proc_link);
467 	TAILQ_INSERT_TAIL(&procp->p_aio_doneq, entryp, aio_proc_link);
468 }
469 
470 static void
aio_proc_remove_done_locked(proc_t procp,aio_workq_entry * entryp)471 aio_proc_remove_done_locked(proc_t procp, aio_workq_entry *entryp)
472 {
473 	TAILQ_REMOVE(&procp->p_aio_doneq, entryp, aio_proc_link);
474 	entryp->aio_proc_link.tqe_prev = NULL;
475 	if (os_atomic_dec_orig(&aio_anchor.aio_total_count, relaxed) <= 0) {
476 		panic("Negative total AIO count!");
477 	}
478 	if (procp->p_aio_total_count-- <= 0) {
479 		panic("proc %p: p_aio_total_count accounting mismatch", procp);
480 	}
481 }
482 
483 static void
aio_proc_unlock(proc_t procp)484 aio_proc_unlock(proc_t procp)
485 {
486 	lck_mtx_unlock(aio_proc_mutex(procp));
487 }
488 
489 static lck_mtx_t*
aio_proc_mutex(proc_t procp)490 aio_proc_mutex(proc_t procp)
491 {
492 	return &procp->p_mlock;
493 }
494 
495 static void
aio_entry_ref(aio_workq_entry * entryp)496 aio_entry_ref(aio_workq_entry *entryp)
497 {
498 	os_ref_retain(&entryp->aio_refcount);
499 }
500 
501 static void
aio_entry_unref(aio_workq_entry * entryp)502 aio_entry_unref(aio_workq_entry *entryp)
503 {
504 	if (os_ref_release(&entryp->aio_refcount) == 0) {
505 		aio_free_request(entryp);
506 	}
507 }
508 
509 static bool
aio_entry_try_workq_remove(proc_t p,aio_workq_entry * entryp)510 aio_entry_try_workq_remove(proc_t p, aio_workq_entry *entryp)
511 {
512 	/* Can only be cancelled if it's still on a work queue */
513 	if (entryp->aio_workq_link.tqe_prev != NULL) {
514 		aio_workq_t queue;
515 		if (bootarg_aio_new_workq) {
516 			return workq_aio_entry_remove_locked(p, entryp);
517 		}
518 
519 		/* Will have to check again under the lock */
520 		queue = aio_entry_workq(entryp);
521 		aio_workq_lock_spin(queue);
522 		if (entryp->aio_workq_link.tqe_prev != NULL) {
523 			aio_workq_remove_entry_locked(queue, entryp);
524 			aio_workq_unlock(queue);
525 			return true;
526 		} else {
527 			aio_workq_unlock(queue);
528 		}
529 	}
530 
531 	return false;
532 }
533 
534 static void
aio_workq_lock_spin(aio_workq_t wq)535 aio_workq_lock_spin(aio_workq_t wq)
536 {
537 	lck_spin_lock(aio_workq_lock(wq));
538 }
539 
540 static void
aio_workq_unlock(aio_workq_t wq)541 aio_workq_unlock(aio_workq_t wq)
542 {
543 	lck_spin_unlock(aio_workq_lock(wq));
544 }
545 
546 static lck_spin_t*
aio_workq_lock(aio_workq_t wq)547 aio_workq_lock(aio_workq_t wq)
548 {
549 	return &wq->aioq_lock;
550 }
551 
552 /*
553  * aio_cancel - attempt to cancel one or more async IO requests currently
554  * outstanding against file descriptor uap->fd.  If uap->aiocbp is not
555  * NULL then only one specific IO is cancelled (if possible).  If uap->aiocbp
556  * is NULL then all outstanding async IO request for the given file
557  * descriptor are cancelled (if possible).
558  */
559 int
aio_cancel(proc_t p,struct aio_cancel_args * uap,int * retval)560 aio_cancel(proc_t p, struct aio_cancel_args *uap, int *retval)
561 {
562 	struct user_aiocb my_aiocb;
563 	int               result;
564 
565 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel) | DBG_FUNC_START,
566 	    VM_KERNEL_ADDRPERM(p), uap->fd, uap->aiocbp, 0, 0);
567 
568 	if (uap->fd) {
569 		vnode_t vp = NULLVP;
570 		const char *vname = NULL;
571 
572 		result = vnode_getfromfd(vfs_context_current(), uap->fd, &vp);
573 		if (result != 0) {
574 			result = EBADF;
575 			goto ExitRoutine;
576 		}
577 
578 		vname = vnode_getname(vp);
579 		/*
580 		 * The aio_cancel() system call will always	return AIO_NOTCANCELED for
581 		 * file	descriptor associated with raw disk device.
582 		 */
583 		if (vnode_ischr(vp) && vname && !strncmp(vname, "rdisk", 5)) {
584 			result = 0;
585 			*retval = AIO_NOTCANCELED;
586 		}
587 
588 		if (vname) {
589 			vnode_putname(vname);
590 		}
591 		vnode_put(vp);
592 
593 		if (result == 0 && *retval == AIO_NOTCANCELED) {
594 			goto ExitRoutine;
595 		}
596 	}
597 
598 	/* quick check to see if there are any async IO requests queued up */
599 	if (!aio_has_any_work()) {
600 		result = 0;
601 		*retval = AIO_ALLDONE;
602 		goto ExitRoutine;
603 	}
604 
605 	*retval = -1;
606 	if (uap->aiocbp != USER_ADDR_NULL) {
607 		if (proc_is64bit(p)) {
608 			struct user64_aiocb aiocb64;
609 
610 			result = copyin(uap->aiocbp, &aiocb64, sizeof(aiocb64));
611 			if (result == 0) {
612 				do_munge_aiocb_user64_to_user(&aiocb64, &my_aiocb);
613 			}
614 		} else {
615 			struct user32_aiocb aiocb32;
616 
617 			result = copyin(uap->aiocbp, &aiocb32, sizeof(aiocb32));
618 			if (result == 0) {
619 				do_munge_aiocb_user32_to_user(&aiocb32, &my_aiocb);
620 			}
621 		}
622 
623 		if (result != 0) {
624 			result = EAGAIN;
625 			goto ExitRoutine;
626 		}
627 
628 		/* NOTE - POSIX standard says a mismatch between the file */
629 		/* descriptor passed in and the file descriptor embedded in */
630 		/* the aiocb causes unspecified results.  We return EBADF in */
631 		/* that situation.  */
632 		if (uap->fd != my_aiocb.aio_fildes) {
633 			result = EBADF;
634 			goto ExitRoutine;
635 		}
636 	}
637 
638 	aio_proc_lock(p);
639 	result = do_aio_cancel_locked(p, uap->fd, uap->aiocbp, 0);
640 	ASSERT_AIO_PROC_LOCK_OWNED(p);
641 	aio_proc_unlock(p);
642 
643 	if (result != -1) {
644 		*retval = result;
645 		result = 0;
646 		goto ExitRoutine;
647 	}
648 
649 	result = EBADF;
650 
651 ExitRoutine:
652 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel) | DBG_FUNC_END,
653 	    VM_KERNEL_ADDRPERM(p), uap->fd, uap->aiocbp, result, 0);
654 
655 	return result;
656 }
657 
658 
659 /*
660  * _aio_close - internal function used to clean up async IO requests for
661  * a file descriptor that is closing.
662  * THIS MAY BLOCK.
663  */
664 __private_extern__ void
_aio_close(proc_t p,int fd)665 _aio_close(proc_t p, int fd)
666 {
667 	int error;
668 
669 	/* quick check to see if there are any async IO requests queued up */
670 	if (!aio_has_any_work()) {
671 		return;
672 	}
673 
674 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close) | DBG_FUNC_START,
675 	    VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
676 
677 	/* cancel all async IO requests on our todo queues for this file descriptor */
678 	aio_proc_lock(p);
679 	error = do_aio_cancel_locked(p, fd, USER_ADDR_NULL, AIO_CLOSE_WAIT);
680 	ASSERT_AIO_PROC_LOCK_OWNED(p);
681 	if (error == AIO_NOTCANCELED) {
682 		/*
683 		 * AIO_NOTCANCELED is returned when we find an aio request for this process
684 		 * and file descriptor on the active async IO queue.  Active requests cannot
685 		 * be cancelled so we must wait for them to complete.  We will get a special
686 		 * wake up call on our channel used to sleep for ALL active requests to
687 		 * complete.  This sleep channel (proc.AIO_CLEANUP_SLEEP_CHAN) is only used
688 		 * when we must wait for all active aio requests.
689 		 */
690 
691 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close_sleep) | DBG_FUNC_NONE,
692 		    VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
693 
694 		while (aio_proc_has_active_requests_for_file(p, fd)) {
695 			msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_close", 0);
696 		}
697 	}
698 
699 	aio_proc_unlock(p);
700 
701 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close) | DBG_FUNC_END,
702 	    VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
703 }
704 
705 
706 /*
707  * aio_error - return the error status associated with the async IO
708  * request referred to by uap->aiocbp.  The error status is the errno
709  * value that would be set by the corresponding IO request (read, wrtie,
710  * fdatasync, or sync).
711  */
712 int
aio_error(proc_t p,struct aio_error_args * uap,int * retval)713 aio_error(proc_t p, struct aio_error_args *uap, int *retval)
714 {
715 	aio_workq_entry *entryp;
716 	int              error;
717 
718 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error) | DBG_FUNC_START,
719 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
720 
721 	/* see if there are any aios to check */
722 	if (!aio_has_any_work()) {
723 		return EINVAL;
724 	}
725 
726 	aio_proc_lock(p);
727 
728 	/* look for a match on our queue of async IO requests that have completed */
729 	TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
730 		if (entryp->uaiocbp == uap->aiocbp) {
731 			ASSERT_AIO_FROM_PROC(entryp, p);
732 
733 			*retval = entryp->errorval;
734 			error = 0;
735 
736 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error_val) | DBG_FUNC_NONE,
737 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
738 			goto ExitRoutine;
739 		}
740 	}
741 
742 	/* look for a match on our queue of active async IO requests */
743 	TAILQ_FOREACH(entryp, &p->p_aio_activeq, aio_proc_link) {
744 		if (entryp->uaiocbp == uap->aiocbp) {
745 			ASSERT_AIO_FROM_PROC(entryp, p);
746 			*retval = EINPROGRESS;
747 			error = 0;
748 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error_activeq) | DBG_FUNC_NONE,
749 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
750 			goto ExitRoutine;
751 		}
752 	}
753 
754 	error = EINVAL;
755 
756 ExitRoutine:
757 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error) | DBG_FUNC_END,
758 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
759 	aio_proc_unlock(p);
760 
761 	return error;
762 }
763 
764 
765 /*
766  * aio_fsync - asynchronously force all IO operations associated
767  * with the file indicated by the file descriptor (uap->aiocbp->aio_fildes) and
768  * queued at the time of the call to the synchronized completion state.
769  * NOTE - we do not support op O_DSYNC at this point since we do not support the
770  * fdatasync() call.
771  */
772 int
aio_fsync(proc_t p,struct aio_fsync_args * uap,int * retval)773 aio_fsync(proc_t p, struct aio_fsync_args *uap, int *retval)
774 {
775 	aio_entry_flags_t fsync_kind;
776 	int error;
777 
778 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync) | DBG_FUNC_START,
779 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, uap->op, 0, 0);
780 
781 	*retval = 0;
782 	/* 0 := O_SYNC for binary backward compatibility with Panther */
783 	if (uap->op == O_SYNC || uap->op == 0) {
784 		fsync_kind = AIO_FSYNC;
785 	} else if (uap->op == O_DSYNC) {
786 		fsync_kind = AIO_DSYNC;
787 	} else {
788 		*retval = -1;
789 		error = EINVAL;
790 		goto ExitRoutine;
791 	}
792 
793 	error = aio_queue_async_request(p, uap->aiocbp, fsync_kind);
794 	if (error != 0) {
795 		*retval = -1;
796 	}
797 
798 ExitRoutine:
799 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync) | DBG_FUNC_END,
800 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
801 
802 	return error;
803 }
804 
805 
806 /* aio_read - asynchronously read uap->aiocbp->aio_nbytes bytes from the
807  * file descriptor (uap->aiocbp->aio_fildes) into the buffer
808  * (uap->aiocbp->aio_buf).
809  */
810 int
aio_read(proc_t p,struct aio_read_args * uap,int * retval)811 aio_read(proc_t p, struct aio_read_args *uap, int *retval)
812 {
813 	int error;
814 
815 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_read) | DBG_FUNC_START,
816 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
817 
818 	*retval = 0;
819 
820 	error = aio_queue_async_request(p, uap->aiocbp, AIO_READ);
821 	if (error != 0) {
822 		*retval = -1;
823 	}
824 
825 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_read) | DBG_FUNC_END,
826 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
827 
828 	return error;
829 }
830 
831 
832 /*
833  * aio_return - return the return status associated with the async IO
834  * request referred to by uap->aiocbp.  The return status is the value
835  * that would be returned by corresponding IO request (read, write,
836  * fdatasync, or sync).  This is where we release kernel resources
837  * held for async IO call associated with the given aiocb pointer.
838  */
839 int
aio_return(proc_t p,struct aio_return_args * uap,user_ssize_t * retval)840 aio_return(proc_t p, struct aio_return_args *uap, user_ssize_t *retval)
841 {
842 	aio_workq_entry *entryp;
843 	int              error = EINVAL;
844 
845 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return) | DBG_FUNC_START,
846 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
847 
848 	/* See if there are any entries to check */
849 	if (!aio_has_any_work()) {
850 		goto ExitRoutine;
851 	}
852 
853 	aio_proc_lock(p);
854 	*retval = 0;
855 
856 	/* look for a match on our queue of async IO requests that have completed */
857 	TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
858 		ASSERT_AIO_FROM_PROC(entryp, p);
859 		if (entryp->uaiocbp == uap->aiocbp) {
860 			/* Done and valid for aio_return(), pull it off the list */
861 			aio_proc_remove_done_locked(p, entryp);
862 
863 			*retval = entryp->returnval;
864 			error = 0;
865 			aio_proc_unlock(p);
866 
867 			aio_entry_unref(entryp);
868 
869 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return_val) | DBG_FUNC_NONE,
870 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
871 			goto ExitRoutine;
872 		}
873 	}
874 
875 	/* look for a match on our queue of active async IO requests */
876 	TAILQ_FOREACH(entryp, &p->p_aio_activeq, aio_proc_link) {
877 		ASSERT_AIO_FROM_PROC(entryp, p);
878 		if (entryp->uaiocbp == uap->aiocbp) {
879 			error = EINPROGRESS;
880 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return_activeq) | DBG_FUNC_NONE,
881 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
882 			break;
883 		}
884 	}
885 
886 	aio_proc_unlock(p);
887 
888 ExitRoutine:
889 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return) | DBG_FUNC_END,
890 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
891 
892 	return error;
893 }
894 
895 
896 /*
897  * _aio_exec - internal function used to clean up async IO requests for
898  * a process that is going away due to exec().  We cancel any async IOs
899  * we can and wait for those already active.  We also disable signaling
900  * for cancelled or active aio requests that complete.
901  * This routine MAY block!
902  */
903 __private_extern__ void
_aio_exec(proc_t p)904 _aio_exec(proc_t p)
905 {
906 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exec) | DBG_FUNC_START,
907 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
908 
909 	_aio_exit(p);
910 
911 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exec) | DBG_FUNC_END,
912 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
913 }
914 
915 
916 /*
917  * _aio_exit - internal function used to clean up async IO requests for
918  * a process that is terminating (via exit() or exec()).  We cancel any async IOs
919  * we can and wait for those already active.  We also disable signaling
920  * for cancelled or active aio requests that complete.  This routine MAY block!
921  */
922 __private_extern__ void
_aio_exit(proc_t p)923 _aio_exit(proc_t p)
924 {
925 	TAILQ_HEAD(, aio_workq_entry) tofree = TAILQ_HEAD_INITIALIZER(tofree);
926 	aio_workq_entry *entryp, *tmp;
927 	int              error;
928 
929 	/* quick check to see if there are any async IO requests queued up */
930 	if (!aio_has_any_work()) {
931 		workq_aio_mark_exiting(p);
932 		workq_aio_exit(p);
933 		return;
934 	}
935 
936 	workq_aio_mark_exiting(p);
937 
938 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit) | DBG_FUNC_START,
939 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
940 
941 	aio_proc_lock(p);
942 
943 	/*
944 	 * cancel async IO requests on the todo work queue and wait for those
945 	 * already active to complete.
946 	 */
947 	error = do_aio_cancel_locked(p, -1, USER_ADDR_NULL, AIO_EXIT_WAIT);
948 	ASSERT_AIO_PROC_LOCK_OWNED(p);
949 	if (error == AIO_NOTCANCELED) {
950 		/*
951 		 * AIO_NOTCANCELED is returned when we find an aio request for this process
952 		 * on the active async IO queue.  Active requests cannot be cancelled so we
953 		 * must wait for them to complete.  We will get a special wake up call on
954 		 * our channel used to sleep for ALL active requests to complete.  This sleep
955 		 * channel (proc.AIO_CLEANUP_SLEEP_CHAN) is only used when we must wait for all
956 		 * active aio requests.
957 		 */
958 
959 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit_sleep) | DBG_FUNC_NONE,
960 		    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
961 
962 		while (aio_has_active_requests_for_process(p)) {
963 			msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_exit", 0);
964 		}
965 	}
966 
967 	assert(!aio_has_active_requests_for_process(p));
968 
969 	/* release all aio resources used by this process */
970 	TAILQ_FOREACH_SAFE(entryp, &p->p_aio_doneq, aio_proc_link, tmp) {
971 		ASSERT_AIO_FROM_PROC(entryp, p);
972 
973 		aio_proc_remove_done_locked(p, entryp);
974 		TAILQ_INSERT_TAIL(&tofree, entryp, aio_proc_link);
975 	}
976 
977 	aio_proc_unlock(p);
978 
979 	workq_aio_exit(p);
980 
981 	/* free all the entries outside of the aio_proc_lock() */
982 	TAILQ_FOREACH_SAFE(entryp, &tofree, aio_proc_link, tmp) {
983 		entryp->aio_proc_link.tqe_prev = NULL;
984 		aio_entry_unref(entryp);
985 	}
986 
987 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit) | DBG_FUNC_END,
988 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
989 }
990 
991 
992 static bool
should_cancel(aio_workq_entry * entryp,int fd,user_addr_t aiocbp,aio_entry_flags_t reason)993 should_cancel(aio_workq_entry *entryp, int fd, user_addr_t aiocbp,
994     aio_entry_flags_t reason)
995 {
996 	if (reason & AIO_EXIT_WAIT) {
997 		/* caller is _aio_exit() */
998 		return true;
999 	}
1000 	if (fd != entryp->aiocb.aio_fildes) {
1001 		/* not the file we're looking for */
1002 		return false;
1003 	}
1004 	/*
1005 	 * aio_cancel() or _aio_close() cancel
1006 	 * everything for a given fd when aiocbp is NULL
1007 	 */
1008 	return aiocbp == USER_ADDR_NULL || entryp->uaiocbp == aiocbp;
1009 }
1010 
1011 /*
1012  * do_aio_cancel_locked - cancel async IO requests (if possible).  We get called by
1013  * aio_cancel, close, and at exit.
1014  * There are three modes of operation: 1) cancel all async IOs for a process -
1015  * fd is 0 and aiocbp is NULL 2) cancel all async IOs for file descriptor - fd
1016  * is > 0 and aiocbp is NULL 3) cancel one async IO associated with the given
1017  * aiocbp.
1018  * Returns -1 if no matches were found, AIO_CANCELED when we cancelled all
1019  * target async IO requests, AIO_NOTCANCELED if we could not cancel all
1020  * target async IO requests, and AIO_ALLDONE if all target async IO requests
1021  * were already complete.
1022  * WARNING - do not deference aiocbp in this routine, it may point to user
1023  * land data that has not been copied in (when called from aio_cancel())
1024  *
1025  * Called with proc locked, and returns the same way.
1026  */
1027 static int
do_aio_cancel_locked(proc_t p,int fd,user_addr_t aiocbp,aio_entry_flags_t reason)1028 do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp,
1029     aio_entry_flags_t reason)
1030 {
1031 	bool multiple_matches = (aiocbp == USER_ADDR_NULL);
1032 	aio_workq_entry *entryp, *tmp;
1033 	int result;
1034 
1035 	ASSERT_AIO_PROC_LOCK_OWNED(p);
1036 
1037 	/* look for a match on our queue of async todo work. */
1038 again:
1039 	result = -1;
1040 	TAILQ_FOREACH_SAFE(entryp, &p->p_aio_activeq, aio_proc_link, tmp) {
1041 		ASSERT_AIO_FROM_PROC(entryp, p);
1042 
1043 		if (!should_cancel(entryp, fd, aiocbp, reason)) {
1044 			continue;
1045 		}
1046 
1047 		if (reason) {
1048 			/* mark the entry as blocking close or exit/exec */
1049 			entryp->flags |= reason;
1050 			if ((entryp->flags & AIO_EXIT_WAIT) && (entryp->flags & AIO_CLOSE_WAIT)) {
1051 				panic("Close and exit flags set at the same time");
1052 			}
1053 		}
1054 
1055 		/* Can only be cancelled if it's still on a work queue */
1056 		if (aio_entry_try_workq_remove(p, entryp)) {
1057 			entryp->errorval = ECANCELED;
1058 			entryp->returnval = -1;
1059 
1060 			/* Now it's officially cancelled.  Do the completion */
1061 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq) | DBG_FUNC_NONE,
1062 			    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1063 			    fd, 0, 0);
1064 			do_aio_completion_and_unlock(p, entryp, AIO_CANCELLED);
1065 
1066 			aio_proc_lock(p);
1067 
1068 			if (multiple_matches) {
1069 				/*
1070 				 * Restart from the head of the proc active queue since it
1071 				 * may have been changed while we were away doing completion
1072 				 * processing.
1073 				 *
1074 				 * Note that if we found an uncancellable AIO before, we will
1075 				 * either find it again or discover that it's been completed,
1076 				 * so resetting the result will not cause us to return success
1077 				 * despite outstanding AIOs.
1078 				 */
1079 				goto again;
1080 			}
1081 
1082 			return AIO_CANCELED;
1083 		}
1084 
1085 		/*
1086 		 * It's been taken off the active queue already, i.e. is in flight.
1087 		 * All we can do is ask for notification.
1088 		 */
1089 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_activeq) | DBG_FUNC_NONE,
1090 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1091 		    fd, 0, 0);
1092 
1093 		result = AIO_NOTCANCELED;
1094 		if (!multiple_matches) {
1095 			return result;
1096 		}
1097 	}
1098 
1099 	/*
1100 	 * if we didn't find any matches on the todo or active queues then look for a
1101 	 * match on our queue of async IO requests that have completed and if found
1102 	 * return AIO_ALLDONE result.
1103 	 *
1104 	 * Proc AIO lock is still held.
1105 	 */
1106 	if (result == -1) {
1107 		TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
1108 			ASSERT_AIO_FROM_PROC(entryp, p);
1109 			if (should_cancel(entryp, fd, aiocbp, reason)) {
1110 				KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_doneq) | DBG_FUNC_NONE,
1111 				    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1112 				    fd, 0, 0);
1113 
1114 				result = AIO_ALLDONE;
1115 				if (!multiple_matches) {
1116 					return result;
1117 				}
1118 			}
1119 		}
1120 	}
1121 
1122 	return result;
1123 }
1124 
1125 
1126 /*
1127  * aio_suspend - suspend the calling thread until at least one of the async
1128  * IO operations referenced by uap->aiocblist has completed, until a signal
1129  * interrupts the function, or uap->timeoutp time interval (optional) has
1130  * passed.
1131  * Returns 0 if one or more async IOs have completed else -1 and errno is
1132  * set appropriately - EAGAIN if timeout elapses or EINTR if an interrupt
1133  * woke us up.
1134  */
1135 int
aio_suspend(proc_t p,struct aio_suspend_args * uap,int * retval)1136 aio_suspend(proc_t p, struct aio_suspend_args *uap, int *retval)
1137 {
1138 	__pthread_testcancel(1);
1139 	return aio_suspend_nocancel(p, (struct aio_suspend_nocancel_args *)uap, retval);
1140 }
1141 
1142 
1143 int
aio_suspend_nocancel(proc_t p,struct aio_suspend_nocancel_args * uap,int * retval)1144 aio_suspend_nocancel(proc_t p, struct aio_suspend_nocancel_args *uap, int *retval)
1145 {
1146 	int                     error;
1147 	int                     i;
1148 	uint64_t                abstime;
1149 	struct user_timespec    ts;
1150 	aio_workq_entry        *entryp;
1151 	user_addr_t            *aiocbpp;
1152 	size_t                  aiocbpp_size;
1153 
1154 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend) | DBG_FUNC_START,
1155 	    VM_KERNEL_ADDRPERM(p), uap->nent, 0, 0, 0);
1156 
1157 	*retval = -1;
1158 	abstime = 0;
1159 	aiocbpp = NULL;
1160 
1161 	if (!aio_has_any_work()) {
1162 		error = EINVAL;
1163 		goto ExitThisRoutine;
1164 	}
1165 
1166 	if (uap->nent < 1 || uap->nent > aio_max_requests_per_process ||
1167 	    os_mul_overflow(sizeof(user_addr_t), uap->nent, &aiocbpp_size)) {
1168 		error = EINVAL;
1169 		goto ExitThisRoutine;
1170 	}
1171 
1172 	if (uap->timeoutp != USER_ADDR_NULL) {
1173 		if (proc_is64bit(p)) {
1174 			struct user64_timespec temp;
1175 			error = copyin(uap->timeoutp, &temp, sizeof(temp));
1176 			if (error == 0) {
1177 				ts.tv_sec = (user_time_t)temp.tv_sec;
1178 				ts.tv_nsec = (user_long_t)temp.tv_nsec;
1179 			}
1180 		} else {
1181 			struct user32_timespec temp;
1182 			error = copyin(uap->timeoutp, &temp, sizeof(temp));
1183 			if (error == 0) {
1184 				ts.tv_sec = temp.tv_sec;
1185 				ts.tv_nsec = temp.tv_nsec;
1186 			}
1187 		}
1188 		if (error != 0) {
1189 			error = EAGAIN;
1190 			goto ExitThisRoutine;
1191 		}
1192 
1193 		if (ts.tv_sec < 0 || ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000) {
1194 			error = EINVAL;
1195 			goto ExitThisRoutine;
1196 		}
1197 
1198 		nanoseconds_to_absolutetime((uint64_t)ts.tv_sec * NSEC_PER_SEC + ts.tv_nsec,
1199 		    &abstime);
1200 		clock_absolutetime_interval_to_deadline(abstime, &abstime);
1201 	}
1202 
1203 	aiocbpp = (user_addr_t *)kalloc_data(aiocbpp_size, Z_WAITOK);
1204 	if (aiocbpp == NULL || aio_copy_in_list(p, uap->aiocblist, aiocbpp, uap->nent)) {
1205 		error = EAGAIN;
1206 		goto ExitThisRoutine;
1207 	}
1208 
1209 	/* check list of aio requests to see if any have completed */
1210 check_for_our_aiocbp:
1211 	aio_proc_lock_spin(p);
1212 	for (i = 0; i < uap->nent; i++) {
1213 		user_addr_t     aiocbp;
1214 
1215 		/* NULL elements are legal so check for 'em */
1216 		aiocbp = *(aiocbpp + i);
1217 		if (aiocbp == USER_ADDR_NULL) {
1218 			continue;
1219 		}
1220 
1221 		/* return immediately if any aio request in the list is done */
1222 		TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
1223 			ASSERT_AIO_FROM_PROC(entryp, p);
1224 			if (entryp->uaiocbp == aiocbp) {
1225 				aio_proc_unlock(p);
1226 				*retval = 0;
1227 				error = 0;
1228 				goto ExitThisRoutine;
1229 			}
1230 		}
1231 	}
1232 
1233 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend_sleep) | DBG_FUNC_NONE,
1234 	    VM_KERNEL_ADDRPERM(p), uap->nent, 0, 0, 0);
1235 
1236 	/*
1237 	 * wait for an async IO to complete or a signal fires or timeout expires.
1238 	 * we return EAGAIN (35) for timeout expiration and EINTR (4) when a signal
1239 	 * interrupts us.  If an async IO completes before a signal fires or our
1240 	 * timeout expires, we get a wakeup call from aio_work_thread().
1241 	 */
1242 
1243 	error = msleep1(&p->AIO_SUSPEND_SLEEP_CHAN, aio_proc_mutex(p),
1244 	    PCATCH | PWAIT | PDROP, "aio_suspend", abstime);
1245 	if (error == 0) {
1246 		/*
1247 		 * got our wakeup call from aio_work_thread().
1248 		 * Since we can get a wakeup on this channel from another thread in the
1249 		 * same process we head back up to make sure this is for the correct aiocbp.
1250 		 * If it is the correct aiocbp we will return from where we do the check
1251 		 * (see entryp->uaiocbp == aiocbp after check_for_our_aiocbp label)
1252 		 * else we will fall out and just sleep again.
1253 		 */
1254 		goto check_for_our_aiocbp;
1255 	} else if (error == EWOULDBLOCK) {
1256 		/* our timeout expired */
1257 		error = EAGAIN;
1258 	} else {
1259 		/* we were interrupted */
1260 		error = EINTR;
1261 	}
1262 
1263 ExitThisRoutine:
1264 	if (aiocbpp != NULL) {
1265 		kfree_data(aiocbpp, aiocbpp_size);
1266 	}
1267 
1268 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend) | DBG_FUNC_END,
1269 	    VM_KERNEL_ADDRPERM(p), uap->nent, error, 0, 0);
1270 
1271 	return error;
1272 }
1273 
1274 
1275 /* aio_write - asynchronously write uap->aiocbp->aio_nbytes bytes to the
1276  * file descriptor (uap->aiocbp->aio_fildes) from the buffer
1277  * (uap->aiocbp->aio_buf).
1278  */
1279 
1280 int
aio_write(proc_t p,struct aio_write_args * uap,int * retval __unused)1281 aio_write(proc_t p, struct aio_write_args *uap, int *retval __unused)
1282 {
1283 	int error;
1284 
1285 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_write) | DBG_FUNC_START,
1286 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
1287 
1288 	error = aio_queue_async_request(p, uap->aiocbp, AIO_WRITE);
1289 
1290 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_write) | DBG_FUNC_END,
1291 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
1292 
1293 	return error;
1294 }
1295 
1296 
1297 static int
aio_copy_in_list(proc_t procp,user_addr_t aiocblist,user_addr_t * aiocbpp,int nent)1298 aio_copy_in_list(proc_t procp, user_addr_t aiocblist, user_addr_t *aiocbpp,
1299     int nent)
1300 {
1301 	int result;
1302 
1303 	/* copyin our aiocb pointers from list */
1304 	result = copyin(aiocblist, aiocbpp,
1305 	    proc_is64bit(procp) ? (nent * sizeof(user64_addr_t))
1306 	    : (nent * sizeof(user32_addr_t)));
1307 	if (result) {
1308 		return result;
1309 	}
1310 
1311 	/*
1312 	 * We depend on a list of user_addr_t's so we need to
1313 	 * munge and expand when these pointers came from a
1314 	 * 32-bit process
1315 	 */
1316 	if (!proc_is64bit(procp)) {
1317 		/* copy from last to first to deal with overlap */
1318 		user32_addr_t *my_ptrp = ((user32_addr_t *)aiocbpp) + (nent - 1);
1319 		user_addr_t *my_addrp = aiocbpp + (nent - 1);
1320 
1321 		for (int i = 0; i < nent; i++, my_ptrp--, my_addrp--) {
1322 			*my_addrp = (user_addr_t) (*my_ptrp);
1323 		}
1324 	}
1325 
1326 	return 0;
1327 }
1328 
1329 
1330 static int
aio_copy_in_sigev(proc_t procp,user_addr_t sigp,struct user_sigevent * sigev)1331 aio_copy_in_sigev(proc_t procp, user_addr_t sigp, struct user_sigevent *sigev)
1332 {
1333 	int     result = 0;
1334 
1335 	if (sigp == USER_ADDR_NULL) {
1336 		goto out;
1337 	}
1338 
1339 	/*
1340 	 * We need to munge aio_sigevent since it contains pointers.
1341 	 * Since we do not know if sigev_value is an int or a ptr we do
1342 	 * NOT cast the ptr to a user_addr_t.   This means if we send
1343 	 * this info back to user space we need to remember sigev_value
1344 	 * was not expanded for the 32-bit case.
1345 	 *
1346 	 * Notes:	 This does NOT affect us since we don't support
1347 	 *		sigev_value yet in the aio context.
1348 	 */
1349 	if (proc_is64bit(procp)) {
1350 #if __LP64__
1351 		struct user64_sigevent sigevent64;
1352 
1353 		result = copyin(sigp, &sigevent64, sizeof(sigevent64));
1354 		if (result == 0) {
1355 			sigev->sigev_notify = sigevent64.sigev_notify;
1356 			sigev->sigev_signo = sigevent64.sigev_signo;
1357 			sigev->sigev_value.size_equivalent.sival_int = sigevent64.sigev_value.size_equivalent.sival_int;
1358 			sigev->sigev_notify_function = sigevent64.sigev_notify_function;
1359 			sigev->sigev_notify_attributes = sigevent64.sigev_notify_attributes;
1360 		}
1361 #else
1362 		panic("64bit process on 32bit kernel is not supported");
1363 #endif
1364 	} else {
1365 		struct user32_sigevent sigevent32;
1366 
1367 		result = copyin(sigp, &sigevent32, sizeof(sigevent32));
1368 		if (result == 0) {
1369 			sigev->sigev_notify = sigevent32.sigev_notify;
1370 			sigev->sigev_signo = sigevent32.sigev_signo;
1371 			sigev->sigev_value.size_equivalent.sival_int = sigevent32.sigev_value.sival_int;
1372 			sigev->sigev_notify_function = CAST_USER_ADDR_T(sigevent32.sigev_notify_function);
1373 			sigev->sigev_notify_attributes = CAST_USER_ADDR_T(sigevent32.sigev_notify_attributes);
1374 		}
1375 	}
1376 
1377 	if (result != 0) {
1378 		result = EAGAIN;
1379 	}
1380 
1381 out:
1382 	return result;
1383 }
1384 
1385 /*
1386  * validate user_sigevent.  at this point we only support
1387  * sigev_notify equal to SIGEV_SIGNAL or SIGEV_NONE.  this means
1388  * sigev_value, sigev_notify_function, and sigev_notify_attributes
1389  * are ignored, since SIGEV_THREAD is unsupported.  This is consistent
1390  * with no [RTS] (RalTime Signal) option group support.
1391  */
1392 static int
aio_sigev_validate(const struct user_sigevent * sigev)1393 aio_sigev_validate(const struct user_sigevent *sigev)
1394 {
1395 	switch (sigev->sigev_notify) {
1396 	case SIGEV_SIGNAL:
1397 	{
1398 		int signum;
1399 
1400 		/* make sure we have a valid signal number */
1401 		signum = sigev->sigev_signo;
1402 		if (signum <= 0 || signum >= NSIG ||
1403 		    signum == SIGKILL || signum == SIGSTOP) {
1404 			return EINVAL;
1405 		}
1406 	}
1407 	break;
1408 
1409 	case SIGEV_NONE:
1410 		break;
1411 
1412 	case SIGEV_KEVENT:
1413 		/*
1414 		 * The sigev_signo should contain the descriptor of the kqueue.
1415 		 * Validate that it contains some sane value.
1416 		 */
1417 		if (sigev->sigev_signo <= 0 || sigev->sigev_signo > maxfilesperproc) {
1418 			return EINVAL;
1419 		}
1420 		break;
1421 
1422 	case SIGEV_THREAD:
1423 	/* Unsupported [RTS] */
1424 
1425 	default:
1426 		return EINVAL;
1427 	}
1428 
1429 	return 0;
1430 }
1431 
1432 
1433 /*
1434  * aio_try_enqueue_work_locked
1435  *
1436  * Queue up the entry on the aio asynchronous work queue in priority order
1437  * based on the relative priority of the request.  We calculate the relative
1438  * priority using the nice value of the caller and the value
1439  *
1440  * Parameters:	procp			Process queueing the I/O
1441  *		entryp			The work queue entry being queued
1442  *		leader			The work leader if any
1443  *
1444  * Returns:	Whether the enqueue was successful
1445  *
1446  * Notes:	This function is used for both lio_listio and aio
1447  *
1448  * XXX:		At some point, we may have to consider thread priority
1449  *		rather than process priority, but we don't maintain the
1450  *		adjusted priority for threads the POSIX way.
1451  *
1452  * Called with proc locked.
1453  */
1454 static bool
aio_try_enqueue_work_locked(proc_t procp,aio_workq_entry * entryp,aio_workq_entry * leader)1455 aio_try_enqueue_work_locked(proc_t procp, aio_workq_entry *entryp,
1456     aio_workq_entry *leader)
1457 {
1458 	ASSERT_AIO_PROC_LOCK_OWNED(procp);
1459 
1460 	/* Onto proc queue */
1461 	if (!aio_try_proc_insert_active_locked(procp, entryp)) {
1462 		return false;
1463 	}
1464 
1465 	if (leader) {
1466 		aio_entry_ref(leader); /* consumed in do_aio_completion_and_unlock */
1467 		leader->lio_pending++;
1468 		entryp->lio_leader = leader;
1469 	}
1470 
1471 	/* And work queue */
1472 	aio_entry_ref(entryp); /* consumed in do_aio_completion_and_unlock */
1473 	if (bootarg_aio_new_workq) {
1474 		if (!workq_aio_entry_add_locked(procp, entryp)) {
1475 			(void)os_ref_release(&entryp->aio_refcount);
1476 			return false;
1477 		}
1478 	} else {
1479 		aio_workq_t queue = aio_entry_workq(entryp);
1480 		aio_workq_lock_spin(queue);
1481 		aio_workq_add_entry_locked(queue, entryp);
1482 		waitq_wakeup64_one(&queue->aioq_waitq, CAST_EVENT64_T(queue),
1483 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
1484 		aio_workq_unlock(queue);
1485 	}
1486 
1487 	KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued) | DBG_FUNC_START,
1488 	    VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1489 	    entryp->flags, entryp->aiocb.aio_fildes, 0);
1490 	KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued) | DBG_FUNC_END,
1491 	    entryp->aiocb.aio_offset, 0, entryp->aiocb.aio_nbytes, 0, 0);
1492 	return true;
1493 }
1494 
1495 /*
1496  * EV_FLAG0/1 are filter specific flags.
1497  * Repurpose EV_FLAG0 to indicate the kevent is registered from kernel.
1498  */
1499 #define EV_KERNEL    EV_FLAG0
1500 
1501 /* Register a kevent for AIO completion notification. */
1502 static int
aio_register_kevent(proc_t procp,aio_workq_entry * entryp)1503 aio_register_kevent(proc_t procp, aio_workq_entry *entryp)
1504 {
1505 	struct kevent_qos_s kev;
1506 	struct fileproc *fp = NULL;
1507 	kqueue_t kqu;
1508 	int kqfd = entryp->aiocb.aio_sigevent.sigev_signo;
1509 	int error;
1510 
1511 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_register_kevent) | DBG_FUNC_START,
1512 	    VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp),
1513 	    VM_KERNEL_ADDRPERM(entryp->uaiocbp), kqfd, 0);
1514 
1515 	error = fp_get_ftype(procp, kqfd, DTYPE_KQUEUE, EBADF, &fp);
1516 	if (error) {
1517 		goto exit;
1518 	}
1519 
1520 	kqu.kq = (struct kqueue *)fp_get_data(fp);
1521 
1522 	memset(&kev, 0, sizeof(kev));
1523 	kev.ident = (uintptr_t)entryp->uaiocbp;
1524 	kev.filter = EVFILT_AIO;
1525 	/*
1526 	 * Set the EV_FLAG0 to indicate the event is registered from the kernel.
1527 	 * This flag later is checked in filt_aioattach() and to determine if
1528 	 * a kevent is registered from kernel or user-space.
1529 	 */
1530 	kev.flags = EV_ADD | EV_ENABLE | EV_CLEAR | EV_ONESHOT | EV_KERNEL;
1531 	kev.udata = entryp->aiocb.aio_sigevent.sigev_value.sival_ptr;
1532 	kev.data = (intptr_t)entryp;
1533 
1534 	error = kevent_register(kqu.kq, &kev, NULL);
1535 	assert((error & FILTER_REGISTER_WAIT) == 0);
1536 
1537 exit:
1538 	if (fp) {
1539 		fp_drop(procp, kqfd, fp, 0);
1540 	}
1541 
1542 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_register_kevent) | DBG_FUNC_END,
1543 	    VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp), error, 0, 0);
1544 
1545 	return error;
1546 }
1547 
1548 /*
1549  * lio_listio - initiate a list of IO requests.  We process the list of
1550  * aiocbs either synchronously (mode == LIO_WAIT) or asynchronously
1551  * (mode == LIO_NOWAIT).
1552  *
1553  * The caller gets error and return status for each aiocb in the list
1554  * via aio_error and aio_return.  We must keep completed requests until
1555  * released by the aio_return call.
1556  */
1557 int
lio_listio(proc_t p,struct lio_listio_args * uap,int * retval __unused)1558 lio_listio(proc_t p, struct lio_listio_args *uap, int *retval __unused)
1559 {
1560 	aio_workq_entry         *entries[AIO_LISTIO_MAX] = { };
1561 	user_addr_t              aiocbpp[AIO_LISTIO_MAX];
1562 	struct user_sigevent     aiosigev = { };
1563 	int                      result = 0;
1564 	int                      lio_count = 0;
1565 
1566 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_listio) | DBG_FUNC_START,
1567 	    VM_KERNEL_ADDRPERM(p), uap->nent, uap->mode, 0, 0);
1568 
1569 	if (!(uap->mode == LIO_NOWAIT || uap->mode == LIO_WAIT)) {
1570 		result = EINVAL;
1571 		goto ExitRoutine;
1572 	}
1573 
1574 	if (uap->nent < 1 || uap->nent > AIO_LISTIO_MAX) {
1575 		result = EINVAL;
1576 		goto ExitRoutine;
1577 	}
1578 
1579 	/*
1580 	 * Use sigevent passed in to lio_listio for each of our calls, but
1581 	 * only do completion notification after the last request completes.
1582 	 */
1583 	if (uap->sigp != USER_ADDR_NULL) {
1584 		result = aio_copy_in_sigev(p, uap->sigp, &aiosigev);
1585 		if (result) {
1586 			goto ExitRoutine;
1587 		}
1588 		result = aio_sigev_validate(&aiosigev);
1589 		if (result) {
1590 			goto ExitRoutine;
1591 		}
1592 	}
1593 
1594 	if (aio_copy_in_list(p, uap->aiocblist, aiocbpp, uap->nent)) {
1595 		result = EAGAIN;
1596 		goto ExitRoutine;
1597 	}
1598 
1599 	/*
1600 	 * allocate/parse all entries
1601 	 */
1602 	for (int i = 0; i < uap->nent; i++) {
1603 		aio_workq_entry *entryp;
1604 
1605 		/* NULL elements are legal so check for 'em */
1606 		if (aiocbpp[i] == USER_ADDR_NULL) {
1607 			continue;
1608 		}
1609 
1610 		entryp = aio_create_queue_entry(p, aiocbpp[i], AIO_LIO);
1611 		if (entryp == NULL) {
1612 			result = EAGAIN;
1613 			goto ExitRoutine;
1614 		}
1615 
1616 		/*
1617 		 * This refcount is cleaned up on exit if the entry
1618 		 * isn't submitted
1619 		 */
1620 		entries[lio_count++] = entryp;
1621 		if ((uap->mode == LIO_NOWAIT) &&
1622 		    (entryp->aiocb.aio_sigevent.sigev_notify != SIGEV_KEVENT)) {
1623 			/* Set signal hander, if any */
1624 			entryp->aiocb.aio_sigevent = aiosigev;
1625 		}
1626 	}
1627 
1628 	if (lio_count == 0) {
1629 		/* There's nothing to submit */
1630 		goto ExitRoutine;
1631 	}
1632 
1633 	/*
1634 	 * Past this point we're commited and will not bail out
1635 	 *
1636 	 * - keep a reference on the leader for LIO_WAIT
1637 	 * - perform the submissions and optionally wait
1638 	 */
1639 
1640 	aio_workq_entry *leader = entries[0];
1641 	if (uap->mode == LIO_WAIT) {
1642 		aio_entry_ref(leader); /* consumed below */
1643 	}
1644 
1645 	aio_proc_lock(p);
1646 
1647 	for (int i = 0; i < lio_count; i++) {
1648 		if (aio_try_enqueue_work_locked(p, entries[i], leader)) {
1649 			workq_aio_wakeup_thread(p); /* this may drop and reacquire the proc lock */
1650 			/*
1651 			 * For SIGEV_KEVENT, every AIO in the list would get its own kevent
1652 			 * notification upon completion as opposed to SIGEV_SIGNAL which a
1653 			 * single notification is deliverd when all AIOs have completed.
1654 			 */
1655 			if ((uap->mode == LIO_NOWAIT) &&
1656 			    (entries[i]->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT)) {
1657 				aio_register_kevent(p, entries[i]);
1658 			}
1659 			entries[i] = NULL; /* the entry was submitted */
1660 		} else {
1661 			result = EAGAIN;
1662 		}
1663 	}
1664 
1665 	if (uap->mode == LIO_WAIT && result == 0) {
1666 		leader->flags |= AIO_LIO_WAIT;
1667 
1668 		while (leader->lio_pending) {
1669 			/* If we were interrupted, fail out (even if all finished) */
1670 			if (msleep(leader, aio_proc_mutex(p),
1671 			    PCATCH | PRIBIO | PSPIN, "lio_listio", 0) != 0) {
1672 				result = EINTR;
1673 				break;
1674 			}
1675 		}
1676 
1677 		leader->flags &= ~AIO_LIO_WAIT;
1678 	}
1679 
1680 	aio_proc_unlock(p);
1681 
1682 	if (uap->mode == LIO_WAIT) {
1683 		aio_entry_unref(leader);
1684 	}
1685 
1686 ExitRoutine:
1687 	/* Consume unsubmitted entries */
1688 	for (int i = 0; i < lio_count; i++) {
1689 		if (entries[i]) {
1690 			aio_entry_unref(entries[i]);
1691 		}
1692 	}
1693 
1694 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_listio) | DBG_FUNC_END,
1695 	    VM_KERNEL_ADDRPERM(p), result, 0, 0, 0);
1696 
1697 	return result;
1698 }
1699 
1700 
1701 /*
1702  * aio worker thread.  this is where all the real work gets done.
1703  * we get a wake up call on sleep channel &aio_anchor.aio_async_workq
1704  * after new work is queued up.
1705  */
1706 __attribute__((noreturn))
1707 static void
aio_work_thread(void * arg __unused,wait_result_t wr __unused)1708 aio_work_thread(void *arg __unused, wait_result_t wr __unused)
1709 {
1710 	aio_workq_entry         *entryp;
1711 	int                     error;
1712 	vm_map_switch_context_t switch_ctx;
1713 	struct uthread          *uthreadp = NULL;
1714 	proc_t                  p = NULL;
1715 
1716 	for (;;) {
1717 		/*
1718 		 * returns with the entry ref'ed.
1719 		 * sleeps until work is available.
1720 		 */
1721 		entryp = aio_get_some_work();
1722 		p = entryp->procp;
1723 
1724 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread) | DBG_FUNC_START,
1725 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1726 		    entryp->flags, 0, 0);
1727 
1728 		/*
1729 		 * Assume the target's address space identity for the duration
1730 		 * of the IO.  Note: don't need to have the entryp locked,
1731 		 * because the proc and map don't change until it's freed.
1732 		 */
1733 		uthreadp = (struct uthread *) current_uthread();
1734 		assert(get_task_map(proc_task(current_proc())) != entryp->aio_map);
1735 		assert(uthreadp->uu_aio_task == NULL);
1736 
1737 		/*
1738 		 * workq entries at this stage cause _aio_exec() and _aio_exit() to
1739 		 * block until we hit `do_aio_completion_and_unlock()` below,
1740 		 * which means that it is safe to dereference p->task without
1741 		 * holding a lock or taking references.
1742 		 */
1743 		uthreadp->uu_aio_task = proc_task(p);
1744 		switch_ctx = vm_map_switch_to(entryp->aio_map);
1745 
1746 		if ((entryp->flags & AIO_READ) != 0) {
1747 			error = do_aio_read(entryp);
1748 		} else if ((entryp->flags & AIO_WRITE) != 0) {
1749 			error = do_aio_write(entryp);
1750 		} else if ((entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0) {
1751 			error = do_aio_fsync(entryp);
1752 		} else {
1753 			error = EINVAL;
1754 		}
1755 
1756 		/* Restore old map */
1757 		vm_map_switch_back(switch_ctx);
1758 		uthreadp->uu_aio_task = NULL;
1759 
1760 		/* liberate unused map */
1761 		vm_map_deallocate(entryp->aio_map);
1762 		entryp->aio_map = VM_MAP_NULL;
1763 
1764 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread) | DBG_FUNC_END,
1765 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1766 		    entryp->errorval, entryp->returnval, 0);
1767 
1768 		/* we're done with the IO request so pop it off the active queue and */
1769 		/* push it on the done queue */
1770 		aio_proc_lock(p);
1771 		entryp->errorval = error;
1772 		do_aio_completion_and_unlock(p, entryp, AIO_COMPLETED);
1773 	}
1774 }
1775 
1776 
1777 /*
1778  * aio_get_some_work - get the next async IO request that is ready to be executed.
1779  * aio_fsync complicates matters a bit since we cannot do the fsync until all async
1780  * IO requests at the time the aio_fsync call came in have completed.
1781  * NOTE - AIO_LOCK must be held by caller
1782  */
1783 static aio_workq_entry *
aio_get_some_work(void)1784 aio_get_some_work(void)
1785 {
1786 	aio_workq_entry *entryp = NULL;
1787 	aio_workq_t      queue = NULL;
1788 
1789 	/* Just one queue for the moment.  In the future there will be many. */
1790 	queue = &aio_anchor.aio_async_workqs[0];
1791 	aio_workq_lock_spin(queue);
1792 
1793 	/*
1794 	 * Hold the queue lock.
1795 	 *
1796 	 * pop some work off the work queue and add to our active queue
1797 	 * Always start with the queue lock held.
1798 	 */
1799 	while ((entryp = TAILQ_FIRST(&queue->aioq_entries))) {
1800 		/*
1801 		 * Pull of of work queue.  Once it's off, it can't be cancelled,
1802 		 * so we can take our ref once we drop the queue lock.
1803 		 */
1804 
1805 		aio_workq_remove_entry_locked(queue, entryp);
1806 
1807 		aio_workq_unlock(queue);
1808 
1809 		/*
1810 		 * Check if it's an fsync that must be delayed.  No need to lock the entry;
1811 		 * that flag would have been set at initialization.
1812 		 */
1813 		if ((entryp->flags & AIO_FSYNC) != 0) {
1814 			/*
1815 			 * Check for unfinished operations on the same file
1816 			 * in this proc's queue.
1817 			 */
1818 			aio_proc_lock_spin(entryp->procp);
1819 			if (aio_delay_fsync_request(entryp)) {
1820 				/* It needs to be delayed.  Put it back on the end of the work queue */
1821 				KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay) | DBG_FUNC_NONE,
1822 				    VM_KERNEL_ADDRPERM(entryp->procp),
1823 				    VM_KERNEL_ADDRPERM(entryp->uaiocbp), 0, 0, 0);
1824 
1825 				aio_proc_unlock(entryp->procp);
1826 
1827 				aio_workq_lock_spin(queue);
1828 				aio_workq_add_entry_locked(queue, entryp);
1829 				continue;
1830 			}
1831 			aio_proc_unlock(entryp->procp);
1832 		}
1833 
1834 		return entryp;
1835 	}
1836 
1837 	/* We will wake up when someone enqueues something */
1838 	waitq_assert_wait64(&queue->aioq_waitq, CAST_EVENT64_T(queue), THREAD_UNINT, 0);
1839 	aio_workq_unlock(queue);
1840 	thread_block(aio_work_thread);
1841 
1842 	__builtin_unreachable();
1843 }
1844 
1845 /*
1846  * aio_delay_fsync_request - look to see if this aio_fsync request should be delayed.
1847  * A big, simple hammer: only send it off if it's the most recently filed IO which has
1848  * not been completed.
1849  */
1850 static boolean_t
aio_delay_fsync_request(aio_workq_entry * entryp)1851 aio_delay_fsync_request(aio_workq_entry *entryp)
1852 {
1853 	if (proc_in_teardown(entryp->procp)) {
1854 		/*
1855 		 * we can't delay FSYNCS when in teardown as it will confuse _aio_exit,
1856 		 * if it was dequeued, then we must now commit to it
1857 		 */
1858 		return FALSE;
1859 	}
1860 
1861 	if (entryp == TAILQ_FIRST(&entryp->procp->p_aio_activeq)) {
1862 		return FALSE;
1863 	}
1864 
1865 	return TRUE;
1866 }
1867 
1868 static aio_workq_entry *
aio_create_queue_entry(proc_t procp,user_addr_t aiocbp,aio_entry_flags_t flags)1869 aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t flags)
1870 {
1871 	aio_workq_entry *entryp;
1872 
1873 	entryp = zalloc_flags(aio_workq_zonep, Z_WAITOK | Z_ZERO);
1874 	entryp->procp = procp;
1875 	entryp->uaiocbp = aiocbp;
1876 	entryp->flags = flags;
1877 	/* consumed in aio_return or _aio_exit */
1878 	os_ref_init(&entryp->aio_refcount, &aio_refgrp);
1879 
1880 	if (proc_is64bit(procp)) {
1881 		struct user64_aiocb aiocb64;
1882 
1883 		if (copyin(aiocbp, &aiocb64, sizeof(aiocb64)) != 0) {
1884 			goto error_exit;
1885 		}
1886 		do_munge_aiocb_user64_to_user(&aiocb64, &entryp->aiocb);
1887 	} else {
1888 		struct user32_aiocb aiocb32;
1889 
1890 		if (copyin(aiocbp, &aiocb32, sizeof(aiocb32)) != 0) {
1891 			goto error_exit;
1892 		}
1893 		do_munge_aiocb_user32_to_user(&aiocb32, &entryp->aiocb);
1894 	}
1895 
1896 	/* do some more validation on the aiocb and embedded file descriptor */
1897 	if (aio_validate(procp, entryp) != 0) {
1898 		goto error_exit;
1899 	}
1900 
1901 	/* get a reference on the current_thread, which is passed in vfs_context. */
1902 	entryp->context = *vfs_context_current();
1903 	thread_reference(entryp->context.vc_thread);
1904 	kauth_cred_ref(entryp->context.vc_ucred);
1905 
1906 	if (bootarg_aio_new_workq) {
1907 		entryp->aio_map = VM_MAP_NULL;
1908 		workq_aio_prepare(procp);
1909 	} else {
1910 		/* get a reference to the user land map in order to keep it around */
1911 		entryp->aio_map = get_task_map(proc_task(procp));
1912 		vm_map_reference(entryp->aio_map);
1913 	}
1914 	return entryp;
1915 
1916 error_exit:
1917 	zfree(aio_workq_zonep, entryp);
1918 	return NULL;
1919 }
1920 
1921 
1922 /*
1923  * aio_queue_async_request - queue up an async IO request on our work queue then
1924  * wake up one of our worker threads to do the actual work.  We get a reference
1925  * to our caller's user land map in order to keep it around while we are
1926  * processing the request.
1927  */
1928 static int
aio_queue_async_request(proc_t procp,user_addr_t aiocbp,aio_entry_flags_t flags)1929 aio_queue_async_request(proc_t procp, user_addr_t aiocbp,
1930     aio_entry_flags_t flags)
1931 {
1932 	aio_workq_entry *entryp;
1933 	int              result;
1934 
1935 	entryp = aio_create_queue_entry(procp, aiocbp, flags);
1936 	if (entryp == NULL) {
1937 		result = EAGAIN;
1938 		goto error_noalloc;
1939 	}
1940 
1941 	aio_proc_lock(procp);
1942 	if (!aio_try_enqueue_work_locked(procp, entryp, NULL)) {
1943 		result = EAGAIN;
1944 		goto error_exit;
1945 	}
1946 
1947 	if ((entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) &&
1948 	    (aio_register_kevent(procp, entryp) != 0)) {
1949 		result = EAGAIN;
1950 		goto error_exit;
1951 	}
1952 	workq_aio_wakeup_thread_and_unlock(procp);
1953 	return 0;
1954 
1955 error_exit:
1956 	/*
1957 	 * This entry has not been queued up so no worries about
1958 	 * unlocked state and aio_map
1959 	 */
1960 	aio_proc_unlock(procp);
1961 	aio_free_request(entryp);
1962 error_noalloc:
1963 	return result;
1964 }
1965 
1966 
1967 /*
1968  * aio_free_request - remove our reference on the user land map and
1969  * free the work queue entry resources.  The entry is off all lists
1970  * and has zero refcount, so no one can have a pointer to it.
1971  */
1972 static void
aio_free_request(aio_workq_entry * entryp)1973 aio_free_request(aio_workq_entry *entryp)
1974 {
1975 	if (entryp->aio_proc_link.tqe_prev || entryp->aio_workq_link.tqe_prev) {
1976 		panic("aio_workq_entry %p being freed while still enqueued", entryp);
1977 	}
1978 
1979 	/* remove our reference to the user land map. */
1980 	if (VM_MAP_NULL != entryp->aio_map) {
1981 		vm_map_deallocate(entryp->aio_map);
1982 	}
1983 
1984 	/* remove our reference to thread which enqueued the request */
1985 	if (entryp->context.vc_thread) {
1986 		thread_deallocate(entryp->context.vc_thread);
1987 	}
1988 	kauth_cred_unref(&entryp->context.vc_ucred);
1989 
1990 	zfree(aio_workq_zonep, entryp);
1991 }
1992 
1993 
1994 /*
1995  * aio_validate
1996  *
1997  * validate the aiocb passed in by one of the aio syscalls.
1998  */
1999 static int
aio_validate(proc_t p,aio_workq_entry * entryp)2000 aio_validate(proc_t p, aio_workq_entry *entryp)
2001 {
2002 	struct fileproc *fp;
2003 	int              flag;
2004 	int              result;
2005 
2006 	result = 0;
2007 
2008 	if ((entryp->flags & AIO_LIO) != 0) {
2009 		if (entryp->aiocb.aio_lio_opcode == LIO_READ) {
2010 			entryp->flags |= AIO_READ;
2011 		} else if (entryp->aiocb.aio_lio_opcode == LIO_WRITE) {
2012 			entryp->flags |= AIO_WRITE;
2013 		} else if (entryp->aiocb.aio_lio_opcode == LIO_NOP) {
2014 			return 0;
2015 		} else {
2016 			return EINVAL;
2017 		}
2018 	}
2019 
2020 	flag = FREAD;
2021 	if ((entryp->flags & (AIO_WRITE | AIO_FSYNC | AIO_DSYNC)) != 0) {
2022 		flag = FWRITE;
2023 	}
2024 
2025 	if ((entryp->flags & (AIO_READ | AIO_WRITE)) != 0) {
2026 		if (entryp->aiocb.aio_nbytes > INT_MAX ||
2027 		    entryp->aiocb.aio_buf == USER_ADDR_NULL ||
2028 		    entryp->aiocb.aio_offset < 0) {
2029 			return EINVAL;
2030 		}
2031 	}
2032 
2033 	result = aio_sigev_validate(&entryp->aiocb.aio_sigevent);
2034 	if (result) {
2035 		return result;
2036 	}
2037 
2038 	/* validate the file descriptor and that the file was opened
2039 	 * for the appropriate read / write access.
2040 	 */
2041 	proc_fdlock(p);
2042 
2043 	fp = fp_get_noref_locked(p, entryp->aiocb.aio_fildes);
2044 	if (fp == NULL) {
2045 		result = EBADF;
2046 	} else if ((fp->fp_glob->fg_flag & flag) == 0) {
2047 		/* we don't have read or write access */
2048 		result = EBADF;
2049 	} else if (FILEGLOB_DTYPE(fp->fp_glob) != DTYPE_VNODE) {
2050 		/* this is not a file */
2051 		result = ESPIPE;
2052 	} else {
2053 		fp->fp_flags |= FP_AIOISSUED;
2054 	}
2055 
2056 	proc_fdunlock(p);
2057 
2058 	return result;
2059 }
2060 
2061 /*
2062  * do_aio_completion_and_unlock.  Handle async IO completion.
2063  */
2064 static void
do_aio_completion_and_unlock(proc_t p,aio_workq_entry * entryp,aio_entry_flags_t reason)2065 do_aio_completion_and_unlock(proc_t p, aio_workq_entry *entryp,
2066     aio_entry_flags_t reason)
2067 {
2068 	aio_workq_entry *leader = entryp->lio_leader;
2069 	int              lio_pending = 0;
2070 	bool             do_signal, do_kevent;
2071 
2072 	ASSERT_AIO_PROC_LOCK_OWNED(p);
2073 
2074 	aio_proc_move_done_locked(p, entryp);
2075 	entryp->flags |= reason;
2076 
2077 	if (leader) {
2078 		lio_pending = --leader->lio_pending;
2079 		if (lio_pending < 0) {
2080 			panic("lio_pending accounting mistake");
2081 		}
2082 		if (lio_pending == 0 && (leader->flags & AIO_LIO_WAIT)) {
2083 			wakeup(leader);
2084 		}
2085 		entryp->lio_leader = NULL; /* no dangling pointers please */
2086 	}
2087 
2088 	/*
2089 	 * need to handle case where a process is trying to exit, exec, or
2090 	 * close and is currently waiting for active aio requests to complete.
2091 	 * If AIO_CLEANUP_WAIT is set then we need to look to see if there are any
2092 	 * other requests in the active queue for this process.  If there are
2093 	 * none then wakeup using the AIO_CLEANUP_SLEEP_CHAN tsleep channel.
2094 	 * If there are some still active then do nothing - we only want to
2095 	 * wakeup when all active aio requests for the process are complete.
2096 	 */
2097 	do_signal = do_kevent = false;
2098 	if (__improbable(entryp->flags & AIO_EXIT_WAIT)) {
2099 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait) | DBG_FUNC_NONE,
2100 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2101 		    0, 0, 0);
2102 
2103 		if (!aio_has_active_requests_for_process(p)) {
2104 			/*
2105 			 * no active aio requests for this process, continue exiting.  In this
2106 			 * case, there should be no one else waiting on the proc in AIO...
2107 			 */
2108 			wakeup_one((caddr_t)&p->AIO_CLEANUP_SLEEP_CHAN);
2109 
2110 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake) | DBG_FUNC_NONE,
2111 			    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2112 			    0, 0, 0);
2113 		}
2114 	} else if (entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2115 		/*
2116 		 * If this was the last request in the group, or not part of
2117 		 * a group, and that a signal is desired, send one.
2118 		 */
2119 		do_signal = (lio_pending == 0);
2120 	} else if (entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) {
2121 		/*
2122 		 * For SIGEV_KEVENT, every AIO (even it is part of a group) would get
2123 		 * a kevent notification.
2124 		 */
2125 		do_kevent = true;
2126 	}
2127 
2128 	if (__improbable(entryp->flags & AIO_CLOSE_WAIT)) {
2129 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait) | DBG_FUNC_NONE,
2130 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2131 		    0, 0, 0);
2132 
2133 		if (!aio_proc_has_active_requests_for_file(p, entryp->aiocb.aio_fildes)) {
2134 			/* Can't wakeup_one(); multiple closes might be in progress. */
2135 			wakeup(&p->AIO_CLEANUP_SLEEP_CHAN);
2136 
2137 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake) | DBG_FUNC_NONE,
2138 			    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2139 			    0, 0, 0);
2140 		}
2141 	}
2142 
2143 	aio_proc_unlock(p);
2144 
2145 	if (do_signal) {
2146 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_sig) | DBG_FUNC_NONE,
2147 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2148 		    entryp->aiocb.aio_sigevent.sigev_signo, 0, 0);
2149 
2150 		psignal(p, entryp->aiocb.aio_sigevent.sigev_signo);
2151 	} else if (do_kevent) {
2152 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_kevent) | DBG_FUNC_NONE,
2153 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2154 		    entryp->aiocb.aio_sigevent.sigev_signo, 0, 0);
2155 
2156 		/* We only support one event type for either completed/cancelled AIO. */
2157 		lck_mtx_lock(&aio_klist_lock);
2158 		KNOTE(&aio_klist, 1);
2159 		lck_mtx_unlock(&aio_klist_lock);
2160 	}
2161 
2162 	/*
2163 	 * A thread in aio_suspend() wants to known about completed IOs.  If it checked
2164 	 * the done list before we moved our AIO there, then it already asserted its wait,
2165 	 * and we can wake it up without holding the lock.  If it checked the list after
2166 	 * we did our move, then it already has seen the AIO that we moved.  Herego, we
2167 	 * can do our wakeup without holding the lock.
2168 	 */
2169 	wakeup(&p->AIO_SUSPEND_SLEEP_CHAN);
2170 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_suspend_wake) | DBG_FUNC_NONE,
2171 	    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp), 0, 0, 0);
2172 
2173 	aio_entry_unref(entryp); /* see aio_try_enqueue_work_locked */
2174 	if (leader) {
2175 		aio_entry_unref(leader); /* see lio_listio */
2176 	}
2177 }
2178 
2179 
2180 /*
2181  * do_aio_read
2182  */
2183 static int
do_aio_read(aio_workq_entry * entryp)2184 do_aio_read(aio_workq_entry *entryp)
2185 {
2186 	struct proc     *p = entryp->procp;
2187 	struct fileproc *fp;
2188 	int error;
2189 
2190 	if ((error = fp_lookup(p, entryp->aiocb.aio_fildes, &fp, 0))) {
2191 		return error;
2192 	}
2193 
2194 	if (fp->fp_glob->fg_flag & FREAD) {
2195 		error = dofileread(&entryp->context, fp,
2196 		    entryp->aiocb.aio_buf,
2197 		    entryp->aiocb.aio_nbytes,
2198 		    entryp->aiocb.aio_offset, FOF_OFFSET,
2199 		    &entryp->returnval);
2200 	} else {
2201 		error = EBADF;
2202 	}
2203 
2204 	fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2205 	return error;
2206 }
2207 
2208 
2209 /*
2210  * do_aio_write
2211  */
2212 static int
do_aio_write(aio_workq_entry * entryp)2213 do_aio_write(aio_workq_entry *entryp)
2214 {
2215 	struct proc     *p = entryp->procp;
2216 	struct fileproc *fp;
2217 	int error;
2218 
2219 	if ((error = fp_lookup(p, entryp->aiocb.aio_fildes, &fp, 0))) {
2220 		return error;
2221 	}
2222 
2223 	if (fp->fp_glob->fg_flag & FWRITE) {
2224 		int flags = 0;
2225 
2226 		if ((fp->fp_glob->fg_flag & O_APPEND) == 0) {
2227 			flags |= FOF_OFFSET;
2228 		}
2229 
2230 		/* NB: tell dofilewrite the offset, and to use the proc cred */
2231 		error = dofilewrite(&entryp->context,
2232 		    fp,
2233 		    entryp->aiocb.aio_buf,
2234 		    entryp->aiocb.aio_nbytes,
2235 		    entryp->aiocb.aio_offset,
2236 		    flags,
2237 		    &entryp->returnval);
2238 	} else {
2239 		error = EBADF;
2240 	}
2241 
2242 	fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2243 	return error;
2244 }
2245 
2246 
2247 /*
2248  * aio_has_active_requests_for_process - return whether the process has active
2249  * requests pending.
2250  */
2251 static bool
aio_has_active_requests_for_process(proc_t procp)2252 aio_has_active_requests_for_process(proc_t procp)
2253 {
2254 	return !TAILQ_EMPTY(&procp->p_aio_activeq);
2255 }
2256 
2257 /*
2258  * Called with the proc locked.
2259  */
2260 static bool
aio_proc_has_active_requests_for_file(proc_t procp,int fd)2261 aio_proc_has_active_requests_for_file(proc_t procp, int fd)
2262 {
2263 	aio_workq_entry *entryp;
2264 
2265 	TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) {
2266 		if (entryp->aiocb.aio_fildes == fd) {
2267 			return true;
2268 		}
2269 	}
2270 
2271 	return false;
2272 }
2273 
2274 
2275 /*
2276  * do_aio_fsync
2277  */
2278 static int
do_aio_fsync(aio_workq_entry * entryp)2279 do_aio_fsync(aio_workq_entry *entryp)
2280 {
2281 	struct proc            *p = entryp->procp;
2282 	struct vnode           *vp;
2283 	struct fileproc        *fp;
2284 	int                     sync_flag;
2285 	int                     error;
2286 
2287 	/*
2288 	 * We are never called unless either AIO_FSYNC or AIO_DSYNC are set.
2289 	 *
2290 	 * If AIO_DSYNC is set, we can tell the lower layers that it is OK
2291 	 * to mark for update the metadata not strictly necessary for data
2292 	 * retrieval, rather than forcing it to disk.
2293 	 *
2294 	 * If AIO_FSYNC is set, we have to also wait for metadata not really
2295 	 * necessary to data retrival are committed to stable storage (e.g.
2296 	 * atime, mtime, ctime, etc.).
2297 	 *
2298 	 * Metadata necessary for data retrieval ust be committed to stable
2299 	 * storage in either case (file length, etc.).
2300 	 */
2301 	if (entryp->flags & AIO_FSYNC) {
2302 		sync_flag = MNT_WAIT;
2303 	} else {
2304 		sync_flag = MNT_DWAIT;
2305 	}
2306 
2307 	error = fp_get_ftype(p, entryp->aiocb.aio_fildes, DTYPE_VNODE, ENOTSUP, &fp);
2308 	if (error != 0) {
2309 		entryp->returnval = -1;
2310 		return error;
2311 	}
2312 	vp = fp_get_data(fp);
2313 
2314 	if ((error = vnode_getwithref(vp)) == 0) {
2315 		error = VNOP_FSYNC(vp, sync_flag, &entryp->context);
2316 
2317 		(void)vnode_put(vp);
2318 	} else {
2319 		entryp->returnval = -1;
2320 	}
2321 
2322 	fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2323 	return error;
2324 }
2325 
2326 
2327 /*
2328  * is_already_queued - runs through our queues to see if the given
2329  * aiocbp / process is there.  Returns TRUE if there is a match
2330  * on any of our aio queues.
2331  *
2332  * Called with proc aio lock held (can be held spin)
2333  */
2334 static boolean_t
is_already_queued(proc_t procp,user_addr_t aiocbp)2335 is_already_queued(proc_t procp, user_addr_t aiocbp)
2336 {
2337 	aio_workq_entry *entryp;
2338 	boolean_t        result;
2339 
2340 	result = FALSE;
2341 
2342 	/* look for matches on our queue of async IO requests that have completed */
2343 	TAILQ_FOREACH(entryp, &procp->p_aio_doneq, aio_proc_link) {
2344 		if (aiocbp == entryp->uaiocbp) {
2345 			result = TRUE;
2346 			goto ExitThisRoutine;
2347 		}
2348 	}
2349 
2350 	/* look for matches on our queue of active async IO requests */
2351 	TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) {
2352 		if (aiocbp == entryp->uaiocbp) {
2353 			result = TRUE;
2354 			goto ExitThisRoutine;
2355 		}
2356 	}
2357 
2358 ExitThisRoutine:
2359 	return result;
2360 }
2361 
2362 
2363 /*
2364  * aio initialization
2365  */
2366 __private_extern__ void
aio_init(void)2367 aio_init(void)
2368 {
2369 	for (int i = 0; i < AIO_NUM_WORK_QUEUES; i++) {
2370 		aio_workq_init(&aio_anchor.aio_async_workqs[i]);
2371 	}
2372 
2373 	if (bootarg_aio_new_workq) {
2374 		printf("New aio workqueue implementation selected\n");
2375 	} else {
2376 		_aio_create_worker_threads(aio_worker_threads);
2377 	}
2378 
2379 	klist_init(&aio_klist);
2380 
2381 	clock_interval_to_absolutetime_interval(aio_wq_reduce_pool_window.usecs,
2382 	    NSEC_PER_USEC, &aio_wq_reduce_pool_window.abstime);
2383 }
2384 
2385 
2386 /*
2387  * aio worker threads created here.
2388  */
2389 __private_extern__ void
_aio_create_worker_threads(int num)2390 _aio_create_worker_threads(int num)
2391 {
2392 	int i;
2393 
2394 	/* create some worker threads to handle the async IO requests */
2395 	for (i = 0; i < num; i++) {
2396 		thread_t                myThread;
2397 
2398 		if (KERN_SUCCESS != kernel_thread_start(aio_work_thread, NULL, &myThread)) {
2399 			printf("%s - failed to create a work thread \n", __FUNCTION__);
2400 		} else {
2401 			thread_deallocate(myThread);
2402 		}
2403 	}
2404 }
2405 
2406 /*
2407  * Return the current activation utask
2408  */
2409 task_t
get_aiotask(void)2410 get_aiotask(void)
2411 {
2412 	return current_uthread()->uu_aio_task;
2413 }
2414 
2415 
2416 /*
2417  * In the case of an aiocb from a
2418  * 32-bit process we need to expand some longs and pointers to the correct
2419  * sizes in order to let downstream code always work on the same type of
2420  * aiocb (in our case that is a user_aiocb)
2421  */
2422 static void
do_munge_aiocb_user32_to_user(struct user32_aiocb * my_aiocbp,struct user_aiocb * the_user_aiocbp)2423 do_munge_aiocb_user32_to_user(struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp)
2424 {
2425 	the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
2426 	the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
2427 	the_user_aiocbp->aio_buf = CAST_USER_ADDR_T(my_aiocbp->aio_buf);
2428 	the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
2429 	the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
2430 	the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
2431 
2432 	/* special case here.  since we do not know if sigev_value is an */
2433 	/* int or a ptr we do NOT cast the ptr to a user_addr_t.   This  */
2434 	/* means if we send this info back to user space we need to remember */
2435 	/* sigev_value was not expanded for the 32-bit case.  */
2436 	/* NOTE - this does NOT affect us since we don't support sigev_value */
2437 	/* yet in the aio context.  */
2438 	//LP64
2439 	the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
2440 	the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
2441 	the_user_aiocbp->aio_sigevent.sigev_value.sival_ptr =
2442 	    my_aiocbp->aio_sigevent.sigev_value.sival_ptr;
2443 	the_user_aiocbp->aio_sigevent.sigev_notify_function =
2444 	    CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_function);
2445 	the_user_aiocbp->aio_sigevent.sigev_notify_attributes =
2446 	    CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_attributes);
2447 }
2448 
2449 /* Similar for 64-bit user process, so that we don't need to satisfy
2450  * the alignment constraints of the original user64_aiocb
2451  */
2452 #if !__LP64__
2453 __dead2
2454 #endif
2455 static void
do_munge_aiocb_user64_to_user(struct user64_aiocb * my_aiocbp,struct user_aiocb * the_user_aiocbp)2456 do_munge_aiocb_user64_to_user(struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp)
2457 {
2458 #if __LP64__
2459 	the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
2460 	the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
2461 	the_user_aiocbp->aio_buf = my_aiocbp->aio_buf;
2462 	the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
2463 	the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
2464 	the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
2465 
2466 	the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
2467 	the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
2468 	the_user_aiocbp->aio_sigevent.sigev_value.sival_ptr =
2469 	    my_aiocbp->aio_sigevent.sigev_value.sival_ptr;
2470 	the_user_aiocbp->aio_sigevent.sigev_notify_function =
2471 	    my_aiocbp->aio_sigevent.sigev_notify_function;
2472 	the_user_aiocbp->aio_sigevent.sigev_notify_attributes =
2473 	    my_aiocbp->aio_sigevent.sigev_notify_attributes;
2474 #else
2475 #pragma unused(my_aiocbp, the_user_aiocbp)
2476 	panic("64bit process on 32bit kernel is not supported");
2477 #endif
2478 }
2479 
2480 
2481 static int
filt_aioattach(struct knote * kn,struct kevent_qos_s * kev)2482 filt_aioattach(struct knote *kn, struct kevent_qos_s *kev)
2483 {
2484 	aio_workq_entry *entryp = (aio_workq_entry *)kev->data;
2485 
2486 	/* Don't allow kevent registration from the user-space. */
2487 	if ((kev->flags & EV_KERNEL) == 0) {
2488 		return EPERM;
2489 	}
2490 
2491 	kev->flags &= ~EV_KERNEL;
2492 	/* Clear the 'kn_fflags' state afte the knote has been processed. */
2493 	kn->kn_flags |= EV_CLEAR;
2494 
2495 	/* Associate the knote with the AIO work. */
2496 	knote_kn_hook_set_raw(kn, (void *)entryp);
2497 
2498 	lck_mtx_lock(&aio_klist_lock);
2499 	KNOTE_ATTACH(&aio_klist, kn);
2500 	lck_mtx_unlock(&aio_klist_lock);
2501 
2502 	return 0;
2503 }
2504 
2505 static void
filt_aiodetach(struct knote * kn)2506 filt_aiodetach(struct knote *kn)
2507 {
2508 	lck_mtx_lock(&aio_klist_lock);
2509 	KNOTE_DETACH(&aio_klist, kn);
2510 	lck_mtx_unlock(&aio_klist_lock);
2511 }
2512 
2513 /*
2514  * The 'f_event' is called with 'aio_klist_lock' held when KNOTE() was called
2515  * in do_aio_completion_and_unlock().
2516  */
2517 static int
filt_aioevent(struct knote * kn,long hint)2518 filt_aioevent(struct knote *kn, long hint)
2519 {
2520 	aio_workq_entry *entryp;
2521 	int activate = 0;
2522 
2523 	/*
2524 	 * The 'f_event' and 'f_process' can run concurrently so it is possible
2525 	 * the aio_workq_entry has been detached from the knote when the
2526 	 * filt_aioprocess() was called earlier. In this case, we will skip
2527 	 * activating the event.
2528 	 */
2529 	entryp = knote_kn_hook_get_raw(kn);
2530 	if (__improbable(entryp == NULL)) {
2531 		goto out;
2532 	}
2533 
2534 	/* We can only activate the filter if the AIO work has completed. */
2535 	if (entryp->flags & AIO_COMPLETED) {
2536 		kn->kn_fflags |= hint;
2537 		activate = FILTER_ACTIVE;
2538 	}
2539 
2540 out:
2541 	return activate;
2542 }
2543 
2544 static int
filt_aiotouch(struct knote * kn,struct kevent_qos_s * kev)2545 filt_aiotouch(struct knote *kn, struct kevent_qos_s *kev)
2546 {
2547 	panic("%s: kn %p kev %p (NOT EXPECTED TO BE CALLED!!)", __func__, kn, kev);
2548 }
2549 
2550 static int
filt_aioprocess(struct knote * kn,struct kevent_qos_s * kev)2551 filt_aioprocess(struct knote *kn, struct kevent_qos_s *kev)
2552 {
2553 	aio_workq_entry *entryp;
2554 	proc_t p;
2555 	int res = 0;
2556 
2557 	entryp = knote_kn_hook_get_raw(kn);
2558 	assert(entryp);
2559 	p = entryp->procp;
2560 
2561 	lck_mtx_lock(&aio_klist_lock);
2562 	if (kn->kn_fflags) {
2563 		/* Propagate the error status and return value back to the user. */
2564 		kn->kn_ext[0] = entryp->errorval;
2565 		kn->kn_ext[1] = entryp->returnval;
2566 		knote_fill_kevent(kn, kev, 0);
2567 		knote_kn_hook_set_raw(kn, NULL);
2568 
2569 		aio_proc_lock(p);
2570 		aio_proc_remove_done_locked(p, entryp);
2571 		aio_proc_unlock(p);
2572 		aio_entry_unref(entryp);
2573 
2574 		res = FILTER_ACTIVE;
2575 	}
2576 	lck_mtx_unlock(&aio_klist_lock);
2577 
2578 	return res;
2579 }
2580 
2581 SECURITY_READ_ONLY_EARLY(struct filterops) aio_filtops = {
2582 	.f_isfd = 0,
2583 	.f_attach = filt_aioattach,
2584 	.f_detach = filt_aiodetach,
2585 	.f_event = filt_aioevent,
2586 	.f_touch = filt_aiotouch,
2587 	.f_process = filt_aioprocess,
2588 };
2589 
2590 #pragma mark per process aio workqueue
2591 
2592 /*
2593  * The per process workq threads call this function to handle the aio request. The threads
2594  * belong to the same process so we don't need to change the vm maps as we would for kernel
2595  * threads.
2596  */
2597 static int
workq_aio_process_entry(aio_workq_entry * entryp)2598 workq_aio_process_entry(aio_workq_entry *entryp)
2599 {
2600 	proc_t p = entryp->procp;
2601 	int error;
2602 
2603 	assert(current_proc() == p && current_thread() != vfs_context_thread(&entryp->context));
2604 
2605 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_WQ_process_entry) | DBG_FUNC_START,
2606 	    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2607 	    entryp->flags, 0, 0);
2608 
2609 	if ((entryp->flags & AIO_READ) != 0) {
2610 		error = do_aio_read(entryp);
2611 	} else if ((entryp->flags & AIO_WRITE) != 0) {
2612 		error = do_aio_write(entryp);
2613 	} else if ((entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0) {
2614 		if ((entryp->flags & AIO_FSYNC) != 0) {
2615 			/*
2616 			 * Check for unfinished operations on the same file
2617 			 * in this proc's queue.
2618 			 */
2619 			aio_proc_lock_spin(p);
2620 			if (aio_delay_fsync_request(entryp)) {
2621 				/* It needs to be delayed.  Put it back on the end of the work queue */
2622 				KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay) | DBG_FUNC_NONE,
2623 				    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2624 				    0, 0, 0);
2625 
2626 				/* The references on this entry havn't been consumed */
2627 				if (!workq_aio_entry_add_locked(p, entryp)) {
2628 					entryp->errorval = ECANCELED;
2629 					entryp->returnval = -1;
2630 
2631 					/* Now it's officially cancelled.  Do the completion */
2632 					KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq) | DBG_FUNC_NONE,
2633 					    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2634 					    entryp->aiocb.aio_fildes, 0, 0);
2635 
2636 					do_aio_completion_and_unlock(p, entryp, AIO_CANCELLED);
2637 				} else {
2638 					workq_aio_wakeup_thread_and_unlock(p);
2639 				}
2640 				return 0;
2641 			}
2642 			aio_proc_unlock(entryp->procp);
2643 		}
2644 		error = do_aio_fsync(entryp);
2645 	} else {
2646 		error = EINVAL;
2647 	}
2648 
2649 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_WQ_process_entry) | DBG_FUNC_END,
2650 	    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2651 	    entryp->errorval, entryp->returnval, 0);
2652 
2653 	/* we're done with the IO request so pop it off the active queue and */
2654 	/* push it on the done queue */
2655 	aio_proc_lock(p);
2656 	entryp->errorval = error;
2657 	do_aio_completion_and_unlock(p, entryp, AIO_COMPLETED);
2658 	return 0;
2659 }
2660 
2661 /*
2662  * The functions below implement a workqueue for aio which is taken from the
2663  * workqueue implementation for libdispatch/pthreads. They are stripped down versions
2664  * of the corresponding functions for libdispatch/pthreads.
2665  */
2666 
2667 static int
2668 aio_workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
2669 {
2670 #pragma unused(arg2)
2671 	struct aio_workq_usec_var *v = arg1;
2672 	int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
2673 	if (error || !req->newptr) {
2674 		return error;
2675 	}
2676 	clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
2677 	    &v->abstime);
2678 	return 0;
2679 }
2680 
2681 #pragma mark wq_flags
2682 
2683 #define AIO_WQ_DEAD 0x1000
2684 
2685 static inline uint32_t
_wa_flags(workq_aio_t wq_aio)2686 _wa_flags(workq_aio_t wq_aio)
2687 {
2688 	return os_atomic_load(&wq_aio->wa_flags, relaxed);
2689 }
2690 
2691 static inline bool
_wq_exiting(workq_aio_t wq_aio)2692 _wq_exiting(workq_aio_t wq_aio)
2693 {
2694 	return _wa_flags(wq_aio) & WQ_EXITING;
2695 }
2696 
2697 static inline bool
_wq_dead(workq_aio_t wq_aio)2698 _wq_dead(workq_aio_t wq_aio)
2699 {
2700 	return _wa_flags(wq_aio) & AIO_WQ_DEAD;
2701 }
2702 
2703 #define AIO_WQPTR_IS_INITING_VALUE ((workq_aio_t)~(uintptr_t)0)
2704 
2705 static workq_aio_t
proc_get_aio_wqptr_fast(struct proc * p)2706 proc_get_aio_wqptr_fast(struct proc *p)
2707 {
2708 	return os_atomic_load(&p->p_aio_wqptr, relaxed);
2709 }
2710 
2711 static workq_aio_t
proc_get_aio_wqptr(struct proc * p)2712 proc_get_aio_wqptr(struct proc *p)
2713 {
2714 	workq_aio_t wq_aio = proc_get_aio_wqptr_fast(p);
2715 	return wq_aio == AIO_WQPTR_IS_INITING_VALUE ? NULL : wq_aio;
2716 }
2717 
2718 static void
proc_set_aio_wqptr(struct proc * p,workq_aio_t wq_aio)2719 proc_set_aio_wqptr(struct proc *p, workq_aio_t wq_aio)
2720 {
2721 	wq_aio = os_atomic_xchg(&p->p_aio_wqptr, wq_aio, release);
2722 	if (wq_aio == AIO_WQPTR_IS_INITING_VALUE) {
2723 		proc_lock(p);
2724 		thread_wakeup(&p->p_aio_wqptr);
2725 		proc_unlock(p);
2726 	}
2727 }
2728 
2729 static bool
proc_init_aio_wqptr_or_wait(struct proc * p)2730 proc_init_aio_wqptr_or_wait(struct proc *p)
2731 {
2732 	workq_aio_t wq_aio;
2733 
2734 	proc_lock(p);
2735 	wq_aio = os_atomic_load(&p->p_aio_wqptr, relaxed);
2736 
2737 	if (wq_aio == NULL) {
2738 		os_atomic_store(&p->p_aio_wqptr, AIO_WQPTR_IS_INITING_VALUE, relaxed);
2739 		proc_unlock(p);
2740 		return true;
2741 	}
2742 
2743 	if (wq_aio == AIO_WQPTR_IS_INITING_VALUE) {
2744 		assert_wait(&p->p_aio_wqptr, THREAD_UNINT);
2745 		proc_unlock(p);
2746 		thread_block(THREAD_CONTINUE_NULL);
2747 	} else {
2748 		proc_unlock(p);
2749 	}
2750 	return false;
2751 }
2752 
2753 static inline event_t
workq_aio_parked_wait_event(struct uthread * uth)2754 workq_aio_parked_wait_event(struct uthread *uth)
2755 {
2756 	return (event_t)&uth->uu_workq_stackaddr;
2757 }
2758 
2759 static inline void
workq_aio_thread_wakeup(struct uthread * uth)2760 workq_aio_thread_wakeup(struct uthread *uth)
2761 {
2762 	thread_wakeup_thread(workq_aio_parked_wait_event(uth), get_machthread(uth));
2763 }
2764 
2765 /*
2766  * Routine:	workq_aio_mark_exiting
2767  *
2768  * Function:	Mark the work queue such that new threads will not be added to the
2769  *		work queue after we return.
2770  *
2771  * Conditions:	Called against the current process.
2772  */
2773 static void
workq_aio_mark_exiting(proc_t p)2774 workq_aio_mark_exiting(proc_t p)
2775 {
2776 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
2777 	uint32_t wq_flags;
2778 
2779 	if (!wq_aio) {
2780 		return;
2781 	}
2782 
2783 	wq_flags = os_atomic_or_orig(&wq_aio->wa_flags, WQ_EXITING, relaxed);
2784 	if (__improbable(wq_flags & WQ_EXITING)) {
2785 		panic("workq_aio_mark_exiting_locked called twice");
2786 	}
2787 
2788 	/*
2789 	 * Opportunistically try to cancel thread calls that are likely in flight.
2790 	 * workq_aio_exit() will do the proper cleanup.
2791 	 */
2792 	if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
2793 		thread_call_cancel(wq_aio->wa_death_call);
2794 	}
2795 }
2796 
2797 static void
workq_aio_exit(proc_t p)2798 workq_aio_exit(proc_t p)
2799 {
2800 	workq_aio_t wq_aio;
2801 
2802 	wq_aio = os_atomic_xchg(&p->p_aio_wqptr, NULL, release);
2803 
2804 	if (!wq_aio) {
2805 		return;
2806 	}
2807 
2808 	/*
2809 	 * Thread calls are always scheduled by the proc itself or under the
2810 	 * workqueue spinlock if WQ_EXITING is not yet set.
2811 	 *
2812 	 * Either way, when this runs, the proc has no threads left beside
2813 	 * the one running this very code, so we know no thread call can be
2814 	 * dispatched anymore.
2815 	 */
2816 
2817 	thread_call_cancel_wait(wq_aio->wa_death_call);
2818 	thread_call_free(wq_aio->wa_death_call);
2819 
2820 	/*
2821 	 * Clean up workqueue data structures for threads that exited and
2822 	 * didn't get a chance to clean up after themselves.
2823 	 *
2824 	 * idle/new threads should have been interrupted and died on their own
2825 	 */
2826 
2827 	assert(TAILQ_EMPTY(&wq_aio->wa_aioq_entries));
2828 	assert(TAILQ_EMPTY(&wq_aio->wa_thrunlist));
2829 
2830 	if (wq_aio->wa_nthreads) {
2831 		os_atomic_or(&wq_aio->wa_flags, AIO_WQ_DEAD, relaxed);
2832 		aio_proc_lock_spin(p);
2833 		if (wq_aio->wa_nthreads) {
2834 			struct uthread *uth;
2835 
2836 			TAILQ_FOREACH(uth, &wq_aio->wa_thidlelist, uu_workq_entry) {
2837 				if (uth->uu_workq_flags & UT_WORKQ_DYING) {
2838 					workq_aio_thread_wakeup(uth);
2839 					continue;
2840 				}
2841 				wq_aio->wa_thdying_count++;
2842 				uth->uu_workq_flags |= UT_WORKQ_DYING;
2843 				workq_aio_thread_wakeup(uth);
2844 			}
2845 			while (wq_aio->wa_nthreads) {
2846 				msleep(&wq_aio->wa_nthreads, aio_proc_mutex(p), PRIBIO | PSPIN, "aio_workq_exit", 0);
2847 			}
2848 		}
2849 		aio_proc_unlock(p);
2850 	}
2851 
2852 	assertf(TAILQ_EMPTY(&wq_aio->wa_thidlelist),
2853 	    "wa_thidlecount = %d, wa_thdying_count = %d",
2854 	    wq_aio->wa_thidlecount, wq_aio->wa_thdying_count);
2855 	assertf(wq_aio->wa_thidlecount == 0,
2856 	    "wa_thidlecount = %d, wa_thdying_count = %d",
2857 	    wq_aio->wa_thidlecount, wq_aio->wa_thdying_count);
2858 	assertf(wq_aio->wa_thdying_count == 0,
2859 	    "wa_thdying_count = %d", wq_aio->wa_thdying_count);
2860 
2861 	kfree_type(workq_aio_s, wq_aio);
2862 }
2863 
2864 static int
workq_aio_open(struct proc * p)2865 workq_aio_open(struct proc *p)
2866 {
2867 	workq_aio_t wq_aio;
2868 	int error = 0;
2869 
2870 	if (proc_get_aio_wqptr(p) == NULL) {
2871 		if (proc_init_aio_wqptr_or_wait(p) == FALSE) {
2872 			assert(proc_get_aio_wqptr(p) != NULL);
2873 			goto out;
2874 		}
2875 
2876 		wq_aio = kalloc_type(workq_aio_s, Z_WAITOK | Z_ZERO);
2877 
2878 		wq_aio->wa_proc = p;
2879 
2880 		TAILQ_INIT(&wq_aio->wa_thidlelist);
2881 		TAILQ_INIT(&wq_aio->wa_thrunlist);
2882 		TAILQ_INIT(&wq_aio->wa_aioq_entries);
2883 
2884 		wq_aio->wa_death_call = thread_call_allocate_with_options(
2885 			workq_aio_kill_old_threads_call, wq_aio,
2886 			THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
2887 
2888 		proc_set_aio_wqptr(p, wq_aio);
2889 	}
2890 out:
2891 	return error;
2892 }
2893 
2894 #pragma mark aio workqueue idle thread accounting
2895 
2896 static inline struct uthread *
workq_oldest_killable_idle_aio_thread(workq_aio_t wq_aio)2897 workq_oldest_killable_idle_aio_thread(workq_aio_t wq_aio)
2898 {
2899 	return TAILQ_LAST(&wq_aio->wa_thidlelist, workq_aio_uthread_head);
2900 }
2901 
2902 static inline uint64_t
workq_kill_delay_for_idle_aio_thread()2903 workq_kill_delay_for_idle_aio_thread()
2904 {
2905 	return aio_wq_reduce_pool_window.abstime;
2906 }
2907 
2908 static inline bool
workq_should_kill_idle_aio_thread(struct uthread * uth,uint64_t now)2909 workq_should_kill_idle_aio_thread(struct uthread *uth, uint64_t now)
2910 {
2911 	uint64_t delay = workq_kill_delay_for_idle_aio_thread();
2912 	return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
2913 }
2914 
2915 static void
workq_aio_death_call_schedule(workq_aio_t wq_aio,uint64_t deadline)2916 workq_aio_death_call_schedule(workq_aio_t wq_aio, uint64_t deadline)
2917 {
2918 	uint32_t wa_flags = os_atomic_load(&wq_aio->wa_flags, relaxed);
2919 
2920 	if (wa_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
2921 		return;
2922 	}
2923 	os_atomic_or(&wq_aio->wa_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
2924 
2925 	/*
2926 	 * <rdar://problem/13139182> Due to how long term timers work, the leeway
2927 	 * can't be too short, so use 500ms which is long enough that we will not
2928 	 * wake up the CPU for killing threads, but short enough that it doesn't
2929 	 * fall into long-term timer list shenanigans.
2930 	 */
2931 	thread_call_enter_delayed_with_leeway(wq_aio->wa_death_call, NULL, deadline,
2932 	    aio_wq_reduce_pool_window.abstime / 10,
2933 	    THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
2934 }
2935 
2936 /*
2937  * `decrement` is set to the number of threads that are no longer dying:
2938  * - because they have been resuscitated just in time (workq_pop_idle_thread)
2939  * - or have been killed (workq_thread_terminate).
2940  */
2941 static void
workq_aio_death_policy_evaluate(workq_aio_t wq_aio,uint16_t decrement)2942 workq_aio_death_policy_evaluate(workq_aio_t wq_aio, uint16_t decrement)
2943 {
2944 	struct uthread *uth;
2945 
2946 	assert(wq_aio->wa_thdying_count >= decrement);
2947 #if 0
2948 	if (decrement) {
2949 		printf("VV_DEBUG_AIO : %s:%d : pid = %d, ctid = %d, after decrement thdying_count = %d\n",
2950 		    __FUNCTION__, __LINE__, proc_pid(current_proc()), thread_get_ctid(thr),
2951 		    wq_aio->wa_thdying_count - decrement);
2952 	}
2953 #endif
2954 
2955 	if ((wq_aio->wa_thdying_count -= decrement) > 0) {
2956 		return;
2957 	}
2958 
2959 	if (wq_aio->wa_thidlecount <= 1) {
2960 		return;
2961 	}
2962 
2963 	if (((uth = workq_oldest_killable_idle_aio_thread(wq_aio)) == NULL)) {
2964 		return;
2965 	}
2966 
2967 	uint64_t now = mach_absolute_time();
2968 	uint64_t delay = workq_kill_delay_for_idle_aio_thread();
2969 
2970 	if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
2971 		if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
2972 			wq_aio->wa_thdying_count++;
2973 			uth->uu_workq_flags |= UT_WORKQ_DYING;
2974 		}
2975 		workq_aio_thread_wakeup(uth);
2976 		return;
2977 	}
2978 
2979 	workq_aio_death_call_schedule(wq_aio,
2980 	    uth->uu_save.uus_workq_park_data.idle_stamp + delay);
2981 }
2982 
2983 static void
workq_aio_kill_old_threads_call(void * param0,void * param1 __unused)2984 workq_aio_kill_old_threads_call(void *param0, void *param1 __unused)
2985 {
2986 	workq_aio_t wq_aio = param0;
2987 
2988 	aio_proc_lock_spin(wq_aio->wa_proc);
2989 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_death_call | DBG_FUNC_START, wq_aio);
2990 	os_atomic_andnot(&wq_aio->wa_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
2991 	workq_aio_death_policy_evaluate(wq_aio, 0);
2992 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_death_call | DBG_FUNC_END, wq_aio);
2993 	aio_proc_unlock(wq_aio->wa_proc);;
2994 }
2995 
2996 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
2997 #define WQ_SETUP_NONE  0
2998 
2999 __attribute__((noreturn, noinline))
3000 static void
workq_aio_unpark_for_death_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t death_flags,__unused uint32_t setup_flags)3001 workq_aio_unpark_for_death_and_unlock(proc_t p, workq_aio_t wq_aio,
3002     struct uthread *uth, uint32_t death_flags, __unused uint32_t setup_flags)
3003 {
3004 	if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
3005 		wq_aio->wa_thidlecount--;
3006 		TAILQ_REMOVE(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3007 	}
3008 
3009 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3010 		wq_aio->wa_thdying_count--;
3011 	}
3012 	assert(wq_aio->wa_nthreads > 0);
3013 	wq_aio->wa_nthreads--;
3014 
3015 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_terminate | DBG_FUNC_NONE, wq_aio);
3016 
3017 	if (_wq_dead(wq_aio) && (wq_aio->wa_nthreads == 0)) {
3018 		wakeup(&wq_aio->wa_nthreads);
3019 	}
3020 
3021 	aio_proc_unlock(p);
3022 
3023 	thread_t th = get_machthread(uth);
3024 	assert(th == current_thread());
3025 
3026 	thread_deallocate(th);
3027 	thread_terminate(th);
3028 	thread_exception_return();
3029 	__builtin_unreachable();
3030 }
3031 
3032 static void
workq_push_idle_aio_thread(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3033 workq_push_idle_aio_thread(proc_t p, workq_aio_t wq_aio, struct uthread *uth,
3034     uint32_t setup_flags)
3035 {
3036 	uint64_t now = mach_absolute_time();
3037 
3038 	uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING);
3039 	TAILQ_REMOVE(&wq_aio->wa_thrunlist, uth, uu_workq_entry);
3040 
3041 	uth->uu_save.uus_workq_park_data.idle_stamp = now;
3042 
3043 	struct uthread *oldest = workq_oldest_killable_idle_aio_thread(wq_aio);
3044 	uint16_t cur_idle = wq_aio->wa_thidlecount;
3045 
3046 	if (_wq_exiting(wq_aio) || (wq_aio->wa_thdying_count == 0 && oldest &&
3047 	    workq_should_kill_idle_aio_thread(oldest, now))) {
3048 		/*
3049 		 * Immediately kill threads if we have too may of them.
3050 		 *
3051 		 * And swap "place" with the oldest one we'd have woken up.
3052 		 * This is a relatively desperate situation where we really
3053 		 * need to kill threads quickly and it's best to kill
3054 		 * the one that's currently on core than context switching.
3055 		 */
3056 		if (oldest) {
3057 			oldest->uu_save.uus_workq_park_data.idle_stamp = now;
3058 			TAILQ_REMOVE(&wq_aio->wa_thidlelist, oldest, uu_workq_entry);
3059 			TAILQ_INSERT_HEAD(&wq_aio->wa_thidlelist, oldest, uu_workq_entry);
3060 		}
3061 
3062 		if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3063 			wq_aio->wa_thdying_count++;
3064 			uth->uu_workq_flags |= UT_WORKQ_DYING;
3065 		}
3066 		workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth, 0, setup_flags);
3067 		__builtin_unreachable();
3068 	}
3069 
3070 	struct uthread *tail = TAILQ_LAST(&wq_aio->wa_thidlelist, workq_aio_uthread_head);
3071 
3072 	cur_idle += 1;
3073 	wq_aio->wa_thidlecount = cur_idle;
3074 	uth->uu_save.uus_workq_park_data.has_stack = false;
3075 	TAILQ_INSERT_HEAD(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3076 
3077 	if (!tail) {
3078 		uint64_t delay = workq_kill_delay_for_idle_aio_thread();
3079 		workq_aio_death_call_schedule(wq_aio, now + delay);
3080 	}
3081 }
3082 
3083 /*
3084  * We have no work to do, park ourselves on the idle list.
3085  *
3086  * Consumes the workqueue lock and does not return.
3087  */
3088 __attribute__((noreturn, noinline))
3089 static void
workq_aio_park_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3090 workq_aio_park_and_unlock(proc_t p, workq_aio_t wq_aio, struct uthread *uth,
3091     uint32_t setup_flags)
3092 {
3093 	assert(uth == current_uthread());
3094 	assert(uth->uu_kqr_bound == NULL);
3095 
3096 	workq_push_idle_aio_thread(p, wq_aio, uth, setup_flags); // may not return
3097 
3098 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3099 		workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth,
3100 		    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
3101 		__builtin_unreachable();
3102 	}
3103 
3104 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_park | DBG_FUNC_NONE, wq_aio);
3105 
3106 	thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue);
3107 	/* XXX this should probably be THREAD_UNINT */
3108 	assert_wait(workq_aio_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
3109 	aio_proc_unlock(p);
3110 	thread_block(workq_aio_unpark_continue);
3111 	__builtin_unreachable();
3112 }
3113 
3114 #define WORKQ_POLICY_INIT(qos) \
3115 	         (struct uu_workq_policy){ .qos_req = (qos), .qos_bucket = (qos) }
3116 
3117 /*
3118  * This function is always called with the workq lock.
3119  */
3120 static void
workq_aio_thread_reset_pri(struct uthread * uth,thread_t src_th)3121 workq_aio_thread_reset_pri(struct uthread *uth, thread_t src_th)
3122 {
3123 	thread_t th = get_machthread(uth);
3124 	thread_qos_t qos = (thread_qos_t)proc_get_effective_thread_policy(src_th, TASK_POLICY_QOS);
3125 	int priority = 31;
3126 	int policy = POLICY_TIMESHARE;
3127 
3128 	uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
3129 	thread_set_workq_pri(th, qos, priority, policy);
3130 }
3131 
3132 static inline void
workq_aio_thread_set_type(struct uthread * uth,uint16_t flags)3133 workq_aio_thread_set_type(struct uthread *uth, uint16_t flags)
3134 {
3135 	uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
3136 	uth->uu_workq_flags |= flags;
3137 }
3138 
3139 __attribute__((noreturn, noinline))
3140 static void
workq_aio_unpark_select_req_or_park_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3141 workq_aio_unpark_select_req_or_park_and_unlock(proc_t p, workq_aio_t wq_aio,
3142     struct uthread *uth, uint32_t setup_flags)
3143 {
3144 	aio_workq_entry *entryp;
3145 	thread_t last_thread = NULL;
3146 
3147 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_select_req | DBG_FUNC_START, wq_aio);
3148 	thread_freeze_base_pri(get_machthread(uth));
3149 	workq_aio_thread_set_type(uth, 0);
3150 	while ((entryp = TAILQ_FIRST(&wq_aio->wa_aioq_entries))) {
3151 		if (__improbable(_wq_exiting(wq_aio))) {
3152 			break;
3153 		}
3154 
3155 		TAILQ_REMOVE(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3156 		entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
3157 
3158 		aio_proc_unlock(p);
3159 
3160 		thread_t thr = vfs_context_thread(&entryp->context);
3161 		if (last_thread != thr) {
3162 			workq_aio_thread_reset_pri(uth, thr);
3163 			last_thread = thr;
3164 		}
3165 
3166 		/* this frees references to workq entry */
3167 		workq_aio_process_entry(entryp);
3168 
3169 		ast_check_async_thread();
3170 
3171 		aio_proc_lock_spin(p);
3172 	}
3173 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_select_req | DBG_FUNC_END, wq_aio);
3174 	thread_unfreeze_base_pri(get_machthread(uth));
3175 	workq_aio_park_and_unlock(p, wq_aio, uth, setup_flags);
3176 }
3177 
3178 /*
3179  * parked idle thread wakes up
3180  */
3181 __attribute__((noreturn, noinline))
3182 static void
workq_aio_unpark_continue(void * parameter __unused,wait_result_t wr)3183 workq_aio_unpark_continue(void *parameter __unused, wait_result_t wr)
3184 {
3185 	thread_t th = current_thread();
3186 	struct uthread *uth = get_bsdthread_info(th);
3187 	proc_t p = current_proc();
3188 	workq_aio_t wq_aio = proc_get_aio_wqptr_fast(p);
3189 
3190 	aio_proc_lock_spin(p);
3191 
3192 	if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
3193 		workq_aio_unpark_select_req_or_park_and_unlock(p, wq_aio, uth, WQ_SETUP_NONE);
3194 		__builtin_unreachable();
3195 	}
3196 
3197 	if (__probable(wr == THREAD_AWAKENED)) {
3198 		/*
3199 		 * We were set running, but for the purposes of dying.
3200 		 */
3201 		assert(uth->uu_workq_flags & UT_WORKQ_DYING);
3202 		assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
3203 	} else {
3204 		/*
3205 		 * workaround for <rdar://problem/38647347>,
3206 		 * in case we do hit userspace, make sure calling
3207 		 * workq_thread_terminate() does the right thing here,
3208 		 * and if we never call it, that workq_exit() will too because it sees
3209 		 * this thread on the runlist.
3210 		 */
3211 		assert(wr == THREAD_INTERRUPTED);
3212 
3213 		if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3214 			wq_aio->wa_thdying_count++;
3215 			uth->uu_workq_flags |= UT_WORKQ_DYING;
3216 		}
3217 	}
3218 
3219 	workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth,
3220 	    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE);
3221 
3222 	__builtin_unreachable();
3223 }
3224 
3225 /*
3226  * Called by thread_create_workq_aio_waiting() during thread initialization, before
3227  * assert_wait, before the thread has been started.
3228  */
3229 event_t
aio_workq_thread_init_and_wq_lock(task_t task,thread_t th)3230 aio_workq_thread_init_and_wq_lock(task_t task, thread_t th)
3231 {
3232 	struct uthread *uth = get_bsdthread_info(th);
3233 
3234 	uth->uu_workq_flags = UT_WORKQ_NEW;
3235 	uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
3236 	uth->uu_workq_thport = MACH_PORT_NULL;
3237 	uth->uu_workq_stackaddr = 0;
3238 	uth->uu_workq_pthread_kill_allowed = 0;
3239 
3240 	thread_set_tag(th, THREAD_TAG_AIO_WORKQUEUE);
3241 	thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
3242 
3243 	aio_proc_lock(get_bsdtask_info(task));
3244 	return workq_aio_parked_wait_event(uth);
3245 }
3246 
3247 /**
3248  * Try to add a new workqueue thread for aio.
3249  *
3250  * - called with workq lock held
3251  * - dropped and retaken around thread creation
3252  * - return with workq lock held
3253  * - aio threads do not call into pthread functions to setup or destroy stacks.
3254  */
3255 static kern_return_t
workq_aio_add_new_thread(proc_t p,workq_aio_t wq_aio)3256 workq_aio_add_new_thread(proc_t p, workq_aio_t wq_aio)
3257 {
3258 	kern_return_t kret;
3259 	thread_t th;
3260 
3261 	wq_aio->wa_nthreads++;
3262 
3263 	aio_proc_unlock(p);
3264 
3265 	kret = thread_create_aio_workq_waiting(proc_task(p),
3266 	    workq_aio_unpark_continue,
3267 	    &th);
3268 
3269 	if (kret != KERN_SUCCESS) {
3270 		WQ_AIO_TRACE(AIO_WQ_aio_thread_create_failed | DBG_FUNC_NONE, wq_aio,
3271 		    kret, 0, 0, 0);
3272 		goto out;
3273 	}
3274 
3275 	/*
3276 	 * thread_create_aio_workq_waiting() will return with the wq lock held
3277 	 * on success, because it calls workq_thread_init_and_wq_lock().
3278 	 */
3279 	struct uthread *uth = get_bsdthread_info(th);
3280 	TAILQ_INSERT_TAIL(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3281 	wq_aio->wa_thidlecount++;
3282 	uth->uu_workq_flags &= ~UT_WORKQ_NEW;
3283 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_create | DBG_FUNC_NONE, wq_aio);
3284 	return kret;
3285 
3286 out:
3287 	aio_proc_lock(p);
3288 	/*
3289 	 * Do not redrive here if we went under wq_max_threads again,
3290 	 * it is the responsibility of the callers of this function
3291 	 * to do so when it fails.
3292 	 */
3293 	wq_aio->wa_nthreads--;
3294 	return kret;
3295 }
3296 
3297 static void
workq_aio_wakeup_thread_internal(proc_t p,bool unlock)3298 workq_aio_wakeup_thread_internal(proc_t p, bool unlock)
3299 {
3300 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3301 	bool needs_wakeup = false;
3302 	struct uthread *uth = NULL;
3303 
3304 	if (!wq_aio) {
3305 		goto out;
3306 	}
3307 
3308 	uth = TAILQ_FIRST(&wq_aio->wa_thidlelist);
3309 	while (!uth && (wq_aio->wa_nthreads < WORKQUEUE_AIO_MAXTHREADS) &&
3310 	    !(thread_get_tag(current_thread()) & THREAD_TAG_AIO_WORKQUEUE)) {
3311 		if (workq_aio_add_new_thread(p, wq_aio) != KERN_SUCCESS) {
3312 			break;
3313 		}
3314 		uth = TAILQ_FIRST(&wq_aio->wa_thidlelist);
3315 	}
3316 
3317 	if (!uth) {
3318 		goto out;
3319 	}
3320 
3321 	TAILQ_REMOVE(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3322 	wq_aio->wa_thidlecount--;
3323 
3324 	TAILQ_INSERT_TAIL(&wq_aio->wa_thrunlist, uth, uu_workq_entry);
3325 	assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
3326 	uth->uu_workq_flags |= UT_WORKQ_RUNNING;
3327 
3328 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_wakeup | DBG_FUNC_NONE, wq_aio);
3329 
3330 	if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3331 		uth->uu_workq_flags ^= UT_WORKQ_DYING;
3332 		workq_aio_death_policy_evaluate(wq_aio, 1);
3333 		needs_wakeup = false;
3334 	} else {
3335 		needs_wakeup = true;
3336 	}
3337 out:
3338 	if (unlock) {
3339 		aio_proc_unlock(p);
3340 	}
3341 
3342 	if (uth && needs_wakeup) {
3343 		workq_aio_thread_wakeup(uth);
3344 	}
3345 }
3346 
3347 static void
workq_aio_wakeup_thread_and_unlock(proc_t p)3348 workq_aio_wakeup_thread_and_unlock(proc_t p)
3349 {
3350 	return workq_aio_wakeup_thread_internal(p, true);
3351 }
3352 
3353 static void
workq_aio_wakeup_thread(proc_t p)3354 workq_aio_wakeup_thread(proc_t p)
3355 {
3356 	return workq_aio_wakeup_thread_internal(p, false);
3357 }
3358 
3359 void
workq_aio_prepare(struct proc * p)3360 workq_aio_prepare(struct proc *p)
3361 {
3362 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3363 
3364 	if (__improbable(!wq_aio && !proc_in_teardown(p))) {
3365 		workq_aio_open(p);
3366 	}
3367 }
3368 
3369 bool
workq_aio_entry_add_locked(struct proc * p,aio_workq_entry * entryp)3370 workq_aio_entry_add_locked(struct proc *p, aio_workq_entry *entryp)
3371 {
3372 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3373 	bool ret = false;
3374 
3375 	ASSERT_AIO_PROC_LOCK_OWNED(p);
3376 
3377 	if (!proc_in_teardown(p) && wq_aio && !_wq_exiting(wq_aio)) {
3378 		TAILQ_INSERT_TAIL(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3379 		ret = true;
3380 	}
3381 
3382 	return ret;
3383 }
3384 
3385 bool
workq_aio_entry_remove_locked(struct proc * p,aio_workq_entry * entryp)3386 workq_aio_entry_remove_locked(struct proc *p, aio_workq_entry *entryp)
3387 {
3388 	workq_aio_t  wq_aio = proc_get_aio_wqptr(p);
3389 
3390 	ASSERT_AIO_PROC_LOCK_OWNED(p);
3391 
3392 	if (entryp->aio_workq_link.tqe_prev == NULL) {
3393 		panic("Trying to remove an entry from a work queue, but it is not on a queue");
3394 	}
3395 
3396 	TAILQ_REMOVE(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3397 	entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
3398 
3399 	return true;
3400 }
3401