xref: /xnu-12377.81.4/bsd/kern/kern_aio.c (revision 043036a2b3718f7f0be807e2870f8f47d3fa0796)
1 /*
2  * Copyright (c) 2003-2024 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 
29 /*
30  * This file contains support for the POSIX 1003.1B AIO/LIO facility.
31  */
32 
33 #include <sys/systm.h>
34 #include <sys/fcntl.h>
35 #include <sys/file_internal.h>
36 #include <sys/filedesc.h>
37 #include <sys/kdebug.h>
38 #include <sys/kernel.h>
39 #include <sys/vnode_internal.h>
40 #include <sys/kauth.h>
41 #include <sys/mount_internal.h>
42 #include <sys/param.h>
43 #include <sys/proc_internal.h>
44 #include <sys/sysctl.h>
45 #include <sys/unistd.h>
46 #include <sys/user.h>
47 
48 #include <sys/aio_kern.h>
49 #include <sys/sysproto.h>
50 
51 #include <machine/limits.h>
52 
53 #include <mach/mach_types.h>
54 #include <kern/kern_types.h>
55 #include <kern/waitq.h>
56 #include <kern/zalloc.h>
57 #include <kern/task.h>
58 #include <kern/sched_prim.h>
59 #include <kern/ast.h>
60 
61 #include <vm/vm_map_xnu.h>
62 
63 #include <os/refcnt.h>
64 
65 #include <kern/thread.h>
66 #include <kern/policy_internal.h>
67 #include <pthread/workqueue_internal.h>
68 
69 #if 0
70 #undef KERNEL_DEBUG
71 #define KERNEL_DEBUG KERNEL_DEBUG_CONSTANT
72 #endif
73 
74 #define AIO_work_queued                 1
75 #define AIO_worker_wake                 2
76 #define AIO_completion_sig              3
77 #define AIO_completion_kevent           4
78 #define AIO_completion_cleanup_wait     5
79 #define AIO_completion_cleanup_wake     6
80 #define AIO_completion_suspend_wake     7
81 #define AIO_cancel                      10
82 #define AIO_cancel_async_workq          11
83 #define AIO_cancel_sync_workq           12
84 #define AIO_cancel_activeq              13
85 #define AIO_cancel_doneq                14
86 #define AIO_fsync                       20
87 #define AIO_fsync_delay                 21
88 #define AIO_read                        30
89 #define AIO_write                       40
90 #define AIO_listio                      50
91 #define AIO_error                       60
92 #define AIO_error_val                   61
93 #define AIO_error_activeq               62
94 #define AIO_error_workq                 63
95 #define AIO_return                      70
96 #define AIO_return_val                  71
97 #define AIO_return_activeq              72
98 #define AIO_return_workq                73
99 #define AIO_exec                        80
100 #define AIO_exit                        90
101 #define AIO_exit_sleep                  91
102 #define AIO_close                       100
103 #define AIO_close_sleep                 101
104 #define AIO_suspend                     110
105 #define AIO_suspend_sleep               111
106 #define AIO_worker_thread               120
107 #define AIO_register_kevent             130
108 #define AIO_WQ_process_entry            140
109 #define AIO_WQ_aio_thread_create        141
110 #define AIO_WQ_aio_thread_terminate     142
111 #define AIO_WQ_aio_death_call           143
112 #define AIO_WQ_aio_thread_park          144
113 #define AIO_WQ_aio_select_req           145
114 #define AIO_WQ_aio_thread_create_failed 146
115 #define AIO_WQ_aio_thread_wakeup        147
116 
117 static TUNABLE(uint32_t, bootarg_aio_new_workq, "aio_new_workq", 1);
118 
119 __options_decl(aio_entry_flags_t, uint32_t, {
120 	AIO_READ        = 0x00000001, /* a read */
121 	AIO_WRITE       = 0x00000002, /* a write */
122 	AIO_FSYNC       = 0x00000004, /* aio_fsync with op = O_SYNC */
123 	AIO_DSYNC       = 0x00000008, /* aio_fsync with op = O_DSYNC (not supported yet) */
124 	AIO_LIO         = 0x00000010, /* lio_listio generated IO */
125 	AIO_LIO_WAIT    = 0x00000020, /* lio_listio is waiting on the leader */
126 
127 	AIO_COMPLETED   = 0x00000100, /* request has completed */
128 	AIO_CANCELLED   = 0x00000200, /* request has been cancelled */
129 	AIO_KEVENT_REGISTERED = 0x00000400, /* kevent has been registered */
130 
131 	/*
132 	 * These flags mean that this entry is blocking either:
133 	 * - close (AIO_CLOSE_WAIT)
134 	 * - exit or exec (AIO_EXIT_WAIT)
135 	 *
136 	 * These flags are mutually exclusive, and the AIO_EXIT_WAIT variant
137 	 * will also neuter notifications in do_aio_completion_and_unlock().
138 	 */
139 	AIO_CLOSE_WAIT  = 0x00004000,
140 	AIO_EXIT_WAIT   = 0x00008000,
141 });
142 
143 /*! @struct aio_workq_entry
144  *
145  * @discussion
146  * This represents a piece of aio/lio work.
147  *
148  * The ownership rules go as follows:
149  *
150  * - the "proc" owns one refcount on the entry (from creation), while it is
151  *   enqueued on the aio_activeq and then the aio_doneq.
152  *
153  *   either aio_return() (user read the status) or _aio_exit() (the process
154  *   died) will dequeue the entry and consume this ref.
155  *
156  * - the async workqueue owns one refcount once the work is submitted,
157  *   which is consumed in do_aio_completion_and_unlock().
158  *
159  *   This ref protects the entry for the the end of
160  *   do_aio_completion_and_unlock() (when signal delivery happens).
161  *
162  * - lio_listio() for batches picks one of the entries to be the "leader"
163  *   of the batch. Each work item will have a refcount on its leader
164  *   so that the accounting of the batch completion can be done on the leader
165  *   (to be able to decrement lio_pending).
166  *
167  *   This ref is consumed in do_aio_completion_and_unlock() as well.
168  *
169  * - lastly, in lio_listio() when the LIO_WAIT behavior is requested,
170  *   an extra ref is taken in this syscall as it needs to keep accessing
171  *   the leader "lio_pending" field until it hits 0.
172  */
173 struct aio_workq_entry {
174 	/* queue lock */
175 	TAILQ_ENTRY(aio_workq_entry)    aio_workq_link;
176 
177 	/* Proc lock */
178 	TAILQ_ENTRY(aio_workq_entry)    aio_proc_link;  /* p_aio_activeq or p_aio_doneq */
179 	user_ssize_t                    returnval;      /* return value from read / write request */
180 	errno_t                         errorval;       /* error value from read / write request */
181 	os_refcnt_t                     aio_refcount;
182 	aio_entry_flags_t               flags;
183 
184 	int                             lio_pending;    /* pending I/Os in lio group, only on leader */
185 	struct aio_workq_entry         *lio_leader;     /* pointer to the lio leader, can be self */
186 
187 	/* Initialized and never changed, safe to access */
188 	struct proc                    *procp;          /* user proc that queued this request */
189 	user_addr_t                     uaiocbp;        /* pointer passed in from user land */
190 	struct user_aiocb               aiocb;          /* copy of aiocb from user land */
191 	struct vfs_context              context;        /* context which enqueued the request */
192 
193 	/* Initialized, and possibly freed by aio_work_thread() or at free if cancelled */
194 	vm_map_t                        aio_map;        /* user land map we have a reference to */
195 };
196 
197 /*
198  * aio requests queue up on the aio_async_workq or lio_sync_workq (for
199  * lio_listio LIO_WAIT).  Requests then move to the per process aio_activeq
200  * (proc.aio_activeq) when one of our worker threads start the IO.
201  * And finally, requests move to the per process aio_doneq (proc.aio_doneq)
202  * when the IO request completes.  The request remains on aio_doneq until
203  * user process calls aio_return or the process exits, either way that is our
204  * trigger to release aio resources.
205  */
206 typedef struct aio_workq   {
207 	TAILQ_HEAD(, aio_workq_entry)   aioq_entries;
208 	lck_spin_t                      aioq_lock;
209 	struct waitq                    aioq_waitq;
210 } *aio_workq_t;
211 
212 #define AIO_NUM_WORK_QUEUES 1
213 struct aio_anchor_cb {
214 	os_atomic(int)          aio_total_count;        /* total extant entries */
215 
216 	/* Hash table of queues here */
217 	int                     aio_num_workqs;
218 	struct aio_workq        aio_async_workqs[AIO_NUM_WORK_QUEUES];
219 };
220 typedef struct aio_anchor_cb aio_anchor_cb;
221 
222 
223 /* New per process workqueue */
224 #define WORKQUEUE_AIO_MAXTHREADS            16
225 
226 TAILQ_HEAD(workq_aio_uthread_head, uthread);
227 
228 typedef struct workq_aio_s {
229 	thread_call_t   wa_death_call;
230 	struct workq_aio_uthread_head wa_thrunlist;
231 	struct workq_aio_uthread_head wa_thidlelist;
232 	TAILQ_HEAD(, aio_workq_entry) wa_aioq_entries;
233 	proc_t wa_proc;
234 	workq_state_flags_t _Atomic wa_flags;
235 	uint16_t wa_nthreads;
236 	uint16_t wa_thidlecount;
237 	uint16_t wa_thdying_count;
238 } workq_aio_s, *workq_aio_t;
239 
240 struct aio_workq_usec_var {
241 	uint32_t usecs;
242 	uint64_t abstime;
243 };
244 
245 static int aio_workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
246 
247 #define AIO_WORKQ_SYSCTL_USECS(var, init) \
248 	        static struct aio_workq_usec_var var = { .usecs = (init) }; \
249 	        SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
250 	                        CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &(var), 0, \
251 	                        aio_workq_sysctl_handle_usecs, "I", "")
252 
253 AIO_WORKQ_SYSCTL_USECS(aio_wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
254 
255 #define WQ_AIO_TRACE(x, wq, a, b, c, d) \
256 	        ({ KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, (x)),\
257 	        proc_getpid((wq)->wa_proc), (a), (b), (c), (d)); })
258 
259 #define WQ_AIO_TRACE_WQ(x, wq) \
260 	        ({ KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, (x)),\
261 	        proc_getpid((wq)->wa_proc),\
262 	        (uintptr_t)thread_tid(current_thread()),\
263 	        (wq)->wa_nthreads, (wq)->wa_thidlecount, (wq)->wa_thdying_count); })
264 
265 /*
266  * Notes on aio sleep / wake channels.
267  * We currently pick a couple fields within the proc structure that will allow
268  * us sleep channels that currently do not collide with any other kernel routines.
269  * At this time, for binary compatibility reasons, we cannot create new proc fields.
270  */
271 #define AIO_SUSPEND_SLEEP_CHAN  p_aio_activeq
272 #define AIO_CLEANUP_SLEEP_CHAN  p_aio_total_count
273 
274 #define ASSERT_AIO_FROM_PROC(aiop, theproc)     \
275 	if ((aiop)->procp != (theproc)) {       \
276 	        panic("AIO on a proc list that does not belong to that proc."); \
277 	}
278 
279 extern kern_return_t thread_terminate(thread_t);
280 
281 /*
282  *  LOCAL PROTOTYPES
283  */
284 static void             aio_proc_lock(proc_t procp);
285 static void             aio_proc_lock_spin(proc_t procp);
286 static void             aio_proc_unlock(proc_t procp);
287 static lck_mtx_t       *aio_proc_mutex(proc_t procp);
288 static bool             aio_has_active_requests_for_process(proc_t procp);
289 static bool             aio_proc_has_active_requests_for_file(proc_t procp, int fd);
290 static boolean_t        is_already_queued(proc_t procp, user_addr_t aiocbp);
291 
292 static aio_workq_t      aio_entry_workq(aio_workq_entry *entryp);
293 static void             aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
294 static void             aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
295 static void             aio_entry_ref(aio_workq_entry *entryp);
296 static void             aio_entry_unref(aio_workq_entry *entryp);
297 static bool             aio_entry_try_workq_remove(proc_t p, aio_workq_entry *entryp);
298 static boolean_t        aio_delay_fsync_request(aio_workq_entry *entryp);
299 static void             aio_free_request(aio_workq_entry *entryp);
300 
301 static void             aio_workq_init(aio_workq_t wq);
302 static void             aio_workq_lock_spin(aio_workq_t wq);
303 static void             aio_workq_unlock(aio_workq_t wq);
304 static lck_spin_t      *aio_workq_lock(aio_workq_t wq);
305 
306 static void             aio_work_thread(void *arg, wait_result_t wr);
307 static aio_workq_entry *aio_get_some_work(void);
308 
309 static int              aio_queue_async_request(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t);
310 static int              aio_validate(proc_t, aio_workq_entry *entryp);
311 
312 static int              do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp, aio_entry_flags_t);
313 static void             do_aio_completion_and_unlock(proc_t p, aio_workq_entry *entryp, aio_entry_flags_t reason);
314 static int              do_aio_fsync(aio_workq_entry *entryp);
315 static int              do_aio_read(aio_workq_entry *entryp);
316 static int              do_aio_write(aio_workq_entry *entryp);
317 static void             do_munge_aiocb_user32_to_user(struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp);
318 static void             do_munge_aiocb_user64_to_user(struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp);
319 static aio_workq_entry *aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t);
320 static int              aio_copy_in_list(proc_t, user_addr_t, user_addr_t *, int);
321 
322 static void             workq_aio_prepare(struct proc *p);
323 static bool             workq_aio_entry_add_locked(struct proc *p, aio_workq_entry *entryp);
324 static void             workq_aio_wakeup_thread(proc_t p);
325 static void             workq_aio_wakeup_thread_and_unlock(proc_t p);
326 static int              workq_aio_process_entry(aio_workq_entry *entryp);
327 static bool             workq_aio_entry_remove_locked(struct proc *p, aio_workq_entry *entryp);
328 
329 static void             workq_aio_kill_old_threads_call(void *param0, void *param1 __unused);
330 static void             workq_aio_unpark_continue(void *parameter __unused, wait_result_t wr);
331 
332 static void             workq_aio_mark_exiting(proc_t p);
333 static void             workq_aio_exit(proc_t p);
334 
335 #define ASSERT_AIO_PROC_LOCK_OWNED(p)   LCK_MTX_ASSERT(aio_proc_mutex(p), LCK_MTX_ASSERT_OWNED)
336 #define ASSERT_AIO_WORKQ_LOCK_OWNED(q)  LCK_SPIN_ASSERT(aio_workq_lock(q), LCK_ASSERT_OWNED)
337 
338 /*
339  *  EXTERNAL PROTOTYPES
340  */
341 
342 /* in ...bsd/kern/sys_generic.c */
343 extern int dofileread(vfs_context_t ctx, struct fileproc *fp,
344     user_addr_t bufp, user_size_t nbyte,
345     off_t offset, int flags, user_ssize_t *retval);
346 extern int dofilewrite(vfs_context_t ctx, struct fileproc *fp,
347     user_addr_t bufp, user_size_t nbyte, off_t offset,
348     int flags, user_ssize_t *retval);
349 
350 /*
351  * aio external global variables.
352  */
353 extern int aio_max_requests;                    /* AIO_MAX - configurable */
354 extern int aio_max_requests_per_process;        /* AIO_PROCESS_MAX - configurable */
355 extern int aio_worker_threads;                  /* AIO_THREAD_COUNT - configurable */
356 
357 
358 /*
359  * aio static variables.
360  */
361 static aio_anchor_cb aio_anchor = {
362 	.aio_num_workqs = AIO_NUM_WORK_QUEUES,
363 };
364 os_refgrp_decl(static, aio_refgrp, "aio", NULL);
365 static LCK_GRP_DECLARE(aio_proc_lock_grp, "aio_proc");
366 static LCK_GRP_DECLARE(aio_queue_lock_grp, "aio_queue");
367 static LCK_MTX_DECLARE(aio_proc_mtx, &aio_proc_lock_grp);
368 
369 static struct klist aio_klist;
370 static LCK_GRP_DECLARE(aio_klist_lck_grp, "aio_klist");
371 static LCK_MTX_DECLARE(aio_klist_lock, &aio_klist_lck_grp);
372 
373 static KALLOC_TYPE_DEFINE(aio_workq_zonep, aio_workq_entry, KT_DEFAULT);
374 
375 /* Hash */
376 static aio_workq_t
aio_entry_workq(__unused aio_workq_entry * entryp)377 aio_entry_workq(__unused aio_workq_entry *entryp)
378 {
379 	return &aio_anchor.aio_async_workqs[0];
380 }
381 
382 static void
aio_workq_init(aio_workq_t wq)383 aio_workq_init(aio_workq_t wq)
384 {
385 	TAILQ_INIT(&wq->aioq_entries);
386 	lck_spin_init(&wq->aioq_lock, &aio_queue_lock_grp, LCK_ATTR_NULL);
387 	waitq_init(&wq->aioq_waitq, WQT_QUEUE, SYNC_POLICY_FIFO);
388 }
389 
390 
391 /*
392  * Can be passed a queue which is locked spin.
393  */
394 static void
aio_workq_remove_entry_locked(aio_workq_t queue,aio_workq_entry * entryp)395 aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
396 {
397 	ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
398 
399 	if (entryp->aio_workq_link.tqe_prev == NULL) {
400 		panic("Trying to remove an entry from a work queue, but it is not on a queue");
401 	}
402 
403 	TAILQ_REMOVE(&queue->aioq_entries, entryp, aio_workq_link);
404 	entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
405 }
406 
407 static void
aio_workq_add_entry_locked(aio_workq_t queue,aio_workq_entry * entryp)408 aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
409 {
410 	ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
411 
412 	if (bootarg_aio_new_workq) {
413 		panic("old workq implementation selected with bootarg set");
414 	}
415 
416 	TAILQ_INSERT_TAIL(&queue->aioq_entries, entryp, aio_workq_link);
417 }
418 
419 static void
aio_proc_lock(proc_t procp)420 aio_proc_lock(proc_t procp)
421 {
422 	lck_mtx_lock(aio_proc_mutex(procp));
423 }
424 
425 static void
aio_proc_lock_spin(proc_t procp)426 aio_proc_lock_spin(proc_t procp)
427 {
428 	lck_mtx_lock_spin(aio_proc_mutex(procp));
429 }
430 
431 static bool
aio_has_any_work(void)432 aio_has_any_work(void)
433 {
434 	return os_atomic_load(&aio_anchor.aio_total_count, relaxed) != 0;
435 }
436 
437 static bool
aio_try_proc_insert_active_locked(proc_t procp,aio_workq_entry * entryp)438 aio_try_proc_insert_active_locked(proc_t procp, aio_workq_entry *entryp)
439 {
440 	int old, new;
441 
442 	ASSERT_AIO_PROC_LOCK_OWNED(procp);
443 
444 	if (procp->p_aio_total_count >= aio_max_requests_per_process) {
445 		return false;
446 	}
447 
448 	if (is_already_queued(procp, entryp->uaiocbp)) {
449 		return false;
450 	}
451 
452 	os_atomic_rmw_loop(&aio_anchor.aio_total_count, old, new, relaxed, {
453 		if (old >= aio_max_requests) {
454 		        os_atomic_rmw_loop_give_up(return false);
455 		}
456 		new = old + 1;
457 	});
458 
459 	TAILQ_INSERT_TAIL(&procp->p_aio_activeq, entryp, aio_proc_link);
460 	procp->p_aio_total_count++;
461 	return true;
462 }
463 
464 static void
aio_proc_move_done_locked(proc_t procp,aio_workq_entry * entryp)465 aio_proc_move_done_locked(proc_t procp, aio_workq_entry *entryp)
466 {
467 	TAILQ_REMOVE(&procp->p_aio_activeq, entryp, aio_proc_link);
468 	TAILQ_INSERT_TAIL(&procp->p_aio_doneq, entryp, aio_proc_link);
469 }
470 
471 static void
aio_proc_remove_done_locked(proc_t procp,aio_workq_entry * entryp)472 aio_proc_remove_done_locked(proc_t procp, aio_workq_entry *entryp)
473 {
474 	TAILQ_REMOVE(&procp->p_aio_doneq, entryp, aio_proc_link);
475 	entryp->aio_proc_link.tqe_prev = NULL;
476 	if (os_atomic_dec_orig(&aio_anchor.aio_total_count, relaxed) <= 0) {
477 		panic("Negative total AIO count!");
478 	}
479 	if (procp->p_aio_total_count-- <= 0) {
480 		panic("proc %p: p_aio_total_count accounting mismatch", procp);
481 	}
482 }
483 
484 static void
aio_proc_unlock(proc_t procp)485 aio_proc_unlock(proc_t procp)
486 {
487 	lck_mtx_unlock(aio_proc_mutex(procp));
488 }
489 
490 static lck_mtx_t*
aio_proc_mutex(proc_t procp)491 aio_proc_mutex(proc_t procp)
492 {
493 	return &procp->p_mlock;
494 }
495 
496 static void
aio_entry_ref(aio_workq_entry * entryp)497 aio_entry_ref(aio_workq_entry *entryp)
498 {
499 	os_ref_retain(&entryp->aio_refcount);
500 }
501 
502 static void
aio_entry_unref(aio_workq_entry * entryp)503 aio_entry_unref(aio_workq_entry *entryp)
504 {
505 	if (os_ref_release(&entryp->aio_refcount) == 0) {
506 		aio_free_request(entryp);
507 	}
508 }
509 
510 static bool
aio_entry_try_workq_remove(proc_t p,aio_workq_entry * entryp)511 aio_entry_try_workq_remove(proc_t p, aio_workq_entry *entryp)
512 {
513 	/* Can only be cancelled if it's still on a work queue */
514 	if (entryp->aio_workq_link.tqe_prev != NULL) {
515 		aio_workq_t queue;
516 		if (bootarg_aio_new_workq) {
517 			return workq_aio_entry_remove_locked(p, entryp);
518 		}
519 
520 		/* Will have to check again under the lock */
521 		queue = aio_entry_workq(entryp);
522 		aio_workq_lock_spin(queue);
523 		if (entryp->aio_workq_link.tqe_prev != NULL) {
524 			aio_workq_remove_entry_locked(queue, entryp);
525 			aio_workq_unlock(queue);
526 			return true;
527 		} else {
528 			aio_workq_unlock(queue);
529 		}
530 	}
531 
532 	return false;
533 }
534 
535 static void
aio_workq_lock_spin(aio_workq_t wq)536 aio_workq_lock_spin(aio_workq_t wq)
537 {
538 	lck_spin_lock(aio_workq_lock(wq));
539 }
540 
541 static void
aio_workq_unlock(aio_workq_t wq)542 aio_workq_unlock(aio_workq_t wq)
543 {
544 	lck_spin_unlock(aio_workq_lock(wq));
545 }
546 
547 static lck_spin_t*
aio_workq_lock(aio_workq_t wq)548 aio_workq_lock(aio_workq_t wq)
549 {
550 	return &wq->aioq_lock;
551 }
552 
553 /*
554  * aio_cancel - attempt to cancel one or more async IO requests currently
555  * outstanding against file descriptor uap->fd.  If uap->aiocbp is not
556  * NULL then only one specific IO is cancelled (if possible).  If uap->aiocbp
557  * is NULL then all outstanding async IO request for the given file
558  * descriptor are cancelled (if possible).
559  */
560 int
aio_cancel(proc_t p,struct aio_cancel_args * uap,int * retval)561 aio_cancel(proc_t p, struct aio_cancel_args *uap, int *retval)
562 {
563 	struct user_aiocb my_aiocb;
564 	int               result;
565 
566 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel) | DBG_FUNC_START,
567 	    VM_KERNEL_ADDRPERM(p), uap->fd, uap->aiocbp, 0, 0);
568 
569 	if (uap->fd) {
570 		vnode_t vp = NULLVP;
571 		const char *vname = NULL;
572 
573 		result = vnode_getfromfd(vfs_context_current(), uap->fd, &vp);
574 		if (result != 0) {
575 			result = EBADF;
576 			goto ExitRoutine;
577 		}
578 
579 		vname = vnode_getname(vp);
580 		/*
581 		 * The aio_cancel() system call will always	return AIO_NOTCANCELED for
582 		 * file	descriptor associated with raw disk device.
583 		 */
584 		if (vnode_ischr(vp) && vname && !strncmp(vname, "rdisk", 5)) {
585 			result = 0;
586 			*retval = AIO_NOTCANCELED;
587 		}
588 
589 		if (vname) {
590 			vnode_putname(vname);
591 		}
592 		vnode_put(vp);
593 
594 		if (result == 0 && *retval == AIO_NOTCANCELED) {
595 			goto ExitRoutine;
596 		}
597 	}
598 
599 	/* quick check to see if there are any async IO requests queued up */
600 	if (!aio_has_any_work()) {
601 		result = 0;
602 		*retval = AIO_ALLDONE;
603 		goto ExitRoutine;
604 	}
605 
606 	*retval = -1;
607 	if (uap->aiocbp != USER_ADDR_NULL) {
608 		if (proc_is64bit(p)) {
609 			struct user64_aiocb aiocb64;
610 
611 			result = copyin(uap->aiocbp, &aiocb64, sizeof(aiocb64));
612 			if (result == 0) {
613 				do_munge_aiocb_user64_to_user(&aiocb64, &my_aiocb);
614 			}
615 		} else {
616 			struct user32_aiocb aiocb32;
617 
618 			result = copyin(uap->aiocbp, &aiocb32, sizeof(aiocb32));
619 			if (result == 0) {
620 				do_munge_aiocb_user32_to_user(&aiocb32, &my_aiocb);
621 			}
622 		}
623 
624 		if (result != 0) {
625 			result = EAGAIN;
626 			goto ExitRoutine;
627 		}
628 
629 		/* NOTE - POSIX standard says a mismatch between the file */
630 		/* descriptor passed in and the file descriptor embedded in */
631 		/* the aiocb causes unspecified results.  We return EBADF in */
632 		/* that situation.  */
633 		if (uap->fd != my_aiocb.aio_fildes) {
634 			result = EBADF;
635 			goto ExitRoutine;
636 		}
637 	}
638 
639 	aio_proc_lock(p);
640 	result = do_aio_cancel_locked(p, uap->fd, uap->aiocbp, 0);
641 	ASSERT_AIO_PROC_LOCK_OWNED(p);
642 	aio_proc_unlock(p);
643 
644 	if (result != -1) {
645 		*retval = result;
646 		result = 0;
647 		goto ExitRoutine;
648 	}
649 
650 	result = EBADF;
651 
652 ExitRoutine:
653 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel) | DBG_FUNC_END,
654 	    VM_KERNEL_ADDRPERM(p), uap->fd, uap->aiocbp, result, 0);
655 
656 	return result;
657 }
658 
659 
660 /*
661  * _aio_close - internal function used to clean up async IO requests for
662  * a file descriptor that is closing.
663  * THIS MAY BLOCK.
664  */
665 __private_extern__ void
_aio_close(proc_t p,int fd)666 _aio_close(proc_t p, int fd)
667 {
668 	int error;
669 
670 	/* quick check to see if there are any async IO requests queued up */
671 	if (!aio_has_any_work()) {
672 		return;
673 	}
674 
675 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close) | DBG_FUNC_START,
676 	    VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
677 
678 	/* cancel all async IO requests on our todo queues for this file descriptor */
679 	aio_proc_lock(p);
680 	error = do_aio_cancel_locked(p, fd, USER_ADDR_NULL, AIO_CLOSE_WAIT);
681 	ASSERT_AIO_PROC_LOCK_OWNED(p);
682 	if (error == AIO_NOTCANCELED) {
683 		/*
684 		 * AIO_NOTCANCELED is returned when we find an aio request for this process
685 		 * and file descriptor on the active async IO queue.  Active requests cannot
686 		 * be cancelled so we must wait for them to complete.  We will get a special
687 		 * wake up call on our channel used to sleep for ALL active requests to
688 		 * complete.  This sleep channel (proc.AIO_CLEANUP_SLEEP_CHAN) is only used
689 		 * when we must wait for all active aio requests.
690 		 */
691 
692 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close_sleep) | DBG_FUNC_NONE,
693 		    VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
694 
695 		while (aio_proc_has_active_requests_for_file(p, fd)) {
696 			msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_close", 0);
697 		}
698 	}
699 
700 	aio_proc_unlock(p);
701 
702 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close) | DBG_FUNC_END,
703 	    VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
704 }
705 
706 
707 /*
708  * aio_error - return the error status associated with the async IO
709  * request referred to by uap->aiocbp.  The error status is the errno
710  * value that would be set by the corresponding IO request (read, wrtie,
711  * fdatasync, or sync).
712  */
713 int
aio_error(proc_t p,struct aio_error_args * uap,int * retval)714 aio_error(proc_t p, struct aio_error_args *uap, int *retval)
715 {
716 	aio_workq_entry *entryp;
717 	int              error;
718 
719 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error) | DBG_FUNC_START,
720 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
721 
722 	/* see if there are any aios to check */
723 	if (!aio_has_any_work()) {
724 		return EINVAL;
725 	}
726 
727 	aio_proc_lock(p);
728 
729 	/* look for a match on our queue of async IO requests that have completed */
730 	TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
731 		if (entryp->uaiocbp == uap->aiocbp) {
732 			ASSERT_AIO_FROM_PROC(entryp, p);
733 
734 			*retval = entryp->errorval;
735 			error = 0;
736 
737 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error_val) | DBG_FUNC_NONE,
738 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
739 			goto ExitRoutine;
740 		}
741 	}
742 
743 	/* look for a match on our queue of active async IO requests */
744 	TAILQ_FOREACH(entryp, &p->p_aio_activeq, aio_proc_link) {
745 		if (entryp->uaiocbp == uap->aiocbp) {
746 			ASSERT_AIO_FROM_PROC(entryp, p);
747 			*retval = EINPROGRESS;
748 			error = 0;
749 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error_activeq) | DBG_FUNC_NONE,
750 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
751 			goto ExitRoutine;
752 		}
753 	}
754 
755 	error = EINVAL;
756 
757 ExitRoutine:
758 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error) | DBG_FUNC_END,
759 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
760 	aio_proc_unlock(p);
761 
762 	return error;
763 }
764 
765 
766 /*
767  * aio_fsync - asynchronously force all IO operations associated
768  * with the file indicated by the file descriptor (uap->aiocbp->aio_fildes) and
769  * queued at the time of the call to the synchronized completion state.
770  * NOTE - we do not support op O_DSYNC at this point since we do not support the
771  * fdatasync() call.
772  */
773 int
aio_fsync(proc_t p,struct aio_fsync_args * uap,int * retval)774 aio_fsync(proc_t p, struct aio_fsync_args *uap, int *retval)
775 {
776 	aio_entry_flags_t fsync_kind;
777 	int error;
778 
779 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync) | DBG_FUNC_START,
780 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, uap->op, 0, 0);
781 
782 	*retval = 0;
783 	/* 0 := O_SYNC for binary backward compatibility with Panther */
784 	if (uap->op == O_SYNC || uap->op == 0) {
785 		fsync_kind = AIO_FSYNC;
786 	} else if (uap->op == O_DSYNC) {
787 		fsync_kind = AIO_DSYNC;
788 	} else {
789 		*retval = -1;
790 		error = EINVAL;
791 		goto ExitRoutine;
792 	}
793 
794 	error = aio_queue_async_request(p, uap->aiocbp, fsync_kind);
795 	if (error != 0) {
796 		*retval = -1;
797 	}
798 
799 ExitRoutine:
800 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync) | DBG_FUNC_END,
801 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
802 
803 	return error;
804 }
805 
806 
807 /* aio_read - asynchronously read uap->aiocbp->aio_nbytes bytes from the
808  * file descriptor (uap->aiocbp->aio_fildes) into the buffer
809  * (uap->aiocbp->aio_buf).
810  */
811 int
aio_read(proc_t p,struct aio_read_args * uap,int * retval)812 aio_read(proc_t p, struct aio_read_args *uap, int *retval)
813 {
814 	int error;
815 
816 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_read) | DBG_FUNC_START,
817 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
818 
819 	*retval = 0;
820 
821 	error = aio_queue_async_request(p, uap->aiocbp, AIO_READ);
822 	if (error != 0) {
823 		*retval = -1;
824 	}
825 
826 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_read) | DBG_FUNC_END,
827 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
828 
829 	return error;
830 }
831 
832 
833 /*
834  * aio_return - return the return status associated with the async IO
835  * request referred to by uap->aiocbp.  The return status is the value
836  * that would be returned by corresponding IO request (read, write,
837  * fdatasync, or sync).  This is where we release kernel resources
838  * held for async IO call associated with the given aiocb pointer.
839  */
840 int
aio_return(proc_t p,struct aio_return_args * uap,user_ssize_t * retval)841 aio_return(proc_t p, struct aio_return_args *uap, user_ssize_t *retval)
842 {
843 	aio_workq_entry *entryp;
844 	int              error = EINVAL;
845 
846 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return) | DBG_FUNC_START,
847 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
848 
849 	/* See if there are any entries to check */
850 	if (!aio_has_any_work()) {
851 		goto ExitRoutine;
852 	}
853 
854 	aio_proc_lock(p);
855 	*retval = 0;
856 
857 	/* look for a match on our queue of async IO requests that have completed */
858 	TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
859 		ASSERT_AIO_FROM_PROC(entryp, p);
860 		/*
861 		 * With kevent notification, the completion will be done when the event
862 		 * is processed.
863 		 */
864 		if ((entryp->uaiocbp == uap->aiocbp) &&
865 		    (entryp->aiocb.aio_sigevent.sigev_notify != SIGEV_KEVENT)) {
866 			/* Done and valid for aio_return(), pull it off the list */
867 			aio_proc_remove_done_locked(p, entryp);
868 
869 			*retval = entryp->returnval;
870 			error = 0;
871 			aio_proc_unlock(p);
872 
873 			aio_entry_unref(entryp);
874 
875 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return_val) | DBG_FUNC_NONE,
876 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
877 			goto ExitRoutine;
878 		}
879 	}
880 
881 	/* look for a match on our queue of active async IO requests */
882 	TAILQ_FOREACH(entryp, &p->p_aio_activeq, aio_proc_link) {
883 		ASSERT_AIO_FROM_PROC(entryp, p);
884 		if (entryp->uaiocbp == uap->aiocbp) {
885 			error = EINPROGRESS;
886 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return_activeq) | DBG_FUNC_NONE,
887 			    VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
888 			break;
889 		}
890 	}
891 
892 	aio_proc_unlock(p);
893 
894 ExitRoutine:
895 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return) | DBG_FUNC_END,
896 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
897 
898 	return error;
899 }
900 
901 
902 /*
903  * _aio_exec - internal function used to clean up async IO requests for
904  * a process that is going away due to exec().  We cancel any async IOs
905  * we can and wait for those already active.  We also disable signaling
906  * for cancelled or active aio requests that complete.
907  * This routine MAY block!
908  */
909 __private_extern__ void
_aio_exec(proc_t p)910 _aio_exec(proc_t p)
911 {
912 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exec) | DBG_FUNC_START,
913 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
914 
915 	_aio_exit(p);
916 
917 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exec) | DBG_FUNC_END,
918 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
919 }
920 
921 
922 /*
923  * _aio_exit - internal function used to clean up async IO requests for
924  * a process that is terminating (via exit() or exec()).  We cancel any async IOs
925  * we can and wait for those already active.  We also disable signaling
926  * for cancelled or active aio requests that complete.  This routine MAY block!
927  */
928 __private_extern__ void
_aio_exit(proc_t p)929 _aio_exit(proc_t p)
930 {
931 	TAILQ_HEAD(, aio_workq_entry) tofree = TAILQ_HEAD_INITIALIZER(tofree);
932 	aio_workq_entry *entryp, *tmp;
933 	int              error;
934 
935 	/* quick check to see if there are any async IO requests queued up */
936 	if (!aio_has_any_work()) {
937 		workq_aio_mark_exiting(p);
938 		workq_aio_exit(p);
939 		return;
940 	}
941 
942 	workq_aio_mark_exiting(p);
943 
944 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit) | DBG_FUNC_START,
945 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
946 
947 	aio_proc_lock(p);
948 
949 	/*
950 	 * cancel async IO requests on the todo work queue and wait for those
951 	 * already active to complete.
952 	 */
953 	error = do_aio_cancel_locked(p, -1, USER_ADDR_NULL, AIO_EXIT_WAIT);
954 	ASSERT_AIO_PROC_LOCK_OWNED(p);
955 	if (error == AIO_NOTCANCELED) {
956 		/*
957 		 * AIO_NOTCANCELED is returned when we find an aio request for this process
958 		 * on the active async IO queue.  Active requests cannot be cancelled so we
959 		 * must wait for them to complete.  We will get a special wake up call on
960 		 * our channel used to sleep for ALL active requests to complete.  This sleep
961 		 * channel (proc.AIO_CLEANUP_SLEEP_CHAN) is only used when we must wait for all
962 		 * active aio requests.
963 		 */
964 
965 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit_sleep) | DBG_FUNC_NONE,
966 		    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
967 
968 		while (aio_has_active_requests_for_process(p)) {
969 			msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_exit", 0);
970 		}
971 	}
972 
973 	assert(!aio_has_active_requests_for_process(p));
974 
975 	/* release all aio resources used by this process */
976 	TAILQ_FOREACH_SAFE(entryp, &p->p_aio_doneq, aio_proc_link, tmp) {
977 		ASSERT_AIO_FROM_PROC(entryp, p);
978 
979 		aio_proc_remove_done_locked(p, entryp);
980 		TAILQ_INSERT_TAIL(&tofree, entryp, aio_proc_link);
981 	}
982 
983 	aio_proc_unlock(p);
984 
985 	workq_aio_exit(p);
986 
987 	/* free all the entries outside of the aio_proc_lock() */
988 	TAILQ_FOREACH_SAFE(entryp, &tofree, aio_proc_link, tmp) {
989 		entryp->aio_proc_link.tqe_prev = NULL;
990 		aio_entry_unref(entryp);
991 	}
992 
993 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit) | DBG_FUNC_END,
994 	    VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
995 }
996 
997 
998 static bool
should_cancel(aio_workq_entry * entryp,int fd,user_addr_t aiocbp,aio_entry_flags_t reason)999 should_cancel(aio_workq_entry *entryp, int fd, user_addr_t aiocbp,
1000     aio_entry_flags_t reason)
1001 {
1002 	if (reason & AIO_EXIT_WAIT) {
1003 		/* caller is _aio_exit() */
1004 		return true;
1005 	}
1006 	if (fd != entryp->aiocb.aio_fildes) {
1007 		/* not the file we're looking for */
1008 		return false;
1009 	}
1010 	/*
1011 	 * aio_cancel() or _aio_close() cancel
1012 	 * everything for a given fd when aiocbp is NULL
1013 	 */
1014 	return aiocbp == USER_ADDR_NULL || entryp->uaiocbp == aiocbp;
1015 }
1016 
1017 /*
1018  * do_aio_cancel_locked - cancel async IO requests (if possible).  We get called by
1019  * aio_cancel, close, and at exit.
1020  * There are three modes of operation: 1) cancel all async IOs for a process -
1021  * fd is 0 and aiocbp is NULL 2) cancel all async IOs for file descriptor - fd
1022  * is > 0 and aiocbp is NULL 3) cancel one async IO associated with the given
1023  * aiocbp.
1024  * Returns -1 if no matches were found, AIO_CANCELED when we cancelled all
1025  * target async IO requests, AIO_NOTCANCELED if we could not cancel all
1026  * target async IO requests, and AIO_ALLDONE if all target async IO requests
1027  * were already complete.
1028  * WARNING - do not deference aiocbp in this routine, it may point to user
1029  * land data that has not been copied in (when called from aio_cancel())
1030  *
1031  * Called with proc locked, and returns the same way.
1032  */
1033 static int
do_aio_cancel_locked(proc_t p,int fd,user_addr_t aiocbp,aio_entry_flags_t reason)1034 do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp,
1035     aio_entry_flags_t reason)
1036 {
1037 	bool multiple_matches = (aiocbp == USER_ADDR_NULL);
1038 	aio_workq_entry *entryp, *tmp;
1039 	int result;
1040 
1041 	ASSERT_AIO_PROC_LOCK_OWNED(p);
1042 
1043 	/* look for a match on our queue of async todo work. */
1044 again:
1045 	result = -1;
1046 	TAILQ_FOREACH_SAFE(entryp, &p->p_aio_activeq, aio_proc_link, tmp) {
1047 		ASSERT_AIO_FROM_PROC(entryp, p);
1048 
1049 		if (!should_cancel(entryp, fd, aiocbp, reason)) {
1050 			continue;
1051 		}
1052 
1053 		if (reason) {
1054 			/* mark the entry as blocking close or exit/exec */
1055 			entryp->flags |= reason;
1056 			if ((entryp->flags & AIO_EXIT_WAIT) && (entryp->flags & AIO_CLOSE_WAIT)) {
1057 				panic("Close and exit flags set at the same time");
1058 			}
1059 		}
1060 
1061 		/* Can only be cancelled if it's still on a work queue */
1062 		if (aio_entry_try_workq_remove(p, entryp)) {
1063 			entryp->errorval = ECANCELED;
1064 			entryp->returnval = -1;
1065 
1066 			/* Now it's officially cancelled.  Do the completion */
1067 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq) | DBG_FUNC_NONE,
1068 			    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1069 			    fd, 0, 0);
1070 			do_aio_completion_and_unlock(p, entryp, AIO_CANCELLED);
1071 
1072 			aio_proc_lock(p);
1073 
1074 			if (multiple_matches) {
1075 				/*
1076 				 * Restart from the head of the proc active queue since it
1077 				 * may have been changed while we were away doing completion
1078 				 * processing.
1079 				 *
1080 				 * Note that if we found an uncancellable AIO before, we will
1081 				 * either find it again or discover that it's been completed,
1082 				 * so resetting the result will not cause us to return success
1083 				 * despite outstanding AIOs.
1084 				 */
1085 				goto again;
1086 			}
1087 
1088 			return AIO_CANCELED;
1089 		}
1090 
1091 		/*
1092 		 * It's been taken off the active queue already, i.e. is in flight.
1093 		 * All we can do is ask for notification.
1094 		 */
1095 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_activeq) | DBG_FUNC_NONE,
1096 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1097 		    fd, 0, 0);
1098 
1099 		result = AIO_NOTCANCELED;
1100 		if (!multiple_matches) {
1101 			return result;
1102 		}
1103 	}
1104 
1105 	/*
1106 	 * if we didn't find any matches on the todo or active queues then look for a
1107 	 * match on our queue of async IO requests that have completed and if found
1108 	 * return AIO_ALLDONE result.
1109 	 *
1110 	 * Proc AIO lock is still held.
1111 	 */
1112 	if (result == -1) {
1113 		TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
1114 			ASSERT_AIO_FROM_PROC(entryp, p);
1115 			if (should_cancel(entryp, fd, aiocbp, reason)) {
1116 				KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_doneq) | DBG_FUNC_NONE,
1117 				    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1118 				    fd, 0, 0);
1119 
1120 				result = AIO_ALLDONE;
1121 				if (!multiple_matches) {
1122 					return result;
1123 				}
1124 			}
1125 		}
1126 	}
1127 
1128 	return result;
1129 }
1130 
1131 
1132 /*
1133  * aio_suspend - suspend the calling thread until at least one of the async
1134  * IO operations referenced by uap->aiocblist has completed, until a signal
1135  * interrupts the function, or uap->timeoutp time interval (optional) has
1136  * passed.
1137  * Returns 0 if one or more async IOs have completed else -1 and errno is
1138  * set appropriately - EAGAIN if timeout elapses or EINTR if an interrupt
1139  * woke us up.
1140  */
1141 int
aio_suspend(proc_t p,struct aio_suspend_args * uap,int * retval)1142 aio_suspend(proc_t p, struct aio_suspend_args *uap, int *retval)
1143 {
1144 	__pthread_testcancel(1);
1145 	return aio_suspend_nocancel(p, (struct aio_suspend_nocancel_args *)uap, retval);
1146 }
1147 
1148 
1149 int
aio_suspend_nocancel(proc_t p,struct aio_suspend_nocancel_args * uap,int * retval)1150 aio_suspend_nocancel(proc_t p, struct aio_suspend_nocancel_args *uap, int *retval)
1151 {
1152 	int                     error;
1153 	int                     i;
1154 	uint64_t                abstime;
1155 	struct user_timespec    ts;
1156 	aio_workq_entry        *entryp;
1157 	user_addr_t            *aiocbpp;
1158 	size_t                  aiocbpp_size;
1159 
1160 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend) | DBG_FUNC_START,
1161 	    VM_KERNEL_ADDRPERM(p), uap->nent, 0, 0, 0);
1162 
1163 	*retval = -1;
1164 	abstime = 0;
1165 	aiocbpp = NULL;
1166 
1167 	if (!aio_has_any_work()) {
1168 		error = EINVAL;
1169 		goto ExitThisRoutine;
1170 	}
1171 
1172 	if (uap->nent < 1 || uap->nent > aio_max_requests_per_process ||
1173 	    os_mul_overflow(sizeof(user_addr_t), uap->nent, &aiocbpp_size)) {
1174 		error = EINVAL;
1175 		goto ExitThisRoutine;
1176 	}
1177 
1178 	if (uap->timeoutp != USER_ADDR_NULL) {
1179 		if (proc_is64bit(p)) {
1180 			struct user64_timespec temp;
1181 			error = copyin(uap->timeoutp, &temp, sizeof(temp));
1182 			if (error == 0) {
1183 				ts.tv_sec = (user_time_t)temp.tv_sec;
1184 				ts.tv_nsec = (user_long_t)temp.tv_nsec;
1185 			}
1186 		} else {
1187 			struct user32_timespec temp;
1188 			error = copyin(uap->timeoutp, &temp, sizeof(temp));
1189 			if (error == 0) {
1190 				ts.tv_sec = temp.tv_sec;
1191 				ts.tv_nsec = temp.tv_nsec;
1192 			}
1193 		}
1194 		if (error != 0) {
1195 			error = EAGAIN;
1196 			goto ExitThisRoutine;
1197 		}
1198 
1199 		if (ts.tv_sec < 0 || ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000) {
1200 			error = EINVAL;
1201 			goto ExitThisRoutine;
1202 		}
1203 
1204 		nanoseconds_to_absolutetime((uint64_t)ts.tv_sec * NSEC_PER_SEC + ts.tv_nsec,
1205 		    &abstime);
1206 		clock_absolutetime_interval_to_deadline(abstime, &abstime);
1207 	}
1208 
1209 	aiocbpp = (user_addr_t *)kalloc_data(aiocbpp_size, Z_WAITOK);
1210 	if (aiocbpp == NULL || aio_copy_in_list(p, uap->aiocblist, aiocbpp, uap->nent)) {
1211 		error = EAGAIN;
1212 		goto ExitThisRoutine;
1213 	}
1214 
1215 	/* check list of aio requests to see if any have completed */
1216 check_for_our_aiocbp:
1217 	aio_proc_lock_spin(p);
1218 	for (i = 0; i < uap->nent; i++) {
1219 		user_addr_t     aiocbp;
1220 
1221 		/* NULL elements are legal so check for 'em */
1222 		aiocbp = *(aiocbpp + i);
1223 		if (aiocbp == USER_ADDR_NULL) {
1224 			continue;
1225 		}
1226 
1227 		/* return immediately if any aio request in the list is done */
1228 		TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
1229 			ASSERT_AIO_FROM_PROC(entryp, p);
1230 			if (entryp->uaiocbp == aiocbp) {
1231 				aio_proc_unlock(p);
1232 				*retval = 0;
1233 				error = 0;
1234 				goto ExitThisRoutine;
1235 			}
1236 		}
1237 	}
1238 
1239 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend_sleep) | DBG_FUNC_NONE,
1240 	    VM_KERNEL_ADDRPERM(p), uap->nent, 0, 0, 0);
1241 
1242 	/*
1243 	 * wait for an async IO to complete or a signal fires or timeout expires.
1244 	 * we return EAGAIN (35) for timeout expiration and EINTR (4) when a signal
1245 	 * interrupts us.  If an async IO completes before a signal fires or our
1246 	 * timeout expires, we get a wakeup call from aio_work_thread().
1247 	 */
1248 
1249 	error = msleep1(&p->AIO_SUSPEND_SLEEP_CHAN, aio_proc_mutex(p),
1250 	    PCATCH | PWAIT | PDROP, "aio_suspend", abstime);
1251 	if (error == 0) {
1252 		/*
1253 		 * got our wakeup call from aio_work_thread().
1254 		 * Since we can get a wakeup on this channel from another thread in the
1255 		 * same process we head back up to make sure this is for the correct aiocbp.
1256 		 * If it is the correct aiocbp we will return from where we do the check
1257 		 * (see entryp->uaiocbp == aiocbp after check_for_our_aiocbp label)
1258 		 * else we will fall out and just sleep again.
1259 		 */
1260 		goto check_for_our_aiocbp;
1261 	} else if (error == EWOULDBLOCK) {
1262 		/* our timeout expired */
1263 		error = EAGAIN;
1264 	} else {
1265 		/* we were interrupted */
1266 		error = EINTR;
1267 	}
1268 
1269 ExitThisRoutine:
1270 	if (aiocbpp != NULL) {
1271 		kfree_data(aiocbpp, aiocbpp_size);
1272 	}
1273 
1274 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend) | DBG_FUNC_END,
1275 	    VM_KERNEL_ADDRPERM(p), uap->nent, error, 0, 0);
1276 
1277 	return error;
1278 }
1279 
1280 
1281 /* aio_write - asynchronously write uap->aiocbp->aio_nbytes bytes to the
1282  * file descriptor (uap->aiocbp->aio_fildes) from the buffer
1283  * (uap->aiocbp->aio_buf).
1284  */
1285 
1286 int
aio_write(proc_t p,struct aio_write_args * uap,int * retval __unused)1287 aio_write(proc_t p, struct aio_write_args *uap, int *retval __unused)
1288 {
1289 	int error;
1290 
1291 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_write) | DBG_FUNC_START,
1292 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
1293 
1294 	error = aio_queue_async_request(p, uap->aiocbp, AIO_WRITE);
1295 
1296 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_write) | DBG_FUNC_END,
1297 	    VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
1298 
1299 	return error;
1300 }
1301 
1302 
1303 static int
aio_copy_in_list(proc_t procp,user_addr_t aiocblist,user_addr_t * aiocbpp,int nent)1304 aio_copy_in_list(proc_t procp, user_addr_t aiocblist, user_addr_t *aiocbpp,
1305     int nent)
1306 {
1307 	int result;
1308 
1309 	/* copyin our aiocb pointers from list */
1310 	result = copyin(aiocblist, aiocbpp,
1311 	    proc_is64bit(procp) ? (nent * sizeof(user64_addr_t))
1312 	    : (nent * sizeof(user32_addr_t)));
1313 	if (result) {
1314 		return result;
1315 	}
1316 
1317 	/*
1318 	 * We depend on a list of user_addr_t's so we need to
1319 	 * munge and expand when these pointers came from a
1320 	 * 32-bit process
1321 	 */
1322 	if (!proc_is64bit(procp)) {
1323 		/* copy from last to first to deal with overlap */
1324 		user32_addr_t *my_ptrp = ((user32_addr_t *)aiocbpp) + (nent - 1);
1325 		user_addr_t *my_addrp = aiocbpp + (nent - 1);
1326 
1327 		for (int i = 0; i < nent; i++, my_ptrp--, my_addrp--) {
1328 			*my_addrp = (user_addr_t) (*my_ptrp);
1329 		}
1330 	}
1331 
1332 	return 0;
1333 }
1334 
1335 
1336 static int
aio_copy_in_sigev(proc_t procp,user_addr_t sigp,struct user_sigevent * sigev)1337 aio_copy_in_sigev(proc_t procp, user_addr_t sigp, struct user_sigevent *sigev)
1338 {
1339 	int     result = 0;
1340 
1341 	if (sigp == USER_ADDR_NULL) {
1342 		goto out;
1343 	}
1344 
1345 	/*
1346 	 * We need to munge aio_sigevent since it contains pointers.
1347 	 * Since we do not know if sigev_value is an int or a ptr we do
1348 	 * NOT cast the ptr to a user_addr_t.   This means if we send
1349 	 * this info back to user space we need to remember sigev_value
1350 	 * was not expanded for the 32-bit case.
1351 	 *
1352 	 * Notes:	 This does NOT affect us since we don't support
1353 	 *		sigev_value yet in the aio context.
1354 	 */
1355 	if (proc_is64bit(procp)) {
1356 #if __LP64__
1357 		struct user64_sigevent sigevent64;
1358 
1359 		result = copyin(sigp, &sigevent64, sizeof(sigevent64));
1360 		if (result == 0) {
1361 			sigev->sigev_notify = sigevent64.sigev_notify;
1362 			sigev->sigev_signo = sigevent64.sigev_signo;
1363 			sigev->sigev_value.size_equivalent.sival_int = sigevent64.sigev_value.size_equivalent.sival_int;
1364 			sigev->sigev_notify_function = sigevent64.sigev_notify_function;
1365 			sigev->sigev_notify_attributes = sigevent64.sigev_notify_attributes;
1366 		}
1367 #else
1368 		panic("64bit process on 32bit kernel is not supported");
1369 #endif
1370 	} else {
1371 		struct user32_sigevent sigevent32;
1372 
1373 		result = copyin(sigp, &sigevent32, sizeof(sigevent32));
1374 		if (result == 0) {
1375 			sigev->sigev_notify = sigevent32.sigev_notify;
1376 			sigev->sigev_signo = sigevent32.sigev_signo;
1377 			sigev->sigev_value.size_equivalent.sival_int = sigevent32.sigev_value.sival_int;
1378 			sigev->sigev_notify_function = CAST_USER_ADDR_T(sigevent32.sigev_notify_function);
1379 			sigev->sigev_notify_attributes = CAST_USER_ADDR_T(sigevent32.sigev_notify_attributes);
1380 		}
1381 	}
1382 
1383 	if (result != 0) {
1384 		result = EAGAIN;
1385 	}
1386 
1387 out:
1388 	return result;
1389 }
1390 
1391 /*
1392  * validate user_sigevent.  at this point we only support
1393  * sigev_notify equal to SIGEV_SIGNAL or SIGEV_NONE.  this means
1394  * sigev_value, sigev_notify_function, and sigev_notify_attributes
1395  * are ignored, since SIGEV_THREAD is unsupported.  This is consistent
1396  * with no [RTS] (RalTime Signal) option group support.
1397  */
1398 static int
aio_sigev_validate(const struct user_sigevent * sigev)1399 aio_sigev_validate(const struct user_sigevent *sigev)
1400 {
1401 	switch (sigev->sigev_notify) {
1402 	case SIGEV_SIGNAL:
1403 	{
1404 		int signum;
1405 
1406 		/* make sure we have a valid signal number */
1407 		signum = sigev->sigev_signo;
1408 		if (signum <= 0 || signum >= NSIG ||
1409 		    signum == SIGKILL || signum == SIGSTOP) {
1410 			return EINVAL;
1411 		}
1412 	}
1413 	break;
1414 
1415 	case SIGEV_NONE:
1416 		break;
1417 
1418 	case SIGEV_KEVENT:
1419 		/*
1420 		 * The sigev_signo should contain the descriptor of the kqueue.
1421 		 * Validate that it contains some sane value.
1422 		 */
1423 		if (sigev->sigev_signo <= 0 || sigev->sigev_signo > maxfilesperproc) {
1424 			return EINVAL;
1425 		}
1426 		break;
1427 
1428 	case SIGEV_THREAD:
1429 	/* Unsupported [RTS] */
1430 
1431 	default:
1432 		return EINVAL;
1433 	}
1434 
1435 	return 0;
1436 }
1437 
1438 
1439 /*
1440  * aio_try_enqueue_work_locked
1441  *
1442  * Queue up the entry on the aio asynchronous work queue in priority order
1443  * based on the relative priority of the request.  We calculate the relative
1444  * priority using the nice value of the caller and the value
1445  *
1446  * Parameters:	procp			Process queueing the I/O
1447  *		entryp			The work queue entry being queued
1448  *		leader			The work leader if any
1449  *
1450  * Returns:	Whether the enqueue was successful
1451  *
1452  * Notes:	This function is used for both lio_listio and aio
1453  *
1454  * XXX:		At some point, we may have to consider thread priority
1455  *		rather than process priority, but we don't maintain the
1456  *		adjusted priority for threads the POSIX way.
1457  *
1458  * Called with proc locked.
1459  */
1460 static bool
aio_try_enqueue_work_locked(proc_t procp,aio_workq_entry * entryp,aio_workq_entry * leader)1461 aio_try_enqueue_work_locked(proc_t procp, aio_workq_entry *entryp,
1462     aio_workq_entry *leader)
1463 {
1464 	ASSERT_AIO_PROC_LOCK_OWNED(procp);
1465 
1466 	/* Onto proc queue */
1467 	if (!aio_try_proc_insert_active_locked(procp, entryp)) {
1468 		return false;
1469 	}
1470 
1471 	if (leader) {
1472 		aio_entry_ref(leader); /* consumed in do_aio_completion_and_unlock */
1473 		leader->lio_pending++;
1474 		entryp->lio_leader = leader;
1475 	}
1476 
1477 	/* And work queue */
1478 	aio_entry_ref(entryp); /* consumed in do_aio_completion_and_unlock */
1479 	if (bootarg_aio_new_workq) {
1480 		if (!workq_aio_entry_add_locked(procp, entryp)) {
1481 			(void)os_ref_release(&entryp->aio_refcount);
1482 			return false;
1483 		}
1484 	} else {
1485 		aio_workq_t queue = aio_entry_workq(entryp);
1486 		aio_workq_lock_spin(queue);
1487 		aio_workq_add_entry_locked(queue, entryp);
1488 		waitq_wakeup64_one(&queue->aioq_waitq, CAST_EVENT64_T(queue),
1489 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
1490 		aio_workq_unlock(queue);
1491 	}
1492 
1493 	KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued) | DBG_FUNC_START,
1494 	    VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1495 	    entryp->flags, entryp->aiocb.aio_fildes, 0);
1496 	KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued) | DBG_FUNC_END,
1497 	    entryp->aiocb.aio_offset, 0, entryp->aiocb.aio_nbytes, 0, 0);
1498 	return true;
1499 }
1500 
1501 /*
1502  * EV_FLAG0/1 are filter specific flags.
1503  * Repurpose EV_FLAG0 to indicate the kevent is registered from kernel.
1504  */
1505 #define EV_KERNEL    EV_FLAG0
1506 
1507 /* Internal function to register/unregister a AIO kevent. */
1508 static int
aio_register_kevent_internal(proc_t procp,aio_workq_entry * entryp,struct user_sigevent * sigp,uintptr_t ident,int16_t filter,uint16_t flags)1509 aio_register_kevent_internal(proc_t procp, aio_workq_entry *entryp,
1510     struct user_sigevent *sigp, uintptr_t ident, int16_t filter, uint16_t flags)
1511 {
1512 	struct kevent_qos_s kev;
1513 	struct fileproc *fp = NULL;
1514 	kqueue_t kqu;
1515 	int kqfd = sigp->sigev_signo;
1516 	int error;
1517 
1518 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_register_kevent) | DBG_FUNC_START,
1519 	    VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp),
1520 	    VM_KERNEL_ADDRPERM(entryp->uaiocbp), kqfd, 0);
1521 
1522 	error = fp_get_ftype(procp, kqfd, DTYPE_KQUEUE, EBADF, &fp);
1523 	if (error) {
1524 		goto exit;
1525 	}
1526 
1527 	kqu.kq = (struct kqueue *)fp_get_data(fp);
1528 
1529 	memset(&kev, 0, sizeof(kev));
1530 	kev.ident = ident;
1531 	kev.filter = filter;
1532 	kev.flags = flags;
1533 	kev.udata = sigp->sigev_value.sival_ptr;
1534 	kev.data = (intptr_t)entryp;
1535 
1536 	error = kevent_register(kqu.kq, &kev, NULL);
1537 	assert((error & FILTER_REGISTER_WAIT) == 0);
1538 
1539 	if (kev.flags & EV_ERROR) {
1540 		error = (int)kev.data;
1541 	}
1542 
1543 exit:
1544 	if (fp) {
1545 		fp_drop(procp, kqfd, fp, 0);
1546 	}
1547 
1548 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_register_kevent) | DBG_FUNC_END,
1549 	    VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp), error, 0, 0);
1550 
1551 	return error;
1552 }
1553 
1554 static int
aio_register_kevent(proc_t procp,aio_workq_entry * entryp,struct user_sigevent * sigp,uintptr_t ident,int16_t filter)1555 aio_register_kevent(proc_t procp, aio_workq_entry *entryp,
1556     struct user_sigevent *sigp, uintptr_t ident, int16_t filter)
1557 {
1558 	/*
1559 	 * Set the EV_FLAG0 to indicate the event is registered from the kernel.
1560 	 * This flag is later checked in filt_aioattach() to determine if the
1561 	 * kevent is registered from kernel or user-space.
1562 	 */
1563 	uint16_t flags = EV_ADD | EV_ENABLE | EV_CLEAR | EV_ONESHOT | EV_KERNEL;
1564 	int error;
1565 
1566 	error = aio_register_kevent_internal(procp, entryp, sigp, ident, filter, flags);
1567 	if (!error) {
1568 		entryp->flags |= AIO_KEVENT_REGISTERED;
1569 	}
1570 
1571 	return error;
1572 }
1573 
1574 static int
aio_unregister_kevent(proc_t procp,aio_workq_entry * entryp,struct user_sigevent * sigp,uintptr_t ident,int16_t filter)1575 aio_unregister_kevent(proc_t procp, aio_workq_entry *entryp,
1576     struct user_sigevent *sigp, uintptr_t ident, int16_t filter)
1577 {
1578 	/*
1579 	 * Set the EV_KERNEL to indicate the event is unregistered from the kernel.
1580 	 */
1581 	uint16_t flags = EV_DELETE | EV_KERNEL;
1582 	int error;
1583 
1584 	error = aio_register_kevent_internal(procp, entryp, sigp, ident, filter, flags);
1585 	if (!error) {
1586 		entryp->flags &= ~AIO_KEVENT_REGISTERED;
1587 	}
1588 
1589 	return error;
1590 }
1591 
1592 /*
1593  * lio_listio - initiate a list of IO requests.  We process the list of
1594  * aiocbs either synchronously (mode == LIO_WAIT) or asynchronously
1595  * (mode == LIO_NOWAIT).
1596  *
1597  * The caller gets error and return status for each aiocb in the list
1598  * via aio_error and aio_return.  We must keep completed requests until
1599  * released by the aio_return call.
1600  */
1601 int
lio_listio(proc_t p,struct lio_listio_args * uap,int * retval __unused)1602 lio_listio(proc_t p, struct lio_listio_args *uap, int *retval __unused)
1603 {
1604 	aio_workq_entry         *entries[AIO_LISTIO_MAX] = { };
1605 	user_addr_t              aiocbpp[AIO_LISTIO_MAX];
1606 	struct user_sigevent     aiosigev = { };
1607 	int                      result = 0;
1608 	int                      lio_count = 0;
1609 
1610 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_listio) | DBG_FUNC_START,
1611 	    VM_KERNEL_ADDRPERM(p), uap->nent, uap->mode, 0, 0);
1612 
1613 	if (!(uap->mode == LIO_NOWAIT || uap->mode == LIO_WAIT)) {
1614 		result = EINVAL;
1615 		goto ExitRoutine;
1616 	}
1617 
1618 	if (uap->nent < 1 || uap->nent > AIO_LISTIO_MAX) {
1619 		result = EINVAL;
1620 		goto ExitRoutine;
1621 	}
1622 
1623 	/*
1624 	 * Use sigevent passed in to lio_listio for each of our calls, but
1625 	 * only do completion notification after the last request completes.
1626 	 */
1627 	if (uap->sigp != USER_ADDR_NULL) {
1628 		result = aio_copy_in_sigev(p, uap->sigp, &aiosigev);
1629 		if (result) {
1630 			goto ExitRoutine;
1631 		}
1632 		result = aio_sigev_validate(&aiosigev);
1633 		if (result) {
1634 			goto ExitRoutine;
1635 		}
1636 	}
1637 
1638 	if (aio_copy_in_list(p, uap->aiocblist, aiocbpp, uap->nent)) {
1639 		result = EAGAIN;
1640 		goto ExitRoutine;
1641 	}
1642 
1643 	/*
1644 	 * allocate/parse all entries
1645 	 */
1646 	for (int i = 0; i < uap->nent; i++) {
1647 		aio_workq_entry *entryp;
1648 
1649 		/* NULL elements are legal so check for 'em */
1650 		if (aiocbpp[i] == USER_ADDR_NULL) {
1651 			continue;
1652 		}
1653 
1654 		entryp = aio_create_queue_entry(p, aiocbpp[i], AIO_LIO);
1655 		if (entryp == NULL) {
1656 			result = EAGAIN;
1657 			goto ExitRoutine;
1658 		}
1659 
1660 		/*
1661 		 * This refcount is cleaned up on exit if the entry
1662 		 * isn't submitted
1663 		 */
1664 		entries[lio_count++] = entryp;
1665 		if (uap->mode == LIO_WAIT) {
1666 			continue;
1667 		}
1668 
1669 		if (entryp->aiocb.aio_sigevent.sigev_notify != SIGEV_KEVENT) {
1670 			/* Set signal hander, if any */
1671 			entryp->aiocb.aio_sigevent = aiosigev;
1672 		} else {
1673 			/*
1674 			 * For SIGEV_KEVENT, every AIO in the list would get its own kevent
1675 			 * notification upon completion as opposed to SIGEV_SIGNAL which a
1676 			 * single notification is deliverd when all AIOs have completed.
1677 			 */
1678 			result = aio_register_kevent(p, entryp, &entryp->aiocb.aio_sigevent,
1679 			    (uintptr_t)entryp->uaiocbp, EVFILT_AIO);
1680 			if (result) {
1681 				goto ExitRoutine;
1682 			}
1683 		}
1684 	}
1685 
1686 	if (lio_count == 0) {
1687 		/* There's nothing to submit */
1688 		goto ExitRoutine;
1689 	}
1690 
1691 	/*
1692 	 * Past this point we're commited and will not bail out
1693 	 *
1694 	 * - keep a reference on the leader for LIO_WAIT
1695 	 * - perform the submissions and optionally wait
1696 	 */
1697 
1698 	aio_workq_entry *leader = entries[0];
1699 	if (uap->mode == LIO_WAIT) {
1700 		aio_entry_ref(leader); /* consumed below */
1701 	}
1702 
1703 	aio_proc_lock(p);
1704 
1705 	for (int i = 0; i < lio_count; i++) {
1706 		if (aio_try_enqueue_work_locked(p, entries[i], leader)) {
1707 			workq_aio_wakeup_thread(p); /* this may drop and reacquire the proc lock */
1708 			entries[i] = NULL; /* the entry was submitted */
1709 		} else {
1710 			result = EAGAIN;
1711 		}
1712 	}
1713 
1714 	if (uap->mode == LIO_WAIT && result == 0) {
1715 		leader->flags |= AIO_LIO_WAIT;
1716 
1717 		while (leader->lio_pending) {
1718 			/* If we were interrupted, fail out (even if all finished) */
1719 			if (msleep(leader, aio_proc_mutex(p),
1720 			    PCATCH | PRIBIO | PSPIN, "lio_listio", 0) != 0) {
1721 				result = EINTR;
1722 				break;
1723 			}
1724 		}
1725 
1726 		leader->flags &= ~AIO_LIO_WAIT;
1727 	}
1728 
1729 	aio_proc_unlock(p);
1730 
1731 	if (uap->mode == LIO_WAIT) {
1732 		aio_entry_unref(leader);
1733 	}
1734 
1735 ExitRoutine:
1736 	/* Consume unsubmitted entries */
1737 	for (int i = 0; i < lio_count; i++) {
1738 		aio_workq_entry *entryp = entries[i];
1739 
1740 		if (entryp) {
1741 			if (entryp->flags & AIO_KEVENT_REGISTERED) {
1742 				(void)aio_unregister_kevent(p, entryp,
1743 				    &entryp->aiocb.aio_sigevent, (uintptr_t)entryp->uaiocbp,
1744 				    EVFILT_AIO);
1745 			}
1746 			aio_entry_unref(entryp);
1747 		}
1748 	}
1749 
1750 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_listio) | DBG_FUNC_END,
1751 	    VM_KERNEL_ADDRPERM(p), result, 0, 0, 0);
1752 
1753 	return result;
1754 }
1755 
1756 
1757 /*
1758  * aio worker thread.  this is where all the real work gets done.
1759  * we get a wake up call on sleep channel &aio_anchor.aio_async_workq
1760  * after new work is queued up.
1761  */
1762 __attribute__((noreturn))
1763 static void
aio_work_thread(void * arg __unused,wait_result_t wr __unused)1764 aio_work_thread(void *arg __unused, wait_result_t wr __unused)
1765 {
1766 	aio_workq_entry         *entryp;
1767 	int                     error;
1768 	vm_map_switch_context_t switch_ctx;
1769 	struct uthread          *uthreadp = NULL;
1770 	proc_t                  p = NULL;
1771 
1772 	for (;;) {
1773 		/*
1774 		 * returns with the entry ref'ed.
1775 		 * sleeps until work is available.
1776 		 */
1777 		entryp = aio_get_some_work();
1778 		p = entryp->procp;
1779 
1780 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread) | DBG_FUNC_START,
1781 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1782 		    entryp->flags, 0, 0);
1783 
1784 		/*
1785 		 * Assume the target's address space identity for the duration
1786 		 * of the IO.  Note: don't need to have the entryp locked,
1787 		 * because the proc and map don't change until it's freed.
1788 		 */
1789 		uthreadp = (struct uthread *) current_uthread();
1790 		assert(get_task_map(proc_task(current_proc())) != entryp->aio_map);
1791 		assert(uthreadp->uu_aio_task == NULL);
1792 
1793 		/*
1794 		 * workq entries at this stage cause _aio_exec() and _aio_exit() to
1795 		 * block until we hit `do_aio_completion_and_unlock()` below,
1796 		 * which means that it is safe to dereference p->task without
1797 		 * holding a lock or taking references.
1798 		 */
1799 		uthreadp->uu_aio_task = proc_task(p);
1800 		switch_ctx = vm_map_switch_to(entryp->aio_map);
1801 
1802 		if ((entryp->flags & AIO_READ) != 0) {
1803 			error = do_aio_read(entryp);
1804 		} else if ((entryp->flags & AIO_WRITE) != 0) {
1805 			error = do_aio_write(entryp);
1806 		} else if ((entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0) {
1807 			error = do_aio_fsync(entryp);
1808 		} else {
1809 			error = EINVAL;
1810 		}
1811 
1812 		/* Restore old map */
1813 		vm_map_switch_back(switch_ctx);
1814 		uthreadp->uu_aio_task = NULL;
1815 
1816 		/* liberate unused map */
1817 		vm_map_deallocate(entryp->aio_map);
1818 		entryp->aio_map = VM_MAP_NULL;
1819 
1820 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread) | DBG_FUNC_END,
1821 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1822 		    entryp->errorval, entryp->returnval, 0);
1823 
1824 		/* we're done with the IO request so pop it off the active queue and */
1825 		/* push it on the done queue */
1826 		aio_proc_lock(p);
1827 		entryp->errorval = error;
1828 		do_aio_completion_and_unlock(p, entryp, AIO_COMPLETED);
1829 	}
1830 }
1831 
1832 
1833 /*
1834  * aio_get_some_work - get the next async IO request that is ready to be executed.
1835  * aio_fsync complicates matters a bit since we cannot do the fsync until all async
1836  * IO requests at the time the aio_fsync call came in have completed.
1837  * NOTE - AIO_LOCK must be held by caller
1838  */
1839 static aio_workq_entry *
aio_get_some_work(void)1840 aio_get_some_work(void)
1841 {
1842 	aio_workq_entry *entryp = NULL;
1843 	aio_workq_t      queue = NULL;
1844 
1845 	/* Just one queue for the moment.  In the future there will be many. */
1846 	queue = &aio_anchor.aio_async_workqs[0];
1847 	aio_workq_lock_spin(queue);
1848 
1849 	/*
1850 	 * Hold the queue lock.
1851 	 *
1852 	 * pop some work off the work queue and add to our active queue
1853 	 * Always start with the queue lock held.
1854 	 */
1855 	while ((entryp = TAILQ_FIRST(&queue->aioq_entries))) {
1856 		/*
1857 		 * Pull of of work queue.  Once it's off, it can't be cancelled,
1858 		 * so we can take our ref once we drop the queue lock.
1859 		 */
1860 
1861 		aio_workq_remove_entry_locked(queue, entryp);
1862 
1863 		aio_workq_unlock(queue);
1864 
1865 		/*
1866 		 * Check if it's an fsync that must be delayed.  No need to lock the entry;
1867 		 * that flag would have been set at initialization.
1868 		 */
1869 		if ((entryp->flags & AIO_FSYNC) != 0) {
1870 			/*
1871 			 * Check for unfinished operations on the same file
1872 			 * in this proc's queue.
1873 			 */
1874 			aio_proc_lock_spin(entryp->procp);
1875 			if (aio_delay_fsync_request(entryp)) {
1876 				/* It needs to be delayed.  Put it back on the end of the work queue */
1877 				KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay) | DBG_FUNC_NONE,
1878 				    VM_KERNEL_ADDRPERM(entryp->procp),
1879 				    VM_KERNEL_ADDRPERM(entryp->uaiocbp), 0, 0, 0);
1880 
1881 				aio_proc_unlock(entryp->procp);
1882 
1883 				aio_workq_lock_spin(queue);
1884 				aio_workq_add_entry_locked(queue, entryp);
1885 				continue;
1886 			}
1887 			aio_proc_unlock(entryp->procp);
1888 		}
1889 
1890 		return entryp;
1891 	}
1892 
1893 	/* We will wake up when someone enqueues something */
1894 	waitq_assert_wait64(&queue->aioq_waitq, CAST_EVENT64_T(queue), THREAD_UNINT, 0);
1895 	aio_workq_unlock(queue);
1896 	thread_block(aio_work_thread);
1897 
1898 	__builtin_unreachable();
1899 }
1900 
1901 /*
1902  * aio_delay_fsync_request - look to see if this aio_fsync request should be delayed.
1903  * A big, simple hammer: only send it off if it's the most recently filed IO which has
1904  * not been completed.
1905  */
1906 static boolean_t
aio_delay_fsync_request(aio_workq_entry * entryp)1907 aio_delay_fsync_request(aio_workq_entry *entryp)
1908 {
1909 	if (proc_in_teardown(entryp->procp)) {
1910 		/*
1911 		 * we can't delay FSYNCS when in teardown as it will confuse _aio_exit,
1912 		 * if it was dequeued, then we must now commit to it
1913 		 */
1914 		return FALSE;
1915 	}
1916 
1917 	if (entryp == TAILQ_FIRST(&entryp->procp->p_aio_activeq)) {
1918 		return FALSE;
1919 	}
1920 
1921 	return TRUE;
1922 }
1923 
1924 static aio_workq_entry *
aio_create_queue_entry(proc_t procp,user_addr_t aiocbp,aio_entry_flags_t flags)1925 aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t flags)
1926 {
1927 	aio_workq_entry *entryp;
1928 
1929 	entryp = zalloc_flags(aio_workq_zonep, Z_WAITOK | Z_ZERO);
1930 	entryp->procp = procp;
1931 	entryp->uaiocbp = aiocbp;
1932 	entryp->flags = flags;
1933 	/* consumed in aio_return or _aio_exit */
1934 	os_ref_init(&entryp->aio_refcount, &aio_refgrp);
1935 
1936 	if (proc_is64bit(procp)) {
1937 		struct user64_aiocb aiocb64;
1938 
1939 		if (copyin(aiocbp, &aiocb64, sizeof(aiocb64)) != 0) {
1940 			goto error_exit;
1941 		}
1942 		do_munge_aiocb_user64_to_user(&aiocb64, &entryp->aiocb);
1943 	} else {
1944 		struct user32_aiocb aiocb32;
1945 
1946 		if (copyin(aiocbp, &aiocb32, sizeof(aiocb32)) != 0) {
1947 			goto error_exit;
1948 		}
1949 		do_munge_aiocb_user32_to_user(&aiocb32, &entryp->aiocb);
1950 	}
1951 
1952 	/* do some more validation on the aiocb and embedded file descriptor */
1953 	if (aio_validate(procp, entryp) != 0) {
1954 		goto error_exit;
1955 	}
1956 
1957 	/* get a reference on the current_thread, which is passed in vfs_context. */
1958 	entryp->context = *vfs_context_current();
1959 	thread_reference(entryp->context.vc_thread);
1960 	kauth_cred_ref(entryp->context.vc_ucred);
1961 
1962 	if (bootarg_aio_new_workq) {
1963 		entryp->aio_map = VM_MAP_NULL;
1964 		workq_aio_prepare(procp);
1965 	} else {
1966 		/* get a reference to the user land map in order to keep it around */
1967 		entryp->aio_map = get_task_map(proc_task(procp));
1968 		vm_map_reference(entryp->aio_map);
1969 	}
1970 	return entryp;
1971 
1972 error_exit:
1973 	zfree(aio_workq_zonep, entryp);
1974 	return NULL;
1975 }
1976 
1977 
1978 /*
1979  * aio_queue_async_request - queue up an async IO request on our work queue then
1980  * wake up one of our worker threads to do the actual work.  We get a reference
1981  * to our caller's user land map in order to keep it around while we are
1982  * processing the request.
1983  */
1984 static int
aio_queue_async_request(proc_t procp,user_addr_t aiocbp,aio_entry_flags_t flags)1985 aio_queue_async_request(proc_t procp, user_addr_t aiocbp,
1986     aio_entry_flags_t flags)
1987 {
1988 	aio_workq_entry *entryp;
1989 	int              result;
1990 
1991 	entryp = aio_create_queue_entry(procp, aiocbp, flags);
1992 	if (entryp == NULL) {
1993 		result = EAGAIN;
1994 		goto error_noalloc;
1995 	}
1996 
1997 	aio_proc_lock(procp);
1998 	if (entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) {
1999 		result = aio_register_kevent(procp, entryp, &entryp->aiocb.aio_sigevent,
2000 		    (uintptr_t)entryp->uaiocbp, EVFILT_AIO);
2001 		if (result) {
2002 			goto error_exit;
2003 		}
2004 	}
2005 
2006 	if (!aio_try_enqueue_work_locked(procp, entryp, NULL)) {
2007 		(void)aio_unregister_kevent(procp, entryp, &entryp->aiocb.aio_sigevent,
2008 		    (uintptr_t)entryp->uaiocbp, EVFILT_AIO);
2009 		result = EAGAIN;
2010 		goto error_exit;
2011 	}
2012 	workq_aio_wakeup_thread_and_unlock(procp);
2013 	return 0;
2014 
2015 error_exit:
2016 	/*
2017 	 * This entry has not been queued up so no worries about
2018 	 * unlocked state and aio_map
2019 	 */
2020 	aio_proc_unlock(procp);
2021 	aio_free_request(entryp);
2022 error_noalloc:
2023 	return result;
2024 }
2025 
2026 
2027 /*
2028  * aio_free_request - remove our reference on the user land map and
2029  * free the work queue entry resources.  The entry is off all lists
2030  * and has zero refcount, so no one can have a pointer to it.
2031  */
2032 static void
aio_free_request(aio_workq_entry * entryp)2033 aio_free_request(aio_workq_entry *entryp)
2034 {
2035 	if (entryp->aio_proc_link.tqe_prev || entryp->aio_workq_link.tqe_prev) {
2036 		panic("aio_workq_entry %p being freed while still enqueued", entryp);
2037 	}
2038 
2039 	/* remove our reference to the user land map. */
2040 	if (VM_MAP_NULL != entryp->aio_map) {
2041 		vm_map_deallocate(entryp->aio_map);
2042 	}
2043 
2044 	/* remove our reference to thread which enqueued the request */
2045 	if (entryp->context.vc_thread) {
2046 		thread_deallocate(entryp->context.vc_thread);
2047 	}
2048 	kauth_cred_unref(&entryp->context.vc_ucred);
2049 
2050 	zfree(aio_workq_zonep, entryp);
2051 }
2052 
2053 
2054 /*
2055  * aio_validate
2056  *
2057  * validate the aiocb passed in by one of the aio syscalls.
2058  */
2059 static int
aio_validate(proc_t p,aio_workq_entry * entryp)2060 aio_validate(proc_t p, aio_workq_entry *entryp)
2061 {
2062 	struct fileproc *fp;
2063 	int              flag;
2064 	int              result;
2065 
2066 	result = 0;
2067 
2068 	if ((entryp->flags & AIO_LIO) != 0) {
2069 		if (entryp->aiocb.aio_lio_opcode == LIO_READ) {
2070 			entryp->flags |= AIO_READ;
2071 		} else if (entryp->aiocb.aio_lio_opcode == LIO_WRITE) {
2072 			entryp->flags |= AIO_WRITE;
2073 		} else if (entryp->aiocb.aio_lio_opcode == LIO_NOP) {
2074 			return 0;
2075 		} else {
2076 			return EINVAL;
2077 		}
2078 	}
2079 
2080 	flag = FREAD;
2081 	if ((entryp->flags & (AIO_WRITE | AIO_FSYNC | AIO_DSYNC)) != 0) {
2082 		flag = FWRITE;
2083 	}
2084 
2085 	if ((entryp->flags & (AIO_READ | AIO_WRITE)) != 0) {
2086 		if (entryp->aiocb.aio_nbytes > INT_MAX ||
2087 		    entryp->aiocb.aio_buf == USER_ADDR_NULL ||
2088 		    entryp->aiocb.aio_offset < 0) {
2089 			return EINVAL;
2090 		}
2091 	}
2092 
2093 	result = aio_sigev_validate(&entryp->aiocb.aio_sigevent);
2094 	if (result) {
2095 		return result;
2096 	}
2097 
2098 	/* validate the file descriptor and that the file was opened
2099 	 * for the appropriate read / write access.
2100 	 */
2101 	proc_fdlock(p);
2102 
2103 	fp = fp_get_noref_locked(p, entryp->aiocb.aio_fildes);
2104 	if (fp == NULL) {
2105 		result = EBADF;
2106 	} else if ((fp->fp_glob->fg_flag & flag) == 0) {
2107 		/* we don't have read or write access */
2108 		result = EBADF;
2109 	} else if (FILEGLOB_DTYPE(fp->fp_glob) != DTYPE_VNODE) {
2110 		/* this is not a file */
2111 		result = ESPIPE;
2112 	} else {
2113 		fp->fp_flags |= FP_AIOISSUED;
2114 	}
2115 
2116 	proc_fdunlock(p);
2117 
2118 	return result;
2119 }
2120 
2121 /*
2122  * do_aio_completion_and_unlock.  Handle async IO completion.
2123  */
2124 static void
do_aio_completion_and_unlock(proc_t p,aio_workq_entry * entryp,aio_entry_flags_t reason)2125 do_aio_completion_and_unlock(proc_t p, aio_workq_entry *entryp,
2126     aio_entry_flags_t reason)
2127 {
2128 	aio_workq_entry *leader = entryp->lio_leader;
2129 	int              lio_pending = 0;
2130 	bool             do_signal, do_kevent;
2131 
2132 	ASSERT_AIO_PROC_LOCK_OWNED(p);
2133 
2134 	aio_proc_move_done_locked(p, entryp);
2135 	entryp->flags |= reason;
2136 
2137 	if (leader) {
2138 		lio_pending = --leader->lio_pending;
2139 		if (lio_pending < 0) {
2140 			panic("lio_pending accounting mistake");
2141 		}
2142 		if (lio_pending == 0 && (leader->flags & AIO_LIO_WAIT)) {
2143 			wakeup(leader);
2144 		}
2145 		entryp->lio_leader = NULL; /* no dangling pointers please */
2146 	}
2147 
2148 	/*
2149 	 * need to handle case where a process is trying to exit, exec, or
2150 	 * close and is currently waiting for active aio requests to complete.
2151 	 * If AIO_CLEANUP_WAIT is set then we need to look to see if there are any
2152 	 * other requests in the active queue for this process.  If there are
2153 	 * none then wakeup using the AIO_CLEANUP_SLEEP_CHAN tsleep channel.
2154 	 * If there are some still active then do nothing - we only want to
2155 	 * wakeup when all active aio requests for the process are complete.
2156 	 */
2157 	do_signal = do_kevent = false;
2158 	if (__improbable(entryp->flags & AIO_EXIT_WAIT)) {
2159 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait) | DBG_FUNC_NONE,
2160 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2161 		    0, 0, 0);
2162 
2163 		if (!aio_has_active_requests_for_process(p)) {
2164 			/*
2165 			 * no active aio requests for this process, continue exiting.  In this
2166 			 * case, there should be no one else waiting on the proc in AIO...
2167 			 */
2168 			wakeup_one((caddr_t)&p->AIO_CLEANUP_SLEEP_CHAN);
2169 
2170 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake) | DBG_FUNC_NONE,
2171 			    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2172 			    0, 0, 0);
2173 		}
2174 	} else if (entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2175 		/*
2176 		 * If this was the last request in the group, or not part of
2177 		 * a group, and that a signal is desired, send one.
2178 		 */
2179 		do_signal = (lio_pending == 0);
2180 	} else if (entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) {
2181 		/*
2182 		 * For SIGEV_KEVENT, every AIO (even it is part of a group) would get
2183 		 * a kevent notification.
2184 		 */
2185 		do_kevent = true;
2186 	}
2187 
2188 	if (__improbable(entryp->flags & AIO_CLOSE_WAIT)) {
2189 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait) | DBG_FUNC_NONE,
2190 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2191 		    0, 0, 0);
2192 
2193 		if (!aio_proc_has_active_requests_for_file(p, entryp->aiocb.aio_fildes)) {
2194 			/* Can't wakeup_one(); multiple closes might be in progress. */
2195 			wakeup(&p->AIO_CLEANUP_SLEEP_CHAN);
2196 
2197 			KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake) | DBG_FUNC_NONE,
2198 			    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2199 			    0, 0, 0);
2200 		}
2201 	}
2202 
2203 	aio_proc_unlock(p);
2204 
2205 	if (do_signal) {
2206 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_sig) | DBG_FUNC_NONE,
2207 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2208 		    entryp->aiocb.aio_sigevent.sigev_signo, 0, 0);
2209 
2210 		psignal(p, entryp->aiocb.aio_sigevent.sigev_signo);
2211 	} else if (do_kevent) {
2212 		KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_kevent) | DBG_FUNC_NONE,
2213 		    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2214 		    entryp->aiocb.aio_sigevent.sigev_signo, 0, 0);
2215 
2216 		/* We only support one event type for either completed/cancelled AIO. */
2217 		lck_mtx_lock(&aio_klist_lock);
2218 		KNOTE(&aio_klist, 1);
2219 		lck_mtx_unlock(&aio_klist_lock);
2220 	}
2221 
2222 	/*
2223 	 * A thread in aio_suspend() wants to known about completed IOs.  If it checked
2224 	 * the done list before we moved our AIO there, then it already asserted its wait,
2225 	 * and we can wake it up without holding the lock.  If it checked the list after
2226 	 * we did our move, then it already has seen the AIO that we moved.  Herego, we
2227 	 * can do our wakeup without holding the lock.
2228 	 */
2229 	wakeup(&p->AIO_SUSPEND_SLEEP_CHAN);
2230 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_suspend_wake) | DBG_FUNC_NONE,
2231 	    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp), 0, 0, 0);
2232 
2233 	aio_entry_unref(entryp); /* see aio_try_enqueue_work_locked */
2234 	if (leader) {
2235 		aio_entry_unref(leader); /* see lio_listio */
2236 	}
2237 }
2238 
2239 
2240 /*
2241  * do_aio_read
2242  */
2243 static int
do_aio_read(aio_workq_entry * entryp)2244 do_aio_read(aio_workq_entry *entryp)
2245 {
2246 	struct proc     *p = entryp->procp;
2247 	struct fileproc *fp;
2248 	int error;
2249 
2250 	if ((error = fp_lookup(p, entryp->aiocb.aio_fildes, &fp, 0))) {
2251 		return error;
2252 	}
2253 
2254 	if (fp->fp_glob->fg_flag & FREAD) {
2255 		error = dofileread(&entryp->context, fp,
2256 		    entryp->aiocb.aio_buf,
2257 		    entryp->aiocb.aio_nbytes,
2258 		    entryp->aiocb.aio_offset, FOF_OFFSET,
2259 		    &entryp->returnval);
2260 	} else {
2261 		error = EBADF;
2262 	}
2263 
2264 	fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2265 	return error;
2266 }
2267 
2268 
2269 /*
2270  * do_aio_write
2271  */
2272 static int
do_aio_write(aio_workq_entry * entryp)2273 do_aio_write(aio_workq_entry *entryp)
2274 {
2275 	struct proc     *p = entryp->procp;
2276 	struct fileproc *fp;
2277 	int error;
2278 
2279 	if ((error = fp_lookup(p, entryp->aiocb.aio_fildes, &fp, 0))) {
2280 		return error;
2281 	}
2282 
2283 	if (fp->fp_glob->fg_flag & FWRITE) {
2284 		int flags = 0;
2285 
2286 		if ((fp->fp_glob->fg_flag & O_APPEND) == 0) {
2287 			flags |= FOF_OFFSET;
2288 		}
2289 
2290 		/* NB: tell dofilewrite the offset, and to use the proc cred */
2291 		error = dofilewrite(&entryp->context,
2292 		    fp,
2293 		    entryp->aiocb.aio_buf,
2294 		    entryp->aiocb.aio_nbytes,
2295 		    entryp->aiocb.aio_offset,
2296 		    flags,
2297 		    &entryp->returnval);
2298 	} else {
2299 		error = EBADF;
2300 	}
2301 
2302 	fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2303 	return error;
2304 }
2305 
2306 
2307 /*
2308  * aio_has_active_requests_for_process - return whether the process has active
2309  * requests pending.
2310  */
2311 static bool
aio_has_active_requests_for_process(proc_t procp)2312 aio_has_active_requests_for_process(proc_t procp)
2313 {
2314 	return !TAILQ_EMPTY(&procp->p_aio_activeq);
2315 }
2316 
2317 /*
2318  * Called with the proc locked.
2319  */
2320 static bool
aio_proc_has_active_requests_for_file(proc_t procp,int fd)2321 aio_proc_has_active_requests_for_file(proc_t procp, int fd)
2322 {
2323 	aio_workq_entry *entryp;
2324 
2325 	TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) {
2326 		if (entryp->aiocb.aio_fildes == fd) {
2327 			return true;
2328 		}
2329 	}
2330 
2331 	return false;
2332 }
2333 
2334 
2335 /*
2336  * do_aio_fsync
2337  */
2338 static int
do_aio_fsync(aio_workq_entry * entryp)2339 do_aio_fsync(aio_workq_entry *entryp)
2340 {
2341 	struct proc            *p = entryp->procp;
2342 	struct vnode           *vp;
2343 	struct fileproc        *fp;
2344 	int                     sync_flag;
2345 	int                     error;
2346 
2347 	/*
2348 	 * We are never called unless either AIO_FSYNC or AIO_DSYNC are set.
2349 	 *
2350 	 * If AIO_DSYNC is set, we can tell the lower layers that it is OK
2351 	 * to mark for update the metadata not strictly necessary for data
2352 	 * retrieval, rather than forcing it to disk.
2353 	 *
2354 	 * If AIO_FSYNC is set, we have to also wait for metadata not really
2355 	 * necessary to data retrival are committed to stable storage (e.g.
2356 	 * atime, mtime, ctime, etc.).
2357 	 *
2358 	 * Metadata necessary for data retrieval ust be committed to stable
2359 	 * storage in either case (file length, etc.).
2360 	 */
2361 	if (entryp->flags & AIO_FSYNC) {
2362 		sync_flag = MNT_WAIT;
2363 	} else {
2364 		sync_flag = MNT_DWAIT;
2365 	}
2366 
2367 	error = fp_get_ftype(p, entryp->aiocb.aio_fildes, DTYPE_VNODE, ENOTSUP, &fp);
2368 	if (error != 0) {
2369 		entryp->returnval = -1;
2370 		return error;
2371 	}
2372 	vp = fp_get_data(fp);
2373 
2374 	if ((error = vnode_getwithref(vp)) == 0) {
2375 		error = VNOP_FSYNC(vp, sync_flag, &entryp->context);
2376 
2377 		(void)vnode_put(vp);
2378 	} else {
2379 		entryp->returnval = -1;
2380 	}
2381 
2382 	fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2383 	return error;
2384 }
2385 
2386 
2387 /*
2388  * is_already_queued - runs through our queues to see if the given
2389  * aiocbp / process is there.  Returns TRUE if there is a match
2390  * on any of our aio queues.
2391  *
2392  * Called with proc aio lock held (can be held spin)
2393  */
2394 static boolean_t
is_already_queued(proc_t procp,user_addr_t aiocbp)2395 is_already_queued(proc_t procp, user_addr_t aiocbp)
2396 {
2397 	aio_workq_entry *entryp;
2398 	boolean_t        result;
2399 
2400 	result = FALSE;
2401 
2402 	/* look for matches on our queue of async IO requests that have completed */
2403 	TAILQ_FOREACH(entryp, &procp->p_aio_doneq, aio_proc_link) {
2404 		if (aiocbp == entryp->uaiocbp) {
2405 			result = TRUE;
2406 			goto ExitThisRoutine;
2407 		}
2408 	}
2409 
2410 	/* look for matches on our queue of active async IO requests */
2411 	TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) {
2412 		if (aiocbp == entryp->uaiocbp) {
2413 			result = TRUE;
2414 			goto ExitThisRoutine;
2415 		}
2416 	}
2417 
2418 ExitThisRoutine:
2419 	return result;
2420 }
2421 
2422 
2423 /*
2424  * aio initialization
2425  */
2426 __private_extern__ void
aio_init(void)2427 aio_init(void)
2428 {
2429 	for (int i = 0; i < AIO_NUM_WORK_QUEUES; i++) {
2430 		aio_workq_init(&aio_anchor.aio_async_workqs[i]);
2431 	}
2432 
2433 	if (bootarg_aio_new_workq) {
2434 		printf("New aio workqueue implementation selected\n");
2435 	} else {
2436 		_aio_create_worker_threads(aio_worker_threads);
2437 	}
2438 
2439 	klist_init(&aio_klist);
2440 
2441 	clock_interval_to_absolutetime_interval(aio_wq_reduce_pool_window.usecs,
2442 	    NSEC_PER_USEC, &aio_wq_reduce_pool_window.abstime);
2443 }
2444 
2445 
2446 /*
2447  * aio worker threads created here.
2448  */
2449 __private_extern__ void
_aio_create_worker_threads(int num)2450 _aio_create_worker_threads(int num)
2451 {
2452 	int i;
2453 
2454 	/* create some worker threads to handle the async IO requests */
2455 	for (i = 0; i < num; i++) {
2456 		thread_t                myThread;
2457 
2458 		if (KERN_SUCCESS != kernel_thread_start(aio_work_thread, NULL, &myThread)) {
2459 			printf("%s - failed to create a work thread \n", __FUNCTION__);
2460 		} else {
2461 			thread_deallocate(myThread);
2462 		}
2463 	}
2464 }
2465 
2466 /*
2467  * Return the current activation utask
2468  */
2469 task_t
get_aiotask(void)2470 get_aiotask(void)
2471 {
2472 	return current_uthread()->uu_aio_task;
2473 }
2474 
2475 
2476 /*
2477  * In the case of an aiocb from a
2478  * 32-bit process we need to expand some longs and pointers to the correct
2479  * sizes in order to let downstream code always work on the same type of
2480  * aiocb (in our case that is a user_aiocb)
2481  */
2482 static void
do_munge_aiocb_user32_to_user(struct user32_aiocb * my_aiocbp,struct user_aiocb * the_user_aiocbp)2483 do_munge_aiocb_user32_to_user(struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp)
2484 {
2485 	the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
2486 	the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
2487 	the_user_aiocbp->aio_buf = CAST_USER_ADDR_T(my_aiocbp->aio_buf);
2488 	the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
2489 	the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
2490 	the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
2491 
2492 	/* special case here.  since we do not know if sigev_value is an */
2493 	/* int or a ptr we do NOT cast the ptr to a user_addr_t.   This  */
2494 	/* means if we send this info back to user space we need to remember */
2495 	/* sigev_value was not expanded for the 32-bit case.  */
2496 	/* NOTE - this does NOT affect us since we don't support sigev_value */
2497 	/* yet in the aio context.  */
2498 	//LP64
2499 	the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
2500 	the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
2501 	the_user_aiocbp->aio_sigevent.sigev_value.sival_ptr =
2502 	    my_aiocbp->aio_sigevent.sigev_value.sival_ptr;
2503 	the_user_aiocbp->aio_sigevent.sigev_notify_function =
2504 	    CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_function);
2505 	the_user_aiocbp->aio_sigevent.sigev_notify_attributes =
2506 	    CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_attributes);
2507 }
2508 
2509 /* Similar for 64-bit user process, so that we don't need to satisfy
2510  * the alignment constraints of the original user64_aiocb
2511  */
2512 #if !__LP64__
2513 __dead2
2514 #endif
2515 static void
do_munge_aiocb_user64_to_user(struct user64_aiocb * my_aiocbp,struct user_aiocb * the_user_aiocbp)2516 do_munge_aiocb_user64_to_user(struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp)
2517 {
2518 #if __LP64__
2519 	the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
2520 	the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
2521 	the_user_aiocbp->aio_buf = my_aiocbp->aio_buf;
2522 	the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
2523 	the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
2524 	the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
2525 
2526 	the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
2527 	the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
2528 	the_user_aiocbp->aio_sigevent.sigev_value.sival_ptr =
2529 	    my_aiocbp->aio_sigevent.sigev_value.sival_ptr;
2530 	the_user_aiocbp->aio_sigevent.sigev_notify_function =
2531 	    my_aiocbp->aio_sigevent.sigev_notify_function;
2532 	the_user_aiocbp->aio_sigevent.sigev_notify_attributes =
2533 	    my_aiocbp->aio_sigevent.sigev_notify_attributes;
2534 #else
2535 #pragma unused(my_aiocbp, the_user_aiocbp)
2536 	panic("64bit process on 32bit kernel is not supported");
2537 #endif
2538 }
2539 
2540 
2541 static int
filt_aioattach(struct knote * kn,struct kevent_qos_s * kev)2542 filt_aioattach(struct knote *kn, struct kevent_qos_s *kev)
2543 {
2544 	aio_workq_entry *entryp = (aio_workq_entry *)kev->data;
2545 
2546 	/* Don't allow kevent registration from the user-space. */
2547 	if ((kev->flags & EV_KERNEL) == 0) {
2548 		knote_set_error(kn, EPERM);
2549 		return 0;
2550 	}
2551 
2552 	kev->flags &= ~EV_KERNEL;
2553 	/* Clear the 'kn_fflags' state afte the knote has been processed. */
2554 	kn->kn_flags |= EV_CLEAR;
2555 
2556 	/* Associate the knote with the AIO work. */
2557 	knote_kn_hook_set_raw(kn, (void *)entryp);
2558 	aio_entry_ref(entryp);
2559 
2560 	lck_mtx_lock(&aio_klist_lock);
2561 	KNOTE_ATTACH(&aio_klist, kn);
2562 	lck_mtx_unlock(&aio_klist_lock);
2563 
2564 	return 0;
2565 }
2566 
2567 static void
filt_aiodetach(struct knote * kn)2568 filt_aiodetach(struct knote *kn)
2569 {
2570 	aio_workq_entry *entryp = knote_kn_hook_get_raw(kn);
2571 
2572 	lck_mtx_lock(&aio_klist_lock);
2573 	KNOTE_DETACH(&aio_klist, kn);
2574 	lck_mtx_unlock(&aio_klist_lock);
2575 
2576 	if (entryp) {
2577 		aio_entry_unref(entryp);
2578 	}
2579 }
2580 
2581 /*
2582  * The 'f_event' is called with 'aio_klist_lock' held when KNOTE() was called
2583  * in do_aio_completion_and_unlock().
2584  */
2585 static int
filt_aioevent(struct knote * kn,long hint)2586 filt_aioevent(struct knote *kn, long hint)
2587 {
2588 	aio_workq_entry *entryp;
2589 	int activate = 0;
2590 
2591 	/*
2592 	 * The 'f_event' and 'f_process' can run concurrently so it is possible
2593 	 * the aio_workq_entry has been detached from the knote when the
2594 	 * filt_aioprocess() was called earlier. In this case, we will skip
2595 	 * activating the event.
2596 	 */
2597 	entryp = knote_kn_hook_get_raw(kn);
2598 	if (__improbable(entryp == NULL)) {
2599 		goto out;
2600 	}
2601 
2602 	/* We can only activate the filter if the AIO work has completed. */
2603 	if (entryp->flags & AIO_COMPLETED) {
2604 		kn->kn_fflags |= hint;
2605 		activate = FILTER_ACTIVE;
2606 	}
2607 
2608 out:
2609 	return activate;
2610 }
2611 
2612 static int
filt_aiotouch(struct knote * kn __unused,struct kevent_qos_s * kev)2613 filt_aiotouch(struct knote *kn __unused, struct kevent_qos_s *kev)
2614 {
2615 	/* We treat any kevent update from the kernel or user-space as an error. */
2616 	kev->flags |= EV_ERROR;
2617 	kev->data = ENOTSUP;
2618 
2619 	return 0;
2620 }
2621 
2622 static int
filt_aioprocess(struct knote * kn,struct kevent_qos_s * kev)2623 filt_aioprocess(struct knote *kn, struct kevent_qos_s *kev)
2624 {
2625 	aio_workq_entry *entryp;
2626 	proc_t p;
2627 	int res = 0;
2628 
2629 	entryp = knote_kn_hook_get_raw(kn);
2630 	assert(entryp);
2631 	p = entryp->procp;
2632 
2633 	lck_mtx_lock(&aio_klist_lock);
2634 	if (kn->kn_fflags) {
2635 		/* Propagate the error status and return value back to the user. */
2636 		kn->kn_ext[0] = entryp->errorval;
2637 		kn->kn_ext[1] = entryp->returnval;
2638 		knote_fill_kevent(kn, kev, 0);
2639 
2640 		aio_proc_lock(p);
2641 		aio_proc_remove_done_locked(p, entryp);
2642 		aio_proc_unlock(p);
2643 		aio_entry_unref(entryp);
2644 
2645 		res = FILTER_ACTIVE;
2646 	}
2647 	lck_mtx_unlock(&aio_klist_lock);
2648 
2649 	return res;
2650 }
2651 
2652 SECURITY_READ_ONLY_EARLY(struct filterops) aio_filtops = {
2653 	.f_isfd = 0,
2654 	.f_attach = filt_aioattach,
2655 	.f_detach = filt_aiodetach,
2656 	.f_event = filt_aioevent,
2657 	.f_touch = filt_aiotouch,
2658 	.f_process = filt_aioprocess,
2659 };
2660 
2661 #pragma mark per process aio workqueue
2662 
2663 /*
2664  * The per process workq threads call this function to handle the aio request. The threads
2665  * belong to the same process so we don't need to change the vm maps as we would for kernel
2666  * threads.
2667  */
2668 static int
workq_aio_process_entry(aio_workq_entry * entryp)2669 workq_aio_process_entry(aio_workq_entry *entryp)
2670 {
2671 	proc_t p = entryp->procp;
2672 	int error;
2673 
2674 	assert(current_proc() == p && current_thread() != vfs_context_thread(&entryp->context));
2675 
2676 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_WQ_process_entry) | DBG_FUNC_START,
2677 	    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2678 	    entryp->flags, 0, 0);
2679 
2680 	if ((entryp->flags & AIO_READ) != 0) {
2681 		error = do_aio_read(entryp);
2682 	} else if ((entryp->flags & AIO_WRITE) != 0) {
2683 		error = do_aio_write(entryp);
2684 	} else if ((entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0) {
2685 		if ((entryp->flags & AIO_FSYNC) != 0) {
2686 			/*
2687 			 * Check for unfinished operations on the same file
2688 			 * in this proc's queue.
2689 			 */
2690 			aio_proc_lock_spin(p);
2691 			if (aio_delay_fsync_request(entryp)) {
2692 				/* It needs to be delayed.  Put it back on the end of the work queue */
2693 				KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay) | DBG_FUNC_NONE,
2694 				    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2695 				    0, 0, 0);
2696 
2697 				/* The references on this entry havn't been consumed */
2698 				if (!workq_aio_entry_add_locked(p, entryp)) {
2699 					entryp->errorval = ECANCELED;
2700 					entryp->returnval = -1;
2701 
2702 					/* Now it's officially cancelled.  Do the completion */
2703 					KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq) | DBG_FUNC_NONE,
2704 					    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2705 					    entryp->aiocb.aio_fildes, 0, 0);
2706 
2707 					do_aio_completion_and_unlock(p, entryp, AIO_CANCELLED);
2708 				} else {
2709 					workq_aio_wakeup_thread_and_unlock(p);
2710 				}
2711 				return 0;
2712 			}
2713 			aio_proc_unlock(entryp->procp);
2714 		}
2715 		error = do_aio_fsync(entryp);
2716 	} else {
2717 		error = EINVAL;
2718 	}
2719 
2720 	KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_WQ_process_entry) | DBG_FUNC_END,
2721 	    VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2722 	    entryp->errorval, entryp->returnval, 0);
2723 
2724 	/* we're done with the IO request so pop it off the active queue and */
2725 	/* push it on the done queue */
2726 	aio_proc_lock(p);
2727 	entryp->errorval = error;
2728 	do_aio_completion_and_unlock(p, entryp, AIO_COMPLETED);
2729 	return 0;
2730 }
2731 
2732 /*
2733  * The functions below implement a workqueue for aio which is taken from the
2734  * workqueue implementation for libdispatch/pthreads. They are stripped down versions
2735  * of the corresponding functions for libdispatch/pthreads.
2736  */
2737 
2738 static int
2739 aio_workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
2740 {
2741 #pragma unused(arg2)
2742 	struct aio_workq_usec_var *v = arg1;
2743 	int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
2744 	if (error || !req->newptr) {
2745 		return error;
2746 	}
2747 	clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
2748 	    &v->abstime);
2749 	return 0;
2750 }
2751 
2752 #pragma mark wq_flags
2753 
2754 #define AIO_WQ_DEAD 0x1000
2755 
2756 static inline uint32_t
_wa_flags(workq_aio_t wq_aio)2757 _wa_flags(workq_aio_t wq_aio)
2758 {
2759 	return os_atomic_load(&wq_aio->wa_flags, relaxed);
2760 }
2761 
2762 static inline bool
_wq_exiting(workq_aio_t wq_aio)2763 _wq_exiting(workq_aio_t wq_aio)
2764 {
2765 	return _wa_flags(wq_aio) & WQ_EXITING;
2766 }
2767 
2768 static inline bool
_wq_dead(workq_aio_t wq_aio)2769 _wq_dead(workq_aio_t wq_aio)
2770 {
2771 	return _wa_flags(wq_aio) & AIO_WQ_DEAD;
2772 }
2773 
2774 #define AIO_WQPTR_IS_INITING_VALUE ((workq_aio_t)~(uintptr_t)0)
2775 
2776 static workq_aio_t
proc_get_aio_wqptr_fast(struct proc * p)2777 proc_get_aio_wqptr_fast(struct proc *p)
2778 {
2779 	return os_atomic_load(&p->p_aio_wqptr, relaxed);
2780 }
2781 
2782 static workq_aio_t
proc_get_aio_wqptr(struct proc * p)2783 proc_get_aio_wqptr(struct proc *p)
2784 {
2785 	workq_aio_t wq_aio = proc_get_aio_wqptr_fast(p);
2786 	return wq_aio == AIO_WQPTR_IS_INITING_VALUE ? NULL : wq_aio;
2787 }
2788 
2789 static void
proc_set_aio_wqptr(struct proc * p,workq_aio_t wq_aio)2790 proc_set_aio_wqptr(struct proc *p, workq_aio_t wq_aio)
2791 {
2792 	wq_aio = os_atomic_xchg(&p->p_aio_wqptr, wq_aio, release);
2793 	if (wq_aio == AIO_WQPTR_IS_INITING_VALUE) {
2794 		proc_lock(p);
2795 		thread_wakeup(&p->p_aio_wqptr);
2796 		proc_unlock(p);
2797 	}
2798 }
2799 
2800 static bool
proc_init_aio_wqptr_or_wait(struct proc * p)2801 proc_init_aio_wqptr_or_wait(struct proc *p)
2802 {
2803 	workq_aio_t wq_aio;
2804 
2805 	proc_lock(p);
2806 	wq_aio = os_atomic_load(&p->p_aio_wqptr, relaxed);
2807 
2808 	if (wq_aio == NULL) {
2809 		os_atomic_store(&p->p_aio_wqptr, AIO_WQPTR_IS_INITING_VALUE, relaxed);
2810 		proc_unlock(p);
2811 		return true;
2812 	}
2813 
2814 	if (wq_aio == AIO_WQPTR_IS_INITING_VALUE) {
2815 		assert_wait(&p->p_aio_wqptr, THREAD_UNINT);
2816 		proc_unlock(p);
2817 		thread_block(THREAD_CONTINUE_NULL);
2818 	} else {
2819 		proc_unlock(p);
2820 	}
2821 	return false;
2822 }
2823 
2824 static inline event_t
workq_aio_parked_wait_event(struct uthread * uth)2825 workq_aio_parked_wait_event(struct uthread *uth)
2826 {
2827 	return (event_t)&uth->uu_workq_stackaddr;
2828 }
2829 
2830 static inline void
workq_aio_thread_wakeup(struct uthread * uth)2831 workq_aio_thread_wakeup(struct uthread *uth)
2832 {
2833 	thread_wakeup_thread(workq_aio_parked_wait_event(uth), get_machthread(uth));
2834 }
2835 
2836 /*
2837  * Routine:	workq_aio_mark_exiting
2838  *
2839  * Function:	Mark the work queue such that new threads will not be added to the
2840  *		work queue after we return.
2841  *
2842  * Conditions:	Called against the current process.
2843  */
2844 static void
workq_aio_mark_exiting(proc_t p)2845 workq_aio_mark_exiting(proc_t p)
2846 {
2847 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
2848 	uint32_t wq_flags;
2849 
2850 	if (!wq_aio) {
2851 		return;
2852 	}
2853 
2854 	wq_flags = os_atomic_or_orig(&wq_aio->wa_flags, WQ_EXITING, relaxed);
2855 	if (__improbable(wq_flags & WQ_EXITING)) {
2856 		panic("workq_aio_mark_exiting_locked called twice");
2857 	}
2858 
2859 	/*
2860 	 * Opportunistically try to cancel thread calls that are likely in flight.
2861 	 * workq_aio_exit() will do the proper cleanup.
2862 	 */
2863 	if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
2864 		thread_call_cancel(wq_aio->wa_death_call);
2865 	}
2866 }
2867 
2868 static void
workq_aio_exit(proc_t p)2869 workq_aio_exit(proc_t p)
2870 {
2871 	workq_aio_t wq_aio;
2872 
2873 	wq_aio = os_atomic_xchg(&p->p_aio_wqptr, NULL, release);
2874 
2875 	if (!wq_aio) {
2876 		return;
2877 	}
2878 
2879 	/*
2880 	 * Thread calls are always scheduled by the proc itself or under the
2881 	 * workqueue spinlock if WQ_EXITING is not yet set.
2882 	 *
2883 	 * Either way, when this runs, the proc has no threads left beside
2884 	 * the one running this very code, so we know no thread call can be
2885 	 * dispatched anymore.
2886 	 */
2887 
2888 	thread_call_cancel_wait(wq_aio->wa_death_call);
2889 	thread_call_free(wq_aio->wa_death_call);
2890 
2891 	/*
2892 	 * Clean up workqueue data structures for threads that exited and
2893 	 * didn't get a chance to clean up after themselves.
2894 	 *
2895 	 * idle/new threads should have been interrupted and died on their own
2896 	 */
2897 
2898 	assert(TAILQ_EMPTY(&wq_aio->wa_aioq_entries));
2899 	assert(TAILQ_EMPTY(&wq_aio->wa_thrunlist));
2900 
2901 	if (wq_aio->wa_nthreads) {
2902 		os_atomic_or(&wq_aio->wa_flags, AIO_WQ_DEAD, relaxed);
2903 		aio_proc_lock_spin(p);
2904 		if (wq_aio->wa_nthreads) {
2905 			struct uthread *uth;
2906 
2907 			TAILQ_FOREACH(uth, &wq_aio->wa_thidlelist, uu_workq_entry) {
2908 				if (uth->uu_workq_flags & UT_WORKQ_DYING) {
2909 					workq_aio_thread_wakeup(uth);
2910 					continue;
2911 				}
2912 				wq_aio->wa_thdying_count++;
2913 				uth->uu_workq_flags |= UT_WORKQ_DYING;
2914 				workq_aio_thread_wakeup(uth);
2915 			}
2916 			while (wq_aio->wa_nthreads) {
2917 				msleep(&wq_aio->wa_nthreads, aio_proc_mutex(p), PRIBIO | PSPIN, "aio_workq_exit", 0);
2918 			}
2919 		}
2920 		aio_proc_unlock(p);
2921 	}
2922 
2923 	assertf(TAILQ_EMPTY(&wq_aio->wa_thidlelist),
2924 	    "wa_thidlecount = %d, wa_thdying_count = %d",
2925 	    wq_aio->wa_thidlecount, wq_aio->wa_thdying_count);
2926 	assertf(wq_aio->wa_thidlecount == 0,
2927 	    "wa_thidlecount = %d, wa_thdying_count = %d",
2928 	    wq_aio->wa_thidlecount, wq_aio->wa_thdying_count);
2929 	assertf(wq_aio->wa_thdying_count == 0,
2930 	    "wa_thdying_count = %d", wq_aio->wa_thdying_count);
2931 
2932 	kfree_type(workq_aio_s, wq_aio);
2933 }
2934 
2935 static int
workq_aio_open(struct proc * p)2936 workq_aio_open(struct proc *p)
2937 {
2938 	workq_aio_t wq_aio;
2939 	int error = 0;
2940 
2941 	if (proc_get_aio_wqptr(p) == NULL) {
2942 		if (proc_init_aio_wqptr_or_wait(p) == FALSE) {
2943 			assert(proc_get_aio_wqptr(p) != NULL);
2944 			goto out;
2945 		}
2946 
2947 		wq_aio = kalloc_type(workq_aio_s, Z_WAITOK | Z_ZERO);
2948 
2949 		wq_aio->wa_proc = p;
2950 
2951 		TAILQ_INIT(&wq_aio->wa_thidlelist);
2952 		TAILQ_INIT(&wq_aio->wa_thrunlist);
2953 		TAILQ_INIT(&wq_aio->wa_aioq_entries);
2954 
2955 		wq_aio->wa_death_call = thread_call_allocate_with_options(
2956 			workq_aio_kill_old_threads_call, wq_aio,
2957 			THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
2958 
2959 		proc_set_aio_wqptr(p, wq_aio);
2960 	}
2961 out:
2962 	return error;
2963 }
2964 
2965 #pragma mark aio workqueue idle thread accounting
2966 
2967 static inline struct uthread *
workq_oldest_killable_idle_aio_thread(workq_aio_t wq_aio)2968 workq_oldest_killable_idle_aio_thread(workq_aio_t wq_aio)
2969 {
2970 	return TAILQ_LAST(&wq_aio->wa_thidlelist, workq_aio_uthread_head);
2971 }
2972 
2973 static inline uint64_t
workq_kill_delay_for_idle_aio_thread()2974 workq_kill_delay_for_idle_aio_thread()
2975 {
2976 	return aio_wq_reduce_pool_window.abstime;
2977 }
2978 
2979 static inline bool
workq_should_kill_idle_aio_thread(struct uthread * uth,uint64_t now)2980 workq_should_kill_idle_aio_thread(struct uthread *uth, uint64_t now)
2981 {
2982 	uint64_t delay = workq_kill_delay_for_idle_aio_thread();
2983 	return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
2984 }
2985 
2986 static void
workq_aio_death_call_schedule(workq_aio_t wq_aio,uint64_t deadline)2987 workq_aio_death_call_schedule(workq_aio_t wq_aio, uint64_t deadline)
2988 {
2989 	uint32_t wa_flags = os_atomic_load(&wq_aio->wa_flags, relaxed);
2990 
2991 	if (wa_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
2992 		return;
2993 	}
2994 	os_atomic_or(&wq_aio->wa_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
2995 
2996 	/*
2997 	 * <rdar://problem/13139182> Due to how long term timers work, the leeway
2998 	 * can't be too short, so use 500ms which is long enough that we will not
2999 	 * wake up the CPU for killing threads, but short enough that it doesn't
3000 	 * fall into long-term timer list shenanigans.
3001 	 */
3002 	thread_call_enter_delayed_with_leeway(wq_aio->wa_death_call, NULL, deadline,
3003 	    aio_wq_reduce_pool_window.abstime / 10,
3004 	    THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
3005 }
3006 
3007 /*
3008  * `decrement` is set to the number of threads that are no longer dying:
3009  * - because they have been resuscitated just in time (workq_pop_idle_thread)
3010  * - or have been killed (workq_thread_terminate).
3011  */
3012 static void
workq_aio_death_policy_evaluate(workq_aio_t wq_aio,uint16_t decrement)3013 workq_aio_death_policy_evaluate(workq_aio_t wq_aio, uint16_t decrement)
3014 {
3015 	struct uthread *uth;
3016 
3017 	assert(wq_aio->wa_thdying_count >= decrement);
3018 #if 0
3019 	if (decrement) {
3020 		printf("VV_DEBUG_AIO : %s:%d : pid = %d, ctid = %d, after decrement thdying_count = %d\n",
3021 		    __FUNCTION__, __LINE__, proc_pid(current_proc()), thread_get_ctid(thr),
3022 		    wq_aio->wa_thdying_count - decrement);
3023 	}
3024 #endif
3025 
3026 	if ((wq_aio->wa_thdying_count -= decrement) > 0) {
3027 		return;
3028 	}
3029 
3030 	if (wq_aio->wa_thidlecount <= 1) {
3031 		return;
3032 	}
3033 
3034 	if (((uth = workq_oldest_killable_idle_aio_thread(wq_aio)) == NULL)) {
3035 		return;
3036 	}
3037 
3038 	uint64_t now = mach_absolute_time();
3039 	uint64_t delay = workq_kill_delay_for_idle_aio_thread();
3040 
3041 	if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
3042 		if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3043 			wq_aio->wa_thdying_count++;
3044 			uth->uu_workq_flags |= UT_WORKQ_DYING;
3045 		}
3046 		workq_aio_thread_wakeup(uth);
3047 		return;
3048 	}
3049 
3050 	workq_aio_death_call_schedule(wq_aio,
3051 	    uth->uu_save.uus_workq_park_data.idle_stamp + delay);
3052 }
3053 
3054 static void
workq_aio_kill_old_threads_call(void * param0,void * param1 __unused)3055 workq_aio_kill_old_threads_call(void *param0, void *param1 __unused)
3056 {
3057 	workq_aio_t wq_aio = param0;
3058 
3059 	aio_proc_lock_spin(wq_aio->wa_proc);
3060 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_death_call | DBG_FUNC_START, wq_aio);
3061 	os_atomic_andnot(&wq_aio->wa_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
3062 	workq_aio_death_policy_evaluate(wq_aio, 0);
3063 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_death_call | DBG_FUNC_END, wq_aio);
3064 	aio_proc_unlock(wq_aio->wa_proc);;
3065 }
3066 
3067 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
3068 #define WQ_SETUP_NONE  0
3069 
3070 __attribute__((noreturn, noinline))
3071 static void
workq_aio_unpark_for_death_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t death_flags,__unused uint32_t setup_flags)3072 workq_aio_unpark_for_death_and_unlock(proc_t p, workq_aio_t wq_aio,
3073     struct uthread *uth, uint32_t death_flags, __unused uint32_t setup_flags)
3074 {
3075 	if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
3076 		wq_aio->wa_thidlecount--;
3077 		TAILQ_REMOVE(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3078 	}
3079 
3080 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3081 		wq_aio->wa_thdying_count--;
3082 	}
3083 	assert(wq_aio->wa_nthreads > 0);
3084 	wq_aio->wa_nthreads--;
3085 
3086 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_terminate | DBG_FUNC_NONE, wq_aio);
3087 
3088 	if (_wq_dead(wq_aio) && (wq_aio->wa_nthreads == 0)) {
3089 		wakeup(&wq_aio->wa_nthreads);
3090 	}
3091 
3092 	aio_proc_unlock(p);
3093 
3094 	thread_t th = get_machthread(uth);
3095 	assert(th == current_thread());
3096 
3097 	thread_deallocate(th);
3098 	thread_terminate(th);
3099 	thread_exception_return();
3100 	__builtin_unreachable();
3101 }
3102 
3103 static void
workq_push_idle_aio_thread(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3104 workq_push_idle_aio_thread(proc_t p, workq_aio_t wq_aio, struct uthread *uth,
3105     uint32_t setup_flags)
3106 {
3107 	uint64_t now = mach_absolute_time();
3108 
3109 	uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING);
3110 	TAILQ_REMOVE(&wq_aio->wa_thrunlist, uth, uu_workq_entry);
3111 
3112 	uth->uu_save.uus_workq_park_data.idle_stamp = now;
3113 
3114 	struct uthread *oldest = workq_oldest_killable_idle_aio_thread(wq_aio);
3115 	uint16_t cur_idle = wq_aio->wa_thidlecount;
3116 
3117 	if (_wq_exiting(wq_aio) || (wq_aio->wa_thdying_count == 0 && oldest &&
3118 	    workq_should_kill_idle_aio_thread(oldest, now))) {
3119 		/*
3120 		 * Immediately kill threads if we have too may of them.
3121 		 *
3122 		 * And swap "place" with the oldest one we'd have woken up.
3123 		 * This is a relatively desperate situation where we really
3124 		 * need to kill threads quickly and it's best to kill
3125 		 * the one that's currently on core than context switching.
3126 		 */
3127 		if (oldest) {
3128 			oldest->uu_save.uus_workq_park_data.idle_stamp = now;
3129 			TAILQ_REMOVE(&wq_aio->wa_thidlelist, oldest, uu_workq_entry);
3130 			TAILQ_INSERT_HEAD(&wq_aio->wa_thidlelist, oldest, uu_workq_entry);
3131 		}
3132 
3133 		if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3134 			wq_aio->wa_thdying_count++;
3135 			uth->uu_workq_flags |= UT_WORKQ_DYING;
3136 		}
3137 		workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth, 0, setup_flags);
3138 		__builtin_unreachable();
3139 	}
3140 
3141 	struct uthread *tail = TAILQ_LAST(&wq_aio->wa_thidlelist, workq_aio_uthread_head);
3142 
3143 	cur_idle += 1;
3144 	wq_aio->wa_thidlecount = cur_idle;
3145 	uth->uu_save.uus_workq_park_data.has_stack = false;
3146 	TAILQ_INSERT_HEAD(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3147 
3148 	if (!tail) {
3149 		uint64_t delay = workq_kill_delay_for_idle_aio_thread();
3150 		workq_aio_death_call_schedule(wq_aio, now + delay);
3151 	}
3152 }
3153 
3154 /*
3155  * We have no work to do, park ourselves on the idle list.
3156  *
3157  * Consumes the workqueue lock and does not return.
3158  */
3159 __attribute__((noreturn, noinline))
3160 static void
workq_aio_park_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3161 workq_aio_park_and_unlock(proc_t p, workq_aio_t wq_aio, struct uthread *uth,
3162     uint32_t setup_flags)
3163 {
3164 	assert(uth == current_uthread());
3165 	assert(uth->uu_kqr_bound == NULL);
3166 
3167 	workq_push_idle_aio_thread(p, wq_aio, uth, setup_flags); // may not return
3168 
3169 	if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3170 		workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth,
3171 		    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
3172 		__builtin_unreachable();
3173 	}
3174 
3175 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_park | DBG_FUNC_NONE, wq_aio);
3176 
3177 	thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue);
3178 	/* XXX this should probably be THREAD_UNINT */
3179 	assert_wait(workq_aio_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
3180 	aio_proc_unlock(p);
3181 	thread_block(workq_aio_unpark_continue);
3182 	__builtin_unreachable();
3183 }
3184 
3185 #define WORKQ_POLICY_INIT(qos) \
3186 	         (struct uu_workq_policy){ .qos_req = (qos), .qos_bucket = (qos) }
3187 
3188 /*
3189  * This function is always called with the workq lock.
3190  */
3191 static void
workq_aio_thread_reset_pri(struct uthread * uth,thread_t src_th)3192 workq_aio_thread_reset_pri(struct uthread *uth, thread_t src_th)
3193 {
3194 	thread_t th = get_machthread(uth);
3195 	thread_qos_t qos = (thread_qos_t)proc_get_effective_thread_policy(src_th, TASK_POLICY_QOS);
3196 	int priority = 31;
3197 	int policy = POLICY_TIMESHARE;
3198 
3199 	uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
3200 	thread_set_workq_pri(th, qos, priority, policy);
3201 }
3202 
3203 static inline void
workq_aio_thread_set_type(struct uthread * uth,uint16_t flags)3204 workq_aio_thread_set_type(struct uthread *uth, uint16_t flags)
3205 {
3206 	uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
3207 	uth->uu_workq_flags |= flags;
3208 }
3209 
3210 __attribute__((noreturn, noinline))
3211 static void
workq_aio_unpark_select_req_or_park_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3212 workq_aio_unpark_select_req_or_park_and_unlock(proc_t p, workq_aio_t wq_aio,
3213     struct uthread *uth, uint32_t setup_flags)
3214 {
3215 	aio_workq_entry *entryp;
3216 	thread_t last_thread = NULL;
3217 
3218 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_select_req | DBG_FUNC_START, wq_aio);
3219 	thread_freeze_base_pri(get_machthread(uth));
3220 	workq_aio_thread_set_type(uth, 0);
3221 	while ((entryp = TAILQ_FIRST(&wq_aio->wa_aioq_entries))) {
3222 		if (__improbable(_wq_exiting(wq_aio))) {
3223 			break;
3224 		}
3225 
3226 		TAILQ_REMOVE(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3227 		entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
3228 
3229 		aio_proc_unlock(p);
3230 
3231 		thread_t thr = vfs_context_thread(&entryp->context);
3232 		if (last_thread != thr) {
3233 			workq_aio_thread_reset_pri(uth, thr);
3234 			last_thread = thr;
3235 		}
3236 
3237 		/* this frees references to workq entry */
3238 		workq_aio_process_entry(entryp);
3239 
3240 		ast_check_async_thread();
3241 
3242 		aio_proc_lock_spin(p);
3243 	}
3244 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_select_req | DBG_FUNC_END, wq_aio);
3245 	thread_unfreeze_base_pri(get_machthread(uth));
3246 	workq_aio_park_and_unlock(p, wq_aio, uth, setup_flags);
3247 }
3248 
3249 /*
3250  * parked idle thread wakes up
3251  */
3252 __attribute__((noreturn, noinline))
3253 static void
workq_aio_unpark_continue(void * parameter __unused,wait_result_t wr)3254 workq_aio_unpark_continue(void *parameter __unused, wait_result_t wr)
3255 {
3256 	thread_t th = current_thread();
3257 	struct uthread *uth = get_bsdthread_info(th);
3258 	proc_t p = current_proc();
3259 	workq_aio_t wq_aio = proc_get_aio_wqptr_fast(p);
3260 
3261 	aio_proc_lock_spin(p);
3262 
3263 	if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
3264 		workq_aio_unpark_select_req_or_park_and_unlock(p, wq_aio, uth, WQ_SETUP_NONE);
3265 		__builtin_unreachable();
3266 	}
3267 
3268 	if (__probable(wr == THREAD_AWAKENED)) {
3269 		/*
3270 		 * We were set running, but for the purposes of dying.
3271 		 */
3272 		assert(uth->uu_workq_flags & UT_WORKQ_DYING);
3273 		assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
3274 	} else {
3275 		/*
3276 		 * workaround for <rdar://problem/38647347>,
3277 		 * in case we do hit userspace, make sure calling
3278 		 * workq_thread_terminate() does the right thing here,
3279 		 * and if we never call it, that workq_exit() will too because it sees
3280 		 * this thread on the runlist.
3281 		 */
3282 		assert(wr == THREAD_INTERRUPTED);
3283 
3284 		if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3285 			wq_aio->wa_thdying_count++;
3286 			uth->uu_workq_flags |= UT_WORKQ_DYING;
3287 		}
3288 	}
3289 
3290 	workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth,
3291 	    WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE);
3292 
3293 	__builtin_unreachable();
3294 }
3295 
3296 /*
3297  * Called by thread_create_workq_aio_waiting() during thread initialization, before
3298  * assert_wait, before the thread has been started.
3299  */
3300 event_t
aio_workq_thread_init_and_wq_lock(task_t task,thread_t th)3301 aio_workq_thread_init_and_wq_lock(task_t task, thread_t th)
3302 {
3303 	struct uthread *uth = get_bsdthread_info(th);
3304 
3305 	uth->uu_workq_flags = UT_WORKQ_NEW;
3306 	uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
3307 	uth->uu_workq_thport = MACH_PORT_NULL;
3308 	uth->uu_workq_stackaddr = 0;
3309 	uth->uu_workq_pthread_kill_allowed = 0;
3310 
3311 	thread_set_tag(th, THREAD_TAG_AIO_WORKQUEUE);
3312 	thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
3313 
3314 	aio_proc_lock(get_bsdtask_info(task));
3315 	return workq_aio_parked_wait_event(uth);
3316 }
3317 
3318 /**
3319  * Try to add a new workqueue thread for aio.
3320  *
3321  * - called with workq lock held
3322  * - dropped and retaken around thread creation
3323  * - return with workq lock held
3324  * - aio threads do not call into pthread functions to setup or destroy stacks.
3325  */
3326 static kern_return_t
workq_aio_add_new_thread(proc_t p,workq_aio_t wq_aio)3327 workq_aio_add_new_thread(proc_t p, workq_aio_t wq_aio)
3328 {
3329 	kern_return_t kret;
3330 	thread_t th;
3331 
3332 	wq_aio->wa_nthreads++;
3333 
3334 	aio_proc_unlock(p);
3335 
3336 	kret = thread_create_aio_workq_waiting(proc_task(p),
3337 	    workq_aio_unpark_continue,
3338 	    &th);
3339 
3340 	if (kret != KERN_SUCCESS) {
3341 		WQ_AIO_TRACE(AIO_WQ_aio_thread_create_failed | DBG_FUNC_NONE, wq_aio,
3342 		    kret, 0, 0, 0);
3343 		goto out;
3344 	}
3345 
3346 	/*
3347 	 * thread_create_aio_workq_waiting() will return with the wq lock held
3348 	 * on success, because it calls workq_thread_init_and_wq_lock().
3349 	 */
3350 	struct uthread *uth = get_bsdthread_info(th);
3351 	TAILQ_INSERT_TAIL(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3352 	wq_aio->wa_thidlecount++;
3353 	uth->uu_workq_flags &= ~UT_WORKQ_NEW;
3354 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_create | DBG_FUNC_NONE, wq_aio);
3355 	return kret;
3356 
3357 out:
3358 	aio_proc_lock(p);
3359 	/*
3360 	 * Do not redrive here if we went under wq_max_threads again,
3361 	 * it is the responsibility of the callers of this function
3362 	 * to do so when it fails.
3363 	 */
3364 	wq_aio->wa_nthreads--;
3365 	return kret;
3366 }
3367 
3368 static void
workq_aio_wakeup_thread_internal(proc_t p,bool unlock)3369 workq_aio_wakeup_thread_internal(proc_t p, bool unlock)
3370 {
3371 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3372 	bool needs_wakeup = false;
3373 	struct uthread *uth = NULL;
3374 
3375 	if (!wq_aio) {
3376 		goto out;
3377 	}
3378 
3379 	uth = TAILQ_FIRST(&wq_aio->wa_thidlelist);
3380 	while (!uth && (wq_aio->wa_nthreads < WORKQUEUE_AIO_MAXTHREADS) &&
3381 	    !(thread_get_tag(current_thread()) & THREAD_TAG_AIO_WORKQUEUE)) {
3382 		if (workq_aio_add_new_thread(p, wq_aio) != KERN_SUCCESS) {
3383 			break;
3384 		}
3385 		uth = TAILQ_FIRST(&wq_aio->wa_thidlelist);
3386 	}
3387 
3388 	if (!uth) {
3389 		goto out;
3390 	}
3391 
3392 	TAILQ_REMOVE(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3393 	wq_aio->wa_thidlecount--;
3394 
3395 	TAILQ_INSERT_TAIL(&wq_aio->wa_thrunlist, uth, uu_workq_entry);
3396 	assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
3397 	uth->uu_workq_flags |= UT_WORKQ_RUNNING;
3398 
3399 	WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_wakeup | DBG_FUNC_NONE, wq_aio);
3400 
3401 	if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3402 		uth->uu_workq_flags ^= UT_WORKQ_DYING;
3403 		workq_aio_death_policy_evaluate(wq_aio, 1);
3404 		needs_wakeup = false;
3405 	} else {
3406 		needs_wakeup = true;
3407 	}
3408 out:
3409 	if (unlock) {
3410 		aio_proc_unlock(p);
3411 	}
3412 
3413 	if (uth && needs_wakeup) {
3414 		workq_aio_thread_wakeup(uth);
3415 	}
3416 }
3417 
3418 static void
workq_aio_wakeup_thread_and_unlock(proc_t p)3419 workq_aio_wakeup_thread_and_unlock(proc_t p)
3420 {
3421 	return workq_aio_wakeup_thread_internal(p, true);
3422 }
3423 
3424 static void
workq_aio_wakeup_thread(proc_t p)3425 workq_aio_wakeup_thread(proc_t p)
3426 {
3427 	return workq_aio_wakeup_thread_internal(p, false);
3428 }
3429 
3430 void
workq_aio_prepare(struct proc * p)3431 workq_aio_prepare(struct proc *p)
3432 {
3433 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3434 
3435 	if (__improbable(!wq_aio && !proc_in_teardown(p))) {
3436 		workq_aio_open(p);
3437 	}
3438 }
3439 
3440 bool
workq_aio_entry_add_locked(struct proc * p,aio_workq_entry * entryp)3441 workq_aio_entry_add_locked(struct proc *p, aio_workq_entry *entryp)
3442 {
3443 	workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3444 	bool ret = false;
3445 
3446 	ASSERT_AIO_PROC_LOCK_OWNED(p);
3447 
3448 	if (!proc_in_teardown(p) && wq_aio && !_wq_exiting(wq_aio)) {
3449 		TAILQ_INSERT_TAIL(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3450 		ret = true;
3451 	}
3452 
3453 	return ret;
3454 }
3455 
3456 bool
workq_aio_entry_remove_locked(struct proc * p,aio_workq_entry * entryp)3457 workq_aio_entry_remove_locked(struct proc *p, aio_workq_entry *entryp)
3458 {
3459 	workq_aio_t  wq_aio = proc_get_aio_wqptr(p);
3460 
3461 	ASSERT_AIO_PROC_LOCK_OWNED(p);
3462 
3463 	if (entryp->aio_workq_link.tqe_prev == NULL) {
3464 		panic("Trying to remove an entry from a work queue, but it is not on a queue");
3465 	}
3466 
3467 	TAILQ_REMOVE(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3468 	entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
3469 
3470 	return true;
3471 }
3472