xref: /xnu-12377.1.9/bsd/kern/kern_aio.c (revision f6217f891ac0bb64f3d375211650a4c1ff8ca1ea)
1 /*
2  * Copyright (c) 2003-2024 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 
29 /*
30  * This file contains support for the POSIX 1003.1B AIO/LIO facility.
31  */
32 
33 #include <sys/systm.h>
34 #include <sys/fcntl.h>
35 #include <sys/file_internal.h>
36 #include <sys/filedesc.h>
37 #include <sys/kdebug.h>
38 #include <sys/kernel.h>
39 #include <sys/vnode_internal.h>
40 #include <sys/kauth.h>
41 #include <sys/mount_internal.h>
42 #include <sys/param.h>
43 #include <sys/proc_internal.h>
44 #include <sys/sysctl.h>
45 #include <sys/unistd.h>
46 #include <sys/user.h>
47 
48 #include <sys/aio_kern.h>
49 #include <sys/sysproto.h>
50 
51 #include <machine/limits.h>
52 
53 #include <mach/mach_types.h>
54 #include <kern/kern_types.h>
55 #include <kern/waitq.h>
56 #include <kern/zalloc.h>
57 #include <kern/task.h>
58 #include <kern/sched_prim.h>
59 
60 #include <vm/vm_map_xnu.h>
61 
62 #include <os/refcnt.h>
63 
64 #include <kern/thread.h>
65 #include <kern/policy_internal.h>
66 #include <pthread/workqueue_internal.h>
67 
68 #if 0
69 #undef KERNEL_DEBUG
70 #define KERNEL_DEBUG KERNEL_DEBUG_CONSTANT
71 #endif
72 
73 #define AIO_work_queued                 1
74 #define AIO_worker_wake                 2
75 #define AIO_completion_sig              3
76 #define AIO_completion_kevent           4
77 #define AIO_completion_cleanup_wait     5
78 #define AIO_completion_cleanup_wake     6
79 #define AIO_completion_suspend_wake     7
80 #define AIO_cancel                      10
81 #define AIO_cancel_async_workq          11
82 #define AIO_cancel_sync_workq           12
83 #define AIO_cancel_activeq              13
84 #define AIO_cancel_doneq                14
85 #define AIO_fsync                       20
86 #define AIO_fsync_delay                 21
87 #define AIO_read                        30
88 #define AIO_write                       40
89 #define AIO_listio                      50
90 #define AIO_error                       60
91 #define AIO_error_val                   61
92 #define AIO_error_activeq               62
93 #define AIO_error_workq                 63
94 #define AIO_return                      70
95 #define AIO_return_val                  71
96 #define AIO_return_activeq              72
97 #define AIO_return_workq                73
98 #define AIO_exec                        80
99 #define AIO_exit                        90
100 #define AIO_exit_sleep                  91
101 #define AIO_close                       100
102 #define AIO_close_sleep                 101
103 #define AIO_suspend                     110
104 #define AIO_suspend_sleep               111
105 #define AIO_worker_thread               120
106 #define AIO_register_kevent             130
107 #define AIO_WQ_process_entry            140
108 #define AIO_WQ_aio_thread_create        141
109 #define AIO_WQ_aio_thread_terminate     142
110 #define AIO_WQ_aio_death_call           143
111 #define AIO_WQ_aio_thread_park          144
112 #define AIO_WQ_aio_select_req           145
113 #define AIO_WQ_aio_thread_create_failed 146
114 #define AIO_WQ_aio_thread_wakeup        147
115 
116 static TUNABLE(uint32_t, bootarg_aio_new_workq, "aio_new_workq", 1);
117 
118 __options_decl(aio_entry_flags_t, uint32_t, {
119 	AIO_READ        = 0x00000001, /* a read */
120 	AIO_WRITE       = 0x00000002, /* a write */
121 	AIO_FSYNC       = 0x00000004, /* aio_fsync with op = O_SYNC */
122 	AIO_DSYNC       = 0x00000008, /* aio_fsync with op = O_DSYNC (not supported yet) */
123 	AIO_LIO         = 0x00000010, /* lio_listio generated IO */
124 	AIO_LIO_WAIT    = 0x00000020, /* lio_listio is waiting on the leader */
125 
126 	AIO_COMPLETED   = 0x00000100, /* request has completed */
127 	AIO_CANCELLED   = 0x00000200, /* request has been cancelled */
128 
129 	/*
130 	 * These flags mean that this entry is blocking either:
131 	 * - close (AIO_CLOSE_WAIT)
132 	 * - exit or exec (AIO_EXIT_WAIT)
133 	 *
134 	 * These flags are mutually exclusive, and the AIO_EXIT_WAIT variant
135 	 * will also neuter notifications in do_aio_completion_and_unlock().
136 	 */
137 	AIO_CLOSE_WAIT  = 0x00004000,
138 	AIO_EXIT_WAIT   = 0x00008000,
139 });
140 
141 /*! @struct aio_workq_entry
142  *
143  * @discussion
144  * This represents a piece of aio/lio work.
145  *
146  * The ownership rules go as follows:
147  *
148  * - the "proc" owns one refcount on the entry (from creation), while it is
149  *   enqueued on the aio_activeq and then the aio_doneq.
150  *
151  *   either aio_return() (user read the status) or _aio_exit() (the process
152  *   died) will dequeue the entry and consume this ref.
153  *
154  * - the async workqueue owns one refcount once the work is submitted,
155  *   which is consumed in do_aio_completion_and_unlock().
156  *
157  *   This ref protects the entry for the the end of
158  *   do_aio_completion_and_unlock() (when signal delivery happens).
159  *
160  * - lio_listio() for batches picks one of the entries to be the "leader"
161  *   of the batch. Each work item will have a refcount on its leader
162  *   so that the accounting of the batch completion can be done on the leader
163  *   (to be able to decrement lio_pending).
164  *
165  *   This ref is consumed in do_aio_completion_and_unlock() as well.
166  *
167  * - lastly, in lio_listio() when the LIO_WAIT behavior is requested,
168  *   an extra ref is taken in this syscall as it needs to keep accessing
169  *   the leader "lio_pending" field until it hits 0.
170  */
171 struct aio_workq_entry {
172 	/* queue lock */
173 	TAILQ_ENTRY(aio_workq_entry)    aio_workq_link;
174 
175 	/* Proc lock */
176 	TAILQ_ENTRY(aio_workq_entry)    aio_proc_link;  /* p_aio_activeq or p_aio_doneq */
177 	user_ssize_t                    returnval;      /* return value from read / write request */
178 	errno_t                         errorval;       /* error value from read / write request */
179 	os_refcnt_t                     aio_refcount;
180 	aio_entry_flags_t               flags;
181 
182 	int                             lio_pending;    /* pending I/Os in lio group, only on leader */
183 	struct aio_workq_entry         *lio_leader;     /* pointer to the lio leader, can be self */
184 
185 	/* Initialized and never changed, safe to access */
186 	struct proc                    *procp;          /* user proc that queued this request */
187 	user_addr_t                     uaiocbp;        /* pointer passed in from user land */
188 	struct user_aiocb               aiocb;          /* copy of aiocb from user land */
189 	struct vfs_context              context;        /* context which enqueued the request */
190 
191 	/* Initialized, and possibly freed by aio_work_thread() or at free if cancelled */
192 	vm_map_t                        aio_map;        /* user land map we have a reference to */
193 };
194 
195 /*
196  * aio requests queue up on the aio_async_workq or lio_sync_workq (for
197  * lio_listio LIO_WAIT).  Requests then move to the per process aio_activeq
198  * (proc.aio_activeq) when one of our worker threads start the IO.
199  * And finally, requests move to the per process aio_doneq (proc.aio_doneq)
200  * when the IO request completes.  The request remains on aio_doneq until
201  * user process calls aio_return or the process exits, either way that is our
202  * trigger to release aio resources.
203  */
204 typedef struct aio_workq   {
205 	TAILQ_HEAD(, aio_workq_entry)   aioq_entries;
206 	lck_spin_t                      aioq_lock;
207 	struct waitq                    aioq_waitq;
208 } *aio_workq_t;
209 
210 #define AIO_NUM_WORK_QUEUES 1
211 struct aio_anchor_cb {
212 	os_atomic(int)          aio_total_count;        /* total extant entries */
213 
214 	/* Hash table of queues here */
215 	int                     aio_num_workqs;
216 	struct aio_workq        aio_async_workqs[AIO_NUM_WORK_QUEUES];
217 };
218 typedef struct aio_anchor_cb aio_anchor_cb;
219 
220 
221 /* New per process workqueue */
222 #define WORKQUEUE_AIO_MAXTHREADS            16
223 
224 TAILQ_HEAD(workq_aio_uthread_head, uthread);
225 
226 typedef struct workq_aio_s {
227 	thread_call_t   wa_death_call;
228 	struct workq_aio_uthread_head wa_thrunlist;
229 	struct workq_aio_uthread_head wa_thidlelist;
230 	TAILQ_HEAD(, aio_workq_entry) wa_aioq_entries;
231 	proc_t wa_proc;
232 	workq_state_flags_t _Atomic wa_flags;
233 	uint16_t wa_nthreads;
234 	uint16_t wa_thidlecount;
235 	uint16_t wa_thdying_count;
236 } workq_aio_s, *workq_aio_t;
237 
238 struct aio_workq_usec_var {
239 	uint32_t usecs;
240 	uint64_t abstime;
241 };
242 
243 static int aio_workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
244 
245 #define AIO_WORKQ_SYSCTL_USECS(var, init) \
246 	        static struct aio_workq_usec_var var = { .usecs = (init) }; \
247 	        SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
248 	                        CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &(var), 0, \
249 	                        aio_workq_sysctl_handle_usecs, "I", "")
250 
251 AIO_WORKQ_SYSCTL_USECS(aio_wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
252 
253 #define WQ_AIO_TRACE(x, wq, a, b, c, d) \
254 	        ({ KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, (x)),\
255 	        proc_getpid((wq)->wa_proc), (a), (b), (c), (d)); })
256 
257 #define WQ_AIO_TRACE_WQ(x, wq) \
258 	        ({ KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, (x)),\
259 	        proc_getpid((wq)->wa_proc),\
260 	        (uintptr_t)thread_tid(current_thread()),\
261 	        (wq)->wa_nthreads, (wq)->wa_thidlecount, (wq)->wa_thdying_count); })
262 
263 /*
264  * Notes on aio sleep / wake channels.
265  * We currently pick a couple fields within the proc structure that will allow
266  * us sleep channels that currently do not collide with any other kernel routines.
267  * At this time, for binary compatibility reasons, we cannot create new proc fields.
268  */
269 #define AIO_SUSPEND_SLEEP_CHAN  p_aio_activeq
270 #define AIO_CLEANUP_SLEEP_CHAN  p_aio_total_count
271 
272 #define ASSERT_AIO_FROM_PROC(aiop, theproc)     \
273 	if ((aiop)->procp != (theproc)) {       \
274 	        panic("AIO on a proc list that does not belong to that proc."); \
275 	}
276 
277 extern kern_return_t thread_terminate(thread_t);
278 
279 /*
280  *  LOCAL PROTOTYPES
281  */
282 static void             aio_proc_lock(proc_t procp);
283 static void             aio_proc_lock_spin(proc_t procp);
284 static void             aio_proc_unlock(proc_t procp);
285 static lck_mtx_t       *aio_proc_mutex(proc_t procp);
286 static bool             aio_has_active_requests_for_process(proc_t procp);
287 static bool             aio_proc_has_active_requests_for_file(proc_t procp, int fd);
288 static boolean_t        is_already_queued(proc_t procp, user_addr_t aiocbp);
289 
290 static aio_workq_t      aio_entry_workq(aio_workq_entry *entryp);
291 static void             aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
292 static void             aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
293 static void             aio_entry_ref(aio_workq_entry *entryp);
294 static void             aio_entry_unref(aio_workq_entry *entryp);
295 static bool             aio_entry_try_workq_remove(proc_t p, aio_workq_entry *entryp);
296 static boolean_t        aio_delay_fsync_request(aio_workq_entry *entryp);
297 static void             aio_free_request(aio_workq_entry *entryp);
298 
299 static void             aio_workq_init(aio_workq_t wq);
300 static void             aio_workq_lock_spin(aio_workq_t wq);
301 static void             aio_workq_unlock(aio_workq_t wq);
302 static lck_spin_t      *aio_workq_lock(aio_workq_t wq);
303 
304 static void             aio_work_thread(void *arg, wait_result_t wr);
305 static aio_workq_entry *aio_get_some_work(void);
306 
307 static int              aio_queue_async_request(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t);
308 static int              aio_validate(proc_t, aio_workq_entry *entryp);
309 
310 static int              do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp, aio_entry_flags_t);
311 static void             do_aio_completion_and_unlock(proc_t p, aio_workq_entry *entryp, aio_entry_flags_t reason);
312 static int              do_aio_fsync(aio_workq_entry *entryp);
313 static int              do_aio_read(aio_workq_entry *entryp);
314 static int              do_aio_write(aio_workq_entry *entryp);
315 static void             do_munge_aiocb_user32_to_user(struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp);
316 static void             do_munge_aiocb_user64_to_user(struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp);
317 static aio_workq_entry *aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t);
318 static int              aio_copy_in_list(proc_t, user_addr_t, user_addr_t *, int);
319 
320 static void             workq_aio_prepare(struct proc *p);
321 static bool             workq_aio_entry_add_locked(struct proc *p, aio_workq_entry *entryp);
322 static void             workq_aio_wakeup_thread(proc_t p);
323 static void             workq_aio_wakeup_thread_and_unlock(proc_t p);
324 static int              workq_aio_process_entry(aio_workq_entry *entryp);
325 static bool             workq_aio_entry_remove_locked(struct proc *p, aio_workq_entry *entryp);
326 
327 static void             workq_aio_kill_old_threads_call(void *param0, void *param1 __unused);
328 static void             workq_aio_unpark_continue(void *parameter __unused, wait_result_t wr);
329 
330 static void             workq_aio_mark_exiting(proc_t p);
331 static void             workq_aio_exit(proc_t p);
332 
333 #define ASSERT_AIO_PROC_LOCK_OWNED(p)   LCK_MTX_ASSERT(aio_proc_mutex(p), LCK_MTX_ASSERT_OWNED)
334 #define ASSERT_AIO_WORKQ_LOCK_OWNED(q)  LCK_SPIN_ASSERT(aio_workq_lock(q), LCK_ASSERT_OWNED)
335 
336 /*
337  *  EXTERNAL PROTOTYPES
338  */
339 
340 /* in ...bsd/kern/sys_generic.c */
341 extern int dofileread(vfs_context_t ctx, struct fileproc *fp,
342     user_addr_t bufp, user_size_t nbyte,
343     off_t offset, int flags, user_ssize_t *retval);
344 extern int dofilewrite(vfs_context_t ctx, struct fileproc *fp,
345     user_addr_t bufp, user_size_t nbyte, off_t offset,
346     int flags, user_ssize_t *retval);
347 
348 /*
349  * aio external global variables.
350  */
351 extern int aio_max_requests;                    /* AIO_MAX - configurable */
352 extern int aio_max_requests_per_process;        /* AIO_PROCESS_MAX - configurable */
353 extern int aio_worker_threads;                  /* AIO_THREAD_COUNT - configurable */
354 
355 
356 /*
357  * aio static variables.
358  */
359 static aio_anchor_cb aio_anchor = {
360 	.aio_num_workqs = AIO_NUM_WORK_QUEUES,
361 };
362 os_refgrp_decl(static, aio_refgrp, "aio", NULL);
363 static LCK_GRP_DECLARE(aio_proc_lock_grp, "aio_proc");
364 static LCK_GRP_DECLARE(aio_queue_lock_grp, "aio_queue");
365 static LCK_MTX_DECLARE(aio_proc_mtx, &aio_proc_lock_grp);
366 
367 static struct klist aio_klist;
368 static LCK_GRP_DECLARE(aio_klist_lck_grp, "aio_klist");
369 static LCK_MTX_DECLARE(aio_klist_lock, &aio_klist_lck_grp);
370 
371 static KALLOC_TYPE_DEFINE(aio_workq_zonep, aio_workq_entry, KT_DEFAULT);
372 
373 /* Hash */
374 static aio_workq_t
aio_entry_workq(__unused aio_workq_entry * entryp)375 aio_entry_workq(__unused aio_workq_entry *entryp)
376 {
377 	return &aio_anchor.aio_async_workqs[0];
378 }
379 
380 static void
aio_workq_init(aio_workq_t wq)381 aio_workq_init(aio_workq_t wq)
382 {
383 	TAILQ_INIT(&wq->aioq_entries);
384 	lck_spin_init(&wq->aioq_lock, &aio_queue_lock_grp, LCK_ATTR_NULL);
385 	waitq_init(&wq->aioq_waitq, WQT_QUEUE, SYNC_POLICY_FIFO);
386 }
387 
388 
389 /*
390  * Can be passed a queue which is locked spin.
391  */
392 static void
aio_workq_remove_entry_locked(aio_workq_t queue,aio_workq_entry * entryp)393 aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
394 {
395 	ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
396 
397 	if (entryp->aio_workq_link.tqe_prev == NULL) {
398 		panic("Trying to remove an entry from a work queue, but it is not on a queue");
399 	}
400 
401 	TAILQ_REMOVE(&queue->aioq_entries, entryp, aio_workq_link);
402 	entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
403 }
404 
405 static void
aio_workq_add_entry_locked(aio_workq_t queue,aio_workq_entry * entryp)406 aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
407 {
408 	ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
409 
410 	if (bootarg_aio_new_workq) {
411 		panic("old workq implementation selected with bootarg set");
412 	}
413 
414 	TAILQ_INSERT_TAIL(&queue->aioq_entries, entryp, aio_workq_link);
415 }
416 
417 static void
aio_proc_lock(proc_t procp)418 aio_proc_lock(proc_t procp)
419 {
420 	lck_mtx_lock(aio_proc_mutex(procp));
421 }
422 
423 static void
aio_proc_lock_spin(proc_t procp)424 aio_proc_lock_spin(proc_t procp)
425 {
426 	lck_mtx_lock_spin(aio_proc_mutex(procp));
427 }
428 
429 static bool
aio_has_any_work(void)430 aio_has_any_work(void)
431 {
432 	return os_atomic_load(&aio_anchor.aio_total_count, relaxed) != 0;
433 }
434 
435 static bool
aio_try_proc_insert_active_locked(proc_t procp,aio_workq_entry * entryp)436 aio_try_proc_insert_active_locked(proc_t procp, aio_workq_entry *entryp)
437 {
438 	int old, new;
439 
440 	ASSERT_AIO_PROC_LOCK_OWNED(procp);
441 
442 	if (procp->p_aio_total_count >= aio_max_requests_per_process) {
443 		return false;
444 	}
445 
446 	if (is_already_queued(procp, entryp->uaiocbp)) {
447 		return false;
448 	}
449 
450 	os_atomic_rmw_loop(&aio_anchor.aio_total_count, old, new, relaxed, {
451 		if (old >= aio_max_requests) {
452 		        os_atomic_rmw_loop_give_up(return false);
453 		}
454 		new = old + 1;
455 	});
456 
457 	TAILQ_INSERT_TAIL(&procp->p_aio_activeq, entryp, aio_proc_link);
458 	procp->p_aio_total_count++;
459 	return true;
460 }
461 
462 static void
aio_proc_move_done_locked(proc_t procp,aio_workq_entry * entryp)463 aio_proc_move_done_locked(proc_t procp, aio_workq_entry *entryp)
464 {
465 	TAILQ_REMOVE(&procp->p_aio_activeq, entryp, aio_proc_link);
466 	TAILQ_INSERT_TAIL(&procp->p_aio_doneq, entryp, aio_proc_link);
467 }
468 
469 static void
aio_proc_remove_done_locked(proc_t procp,aio_workq_entry * entryp)470 aio_proc_remove_done_locked(proc_t procp, aio_workq_entry *entryp)
471 {
472 	TAILQ_REMOVE(&procp->p_aio_doneq, entryp, aio_proc_link);
473 	entryp->aio_proc_link.tqe_prev = NULL;
474 	if (os_atomic_dec_orig(&aio_anchor.aio_total_count, relaxed) <= 0) {
475 		panic("Negative total AIO count!");
476 	}
477 	if (procp->p_aio_total_count-- <= 0) {
478 		panic("proc %p: p_aio_total_count accounting mismatch", procp);
479 	}
480 }
481 
482 static void
aio_proc_unlock(proc_t procp)483 aio_proc_unlock(proc_t procp)
484 {
485 	lck_mtx_unlock(aio_proc_mutex(procp));
486 }
487 
488 static lck_mtx_t*
aio_proc_mutex(proc_t procp)489 aio_proc_mutex(proc_t procp)
490 {
491 	return &procp->p_mlock;
492 }
493 
494 static void
aio_entry_ref(aio_workq_entry * entryp)495 aio_entry_ref(aio_workq_entry *entryp)
496 {
497 	os_ref_retain(&entryp->aio_refcount);
498 }
499 
500 static void
aio_entry_unref(aio_workq_entry * entryp)501 aio_entry_unref(aio_workq_entry *entryp)
502 {
503 	if (os_ref_release(&entryp->aio_refcount) == 0) {
504 		aio_free_request(entryp);
505 	}
506 }
507 
508 static bool
aio_entry_try_workq_remove(proc_t p,aio_workq_entry * entryp)509 aio_entry_try_workq_remove(proc_t p, aio_workq_entry *entryp)
510 {
511 	/* Can only be cancelled if it's still on a work queue */
512 	if (entryp->aio_workq_link.tqe_prev != NULL) {
513 		aio_workq_t queue;
514 		if (bootarg_aio_new_workq) {
515 			return workq_aio_entry_remove_locked(p, entryp);
516 		}
517 
518 		/* Will have to check again under the lock */
519 		queue = aio_entry_workq(entryp);
520 		aio_workq_lock_spin(queue);
521 		if (entryp->aio_workq_link.tqe_prev != NULL) {
522 			aio_workq_remove_entry_locked(queue, entryp);
523 			aio_workq_unlock(queue);
524 			return true;
525 		} else {
526 			aio_workq_unlock(queue);
527 		}
528 	}
529 
530 	return false;
531 }
532 
533 static void
aio_workq_lock_spin(aio_workq_t wq)534 aio_workq_lock_spin(aio_workq_t wq)
535 {
536 	lck_spin_lock(aio_workq_lock(wq));
537 }
538 
539 static void
aio_workq_unlock(aio_workq_t wq)540 aio_workq_unlock(aio_workq_t wq)
541 {
542 	lck_spin_unlock(aio_workq_lock(wq));
543 }
544 
545 static lck_spin_t*
aio_workq_lock(aio_workq_t wq)546 aio_workq_lock(aio_workq_t wq)
547 {
548 	return &wq->aioq_lock;
549 }
550 
551 /*
552  * aio_cancel - attempt to cancel one or more async IO requests currently
553  * outstanding against file descriptor uap->fd.  If uap->aiocbp is not
554  * NULL then only one specific IO is cancelled (if possible).  If uap->aiocbp
555  * is NULL then all outstanding async IO request for the given file
556  * descriptor are cancelled (if possible).
557  */
558 int
aio_cancel(proc_t p,struct aio_cancel_args * uap,int * retval)559 aio_cancel(proc_t p, struct aio_cancel_args *uap, int *retval)
560 {
561 	struct user_aiocb my_aiocb;
562 	int               result;
563 
564 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel) | DBG_FUNC_START,
565 	    VM_KERNEL_ADDRPERM(p), uap->fd, uap->aiocbp, 0, 0);
566 
567 	if (uap->fd) {
568 		vnode_t vp = NULLVP;
569 		const char *vname = NULL;
570 
571 		result = vnode_getfromfd(vfs_context_current(), uap->fd, &vp);
572 		if (result != 0) {
573 			result = EBADF;
574 			goto ExitRoutine;
575 		}
576 
577 		vname = vnode_getname(vp);
578 		/*
579 		 * The aio_cancel() system call will always	return AIO_NOTCANCELED for
580 		 * file	descriptor associated with raw disk device.
581 		 */
582 		if (vnode_ischr(vp) && vname && !strncmp(vname, "rdisk", 5)) {
583 			result = 0;
584 			*retval = AIO_NOTCANCELED;
585 		}
586 
587 		if (vname) {
588 			vnode_putname(vname);
589 		}
590 		vnode_put(vp);
591 
592 		if (result == 0 && *retval == AIO_NOTCANCELED) {
593 			goto ExitRoutine;
594 		}
595 	}
596 
597 	/* quick check to see if there are any async IO requests queued up */
598 	if (!aio_has_any_work()) {
599 		result = 0;
600 		*retval = AIO_ALLDONE;
601 		goto ExitRoutine;
602 	}
603 
604 	*retval = -1;
605 	if (uap->aiocbp != USER_ADDR_NULL) {
606 		if (proc_is64bit(p)) {
607 			struct user64_aiocb aiocb64;
608 
609 			result = copyin(uap->aiocbp, &aiocb64, sizeof(aiocb64));
610 			if (result == 0) {
611 				do_munge_aiocb_user64_to_user(&aiocb64, &my_aiocb);
612 			}
613 		} else {
614 			struct user32_aiocb aiocb32;
615 
616 			result = copyin(uap->aiocbp, &aiocb32, sizeof(aiocb32));
617 			if (result == 0) {
618 				do_munge_aiocb_user32_to_user(&aiocb32, &my_aiocb);
619 			}
620 		}
621 
622 		if (result != 0) {
623 			result = EAGAIN;
624 			goto ExitRoutine;
625 		}
626 
627 		/* NOTE - POSIX standard says a mismatch between the file */
628 		/* descriptor passed in and the file descriptor embedded in */
629 		/* the aiocb causes unspecified results.  We return EBADF in */
630 		/* that situation.  */
631 		if (uap->fd != my_aiocb.aio_fildes) {
632 			result = EBADF;
633 			goto ExitRoutine;
634 		}
635 	}
636 
637 	aio_proc_lock(p);
638 	result = do_aio_cancel_locked(p, uap->fd, uap->aiocbp, 0);
639 	ASSERT_AIO_PROC_LOCK_OWNED(p);
640 	aio_proc_unlock(p);
641 
642 	if (result != -1) {
643 		*retval = result;
644 		result = 0;
645 		goto ExitRoutine;
646 	}
647 
648 	result = EBADF;
649 
650 ExitRoutine:
651 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel) | DBG_FUNC_END,
652 	    VM_KERNEL_ADDRPERM(p), uap->fd, uap->aiocbp, result, 0);
653 
654 	return result;
655 }
656 
657 
658 /*
659  * _aio_close - internal function used to clean up async IO requests for
660  * a file descriptor that is closing.
661  * THIS MAY BLOCK.
662  */
663 __private_extern__ void
_aio_close(proc_t p,int fd)664 _aio_close(proc_t p, int fd)
665 {
666 	int error;
667 
668 	/* quick check to see if there are any async IO requests queued up */
669 	if (!aio_has_any_work()) {
670 		return;
671 	}
672 
673 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close) | DBG_FUNC_START,
674 	    VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
675 
676 	/* cancel all async IO requests on our todo queues for this file descriptor */
677 	aio_proc_lock(p);
678 	error = do_aio_cancel_locked(p, fd, USER_ADDR_NULL, AIO_CLOSE_WAIT);
679 	ASSERT_AIO_PROC_LOCK_OWNED(p);
680 	if (error == AIO_NOTCANCELED) {
681 		/*
682 		 * AIO_NOTCANCELED is returned when we find an aio request for this process
683 		 * and file descriptor on the active async IO queue.  Active requests cannot
684 		 * be cancelled so we must wait for them to complete.  We will get a special
685 		 * wake up call on our channel used to sleep for ALL active requests to
686 		 * complete.  This sleep channel (proc.AIO_CLEANUP_SLEEP_CHAN) is only used
687 		 * when we must wait for all active aio requests.
688 		 */
689 
690 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close_sleep) | DBG_FUNC_NONE,
691 		    VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
692 
693 		while (aio_proc_has_active_requests_for_file(p, fd)) {
694 			msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_close", 0);
695 		}
696 	}
697 
698 	aio_proc_unlock(p);
699 
700 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close) | DBG_FUNC_END,
701 	    VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
702 }
703 
704 
705 /*
706  * aio_error - return the error status associated with the async IO
707  * request referred to by uap->aiocbp.  The error status is the errno
708  * value that would be set by the corresponding IO request (read, wrtie,
709  * fdatasync, or sync).
710  */
711 int
aio_error(proc_t p,struct aio_error_args * uap,int * retval)712 aio_error(proc_t p, struct aio_error_args *uap, int *retval)
713 {
714 	aio_workq_entry *entryp;
715 	int              error;
716 
717 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error) | DBG_FUNC_START,
718 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
719 
720 	/* see if there are any aios to check */
721 	if (!aio_has_any_work()) {
722 		return EINVAL;
723 	}
724 
725 	aio_proc_lock(p);
726 
727 	/* look for a match on our queue of async IO requests that have completed */
728 	TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
729 		if (entryp->uaiocbp == uap->aiocbp) {
730 			ASSERT_AIO_FROM_PROC(entryp, p);
731 
732 			*retval = entryp->errorval;
733 			error = 0;
734 
735 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error_val) | DBG_FUNC_NONE,
736 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
737 			goto ExitRoutine;
738 		}
739 	}
740 
741 	/* look for a match on our queue of active async IO requests */
742 	TAILQ_FOREACH(entryp, &p->p_aio_activeq, aio_proc_link) {
743 		if (entryp->uaiocbp == uap->aiocbp) {
744 			ASSERT_AIO_FROM_PROC(entryp, p);
745 			*retval = EINPROGRESS;
746 			error = 0;
747 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error_activeq) | DBG_FUNC_NONE,
748 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
749 			goto ExitRoutine;
750 		}
751 	}
752 
753 	error = EINVAL;
754 
755 ExitRoutine:
756 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error) | DBG_FUNC_END,
757 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
758 	aio_proc_unlock(p);
759 
760 	return error;
761 }
762 
763 
764 /*
765  * aio_fsync - asynchronously force all IO operations associated
766  * with the file indicated by the file descriptor (uap->aiocbp->aio_fildes) and
767  * queued at the time of the call to the synchronized completion state.
768  * NOTE - we do not support op O_DSYNC at this point since we do not support the
769  * fdatasync() call.
770  */
771 int
aio_fsync(proc_t p,struct aio_fsync_args * uap,int * retval)772 aio_fsync(proc_t p, struct aio_fsync_args *uap, int *retval)
773 {
774 	aio_entry_flags_t fsync_kind;
775 	int error;
776 
777 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync) | DBG_FUNC_START,
778 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, uap->op, 0, 0);
779 
780 	*retval = 0;
781 	/* 0 := O_SYNC for binary backward compatibility with Panther */
782 	if (uap->op == O_SYNC || uap->op == 0) {
783 		fsync_kind = AIO_FSYNC;
784 	} else if (uap->op == O_DSYNC) {
785 		fsync_kind = AIO_DSYNC;
786 	} else {
787 		*retval = -1;
788 		error = EINVAL;
789 		goto ExitRoutine;
790 	}
791 
792 	error = aio_queue_async_request(p, uap->aiocbp, fsync_kind);
793 	if (error != 0) {
794 		*retval = -1;
795 	}
796 
797 ExitRoutine:
798 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync) | DBG_FUNC_END,
799 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
800 
801 	return error;
802 }
803 
804 
805 /* aio_read - asynchronously read uap->aiocbp->aio_nbytes bytes from the
806  * file descriptor (uap->aiocbp->aio_fildes) into the buffer
807  * (uap->aiocbp->aio_buf).
808  */
809 int
aio_read(proc_t p,struct aio_read_args * uap,int * retval)810 aio_read(proc_t p, struct aio_read_args *uap, int *retval)
811 {
812 	int error;
813 
814 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_read) | DBG_FUNC_START,
815 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
816 
817 	*retval = 0;
818 
819 	error = aio_queue_async_request(p, uap->aiocbp, AIO_READ);
820 	if (error != 0) {
821 		*retval = -1;
822 	}
823 
824 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_read) | DBG_FUNC_END,
825 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
826 
827 	return error;
828 }
829 
830 
831 /*
832  * aio_return - return the return status associated with the async IO
833  * request referred to by uap->aiocbp.  The return status is the value
834  * that would be returned by corresponding IO request (read, write,
835  * fdatasync, or sync).  This is where we release kernel resources
836  * held for async IO call associated with the given aiocb pointer.
837  */
838 int
aio_return(proc_t p,struct aio_return_args * uap,user_ssize_t * retval)839 aio_return(proc_t p, struct aio_return_args *uap, user_ssize_t *retval)
840 {
841 	aio_workq_entry *entryp;
842 	int              error = EINVAL;
843 
844 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return) | DBG_FUNC_START,
845 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
846 
847 	/* See if there are any entries to check */
848 	if (!aio_has_any_work()) {
849 		goto ExitRoutine;
850 	}
851 
852 	aio_proc_lock(p);
853 	*retval = 0;
854 
855 	/* look for a match on our queue of async IO requests that have completed */
856 	TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
857 		ASSERT_AIO_FROM_PROC(entryp, p);
858 		if (entryp->uaiocbp == uap->aiocbp) {
859 			/* Done and valid for aio_return(), pull it off the list */
860 			aio_proc_remove_done_locked(p, entryp);
861 
862 			*retval = entryp->returnval;
863 			error = 0;
864 			aio_proc_unlock(p);
865 
866 			aio_entry_unref(entryp);
867 
868 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return_val) | DBG_FUNC_NONE,
869 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
870 			goto ExitRoutine;
871 		}
872 	}
873 
874 	/* look for a match on our queue of active async IO requests */
875 	TAILQ_FOREACH(entryp, &p->p_aio_activeq, aio_proc_link) {
876 		ASSERT_AIO_FROM_PROC(entryp, p);
877 		if (entryp->uaiocbp == uap->aiocbp) {
878 			error = EINPROGRESS;
879 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return_activeq) | DBG_FUNC_NONE,
880 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
881 			break;
882 		}
883 	}
884 
885 	aio_proc_unlock(p);
886 
887 ExitRoutine:
888 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return) | DBG_FUNC_END,
889 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
890 
891 	return error;
892 }
893 
894 
895 /*
896  * _aio_exec - internal function used to clean up async IO requests for
897  * a process that is going away due to exec().  We cancel any async IOs
898  * we can and wait for those already active.  We also disable signaling
899  * for cancelled or active aio requests that complete.
900  * This routine MAY block!
901  */
902 __private_extern__ void
_aio_exec(proc_t p)903 _aio_exec(proc_t p)
904 {
905 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exec) | DBG_FUNC_START,
906 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
907 
908 	_aio_exit(p);
909 
910 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exec) | DBG_FUNC_END,
911 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
912 }
913 
914 
915 /*
916  * _aio_exit - internal function used to clean up async IO requests for
917  * a process that is terminating (via exit() or exec()).  We cancel any async IOs
918  * we can and wait for those already active.  We also disable signaling
919  * for cancelled or active aio requests that complete.  This routine MAY block!
920  */
921 __private_extern__ void
_aio_exit(proc_t p)922 _aio_exit(proc_t p)
923 {
924 	TAILQ_HEAD(, aio_workq_entry) tofree = TAILQ_HEAD_INITIALIZER(tofree);
925 	aio_workq_entry *entryp, *tmp;
926 	int              error;
927 
928 	/* quick check to see if there are any async IO requests queued up */
929 	if (!aio_has_any_work()) {
930 		workq_aio_mark_exiting(p);
931 		workq_aio_exit(p);
932 		return;
933 	}
934 
935 	workq_aio_mark_exiting(p);
936 
937 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit) | DBG_FUNC_START,
938 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
939 
940 	aio_proc_lock(p);
941 
942 	/*
943 	 * cancel async IO requests on the todo work queue and wait for those
944 	 * already active to complete.
945 	 */
946 	error = do_aio_cancel_locked(p, -1, USER_ADDR_NULL, AIO_EXIT_WAIT);
947 	ASSERT_AIO_PROC_LOCK_OWNED(p);
948 	if (error == AIO_NOTCANCELED) {
949 		/*
950 		 * AIO_NOTCANCELED is returned when we find an aio request for this process
951 		 * on the active async IO queue.  Active requests cannot be cancelled so we
952 		 * must wait for them to complete.  We will get a special wake up call on
953 		 * our channel used to sleep for ALL active requests to complete.  This sleep
954 		 * channel (proc.AIO_CLEANUP_SLEEP_CHAN) is only used when we must wait for all
955 		 * active aio requests.
956 		 */
957 
958 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit_sleep) | DBG_FUNC_NONE,
959 		    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
960 
961 		while (aio_has_active_requests_for_process(p)) {
962 			msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_exit", 0);
963 		}
964 	}
965 
966 	assert(!aio_has_active_requests_for_process(p));
967 
968 	/* release all aio resources used by this process */
969 	TAILQ_FOREACH_SAFE(entryp, &p->p_aio_doneq, aio_proc_link, tmp) {
970 		ASSERT_AIO_FROM_PROC(entryp, p);
971 
972 		aio_proc_remove_done_locked(p, entryp);
973 		TAILQ_INSERT_TAIL(&tofree, entryp, aio_proc_link);
974 	}
975 
976 	aio_proc_unlock(p);
977 
978 	workq_aio_exit(p);
979 
980 	/* free all the entries outside of the aio_proc_lock() */
981 	TAILQ_FOREACH_SAFE(entryp, &tofree, aio_proc_link, tmp) {
982 		entryp->aio_proc_link.tqe_prev = NULL;
983 		aio_entry_unref(entryp);
984 	}
985 
986 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit) | DBG_FUNC_END,
987 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
988 }
989 
990 
991 static bool
should_cancel(aio_workq_entry * entryp,int fd,user_addr_t aiocbp,aio_entry_flags_t reason)992 should_cancel(aio_workq_entry *entryp, int fd, user_addr_t aiocbp,
993     aio_entry_flags_t reason)
994 {
995 	if (reason & AIO_EXIT_WAIT) {
996 		/* caller is _aio_exit() */
997 		return true;
998 	}
999 	if (fd != entryp->aiocb.aio_fildes) {
1000 		/* not the file we're looking for */
1001 		return false;
1002 	}
1003 	/*
1004 	 * aio_cancel() or _aio_close() cancel
1005 	 * everything for a given fd when aiocbp is NULL
1006 	 */
1007 	return aiocbp == USER_ADDR_NULL || entryp->uaiocbp == aiocbp;
1008 }
1009 
1010 /*
1011  * do_aio_cancel_locked - cancel async IO requests (if possible).  We get called by
1012  * aio_cancel, close, and at exit.
1013  * There are three modes of operation: 1) cancel all async IOs for a process -
1014  * fd is 0 and aiocbp is NULL 2) cancel all async IOs for file descriptor - fd
1015  * is > 0 and aiocbp is NULL 3) cancel one async IO associated with the given
1016  * aiocbp.
1017  * Returns -1 if no matches were found, AIO_CANCELED when we cancelled all
1018  * target async IO requests, AIO_NOTCANCELED if we could not cancel all
1019  * target async IO requests, and AIO_ALLDONE if all target async IO requests
1020  * were already complete.
1021  * WARNING - do not deference aiocbp in this routine, it may point to user
1022  * land data that has not been copied in (when called from aio_cancel())
1023  *
1024  * Called with proc locked, and returns the same way.
1025  */
1026 static int
do_aio_cancel_locked(proc_t p,int fd,user_addr_t aiocbp,aio_entry_flags_t reason)1027 do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp,
1028     aio_entry_flags_t reason)
1029 {
1030 	bool multiple_matches = (aiocbp == USER_ADDR_NULL);
1031 	aio_workq_entry *entryp, *tmp;
1032 	int result;
1033 
1034 	ASSERT_AIO_PROC_LOCK_OWNED(p);
1035 
1036 	/* look for a match on our queue of async todo work. */
1037 again:
1038 	result = -1;
1039 	TAILQ_FOREACH_SAFE(entryp, &p->p_aio_activeq, aio_proc_link, tmp) {
1040 		ASSERT_AIO_FROM_PROC(entryp, p);
1041 
1042 		if (!should_cancel(entryp, fd, aiocbp, reason)) {
1043 			continue;
1044 		}
1045 
1046 		if (reason) {
1047 			/* mark the entry as blocking close or exit/exec */
1048 			entryp->flags |= reason;
1049 			if ((entryp->flags & AIO_EXIT_WAIT) && (entryp->flags & AIO_CLOSE_WAIT)) {
1050 				panic("Close and exit flags set at the same time");
1051 			}
1052 		}
1053 
1054 		/* Can only be cancelled if it's still on a work queue */
1055 		if (aio_entry_try_workq_remove(p, entryp)) {
1056 			entryp->errorval = ECANCELED;
1057 			entryp->returnval = -1;
1058 
1059 			/* Now it's officially cancelled.  Do the completion */
1060 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq) | DBG_FUNC_NONE,
1061 			    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1062 			    fd, 0, 0);
1063 			do_aio_completion_and_unlock(p, entryp, AIO_CANCELLED);
1064 
1065 			aio_proc_lock(p);
1066 
1067 			if (multiple_matches) {
1068 				/*
1069 				 * Restart from the head of the proc active queue since it
1070 				 * may have been changed while we were away doing completion
1071 				 * processing.
1072 				 *
1073 				 * Note that if we found an uncancellable AIO before, we will
1074 				 * either find it again or discover that it's been completed,
1075 				 * so resetting the result will not cause us to return success
1076 				 * despite outstanding AIOs.
1077 				 */
1078 				goto again;
1079 			}
1080 
1081 			return AIO_CANCELED;
1082 		}
1083 
1084 		/*
1085 		 * It's been taken off the active queue already, i.e. is in flight.
1086 		 * All we can do is ask for notification.
1087 		 */
1088 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_activeq) | DBG_FUNC_NONE,
1089 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1090 		    fd, 0, 0);
1091 
1092 		result = AIO_NOTCANCELED;
1093 		if (!multiple_matches) {
1094 			return result;
1095 		}
1096 	}
1097 
1098 	/*
1099 	 * if we didn't find any matches on the todo or active queues then look for a
1100 	 * match on our queue of async IO requests that have completed and if found
1101 	 * return AIO_ALLDONE result.
1102 	 *
1103 	 * Proc AIO lock is still held.
1104 	 */
1105 	if (result == -1) {
1106 		TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
1107 			ASSERT_AIO_FROM_PROC(entryp, p);
1108 			if (should_cancel(entryp, fd, aiocbp, reason)) {
1109 				KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_doneq) | DBG_FUNC_NONE,
1110 				    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1111 				    fd, 0, 0);
1112 
1113 				result = AIO_ALLDONE;
1114 				if (!multiple_matches) {
1115 					return result;
1116 				}
1117 			}
1118 		}
1119 	}
1120 
1121 	return result;
1122 }
1123 
1124 
1125 /*
1126  * aio_suspend - suspend the calling thread until at least one of the async
1127  * IO operations referenced by uap->aiocblist has completed, until a signal
1128  * interrupts the function, or uap->timeoutp time interval (optional) has
1129  * passed.
1130  * Returns 0 if one or more async IOs have completed else -1 and errno is
1131  * set appropriately - EAGAIN if timeout elapses or EINTR if an interrupt
1132  * woke us up.
1133  */
1134 int
aio_suspend(proc_t p,struct aio_suspend_args * uap,int * retval)1135 aio_suspend(proc_t p, struct aio_suspend_args *uap, int *retval)
1136 {
1137 	__pthread_testcancel(1);
1138 	return aio_suspend_nocancel(p, (struct aio_suspend_nocancel_args *)uap, retval);
1139 }
1140 
1141 
1142 int
aio_suspend_nocancel(proc_t p,struct aio_suspend_nocancel_args * uap,int * retval)1143 aio_suspend_nocancel(proc_t p, struct aio_suspend_nocancel_args *uap, int *retval)
1144 {
1145 	int                     error;
1146 	int                     i;
1147 	uint64_t                abstime;
1148 	struct user_timespec    ts;
1149 	aio_workq_entry        *entryp;
1150 	user_addr_t            *aiocbpp;
1151 	size_t                  aiocbpp_size;
1152 
1153 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend) | DBG_FUNC_START,
1154 	    VM_KERNEL_ADDRPERM(p), uap->nent, 0, 0, 0);
1155 
1156 	*retval = -1;
1157 	abstime = 0;
1158 	aiocbpp = NULL;
1159 
1160 	if (!aio_has_any_work()) {
1161 		error = EINVAL;
1162 		goto ExitThisRoutine;
1163 	}
1164 
1165 	if (uap->nent < 1 || uap->nent > aio_max_requests_per_process ||
1166 	    os_mul_overflow(sizeof(user_addr_t), uap->nent, &aiocbpp_size)) {
1167 		error = EINVAL;
1168 		goto ExitThisRoutine;
1169 	}
1170 
1171 	if (uap->timeoutp != USER_ADDR_NULL) {
1172 		if (proc_is64bit(p)) {
1173 			struct user64_timespec temp;
1174 			error = copyin(uap->timeoutp, &temp, sizeof(temp));
1175 			if (error == 0) {
1176 				ts.tv_sec = (user_time_t)temp.tv_sec;
1177 				ts.tv_nsec = (user_long_t)temp.tv_nsec;
1178 			}
1179 		} else {
1180 			struct user32_timespec temp;
1181 			error = copyin(uap->timeoutp, &temp, sizeof(temp));
1182 			if (error == 0) {
1183 				ts.tv_sec = temp.tv_sec;
1184 				ts.tv_nsec = temp.tv_nsec;
1185 			}
1186 		}
1187 		if (error != 0) {
1188 			error = EAGAIN;
1189 			goto ExitThisRoutine;
1190 		}
1191 
1192 		if (ts.tv_sec < 0 || ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000) {
1193 			error = EINVAL;
1194 			goto ExitThisRoutine;
1195 		}
1196 
1197 		nanoseconds_to_absolutetime((uint64_t)ts.tv_sec * NSEC_PER_SEC + ts.tv_nsec,
1198 		    &abstime);
1199 		clock_absolutetime_interval_to_deadline(abstime, &abstime);
1200 	}
1201 
1202 	aiocbpp = (user_addr_t *)kalloc_data(aiocbpp_size, Z_WAITOK);
1203 	if (aiocbpp == NULL || aio_copy_in_list(p, uap->aiocblist, aiocbpp, uap->nent)) {
1204 		error = EAGAIN;
1205 		goto ExitThisRoutine;
1206 	}
1207 
1208 	/* check list of aio requests to see if any have completed */
1209 check_for_our_aiocbp:
1210 	aio_proc_lock_spin(p);
1211 	for (i = 0; i < uap->nent; i++) {
1212 		user_addr_t     aiocbp;
1213 
1214 		/* NULL elements are legal so check for 'em */
1215 		aiocbp = *(aiocbpp + i);
1216 		if (aiocbp == USER_ADDR_NULL) {
1217 			continue;
1218 		}
1219 
1220 		/* return immediately if any aio request in the list is done */
1221 		TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
1222 			ASSERT_AIO_FROM_PROC(entryp, p);
1223 			if (entryp->uaiocbp == aiocbp) {
1224 				aio_proc_unlock(p);
1225 				*retval = 0;
1226 				error = 0;
1227 				goto ExitThisRoutine;
1228 			}
1229 		}
1230 	}
1231 
1232 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend_sleep) | DBG_FUNC_NONE,
1233 	    VM_KERNEL_ADDRPERM(p), uap->nent, 0, 0, 0);
1234 
1235 	/*
1236 	 * wait for an async IO to complete or a signal fires or timeout expires.
1237 	 * we return EAGAIN (35) for timeout expiration and EINTR (4) when a signal
1238 	 * interrupts us.  If an async IO completes before a signal fires or our
1239 	 * timeout expires, we get a wakeup call from aio_work_thread().
1240 	 */
1241 
1242 	error = msleep1(&p->AIO_SUSPEND_SLEEP_CHAN, aio_proc_mutex(p),
1243 	    PCATCH | PWAIT | PDROP, "aio_suspend", abstime);
1244 	if (error == 0) {
1245 		/*
1246 		 * got our wakeup call from aio_work_thread().
1247 		 * Since we can get a wakeup on this channel from another thread in the
1248 		 * same process we head back up to make sure this is for the correct aiocbp.
1249 		 * If it is the correct aiocbp we will return from where we do the check
1250 		 * (see entryp->uaiocbp == aiocbp after check_for_our_aiocbp label)
1251 		 * else we will fall out and just sleep again.
1252 		 */
1253 		goto check_for_our_aiocbp;
1254 	} else if (error == EWOULDBLOCK) {
1255 		/* our timeout expired */
1256 		error = EAGAIN;
1257 	} else {
1258 		/* we were interrupted */
1259 		error = EINTR;
1260 	}
1261 
1262 ExitThisRoutine:
1263 	if (aiocbpp != NULL) {
1264 		kfree_data(aiocbpp, aiocbpp_size);
1265 	}
1266 
1267 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend) | DBG_FUNC_END,
1268 	    VM_KERNEL_ADDRPERM(p), uap->nent, error, 0, 0);
1269 
1270 	return error;
1271 }
1272 
1273 
1274 /* aio_write - asynchronously write uap->aiocbp->aio_nbytes bytes to the
1275  * file descriptor (uap->aiocbp->aio_fildes) from the buffer
1276  * (uap->aiocbp->aio_buf).
1277  */
1278 
1279 int
aio_write(proc_t p,struct aio_write_args * uap,int * retval __unused)1280 aio_write(proc_t p, struct aio_write_args *uap, int *retval __unused)
1281 {
1282 	int error;
1283 
1284 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_write) | DBG_FUNC_START,
1285 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
1286 
1287 	error = aio_queue_async_request(p, uap->aiocbp, AIO_WRITE);
1288 
1289 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_write) | DBG_FUNC_END,
1290 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
1291 
1292 	return error;
1293 }
1294 
1295 
1296 static int
aio_copy_in_list(proc_t procp,user_addr_t aiocblist,user_addr_t * aiocbpp,int nent)1297 aio_copy_in_list(proc_t procp, user_addr_t aiocblist, user_addr_t *aiocbpp,
1298     int nent)
1299 {
1300 	int result;
1301 
1302 	/* copyin our aiocb pointers from list */
1303 	result = copyin(aiocblist, aiocbpp,
1304 	    proc_is64bit(procp) ? (nent * sizeof(user64_addr_t))
1305 	    : (nent * sizeof(user32_addr_t)));
1306 	if (result) {
1307 		return result;
1308 	}
1309 
1310 	/*
1311 	 * We depend on a list of user_addr_t's so we need to
1312 	 * munge and expand when these pointers came from a
1313 	 * 32-bit process
1314 	 */
1315 	if (!proc_is64bit(procp)) {
1316 		/* copy from last to first to deal with overlap */
1317 		user32_addr_t *my_ptrp = ((user32_addr_t *)aiocbpp) + (nent - 1);
1318 		user_addr_t *my_addrp = aiocbpp + (nent - 1);
1319 
1320 		for (int i = 0; i < nent; i++, my_ptrp--, my_addrp--) {
1321 			*my_addrp = (user_addr_t) (*my_ptrp);
1322 		}
1323 	}
1324 
1325 	return 0;
1326 }
1327 
1328 
1329 static int
aio_copy_in_sigev(proc_t procp,user_addr_t sigp,struct user_sigevent * sigev)1330 aio_copy_in_sigev(proc_t procp, user_addr_t sigp, struct user_sigevent *sigev)
1331 {
1332 	int     result = 0;
1333 
1334 	if (sigp == USER_ADDR_NULL) {
1335 		goto out;
1336 	}
1337 
1338 	/*
1339 	 * We need to munge aio_sigevent since it contains pointers.
1340 	 * Since we do not know if sigev_value is an int or a ptr we do
1341 	 * NOT cast the ptr to a user_addr_t.   This means if we send
1342 	 * this info back to user space we need to remember sigev_value
1343 	 * was not expanded for the 32-bit case.
1344 	 *
1345 	 * Notes:	 This does NOT affect us since we don't support
1346 	 *		sigev_value yet in the aio context.
1347 	 */
1348 	if (proc_is64bit(procp)) {
1349 #if __LP64__
1350 		struct user64_sigevent sigevent64;
1351 
1352 		result = copyin(sigp, &sigevent64, sizeof(sigevent64));
1353 		if (result == 0) {
1354 			sigev->sigev_notify = sigevent64.sigev_notify;
1355 			sigev->sigev_signo = sigevent64.sigev_signo;
1356 			sigev->sigev_value.size_equivalent.sival_int = sigevent64.sigev_value.size_equivalent.sival_int;
1357 			sigev->sigev_notify_function = sigevent64.sigev_notify_function;
1358 			sigev->sigev_notify_attributes = sigevent64.sigev_notify_attributes;
1359 		}
1360 #else
1361 		panic("64bit process on 32bit kernel is not supported");
1362 #endif
1363 	} else {
1364 		struct user32_sigevent sigevent32;
1365 
1366 		result = copyin(sigp, &sigevent32, sizeof(sigevent32));
1367 		if (result == 0) {
1368 			sigev->sigev_notify = sigevent32.sigev_notify;
1369 			sigev->sigev_signo = sigevent32.sigev_signo;
1370 			sigev->sigev_value.size_equivalent.sival_int = sigevent32.sigev_value.sival_int;
1371 			sigev->sigev_notify_function = CAST_USER_ADDR_T(sigevent32.sigev_notify_function);
1372 			sigev->sigev_notify_attributes = CAST_USER_ADDR_T(sigevent32.sigev_notify_attributes);
1373 		}
1374 	}
1375 
1376 	if (result != 0) {
1377 		result = EAGAIN;
1378 	}
1379 
1380 out:
1381 	return result;
1382 }
1383 
1384 /*
1385  * validate user_sigevent.  at this point we only support
1386  * sigev_notify equal to SIGEV_SIGNAL or SIGEV_NONE.  this means
1387  * sigev_value, sigev_notify_function, and sigev_notify_attributes
1388  * are ignored, since SIGEV_THREAD is unsupported.  This is consistent
1389  * with no [RTS] (RalTime Signal) option group support.
1390  */
1391 static int
aio_sigev_validate(const struct user_sigevent * sigev)1392 aio_sigev_validate(const struct user_sigevent *sigev)
1393 {
1394 	switch (sigev->sigev_notify) {
1395 	case SIGEV_SIGNAL:
1396 	{
1397 		int signum;
1398 
1399 		/* make sure we have a valid signal number */
1400 		signum = sigev->sigev_signo;
1401 		if (signum <= 0 || signum >= NSIG ||
1402 		    signum == SIGKILL || signum == SIGSTOP) {
1403 			return EINVAL;
1404 		}
1405 	}
1406 	break;
1407 
1408 	case SIGEV_NONE:
1409 		break;
1410 
1411 	case SIGEV_KEVENT:
1412 		/*
1413 		 * The sigev_signo should contain the descriptor of the kqueue.
1414 		 * Validate that it contains some sane value.
1415 		 */
1416 		if (sigev->sigev_signo <= 0 || sigev->sigev_signo > maxfilesperproc) {
1417 			return EINVAL;
1418 		}
1419 		break;
1420 
1421 	case SIGEV_THREAD:
1422 	/* Unsupported [RTS] */
1423 
1424 	default:
1425 		return EINVAL;
1426 	}
1427 
1428 	return 0;
1429 }
1430 
1431 
1432 /*
1433  * aio_try_enqueue_work_locked
1434  *
1435  * Queue up the entry on the aio asynchronous work queue in priority order
1436  * based on the relative priority of the request.  We calculate the relative
1437  * priority using the nice value of the caller and the value
1438  *
1439  * Parameters:	procp			Process queueing the I/O
1440  *		entryp			The work queue entry being queued
1441  *		leader			The work leader if any
1442  *
1443  * Returns:	Whether the enqueue was successful
1444  *
1445  * Notes:	This function is used for both lio_listio and aio
1446  *
1447  * XXX:		At some point, we may have to consider thread priority
1448  *		rather than process priority, but we don't maintain the
1449  *		adjusted priority for threads the POSIX way.
1450  *
1451  * Called with proc locked.
1452  */
1453 static bool
aio_try_enqueue_work_locked(proc_t procp,aio_workq_entry * entryp,aio_workq_entry * leader)1454 aio_try_enqueue_work_locked(proc_t procp, aio_workq_entry *entryp,
1455     aio_workq_entry *leader)
1456 {
1457 	ASSERT_AIO_PROC_LOCK_OWNED(procp);
1458 
1459 	/* Onto proc queue */
1460 	if (!aio_try_proc_insert_active_locked(procp, entryp)) {
1461 		return false;
1462 	}
1463 
1464 	if (leader) {
1465 		aio_entry_ref(leader); /* consumed in do_aio_completion_and_unlock */
1466 		leader->lio_pending++;
1467 		entryp->lio_leader = leader;
1468 	}
1469 
1470 	/* And work queue */
1471 	aio_entry_ref(entryp); /* consumed in do_aio_completion_and_unlock */
1472 	if (bootarg_aio_new_workq) {
1473 		if (!workq_aio_entry_add_locked(procp, entryp)) {
1474 			(void)os_ref_release(&entryp->aio_refcount);
1475 			return false;
1476 		}
1477 	} else {
1478 		aio_workq_t queue = aio_entry_workq(entryp);
1479 		aio_workq_lock_spin(queue);
1480 		aio_workq_add_entry_locked(queue, entryp);
1481 		waitq_wakeup64_one(&queue->aioq_waitq, CAST_EVENT64_T(queue),
1482 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
1483 		aio_workq_unlock(queue);
1484 	}
1485 
1486 	KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued) | DBG_FUNC_START,
1487 	    VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1488 	    entryp->flags, entryp->aiocb.aio_fildes, 0);
1489 	KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued) | DBG_FUNC_END,
1490 	    entryp->aiocb.aio_offset, 0, entryp->aiocb.aio_nbytes, 0, 0);
1491 	return true;
1492 }
1493 
1494 /*
1495  * EV_FLAG0/1 are filter specific flags.
1496  * Repurpose EV_FLAG0 to indicate the kevent is registered from kernel.
1497  */
1498 #define EV_KERNEL    EV_FLAG0
1499 
1500 /* Register a kevent for AIO completion notification. */
1501 static int
aio_register_kevent(proc_t procp,aio_workq_entry * entryp)1502 aio_register_kevent(proc_t procp, aio_workq_entry *entryp)
1503 {
1504 	struct kevent_qos_s kev;
1505 	struct fileproc *fp = NULL;
1506 	kqueue_t kqu;
1507 	int kqfd = entryp->aiocb.aio_sigevent.sigev_signo;
1508 	int error;
1509 
1510 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_register_kevent) | DBG_FUNC_START,
1511 	    VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp),
1512 	    VM_KERNEL_ADDRPERM(entryp->uaiocbp), kqfd, 0);
1513 
1514 	error = fp_get_ftype(procp, kqfd, DTYPE_KQUEUE, EBADF, &fp);
1515 	if (error) {
1516 		goto exit;
1517 	}
1518 
1519 	kqu.kq = (struct kqueue *)fp_get_data(fp);
1520 
1521 	memset(&kev, 0, sizeof(kev));
1522 	kev.ident = (uintptr_t)entryp->uaiocbp;
1523 	kev.filter = EVFILT_AIO;
1524 	/*
1525 	 * Set the EV_FLAG0 to indicate the event is registered from the kernel.
1526 	 * This flag later is checked in filt_aioattach() and to determine if
1527 	 * a kevent is registered from kernel or user-space.
1528 	 */
1529 	kev.flags = EV_ADD | EV_ENABLE | EV_CLEAR | EV_ONESHOT | EV_KERNEL;
1530 	kev.udata = entryp->aiocb.aio_sigevent.sigev_value.sival_ptr;
1531 	kev.data = (intptr_t)entryp;
1532 
1533 	error = kevent_register(kqu.kq, &kev, NULL);
1534 	assert((error & FILTER_REGISTER_WAIT) == 0);
1535 
1536 exit:
1537 	if (fp) {
1538 		fp_drop(procp, kqfd, fp, 0);
1539 	}
1540 
1541 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_register_kevent) | DBG_FUNC_END,
1542 	    VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp), error, 0, 0);
1543 
1544 	return error;
1545 }
1546 
1547 /*
1548  * lio_listio - initiate a list of IO requests.  We process the list of
1549  * aiocbs either synchronously (mode == LIO_WAIT) or asynchronously
1550  * (mode == LIO_NOWAIT).
1551  *
1552  * The caller gets error and return status for each aiocb in the list
1553  * via aio_error and aio_return.  We must keep completed requests until
1554  * released by the aio_return call.
1555  */
1556 int
lio_listio(proc_t p,struct lio_listio_args * uap,int * retval __unused)1557 lio_listio(proc_t p, struct lio_listio_args *uap, int *retval __unused)
1558 {
1559 	aio_workq_entry         *entries[AIO_LISTIO_MAX] = { };
1560 	user_addr_t              aiocbpp[AIO_LISTIO_MAX];
1561 	struct user_sigevent     aiosigev = { };
1562 	int                      result = 0;
1563 	int                      lio_count = 0;
1564 
1565 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_listio) | DBG_FUNC_START,
1566 	    VM_KERNEL_ADDRPERM(p), uap->nent, uap->mode, 0, 0);
1567 
1568 	if (!(uap->mode == LIO_NOWAIT || uap->mode == LIO_WAIT)) {
1569 		result = EINVAL;
1570 		goto ExitRoutine;
1571 	}
1572 
1573 	if (uap->nent < 1 || uap->nent > AIO_LISTIO_MAX) {
1574 		result = EINVAL;
1575 		goto ExitRoutine;
1576 	}
1577 
1578 	/*
1579 	 * Use sigevent passed in to lio_listio for each of our calls, but
1580 	 * only do completion notification after the last request completes.
1581 	 */
1582 	if (uap->sigp != USER_ADDR_NULL) {
1583 		result = aio_copy_in_sigev(p, uap->sigp, &aiosigev);
1584 		if (result) {
1585 			goto ExitRoutine;
1586 		}
1587 		result = aio_sigev_validate(&aiosigev);
1588 		if (result) {
1589 			goto ExitRoutine;
1590 		}
1591 	}
1592 
1593 	if (aio_copy_in_list(p, uap->aiocblist, aiocbpp, uap->nent)) {
1594 		result = EAGAIN;
1595 		goto ExitRoutine;
1596 	}
1597 
1598 	/*
1599 	 * allocate/parse all entries
1600 	 */
1601 	for (int i = 0; i < uap->nent; i++) {
1602 		aio_workq_entry *entryp;
1603 
1604 		/* NULL elements are legal so check for 'em */
1605 		if (aiocbpp[i] == USER_ADDR_NULL) {
1606 			continue;
1607 		}
1608 
1609 		entryp = aio_create_queue_entry(p, aiocbpp[i], AIO_LIO);
1610 		if (entryp == NULL) {
1611 			result = EAGAIN;
1612 			goto ExitRoutine;
1613 		}
1614 
1615 		/*
1616 		 * This refcount is cleaned up on exit if the entry
1617 		 * isn't submitted
1618 		 */
1619 		entries[lio_count++] = entryp;
1620 		if ((uap->mode == LIO_NOWAIT) &&
1621 		    (entryp->aiocb.aio_sigevent.sigev_notify != SIGEV_KEVENT)) {
1622 			/* Set signal hander, if any */
1623 			entryp->aiocb.aio_sigevent = aiosigev;
1624 		}
1625 	}
1626 
1627 	if (lio_count == 0) {
1628 		/* There's nothing to submit */
1629 		goto ExitRoutine;
1630 	}
1631 
1632 	/*
1633 	 * Past this point we're commited and will not bail out
1634 	 *
1635 	 * - keep a reference on the leader for LIO_WAIT
1636 	 * - perform the submissions and optionally wait
1637 	 */
1638 
1639 	aio_workq_entry *leader = entries[0];
1640 	if (uap->mode == LIO_WAIT) {
1641 		aio_entry_ref(leader); /* consumed below */
1642 	}
1643 
1644 	aio_proc_lock(p);
1645 
1646 	for (int i = 0; i < lio_count; i++) {
1647 		if (aio_try_enqueue_work_locked(p, entries[i], leader)) {
1648 			workq_aio_wakeup_thread(p); /* this may drop and reacquire the proc lock */
1649 			/*
1650 			 * For SIGEV_KEVENT, every AIO in the list would get its own kevent
1651 			 * notification upon completion as opposed to SIGEV_SIGNAL which a
1652 			 * single notification is deliverd when all AIOs have completed.
1653 			 */
1654 			if ((uap->mode == LIO_NOWAIT) &&
1655 			    (entries[i]->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT)) {
1656 				aio_register_kevent(p, entries[i]);
1657 			}
1658 			entries[i] = NULL; /* the entry was submitted */
1659 		} else {
1660 			result = EAGAIN;
1661 		}
1662 	}
1663 
1664 	if (uap->mode == LIO_WAIT && result == 0) {
1665 		leader->flags |= AIO_LIO_WAIT;
1666 
1667 		while (leader->lio_pending) {
1668 			/* If we were interrupted, fail out (even if all finished) */
1669 			if (msleep(leader, aio_proc_mutex(p),
1670 			    PCATCH | PRIBIO | PSPIN, "lio_listio", 0) != 0) {
1671 				result = EINTR;
1672 				break;
1673 			}
1674 		}
1675 
1676 		leader->flags &= ~AIO_LIO_WAIT;
1677 	}
1678 
1679 	aio_proc_unlock(p);
1680 
1681 	if (uap->mode == LIO_WAIT) {
1682 		aio_entry_unref(leader);
1683 	}
1684 
1685 ExitRoutine:
1686 	/* Consume unsubmitted entries */
1687 	for (int i = 0; i < lio_count; i++) {
1688 		if (entries[i]) {
1689 			aio_entry_unref(entries[i]);
1690 		}
1691 	}
1692 
1693 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_listio) | DBG_FUNC_END,
1694 	    VM_KERNEL_ADDRPERM(p), result, 0, 0, 0);
1695 
1696 	return result;
1697 }
1698 
1699 
1700 /*
1701  * aio worker thread.  this is where all the real work gets done.
1702  * we get a wake up call on sleep channel &aio_anchor.aio_async_workq
1703  * after new work is queued up.
1704  */
1705 __attribute__((noreturn))
1706 static void
aio_work_thread(void * arg __unused,wait_result_t wr __unused)1707 aio_work_thread(void *arg __unused, wait_result_t wr __unused)
1708 {
1709 	aio_workq_entry         *entryp;
1710 	int                     error;
1711 	vm_map_switch_context_t switch_ctx;
1712 	struct uthread          *uthreadp = NULL;
1713 	proc_t                  p = NULL;
1714 
1715 	for (;;) {
1716 		/*
1717 		 * returns with the entry ref'ed.
1718 		 * sleeps until work is available.
1719 		 */
1720 		entryp = aio_get_some_work();
1721 		p = entryp->procp;
1722 
1723 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread) | DBG_FUNC_START,
1724 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1725 		    entryp->flags, 0, 0);
1726 
1727 		/*
1728 		 * Assume the target's address space identity for the duration
1729 		 * of the IO.  Note: don't need to have the entryp locked,
1730 		 * because the proc and map don't change until it's freed.
1731 		 */
1732 		uthreadp = (struct uthread *) current_uthread();
1733 		assert(get_task_map(proc_task(current_proc())) != entryp->aio_map);
1734 		assert(uthreadp->uu_aio_task == NULL);
1735 
1736 		/*
1737 		 * workq entries at this stage cause _aio_exec() and _aio_exit() to
1738 		 * block until we hit `do_aio_completion_and_unlock()` below,
1739 		 * which means that it is safe to dereference p->task without
1740 		 * holding a lock or taking references.
1741 		 */
1742 		uthreadp->uu_aio_task = proc_task(p);
1743 		switch_ctx = vm_map_switch_to(entryp->aio_map);
1744 
1745 		if ((entryp->flags & AIO_READ) != 0) {
1746 			error = do_aio_read(entryp);
1747 		} else if ((entryp->flags & AIO_WRITE) != 0) {
1748 			error = do_aio_write(entryp);
1749 		} else if ((entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0) {
1750 			error = do_aio_fsync(entryp);
1751 		} else {
1752 			error = EINVAL;
1753 		}
1754 
1755 		/* Restore old map */
1756 		vm_map_switch_back(switch_ctx);
1757 		uthreadp->uu_aio_task = NULL;
1758 
1759 		/* liberate unused map */
1760 		vm_map_deallocate(entryp->aio_map);
1761 		entryp->aio_map = VM_MAP_NULL;
1762 
1763 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread) | DBG_FUNC_END,
1764 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1765 		    entryp->errorval, entryp->returnval, 0);
1766 
1767 		/* we're done with the IO request so pop it off the active queue and */
1768 		/* push it on the done queue */
1769 		aio_proc_lock(p);
1770 		entryp->errorval = error;
1771 		do_aio_completion_and_unlock(p, entryp, AIO_COMPLETED);
1772 	}
1773 }
1774 
1775 
1776 /*
1777  * aio_get_some_work - get the next async IO request that is ready to be executed.
1778  * aio_fsync complicates matters a bit since we cannot do the fsync until all async
1779  * IO requests at the time the aio_fsync call came in have completed.
1780  * NOTE - AIO_LOCK must be held by caller
1781  */
1782 static aio_workq_entry *
aio_get_some_work(void)1783 aio_get_some_work(void)
1784 {
1785 	aio_workq_entry *entryp = NULL;
1786 	aio_workq_t      queue = NULL;
1787 
1788 	/* Just one queue for the moment.  In the future there will be many. */
1789 	queue = &aio_anchor.aio_async_workqs[0];
1790 	aio_workq_lock_spin(queue);
1791 
1792 	/*
1793 	 * Hold the queue lock.
1794 	 *
1795 	 * pop some work off the work queue and add to our active queue
1796 	 * Always start with the queue lock held.
1797 	 */
1798 	while ((entryp = TAILQ_FIRST(&queue->aioq_entries))) {
1799 		/*
1800 		 * Pull of of work queue.  Once it's off, it can't be cancelled,
1801 		 * so we can take our ref once we drop the queue lock.
1802 		 */
1803 
1804 		aio_workq_remove_entry_locked(queue, entryp);
1805 
1806 		aio_workq_unlock(queue);
1807 
1808 		/*
1809 		 * Check if it's an fsync that must be delayed.  No need to lock the entry;
1810 		 * that flag would have been set at initialization.
1811 		 */
1812 		if ((entryp->flags & AIO_FSYNC) != 0) {
1813 			/*
1814 			 * Check for unfinished operations on the same file
1815 			 * in this proc's queue.
1816 			 */
1817 			aio_proc_lock_spin(entryp->procp);
1818 			if (aio_delay_fsync_request(entryp)) {
1819 				/* It needs to be delayed.  Put it back on the end of the work queue */
1820 				KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay) | DBG_FUNC_NONE,
1821 				    VM_KERNEL_ADDRPERM(entryp->procp),
1822 				    VM_KERNEL_ADDRPERM(entryp->uaiocbp), 0, 0, 0);
1823 
1824 				aio_proc_unlock(entryp->procp);
1825 
1826 				aio_workq_lock_spin(queue);
1827 				aio_workq_add_entry_locked(queue, entryp);
1828 				continue;
1829 			}
1830 			aio_proc_unlock(entryp->procp);
1831 		}
1832 
1833 		return entryp;
1834 	}
1835 
1836 	/* We will wake up when someone enqueues something */
1837 	waitq_assert_wait64(&queue->aioq_waitq, CAST_EVENT64_T(queue), THREAD_UNINT, 0);
1838 	aio_workq_unlock(queue);
1839 	thread_block(aio_work_thread);
1840 
1841 	__builtin_unreachable();
1842 }
1843 
1844 /*
1845  * aio_delay_fsync_request - look to see if this aio_fsync request should be delayed.
1846  * A big, simple hammer: only send it off if it's the most recently filed IO which has
1847  * not been completed.
1848  */
1849 static boolean_t
aio_delay_fsync_request(aio_workq_entry * entryp)1850 aio_delay_fsync_request(aio_workq_entry *entryp)
1851 {
1852 	if (proc_in_teardown(entryp->procp)) {
1853 		/*
1854 		 * we can't delay FSYNCS when in teardown as it will confuse _aio_exit,
1855 		 * if it was dequeued, then we must now commit to it
1856 		 */
1857 		return FALSE;
1858 	}
1859 
1860 	if (entryp == TAILQ_FIRST(&entryp->procp->p_aio_activeq)) {
1861 		return FALSE;
1862 	}
1863 
1864 	return TRUE;
1865 }
1866 
1867 static aio_workq_entry *
aio_create_queue_entry(proc_t procp,user_addr_t aiocbp,aio_entry_flags_t flags)1868 aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t flags)
1869 {
1870 	aio_workq_entry *entryp;
1871 
1872 	entryp = zalloc_flags(aio_workq_zonep, Z_WAITOK | Z_ZERO);
1873 	entryp->procp = procp;
1874 	entryp->uaiocbp = aiocbp;
1875 	entryp->flags = flags;
1876 	/* consumed in aio_return or _aio_exit */
1877 	os_ref_init(&entryp->aio_refcount, &aio_refgrp);
1878 
1879 	if (proc_is64bit(procp)) {
1880 		struct user64_aiocb aiocb64;
1881 
1882 		if (copyin(aiocbp, &aiocb64, sizeof(aiocb64)) != 0) {
1883 			goto error_exit;
1884 		}
1885 		do_munge_aiocb_user64_to_user(&aiocb64, &entryp->aiocb);
1886 	} else {
1887 		struct user32_aiocb aiocb32;
1888 
1889 		if (copyin(aiocbp, &aiocb32, sizeof(aiocb32)) != 0) {
1890 			goto error_exit;
1891 		}
1892 		do_munge_aiocb_user32_to_user(&aiocb32, &entryp->aiocb);
1893 	}
1894 
1895 	/* do some more validation on the aiocb and embedded file descriptor */
1896 	if (aio_validate(procp, entryp) != 0) {
1897 		goto error_exit;
1898 	}
1899 
1900 	/* get a reference on the current_thread, which is passed in vfs_context. */
1901 	entryp->context = *vfs_context_current();
1902 	thread_reference(entryp->context.vc_thread);
1903 	kauth_cred_ref(entryp->context.vc_ucred);
1904 
1905 	if (bootarg_aio_new_workq) {
1906 		entryp->aio_map = VM_MAP_NULL;
1907 		workq_aio_prepare(procp);
1908 	} else {
1909 		/* get a reference to the user land map in order to keep it around */
1910 		entryp->aio_map = get_task_map(proc_task(procp));
1911 		vm_map_reference(entryp->aio_map);
1912 	}
1913 	return entryp;
1914 
1915 error_exit:
1916 	zfree(aio_workq_zonep, entryp);
1917 	return NULL;
1918 }
1919 
1920 
1921 /*
1922  * aio_queue_async_request - queue up an async IO request on our work queue then
1923  * wake up one of our worker threads to do the actual work.  We get a reference
1924  * to our caller's user land map in order to keep it around while we are
1925  * processing the request.
1926  */
1927 static int
aio_queue_async_request(proc_t procp,user_addr_t aiocbp,aio_entry_flags_t flags)1928 aio_queue_async_request(proc_t procp, user_addr_t aiocbp,
1929     aio_entry_flags_t flags)
1930 {
1931 	aio_workq_entry *entryp;
1932 	int              result;
1933 
1934 	entryp = aio_create_queue_entry(procp, aiocbp, flags);
1935 	if (entryp == NULL) {
1936 		result = EAGAIN;
1937 		goto error_noalloc;
1938 	}
1939 
1940 	aio_proc_lock(procp);
1941 	if (!aio_try_enqueue_work_locked(procp, entryp, NULL)) {
1942 		result = EAGAIN;
1943 		goto error_exit;
1944 	}
1945 
1946 	if ((entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) &&
1947 	    (aio_register_kevent(procp, entryp) != 0)) {
1948 		result = EAGAIN;
1949 		goto error_exit;
1950 	}
1951 	workq_aio_wakeup_thread_and_unlock(procp);
1952 	return 0;
1953 
1954 error_exit:
1955 	/*
1956 	 * This entry has not been queued up so no worries about
1957 	 * unlocked state and aio_map
1958 	 */
1959 	aio_proc_unlock(procp);
1960 	aio_free_request(entryp);
1961 error_noalloc:
1962 	return result;
1963 }
1964 
1965 
1966 /*
1967  * aio_free_request - remove our reference on the user land map and
1968  * free the work queue entry resources.  The entry is off all lists
1969  * and has zero refcount, so no one can have a pointer to it.
1970  */
1971 static void
aio_free_request(aio_workq_entry * entryp)1972 aio_free_request(aio_workq_entry *entryp)
1973 {
1974 	if (entryp->aio_proc_link.tqe_prev || entryp->aio_workq_link.tqe_prev) {
1975 		panic("aio_workq_entry %p being freed while still enqueued", entryp);
1976 	}
1977 
1978 	/* remove our reference to the user land map. */
1979 	if (VM_MAP_NULL != entryp->aio_map) {
1980 		vm_map_deallocate(entryp->aio_map);
1981 	}
1982 
1983 	/* remove our reference to thread which enqueued the request */
1984 	if (entryp->context.vc_thread) {
1985 		thread_deallocate(entryp->context.vc_thread);
1986 	}
1987 	kauth_cred_unref(&entryp->context.vc_ucred);
1988 
1989 	zfree(aio_workq_zonep, entryp);
1990 }
1991 
1992 
1993 /*
1994  * aio_validate
1995  *
1996  * validate the aiocb passed in by one of the aio syscalls.
1997  */
1998 static int
aio_validate(proc_t p,aio_workq_entry * entryp)1999 aio_validate(proc_t p, aio_workq_entry *entryp)
2000 {
2001 	struct fileproc *fp;
2002 	int              flag;
2003 	int              result;
2004 
2005 	result = 0;
2006 
2007 	if ((entryp->flags & AIO_LIO) != 0) {
2008 		if (entryp->aiocb.aio_lio_opcode == LIO_READ) {
2009 			entryp->flags |= AIO_READ;
2010 		} else if (entryp->aiocb.aio_lio_opcode == LIO_WRITE) {
2011 			entryp->flags |= AIO_WRITE;
2012 		} else if (entryp->aiocb.aio_lio_opcode == LIO_NOP) {
2013 			return 0;
2014 		} else {
2015 			return EINVAL;
2016 		}
2017 	}
2018 
2019 	flag = FREAD;
2020 	if ((entryp->flags & (AIO_WRITE | AIO_FSYNC | AIO_DSYNC)) != 0) {
2021 		flag = FWRITE;
2022 	}
2023 
2024 	if ((entryp->flags & (AIO_READ | AIO_WRITE)) != 0) {
2025 		if (entryp->aiocb.aio_nbytes > INT_MAX ||
2026 		    entryp->aiocb.aio_buf == USER_ADDR_NULL ||
2027 		    entryp->aiocb.aio_offset < 0) {
2028 			return EINVAL;
2029 		}
2030 	}
2031 
2032 	result = aio_sigev_validate(&entryp->aiocb.aio_sigevent);
2033 	if (result) {
2034 		return result;
2035 	}
2036 
2037 	/* validate the file descriptor and that the file was opened
2038 	 * for the appropriate read / write access.
2039 	 */
2040 	proc_fdlock(p);
2041 
2042 	fp = fp_get_noref_locked(p, entryp->aiocb.aio_fildes);
2043 	if (fp == NULL) {
2044 		result = EBADF;
2045 	} else if ((fp->fp_glob->fg_flag & flag) == 0) {
2046 		/* we don't have read or write access */
2047 		result = EBADF;
2048 	} else if (FILEGLOB_DTYPE(fp->fp_glob) != DTYPE_VNODE) {
2049 		/* this is not a file */
2050 		result = ESPIPE;
2051 	} else {
2052 		fp->fp_flags |= FP_AIOISSUED;
2053 	}
2054 
2055 	proc_fdunlock(p);
2056 
2057 	return result;
2058 }
2059 
2060 /*
2061  * do_aio_completion_and_unlock.  Handle async IO completion.
2062  */
2063 static void
do_aio_completion_and_unlock(proc_t p,aio_workq_entry * entryp,aio_entry_flags_t reason)2064 do_aio_completion_and_unlock(proc_t p, aio_workq_entry *entryp,
2065     aio_entry_flags_t reason)
2066 {
2067 	aio_workq_entry *leader = entryp->lio_leader;
2068 	int              lio_pending = 0;
2069 	bool             do_signal, do_kevent;
2070 
2071 	ASSERT_AIO_PROC_LOCK_OWNED(p);
2072 
2073 	aio_proc_move_done_locked(p, entryp);
2074 	entryp->flags |= reason;
2075 
2076 	if (leader) {
2077 		lio_pending = --leader->lio_pending;
2078 		if (lio_pending < 0) {
2079 			panic("lio_pending accounting mistake");
2080 		}
2081 		if (lio_pending == 0 && (leader->flags & AIO_LIO_WAIT)) {
2082 			wakeup(leader);
2083 		}
2084 		entryp->lio_leader = NULL; /* no dangling pointers please */
2085 	}
2086 
2087 	/*
2088 	 * need to handle case where a process is trying to exit, exec, or
2089 	 * close and is currently waiting for active aio requests to complete.
2090 	 * If AIO_CLEANUP_WAIT is set then we need to look to see if there are any
2091 	 * other requests in the active queue for this process.  If there are
2092 	 * none then wakeup using the AIO_CLEANUP_SLEEP_CHAN tsleep channel.
2093 	 * If there are some still active then do nothing - we only want to
2094 	 * wakeup when all active aio requests for the process are complete.
2095 	 */
2096 	do_signal = do_kevent = false;
2097 	if (__improbable(entryp->flags & AIO_EXIT_WAIT)) {
2098 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait) | DBG_FUNC_NONE,
2099 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2100 		    0, 0, 0);
2101 
2102 		if (!aio_has_active_requests_for_process(p)) {
2103 			/*
2104 			 * no active aio requests for this process, continue exiting.  In this
2105 			 * case, there should be no one else waiting on the proc in AIO...
2106 			 */
2107 			wakeup_one((caddr_t)&p->AIO_CLEANUP_SLEEP_CHAN);
2108 
2109 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake) | DBG_FUNC_NONE,
2110 			    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2111 			    0, 0, 0);
2112 		}
2113 	} else if (entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2114 		/*
2115 		 * If this was the last request in the group, or not part of
2116 		 * a group, and that a signal is desired, send one.
2117 		 */
2118 		do_signal = (lio_pending == 0);
2119 	} else if (entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) {
2120 		/*
2121 		 * For SIGEV_KEVENT, every AIO (even it is part of a group) would get
2122 		 * a kevent notification.
2123 		 */
2124 		do_kevent = true;
2125 	}
2126 
2127 	if (__improbable(entryp->flags & AIO_CLOSE_WAIT)) {
2128 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait) | DBG_FUNC_NONE,
2129 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2130 		    0, 0, 0);
2131 
2132 		if (!aio_proc_has_active_requests_for_file(p, entryp->aiocb.aio_fildes)) {
2133 			/* Can't wakeup_one(); multiple closes might be in progress. */
2134 			wakeup(&p->AIO_CLEANUP_SLEEP_CHAN);
2135 
2136 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake) | DBG_FUNC_NONE,
2137 			    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2138 			    0, 0, 0);
2139 		}
2140 	}
2141 
2142 	aio_proc_unlock(p);
2143 
2144 	if (do_signal) {
2145 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_sig) | DBG_FUNC_NONE,
2146 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2147 		    entryp->aiocb.aio_sigevent.sigev_signo, 0, 0);
2148 
2149 		psignal(p, entryp->aiocb.aio_sigevent.sigev_signo);
2150 	} else if (do_kevent) {
2151 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_kevent) | DBG_FUNC_NONE,
2152 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2153 		    entryp->aiocb.aio_sigevent.sigev_signo, 0, 0);
2154 
2155 		/* We only support one event type for either completed/cancelled AIO. */
2156 		lck_mtx_lock(&aio_klist_lock);
2157 		KNOTE(&aio_klist, 1);
2158 		lck_mtx_unlock(&aio_klist_lock);
2159 	}
2160 
2161 	/*
2162 	 * A thread in aio_suspend() wants to known about completed IOs.  If it checked
2163 	 * the done list before we moved our AIO there, then it already asserted its wait,
2164 	 * and we can wake it up without holding the lock.  If it checked the list after
2165 	 * we did our move, then it already has seen the AIO that we moved.  Herego, we
2166 	 * can do our wakeup without holding the lock.
2167 	 */
2168 	wakeup(&p->AIO_SUSPEND_SLEEP_CHAN);
2169 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_suspend_wake) | DBG_FUNC_NONE,
2170 	    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp), 0, 0, 0);
2171 
2172 	aio_entry_unref(entryp); /* see aio_try_enqueue_work_locked */
2173 	if (leader) {
2174 		aio_entry_unref(leader); /* see lio_listio */
2175 	}
2176 }
2177 
2178 
2179 /*
2180  * do_aio_read
2181  */
2182 static int
do_aio_read(aio_workq_entry * entryp)2183 do_aio_read(aio_workq_entry *entryp)
2184 {
2185 	struct proc     *p = entryp->procp;
2186 	struct fileproc *fp;
2187 	int error;
2188 
2189 	if ((error = fp_lookup(p, entryp->aiocb.aio_fildes, &fp, 0))) {
2190 		return error;
2191 	}
2192 
2193 	if (fp->fp_glob->fg_flag & FREAD) {
2194 		error = dofileread(&entryp->context, fp,
2195 		    entryp->aiocb.aio_buf,
2196 		    entryp->aiocb.aio_nbytes,
2197 		    entryp->aiocb.aio_offset, FOF_OFFSET,
2198 		    &entryp->returnval);
2199 	} else {
2200 		error = EBADF;
2201 	}
2202 
2203 	fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2204 	return error;
2205 }
2206 
2207 
2208 /*
2209  * do_aio_write
2210  */
2211 static int
do_aio_write(aio_workq_entry * entryp)2212 do_aio_write(aio_workq_entry *entryp)
2213 {
2214 	struct proc     *p = entryp->procp;
2215 	struct fileproc *fp;
2216 	int error;
2217 
2218 	if ((error = fp_lookup(p, entryp->aiocb.aio_fildes, &fp, 0))) {
2219 		return error;
2220 	}
2221 
2222 	if (fp->fp_glob->fg_flag & FWRITE) {
2223 		int flags = 0;
2224 
2225 		if ((fp->fp_glob->fg_flag & O_APPEND) == 0) {
2226 			flags |= FOF_OFFSET;
2227 		}
2228 
2229 		/* NB: tell dofilewrite the offset, and to use the proc cred */
2230 		error = dofilewrite(&entryp->context,
2231 		    fp,
2232 		    entryp->aiocb.aio_buf,
2233 		    entryp->aiocb.aio_nbytes,
2234 		    entryp->aiocb.aio_offset,
2235 		    flags,
2236 		    &entryp->returnval);
2237 	} else {
2238 		error = EBADF;
2239 	}
2240 
2241 	fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2242 	return error;
2243 }
2244 
2245 
2246 /*
2247  * aio_has_active_requests_for_process - return whether the process has active
2248  * requests pending.
2249  */
2250 static bool
aio_has_active_requests_for_process(proc_t procp)2251 aio_has_active_requests_for_process(proc_t procp)
2252 {
2253 	return !TAILQ_EMPTY(&procp->p_aio_activeq);
2254 }
2255 
2256 /*
2257  * Called with the proc locked.
2258  */
2259 static bool
aio_proc_has_active_requests_for_file(proc_t procp,int fd)2260 aio_proc_has_active_requests_for_file(proc_t procp, int fd)
2261 {
2262 	aio_workq_entry *entryp;
2263 
2264 	TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) {
2265 		if (entryp->aiocb.aio_fildes == fd) {
2266 			return true;
2267 		}
2268 	}
2269 
2270 	return false;
2271 }
2272 
2273 
2274 /*
2275  * do_aio_fsync
2276  */
2277 static int
do_aio_fsync(aio_workq_entry * entryp)2278 do_aio_fsync(aio_workq_entry *entryp)
2279 {
2280 	struct proc            *p = entryp->procp;
2281 	struct vnode           *vp;
2282 	struct fileproc        *fp;
2283 	int                     sync_flag;
2284 	int                     error;
2285 
2286 	/*
2287 	 * We are never called unless either AIO_FSYNC or AIO_DSYNC are set.
2288 	 *
2289 	 * If AIO_DSYNC is set, we can tell the lower layers that it is OK
2290 	 * to mark for update the metadata not strictly necessary for data
2291 	 * retrieval, rather than forcing it to disk.
2292 	 *
2293 	 * If AIO_FSYNC is set, we have to also wait for metadata not really
2294 	 * necessary to data retrival are committed to stable storage (e.g.
2295 	 * atime, mtime, ctime, etc.).
2296 	 *
2297 	 * Metadata necessary for data retrieval ust be committed to stable
2298 	 * storage in either case (file length, etc.).
2299 	 */
2300 	if (entryp->flags & AIO_FSYNC) {
2301 		sync_flag = MNT_WAIT;
2302 	} else {
2303 		sync_flag = MNT_DWAIT;
2304 	}
2305 
2306 	error = fp_get_ftype(p, entryp->aiocb.aio_fildes, DTYPE_VNODE, ENOTSUP, &fp);
2307 	if (error != 0) {
2308 		entryp->returnval = -1;
2309 		return error;
2310 	}
2311 	vp = fp_get_data(fp);
2312 
2313 	if ((error = vnode_getwithref(vp)) == 0) {
2314 		error = VNOP_FSYNC(vp, sync_flag, &entryp->context);
2315 
2316 		(void)vnode_put(vp);
2317 	} else {
2318 		entryp->returnval = -1;
2319 	}
2320 
2321 	fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2322 	return error;
2323 }
2324 
2325 
2326 /*
2327  * is_already_queued - runs through our queues to see if the given
2328  * aiocbp / process is there.  Returns TRUE if there is a match
2329  * on any of our aio queues.
2330  *
2331  * Called with proc aio lock held (can be held spin)
2332  */
2333 static boolean_t
is_already_queued(proc_t procp,user_addr_t aiocbp)2334 is_already_queued(proc_t procp, user_addr_t aiocbp)
2335 {
2336 	aio_workq_entry *entryp;
2337 	boolean_t        result;
2338 
2339 	result = FALSE;
2340 
2341 	/* look for matches on our queue of async IO requests that have completed */
2342 	TAILQ_FOREACH(entryp, &procp->p_aio_doneq, aio_proc_link) {
2343 		if (aiocbp == entryp->uaiocbp) {
2344 			result = TRUE;
2345 			goto ExitThisRoutine;
2346 		}
2347 	}
2348 
2349 	/* look for matches on our queue of active async IO requests */
2350 	TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) {
2351 		if (aiocbp == entryp->uaiocbp) {
2352 			result = TRUE;
2353 			goto ExitThisRoutine;
2354 		}
2355 	}
2356 
2357 ExitThisRoutine:
2358 	return result;
2359 }
2360 
2361 
2362 /*
2363  * aio initialization
2364  */
2365 __private_extern__ void
aio_init(void)2366 aio_init(void)
2367 {
2368 	for (int i = 0; i < AIO_NUM_WORK_QUEUES; i++) {
2369 		aio_workq_init(&aio_anchor.aio_async_workqs[i]);
2370 	}
2371 
2372 	if (bootarg_aio_new_workq) {
2373 		printf("New aio workqueue implementation selected\n");
2374 	} else {
2375 		_aio_create_worker_threads(aio_worker_threads);
2376 	}
2377 
2378 	klist_init(&aio_klist);
2379 
2380 	clock_interval_to_absolutetime_interval(aio_wq_reduce_pool_window.usecs,
2381 	    NSEC_PER_USEC, &aio_wq_reduce_pool_window.abstime);
2382 }
2383 
2384 
2385 /*
2386  * aio worker threads created here.
2387  */
2388 __private_extern__ void
_aio_create_worker_threads(int num)2389 _aio_create_worker_threads(int num)
2390 {
2391 	int i;
2392 
2393 	/* create some worker threads to handle the async IO requests */
2394 	for (i = 0; i < num; i++) {
2395 		thread_t                myThread;
2396 
2397 		if (KERN_SUCCESS != kernel_thread_start(aio_work_thread, NULL, &myThread)) {
2398 			printf("%s - failed to create a work thread \n", __FUNCTION__);
2399 		} else {
2400 			thread_deallocate(myThread);
2401 		}
2402 	}
2403 }
2404 
2405 /*
2406  * Return the current activation utask
2407  */
2408 task_t
get_aiotask(void)2409 get_aiotask(void)
2410 {
2411 	return current_uthread()->uu_aio_task;
2412 }
2413 
2414 
2415 /*
2416  * In the case of an aiocb from a
2417  * 32-bit process we need to expand some longs and pointers to the correct
2418  * sizes in order to let downstream code always work on the same type of
2419  * aiocb (in our case that is a user_aiocb)
2420  */
2421 static void
do_munge_aiocb_user32_to_user(struct user32_aiocb * my_aiocbp,struct user_aiocb * the_user_aiocbp)2422 do_munge_aiocb_user32_to_user(struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp)
2423 {
2424 	the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
2425 	the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
2426 	the_user_aiocbp->aio_buf = CAST_USER_ADDR_T(my_aiocbp->aio_buf);
2427 	the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
2428 	the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
2429 	the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
2430 
2431 	/* special case here.  since we do not know if sigev_value is an */
2432 	/* int or a ptr we do NOT cast the ptr to a user_addr_t.   This  */
2433 	/* means if we send this info back to user space we need to remember */
2434 	/* sigev_value was not expanded for the 32-bit case.  */
2435 	/* NOTE - this does NOT affect us since we don't support sigev_value */
2436 	/* yet in the aio context.  */
2437 	//LP64
2438 	the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
2439 	the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
2440 	the_user_aiocbp->aio_sigevent.sigev_value.sival_ptr =
2441 	    my_aiocbp->aio_sigevent.sigev_value.sival_ptr;
2442 	the_user_aiocbp->aio_sigevent.sigev_notify_function =
2443 	    CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_function);
2444 	the_user_aiocbp->aio_sigevent.sigev_notify_attributes =
2445 	    CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_attributes);
2446 }
2447 
2448 /* Similar for 64-bit user process, so that we don't need to satisfy
2449  * the alignment constraints of the original user64_aiocb
2450  */
2451 #if !__LP64__
2452 __dead2
2453 #endif
2454 static void
do_munge_aiocb_user64_to_user(struct user64_aiocb * my_aiocbp,struct user_aiocb * the_user_aiocbp)2455 do_munge_aiocb_user64_to_user(struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp)
2456 {
2457 #if __LP64__
2458 	the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
2459 	the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
2460 	the_user_aiocbp->aio_buf = my_aiocbp->aio_buf;
2461 	the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
2462 	the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
2463 	the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
2464 
2465 	the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
2466 	the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
2467 	the_user_aiocbp->aio_sigevent.sigev_value.sival_ptr =
2468 	    my_aiocbp->aio_sigevent.sigev_value.sival_ptr;
2469 	the_user_aiocbp->aio_sigevent.sigev_notify_function =
2470 	    my_aiocbp->aio_sigevent.sigev_notify_function;
2471 	the_user_aiocbp->aio_sigevent.sigev_notify_attributes =
2472 	    my_aiocbp->aio_sigevent.sigev_notify_attributes;
2473 #else
2474 #pragma unused(my_aiocbp, the_user_aiocbp)
2475 	panic("64bit process on 32bit kernel is not supported");
2476 #endif
2477 }
2478 
2479 
2480 static int
filt_aioattach(struct knote * kn,struct kevent_qos_s * kev)2481 filt_aioattach(struct knote *kn, struct kevent_qos_s *kev)
2482 {
2483 	aio_workq_entry *entryp = (aio_workq_entry *)kev->data;
2484 
2485 	/* Don't allow kevent registration from the user-space. */
2486 	if ((kev->flags & EV_KERNEL) == 0) {
2487 		return EPERM;
2488 	}
2489 
2490 	kev->flags &= ~EV_KERNEL;
2491 	/* Clear the 'kn_fflags' state afte the knote has been processed. */
2492 	kn->kn_flags |= EV_CLEAR;
2493 
2494 	/* Associate the knote with the AIO work. */
2495 	knote_kn_hook_set_raw(kn, (void *)entryp);
2496 
2497 	lck_mtx_lock(&aio_klist_lock);
2498 	KNOTE_ATTACH(&aio_klist, kn);
2499 	lck_mtx_unlock(&aio_klist_lock);
2500 
2501 	return 0;
2502 }
2503 
2504 static void
filt_aiodetach(struct knote * kn)2505 filt_aiodetach(struct knote *kn)
2506 {
2507 	lck_mtx_lock(&aio_klist_lock);
2508 	KNOTE_DETACH(&aio_klist, kn);
2509 	lck_mtx_unlock(&aio_klist_lock);
2510 }
2511 
2512 /*
2513  * The 'f_event' is called with 'aio_klist_lock' held when KNOTE() was called
2514  * in do_aio_completion_and_unlock().
2515  */
2516 static int
filt_aioevent(struct knote * kn,long hint)2517 filt_aioevent(struct knote *kn, long hint)
2518 {
2519 	aio_workq_entry *entryp;
2520 	int activate = 0;
2521 
2522 	/*
2523 	 * The 'f_event' and 'f_process' can run concurrently so it is possible
2524 	 * the aio_workq_entry has been detached from the knote when the
2525 	 * filt_aioprocess() was called earlier. In this case, we will skip
2526 	 * activating the event.
2527 	 */
2528 	entryp = knote_kn_hook_get_raw(kn);
2529 	if (__improbable(entryp == NULL)) {
2530 		goto out;
2531 	}
2532 
2533 	/* We can only activate the filter if the AIO work has completed. */
2534 	if (entryp->flags & AIO_COMPLETED) {
2535 		kn->kn_fflags |= hint;
2536 		activate = FILTER_ACTIVE;
2537 	}
2538 
2539 out:
2540 	return activate;
2541 }
2542 
2543 static int
filt_aiotouch(struct knote * kn,struct kevent_qos_s * kev)2544 filt_aiotouch(struct knote *kn, struct kevent_qos_s *kev)
2545 {
2546 	panic("%s: kn %p kev %p (NOT EXPECTED TO BE CALLED!!)", __func__, kn, kev);
2547 }
2548 
2549 static int
filt_aioprocess(struct knote * kn,struct kevent_qos_s * kev)2550 filt_aioprocess(struct knote *kn, struct kevent_qos_s *kev)
2551 {
2552 	aio_workq_entry *entryp;
2553 	proc_t p;
2554 	int res = 0;
2555 
2556 	entryp = knote_kn_hook_get_raw(kn);
2557 	assert(entryp);
2558 	p = entryp->procp;
2559 
2560 	lck_mtx_lock(&aio_klist_lock);
2561 	if (kn->kn_fflags) {
2562 		/* Propagate the error status and return value back to the user. */
2563 		kn->kn_ext[0] = entryp->errorval;
2564 		kn->kn_ext[1] = entryp->returnval;
2565 		knote_fill_kevent(kn, kev, 0);
2566 		knote_kn_hook_set_raw(kn, NULL);
2567 
2568 		aio_proc_lock(p);
2569 		aio_proc_remove_done_locked(p, entryp);
2570 		aio_proc_unlock(p);
2571 		aio_entry_unref(entryp);
2572 
2573 		res = FILTER_ACTIVE;
2574 	}
2575 	lck_mtx_unlock(&aio_klist_lock);
2576 
2577 	return res;
2578 }
2579 
2580 SECURITY_READ_ONLY_EARLY(struct filterops) aio_filtops = {
2581 	.f_isfd = 0,
2582 	.f_attach = filt_aioattach,
2583 	.f_detach = filt_aiodetach,
2584 	.f_event = filt_aioevent,
2585 	.f_touch = filt_aiotouch,
2586 	.f_process = filt_aioprocess,
2587 };
2588 
2589 #pragma mark per process aio workqueue
2590 
2591 /*
2592  * The per process workq threads call this function to handle the aio request. The threads
2593  * belong to the same process so we don't need to change the vm maps as we would for kernel
2594  * threads.
2595  */
2596 static int
workq_aio_process_entry(aio_workq_entry * entryp)2597 workq_aio_process_entry(aio_workq_entry *entryp)
2598 {
2599 	proc_t p = entryp->procp;
2600 	int error;
2601 
2602 	assert(current_proc() == p && current_thread() != vfs_context_thread(&entryp->context));
2603 
2604 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_WQ_process_entry) | DBG_FUNC_START,
2605 	    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2606 	    entryp->flags, 0, 0);
2607 
2608 	if ((entryp->flags & AIO_READ) != 0) {
2609 		error = do_aio_read(entryp);
2610 	} else if ((entryp->flags & AIO_WRITE) != 0) {
2611 		error = do_aio_write(entryp);
2612 	} else if ((entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0) {
2613 		if ((entryp->flags & AIO_FSYNC) != 0) {
2614 			/*
2615 			 * Check for unfinished operations on the same file
2616 			 * in this proc's queue.
2617 			 */
2618 			aio_proc_lock_spin(p);
2619 			if (aio_delay_fsync_request(entryp)) {
2620 				/* It needs to be delayed.  Put it back on the end of the work queue */
2621 				KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay) | DBG_FUNC_NONE,
2622 				    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2623 				    0, 0, 0);
2624 
2625 				/* The references on this entry havn't been consumed */
2626 				if (!workq_aio_entry_add_locked(p, entryp)) {
2627 					entryp->errorval = ECANCELED;
2628 					entryp->returnval = -1;
2629 
2630 					/* Now it's officially cancelled.  Do the completion */
2631 					KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq) | DBG_FUNC_NONE,
2632 					    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2633 					    entryp->aiocb.aio_fildes, 0, 0);
2634 
2635 					do_aio_completion_and_unlock(p, entryp, AIO_CANCELLED);
2636 				} else {
2637 					workq_aio_wakeup_thread_and_unlock(p);
2638 				}
2639 				return 0;
2640 			}
2641 			aio_proc_unlock(entryp->procp);
2642 		}
2643 		error = do_aio_fsync(entryp);
2644 	} else {
2645 		error = EINVAL;
2646 	}
2647 
2648 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_WQ_process_entry) | DBG_FUNC_END,
2649 	    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2650 	    entryp->errorval, entryp->returnval, 0);
2651 
2652 	/* we're done with the IO request so pop it off the active queue and */
2653 	/* push it on the done queue */
2654 	aio_proc_lock(p);
2655 	entryp->errorval = error;
2656 	do_aio_completion_and_unlock(p, entryp, AIO_COMPLETED);
2657 	return 0;
2658 }
2659 
2660 /*
2661  * The functions below implement a workqueue for aio which is taken from the
2662  * workqueue implementation for libdispatch/pthreads. They are stripped down versions
2663  * of the corresponding functions for libdispatch/pthreads.
2664  */
2665 
2666 static int
2667 aio_workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
2668 {
2669 #pragma unused(arg2)
2670 	struct aio_workq_usec_var *v = arg1;
2671 	int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
2672 	if (error || !req->newptr) {
2673 		return error;
2674 	}
2675 	clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
2676 	    &v->abstime);
2677 	return 0;
2678 }
2679 
2680 #pragma mark wq_flags
2681 
2682 #define AIO_WQ_DEAD 0x1000
2683 
2684 static inline uint32_t
_wa_flags(workq_aio_t wq_aio)2685 _wa_flags(workq_aio_t wq_aio)
2686 {
2687 	return os_atomic_load(&wq_aio->wa_flags, relaxed);
2688 }
2689 
2690 static inline bool
_wq_exiting(workq_aio_t wq_aio)2691 _wq_exiting(workq_aio_t wq_aio)
2692 {
2693 	return _wa_flags(wq_aio) & WQ_EXITING;
2694 }
2695 
2696 static inline bool
_wq_dead(workq_aio_t wq_aio)2697 _wq_dead(workq_aio_t wq_aio)
2698 {
2699 	return _wa_flags(wq_aio) & AIO_WQ_DEAD;
2700 }
2701 
2702 #define AIO_WQPTR_IS_INITING_VALUE ((workq_aio_t)~(uintptr_t)0)
2703 
2704 static workq_aio_t
proc_get_aio_wqptr_fast(struct proc * p)2705 proc_get_aio_wqptr_fast(struct proc *p)
2706 {
2707 	return os_atomic_load(&p->p_aio_wqptr, relaxed);
2708 }
2709 
2710 static workq_aio_t
proc_get_aio_wqptr(struct proc * p)2711 proc_get_aio_wqptr(struct proc *p)
2712 {
2713 	workq_aio_t wq_aio = proc_get_aio_wqptr_fast(p);
2714 	return wq_aio == AIO_WQPTR_IS_INITING_VALUE ? NULL : wq_aio;
2715 }
2716 
2717 static void
proc_set_aio_wqptr(struct proc * p,workq_aio_t wq_aio)2718 proc_set_aio_wqptr(struct proc *p, workq_aio_t wq_aio)
2719 {
2720 	wq_aio = os_atomic_xchg(&p->p_aio_wqptr, wq_aio, release);
2721 	if (wq_aio == AIO_WQPTR_IS_INITING_VALUE) {
2722 		proc_lock(p);
2723 		thread_wakeup(&p->p_aio_wqptr);
2724 		proc_unlock(p);
2725 	}
2726 }
2727 
2728 static bool
proc_init_aio_wqptr_or_wait(struct proc * p)2729 proc_init_aio_wqptr_or_wait(struct proc *p)
2730 {
2731 	workq_aio_t wq_aio;
2732 
2733 	proc_lock(p);
2734 	wq_aio = os_atomic_load(&p->p_aio_wqptr, relaxed);
2735 
2736 	if (wq_aio == NULL) {
2737 		os_atomic_store(&p->p_aio_wqptr, AIO_WQPTR_IS_INITING_VALUE, relaxed);
2738 		proc_unlock(p);
2739 		return true;
2740 	}
2741 
2742 	if (wq_aio == AIO_WQPTR_IS_INITING_VALUE) {
2743 		assert_wait(&p->p_aio_wqptr, THREAD_UNINT);
2744 		proc_unlock(p);
2745 		thread_block(THREAD_CONTINUE_NULL);
2746 	} else {
2747 		proc_unlock(p);
2748 	}
2749 	return false;
2750 }
2751 
2752 static inline event_t
workq_aio_parked_wait_event(struct uthread * uth)2753 workq_aio_parked_wait_event(struct uthread *uth)
2754 {
2755 	return (event_t)&uth->uu_workq_stackaddr;
2756 }
2757 
2758 static inline void
workq_aio_thread_wakeup(struct uthread * uth)2759 workq_aio_thread_wakeup(struct uthread *uth)
2760 {
2761 	thread_wakeup_thread(workq_aio_parked_wait_event(uth), get_machthread(uth));
2762 }
2763 
2764 /*
2765  * Routine:	workq_aio_mark_exiting
2766  *
2767  * Function:	Mark the work queue such that new threads will not be added to the
2768  *		work queue after we return.
2769  *
2770  * Conditions:	Called against the current process.
2771  */
2772 static void
workq_aio_mark_exiting(proc_t p)2773 workq_aio_mark_exiting(proc_t p)
2774 {
2775 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
2776 	uint32_t wq_flags;
2777 
2778 	if (!wq_aio) {
2779 		return;
2780 	}
2781 
2782 	wq_flags = os_atomic_or_orig(&wq_aio->wa_flags, WQ_EXITING, relaxed);
2783 	if (__improbable(wq_flags & WQ_EXITING)) {
2784 		panic("workq_aio_mark_exiting_locked called twice");
2785 	}
2786 
2787 	/*
2788 	 * Opportunistically try to cancel thread calls that are likely in flight.
2789 	 * workq_aio_exit() will do the proper cleanup.
2790 	 */
2791 	if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
2792 		thread_call_cancel(wq_aio->wa_death_call);
2793 	}
2794 }
2795 
2796 static void
workq_aio_exit(proc_t p)2797 workq_aio_exit(proc_t p)
2798 {
2799 	workq_aio_t wq_aio;
2800 
2801 	wq_aio = os_atomic_xchg(&p->p_aio_wqptr, NULL, release);
2802 
2803 	if (!wq_aio) {
2804 		return;
2805 	}
2806 
2807 	/*
2808 	 * Thread calls are always scheduled by the proc itself or under the
2809 	 * workqueue spinlock if WQ_EXITING is not yet set.
2810 	 *
2811 	 * Either way, when this runs, the proc has no threads left beside
2812 	 * the one running this very code, so we know no thread call can be
2813 	 * dispatched anymore.
2814 	 */
2815 
2816 	thread_call_cancel_wait(wq_aio->wa_death_call);
2817 	thread_call_free(wq_aio->wa_death_call);
2818 
2819 	/*
2820 	 * Clean up workqueue data structures for threads that exited and
2821 	 * didn't get a chance to clean up after themselves.
2822 	 *
2823 	 * idle/new threads should have been interrupted and died on their own
2824 	 */
2825 
2826 	assert(TAILQ_EMPTY(&wq_aio->wa_aioq_entries));
2827 	assert(TAILQ_EMPTY(&wq_aio->wa_thrunlist));
2828 
2829 	if (wq_aio->wa_nthreads) {
2830 		os_atomic_or(&wq_aio->wa_flags, AIO_WQ_DEAD, relaxed);
2831 		aio_proc_lock_spin(p);
2832 		if (wq_aio->wa_nthreads) {
2833 			struct uthread *uth;
2834 
2835 			TAILQ_FOREACH(uth, &wq_aio->wa_thidlelist, uu_workq_entry) {
2836 				if (uth->uu_workq_flags & UT_WORKQ_DYING) {
2837 					workq_aio_thread_wakeup(uth);
2838 					continue;
2839 				}
2840 				wq_aio->wa_thdying_count++;
2841 				uth->uu_workq_flags |= UT_WORKQ_DYING;
2842 				workq_aio_thread_wakeup(uth);
2843 			}
2844 			while (wq_aio->wa_nthreads) {
2845 				msleep(&wq_aio->wa_nthreads, aio_proc_mutex(p), PRIBIO | PSPIN, "aio_workq_exit", 0);
2846 			}
2847 		}
2848 		aio_proc_unlock(p);
2849 	}
2850 
2851 	assertf(TAILQ_EMPTY(&wq_aio->wa_thidlelist),
2852 	    "wa_thidlecount = %d, wa_thdying_count = %d",
2853 	    wq_aio->wa_thidlecount, wq_aio->wa_thdying_count);
2854 	assertf(wq_aio->wa_thidlecount == 0,
2855 	    "wa_thidlecount = %d, wa_thdying_count = %d",
2856 	    wq_aio->wa_thidlecount, wq_aio->wa_thdying_count);
2857 	assertf(wq_aio->wa_thdying_count == 0,
2858 	    "wa_thdying_count = %d", wq_aio->wa_thdying_count);
2859 
2860 	kfree_type(workq_aio_s, wq_aio);
2861 }
2862 
2863 static int
workq_aio_open(struct proc * p)2864 workq_aio_open(struct proc *p)
2865 {
2866 	workq_aio_t wq_aio;
2867 	int error = 0;
2868 
2869 	if (proc_get_aio_wqptr(p) == NULL) {
2870 		if (proc_init_aio_wqptr_or_wait(p) == FALSE) {
2871 			assert(proc_get_aio_wqptr(p) != NULL);
2872 			goto out;
2873 		}
2874 
2875 		wq_aio = kalloc_type(workq_aio_s, Z_WAITOK | Z_ZERO);
2876 
2877 		wq_aio->wa_proc = p;
2878 
2879 		TAILQ_INIT(&wq_aio->wa_thidlelist);
2880 		TAILQ_INIT(&wq_aio->wa_thrunlist);
2881 		TAILQ_INIT(&wq_aio->wa_aioq_entries);
2882 
2883 		wq_aio->wa_death_call = thread_call_allocate_with_options(
2884 			workq_aio_kill_old_threads_call, wq_aio,
2885 			THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
2886 
2887 		proc_set_aio_wqptr(p, wq_aio);
2888 	}
2889 out:
2890 	return error;
2891 }
2892 
2893 #pragma mark aio workqueue idle thread accounting
2894 
2895 static inline struct uthread *
workq_oldest_killable_idle_aio_thread(workq_aio_t wq_aio)2896 workq_oldest_killable_idle_aio_thread(workq_aio_t wq_aio)
2897 {
2898 	return TAILQ_LAST(&wq_aio->wa_thidlelist, workq_aio_uthread_head);
2899 }
2900 
2901 static inline uint64_t
workq_kill_delay_for_idle_aio_thread()2902 workq_kill_delay_for_idle_aio_thread()
2903 {
2904 	return aio_wq_reduce_pool_window.abstime;
2905 }
2906 
2907 static inline bool
workq_should_kill_idle_aio_thread(struct uthread * uth,uint64_t now)2908 workq_should_kill_idle_aio_thread(struct uthread *uth, uint64_t now)
2909 {
2910 	uint64_t delay = workq_kill_delay_for_idle_aio_thread();
2911 	return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
2912 }
2913 
2914 static void
workq_aio_death_call_schedule(workq_aio_t wq_aio,uint64_t deadline)2915 workq_aio_death_call_schedule(workq_aio_t wq_aio, uint64_t deadline)
2916 {
2917 	uint32_t wa_flags = os_atomic_load(&wq_aio->wa_flags, relaxed);
2918 
2919 	if (wa_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
2920 		return;
2921 	}
2922 	os_atomic_or(&wq_aio->wa_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
2923 
2924 	/*
2925 	 * <rdar://problem/13139182> Due to how long term timers work, the leeway
2926 	 * can't be too short, so use 500ms which is long enough that we will not
2927 	 * wake up the CPU for killing threads, but short enough that it doesn't
2928 	 * fall into long-term timer list shenanigans.
2929 	 */
2930 	thread_call_enter_delayed_with_leeway(wq_aio->wa_death_call, NULL, deadline,
2931 	    aio_wq_reduce_pool_window.abstime / 10,
2932 	    THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
2933 }
2934 
2935 /*
2936  * `decrement` is set to the number of threads that are no longer dying:
2937  * - because they have been resuscitated just in time (workq_pop_idle_thread)
2938  * - or have been killed (workq_thread_terminate).
2939  */
2940 static void
workq_aio_death_policy_evaluate(workq_aio_t wq_aio,uint16_t decrement)2941 workq_aio_death_policy_evaluate(workq_aio_t wq_aio, uint16_t decrement)
2942 {
2943 	struct uthread *uth;
2944 
2945 	assert(wq_aio->wa_thdying_count >= decrement);
2946 #if 0
2947 	if (decrement) {
2948 		printf("VV_DEBUG_AIO : %s:%d : pid = %d, ctid = %d, after decrement thdying_count = %d\n",
2949 		    __FUNCTION__, __LINE__, proc_pid(current_proc()), thread_get_ctid(thr),
2950 		    wq_aio->wa_thdying_count - decrement);
2951 	}
2952 #endif
2953 
2954 	if ((wq_aio->wa_thdying_count -= decrement) > 0) {
2955 		return;
2956 	}
2957 
2958 	if (wq_aio->wa_thidlecount <= 1) {
2959 		return;
2960 	}
2961 
2962 	if (((uth = workq_oldest_killable_idle_aio_thread(wq_aio)) == NULL)) {
2963 		return;
2964 	}
2965 
2966 	uint64_t now = mach_absolute_time();
2967 	uint64_t delay = workq_kill_delay_for_idle_aio_thread();
2968 
2969 	if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
2970 		if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
2971 			wq_aio->wa_thdying_count++;
2972 			uth->uu_workq_flags |= UT_WORKQ_DYING;
2973 		}
2974 		workq_aio_thread_wakeup(uth);
2975 		return;
2976 	}
2977 
2978 	workq_aio_death_call_schedule(wq_aio,
2979 	    uth->uu_save.uus_workq_park_data.idle_stamp + delay);
2980 }
2981 
2982 static void
workq_aio_kill_old_threads_call(void * param0,void * param1 __unused)2983 workq_aio_kill_old_threads_call(void *param0, void *param1 __unused)
2984 {
2985 	workq_aio_t wq_aio = param0;
2986 
2987 	aio_proc_lock_spin(wq_aio->wa_proc);
2988 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_death_call | DBG_FUNC_START, wq_aio);
2989 	os_atomic_andnot(&wq_aio->wa_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
2990 	workq_aio_death_policy_evaluate(wq_aio, 0);
2991 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_death_call | DBG_FUNC_END, wq_aio);
2992 	aio_proc_unlock(wq_aio->wa_proc);;
2993 }
2994 
2995 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
2996 #define WQ_SETUP_NONE  0
2997 
2998 __attribute__((noreturn, noinline))
2999 static void
workq_aio_unpark_for_death_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t death_flags,__unused uint32_t setup_flags)3000 workq_aio_unpark_for_death_and_unlock(proc_t p, workq_aio_t wq_aio,
3001     struct uthread *uth, uint32_t death_flags, __unused uint32_t setup_flags)
3002 {
3003 	if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
3004 		wq_aio->wa_thidlecount--;
3005 		TAILQ_REMOVE(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3006 	}
3007 
3008 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3009 		wq_aio->wa_thdying_count--;
3010 	}
3011 	assert(wq_aio->wa_nthreads > 0);
3012 	wq_aio->wa_nthreads--;
3013 
3014 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_terminate | DBG_FUNC_NONE, wq_aio);
3015 
3016 	if (_wq_dead(wq_aio) && (wq_aio->wa_nthreads == 0)) {
3017 		wakeup(&wq_aio->wa_nthreads);
3018 	}
3019 
3020 	aio_proc_unlock(p);
3021 
3022 	thread_t th = get_machthread(uth);
3023 	assert(th == current_thread());
3024 
3025 	thread_deallocate(th);
3026 	thread_terminate(th);
3027 	thread_exception_return();
3028 	__builtin_unreachable();
3029 }
3030 
3031 static void
workq_push_idle_aio_thread(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3032 workq_push_idle_aio_thread(proc_t p, workq_aio_t wq_aio, struct uthread *uth,
3033     uint32_t setup_flags)
3034 {
3035 	uint64_t now = mach_absolute_time();
3036 
3037 	uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING);
3038 	TAILQ_REMOVE(&wq_aio->wa_thrunlist, uth, uu_workq_entry);
3039 
3040 	uth->uu_save.uus_workq_park_data.idle_stamp = now;
3041 
3042 	struct uthread *oldest = workq_oldest_killable_idle_aio_thread(wq_aio);
3043 	uint16_t cur_idle = wq_aio->wa_thidlecount;
3044 
3045 	if (_wq_exiting(wq_aio) || (wq_aio->wa_thdying_count == 0 && oldest &&
3046 	    workq_should_kill_idle_aio_thread(oldest, now))) {
3047 		/*
3048 		 * Immediately kill threads if we have too may of them.
3049 		 *
3050 		 * And swap "place" with the oldest one we'd have woken up.
3051 		 * This is a relatively desperate situation where we really
3052 		 * need to kill threads quickly and it's best to kill
3053 		 * the one that's currently on core than context switching.
3054 		 */
3055 		if (oldest) {
3056 			oldest->uu_save.uus_workq_park_data.idle_stamp = now;
3057 			TAILQ_REMOVE(&wq_aio->wa_thidlelist, oldest, uu_workq_entry);
3058 			TAILQ_INSERT_HEAD(&wq_aio->wa_thidlelist, oldest, uu_workq_entry);
3059 		}
3060 
3061 		if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3062 			wq_aio->wa_thdying_count++;
3063 			uth->uu_workq_flags |= UT_WORKQ_DYING;
3064 		}
3065 		workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth, 0, setup_flags);
3066 		__builtin_unreachable();
3067 	}
3068 
3069 	struct uthread *tail = TAILQ_LAST(&wq_aio->wa_thidlelist, workq_aio_uthread_head);
3070 
3071 	cur_idle += 1;
3072 	wq_aio->wa_thidlecount = cur_idle;
3073 	uth->uu_save.uus_workq_park_data.has_stack = false;
3074 	TAILQ_INSERT_HEAD(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3075 
3076 	if (!tail) {
3077 		uint64_t delay = workq_kill_delay_for_idle_aio_thread();
3078 		workq_aio_death_call_schedule(wq_aio, now + delay);
3079 	}
3080 }
3081 
3082 /*
3083  * We have no work to do, park ourselves on the idle list.
3084  *
3085  * Consumes the workqueue lock and does not return.
3086  */
3087 __attribute__((noreturn, noinline))
3088 static void
workq_aio_park_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3089 workq_aio_park_and_unlock(proc_t p, workq_aio_t wq_aio, struct uthread *uth,
3090     uint32_t setup_flags)
3091 {
3092 	assert(uth == current_uthread());
3093 	assert(uth->uu_kqr_bound == NULL);
3094 
3095 	workq_push_idle_aio_thread(p, wq_aio, uth, setup_flags); // may not return
3096 
3097 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3098 		workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth,
3099 		    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
3100 		__builtin_unreachable();
3101 	}
3102 
3103 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_park | DBG_FUNC_NONE, wq_aio);
3104 
3105 	thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue);
3106 	/* XXX this should probably be THREAD_UNINT */
3107 	assert_wait(workq_aio_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
3108 	aio_proc_unlock(p);
3109 	thread_block(workq_aio_unpark_continue);
3110 	__builtin_unreachable();
3111 }
3112 
3113 #define WORKQ_POLICY_INIT(qos) \
3114 	         (struct uu_workq_policy){ .qos_req = (qos), .qos_bucket = (qos) }
3115 
3116 /*
3117  * This function is always called with the workq lock.
3118  */
3119 static void
workq_aio_thread_reset_pri(struct uthread * uth,thread_t src_th)3120 workq_aio_thread_reset_pri(struct uthread *uth, thread_t src_th)
3121 {
3122 	thread_t th = get_machthread(uth);
3123 	thread_qos_t qos = (thread_qos_t)proc_get_effective_thread_policy(src_th, TASK_POLICY_QOS);
3124 	int priority = 31;
3125 	int policy = POLICY_TIMESHARE;
3126 
3127 	uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
3128 	thread_set_workq_pri(th, qos, priority, policy);
3129 }
3130 
3131 static inline void
workq_aio_thread_set_type(struct uthread * uth,uint16_t flags)3132 workq_aio_thread_set_type(struct uthread *uth, uint16_t flags)
3133 {
3134 	uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
3135 	uth->uu_workq_flags |= flags;
3136 }
3137 
3138 __attribute__((noreturn, noinline))
3139 static void
workq_aio_unpark_select_req_or_park_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3140 workq_aio_unpark_select_req_or_park_and_unlock(proc_t p, workq_aio_t wq_aio,
3141     struct uthread *uth, uint32_t setup_flags)
3142 {
3143 	aio_workq_entry *entryp;
3144 	thread_t last_thread = NULL;
3145 
3146 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_select_req | DBG_FUNC_START, wq_aio);
3147 	thread_freeze_base_pri(get_machthread(uth));
3148 	workq_aio_thread_set_type(uth, 0);
3149 	while ((entryp = TAILQ_FIRST(&wq_aio->wa_aioq_entries))) {
3150 		if (__improbable(_wq_exiting(wq_aio))) {
3151 			break;
3152 		}
3153 
3154 		TAILQ_REMOVE(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3155 		entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
3156 
3157 		aio_proc_unlock(p);
3158 
3159 		thread_t thr = vfs_context_thread(&entryp->context);
3160 		if (last_thread != thr) {
3161 			workq_aio_thread_reset_pri(uth, thr);
3162 			last_thread = thr;
3163 		}
3164 
3165 		/* this frees references to workq entry */
3166 		workq_aio_process_entry(entryp);
3167 
3168 		aio_proc_lock_spin(p);
3169 	}
3170 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_select_req | DBG_FUNC_END, wq_aio);
3171 	thread_unfreeze_base_pri(get_machthread(uth));
3172 	workq_aio_park_and_unlock(p, wq_aio, uth, setup_flags);
3173 }
3174 
3175 /*
3176  * parked idle thread wakes up
3177  */
3178 __attribute__((noreturn, noinline))
3179 static void
workq_aio_unpark_continue(void * parameter __unused,wait_result_t wr)3180 workq_aio_unpark_continue(void *parameter __unused, wait_result_t wr)
3181 {
3182 	thread_t th = current_thread();
3183 	struct uthread *uth = get_bsdthread_info(th);
3184 	proc_t p = current_proc();
3185 	workq_aio_t wq_aio = proc_get_aio_wqptr_fast(p);
3186 
3187 	aio_proc_lock_spin(p);
3188 
3189 	if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
3190 		workq_aio_unpark_select_req_or_park_and_unlock(p, wq_aio, uth, WQ_SETUP_NONE);
3191 		__builtin_unreachable();
3192 	}
3193 
3194 	if (__probable(wr == THREAD_AWAKENED)) {
3195 		/*
3196 		 * We were set running, but for the purposes of dying.
3197 		 */
3198 		assert(uth->uu_workq_flags & UT_WORKQ_DYING);
3199 		assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
3200 	} else {
3201 		/*
3202 		 * workaround for <rdar://problem/38647347>,
3203 		 * in case we do hit userspace, make sure calling
3204 		 * workq_thread_terminate() does the right thing here,
3205 		 * and if we never call it, that workq_exit() will too because it sees
3206 		 * this thread on the runlist.
3207 		 */
3208 		assert(wr == THREAD_INTERRUPTED);
3209 
3210 		if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3211 			wq_aio->wa_thdying_count++;
3212 			uth->uu_workq_flags |= UT_WORKQ_DYING;
3213 		}
3214 	}
3215 
3216 	workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth,
3217 	    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE);
3218 
3219 	__builtin_unreachable();
3220 }
3221 
3222 /*
3223  * Called by thread_create_workq_aio_waiting() during thread initialization, before
3224  * assert_wait, before the thread has been started.
3225  */
3226 event_t
aio_workq_thread_init_and_wq_lock(task_t task,thread_t th)3227 aio_workq_thread_init_and_wq_lock(task_t task, thread_t th)
3228 {
3229 	struct uthread *uth = get_bsdthread_info(th);
3230 
3231 	uth->uu_workq_flags = UT_WORKQ_NEW;
3232 	uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
3233 	uth->uu_workq_thport = MACH_PORT_NULL;
3234 	uth->uu_workq_stackaddr = 0;
3235 	uth->uu_workq_pthread_kill_allowed = 0;
3236 
3237 	thread_set_tag(th, THREAD_TAG_AIO_WORKQUEUE);
3238 	thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
3239 
3240 	aio_proc_lock(get_bsdtask_info(task));
3241 	return workq_aio_parked_wait_event(uth);
3242 }
3243 
3244 /**
3245  * Try to add a new workqueue thread for aio.
3246  *
3247  * - called with workq lock held
3248  * - dropped and retaken around thread creation
3249  * - return with workq lock held
3250  * - aio threads do not call into pthread functions to setup or destroy stacks.
3251  */
3252 static kern_return_t
workq_aio_add_new_thread(proc_t p,workq_aio_t wq_aio)3253 workq_aio_add_new_thread(proc_t p, workq_aio_t wq_aio)
3254 {
3255 	kern_return_t kret;
3256 	thread_t th;
3257 
3258 	wq_aio->wa_nthreads++;
3259 
3260 	aio_proc_unlock(p);
3261 
3262 	kret = thread_create_aio_workq_waiting(proc_task(p),
3263 	    workq_aio_unpark_continue,
3264 	    &th);
3265 
3266 	if (kret != KERN_SUCCESS) {
3267 		WQ_AIO_TRACE(AIO_WQ_aio_thread_create_failed | DBG_FUNC_NONE, wq_aio,
3268 		    kret, 0, 0, 0);
3269 		goto out;
3270 	}
3271 
3272 	/*
3273 	 * thread_create_aio_workq_waiting() will return with the wq lock held
3274 	 * on success, because it calls workq_thread_init_and_wq_lock().
3275 	 */
3276 	struct uthread *uth = get_bsdthread_info(th);
3277 	TAILQ_INSERT_TAIL(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3278 	wq_aio->wa_thidlecount++;
3279 	uth->uu_workq_flags &= ~UT_WORKQ_NEW;
3280 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_create | DBG_FUNC_NONE, wq_aio);
3281 	return kret;
3282 
3283 out:
3284 	aio_proc_lock(p);
3285 	/*
3286 	 * Do not redrive here if we went under wq_max_threads again,
3287 	 * it is the responsibility of the callers of this function
3288 	 * to do so when it fails.
3289 	 */
3290 	wq_aio->wa_nthreads--;
3291 	return kret;
3292 }
3293 
3294 static void
workq_aio_wakeup_thread_internal(proc_t p,bool unlock)3295 workq_aio_wakeup_thread_internal(proc_t p, bool unlock)
3296 {
3297 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3298 	bool needs_wakeup = false;
3299 	struct uthread *uth = NULL;
3300 
3301 	if (!wq_aio) {
3302 		goto out;
3303 	}
3304 
3305 	uth = TAILQ_FIRST(&wq_aio->wa_thidlelist);
3306 	while (!uth && (wq_aio->wa_nthreads < WORKQUEUE_AIO_MAXTHREADS) &&
3307 	    !(thread_get_tag(current_thread()) & THREAD_TAG_AIO_WORKQUEUE)) {
3308 		if (workq_aio_add_new_thread(p, wq_aio) != KERN_SUCCESS) {
3309 			break;
3310 		}
3311 		uth = TAILQ_FIRST(&wq_aio->wa_thidlelist);
3312 	}
3313 
3314 	if (!uth) {
3315 		goto out;
3316 	}
3317 
3318 	TAILQ_REMOVE(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3319 	wq_aio->wa_thidlecount--;
3320 
3321 	TAILQ_INSERT_TAIL(&wq_aio->wa_thrunlist, uth, uu_workq_entry);
3322 	assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
3323 	uth->uu_workq_flags |= UT_WORKQ_RUNNING;
3324 
3325 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_wakeup | DBG_FUNC_NONE, wq_aio);
3326 
3327 	if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3328 		uth->uu_workq_flags ^= UT_WORKQ_DYING;
3329 		workq_aio_death_policy_evaluate(wq_aio, 1);
3330 		needs_wakeup = false;
3331 	} else {
3332 		needs_wakeup = true;
3333 	}
3334 out:
3335 	if (unlock) {
3336 		aio_proc_unlock(p);
3337 	}
3338 
3339 	if (uth && needs_wakeup) {
3340 		workq_aio_thread_wakeup(uth);
3341 	}
3342 }
3343 
3344 static void
workq_aio_wakeup_thread_and_unlock(proc_t p)3345 workq_aio_wakeup_thread_and_unlock(proc_t p)
3346 {
3347 	return workq_aio_wakeup_thread_internal(p, true);
3348 }
3349 
3350 static void
workq_aio_wakeup_thread(proc_t p)3351 workq_aio_wakeup_thread(proc_t p)
3352 {
3353 	return workq_aio_wakeup_thread_internal(p, false);
3354 }
3355 
3356 void
workq_aio_prepare(struct proc * p)3357 workq_aio_prepare(struct proc *p)
3358 {
3359 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3360 
3361 	if (__improbable(!wq_aio && !proc_in_teardown(p))) {
3362 		workq_aio_open(p);
3363 	}
3364 }
3365 
3366 bool
workq_aio_entry_add_locked(struct proc * p,aio_workq_entry * entryp)3367 workq_aio_entry_add_locked(struct proc *p, aio_workq_entry *entryp)
3368 {
3369 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3370 	bool ret = false;
3371 
3372 	ASSERT_AIO_PROC_LOCK_OWNED(p);
3373 
3374 	if (!proc_in_teardown(p) && wq_aio && !_wq_exiting(wq_aio)) {
3375 		TAILQ_INSERT_TAIL(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3376 		ret = true;
3377 	}
3378 
3379 	return ret;
3380 }
3381 
3382 bool
workq_aio_entry_remove_locked(struct proc * p,aio_workq_entry * entryp)3383 workq_aio_entry_remove_locked(struct proc *p, aio_workq_entry *entryp)
3384 {
3385 	workq_aio_t  wq_aio = proc_get_aio_wqptr(p);
3386 
3387 	ASSERT_AIO_PROC_LOCK_OWNED(p);
3388 
3389 	if (entryp->aio_workq_link.tqe_prev == NULL) {
3390 		panic("Trying to remove an entry from a work queue, but it is not on a queue");
3391 	}
3392 
3393 	TAILQ_REMOVE(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3394 	entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
3395 
3396 	return true;
3397 }
3398