1 /*
2 * Copyright (c) 2003-2024 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28
29 /*
30 * This file contains support for the POSIX 1003.1B AIO/LIO facility.
31 */
32
33 #include <sys/systm.h>
34 #include <sys/fcntl.h>
35 #include <sys/file_internal.h>
36 #include <sys/filedesc.h>
37 #include <sys/kdebug.h>
38 #include <sys/kernel.h>
39 #include <sys/vnode_internal.h>
40 #include <sys/kauth.h>
41 #include <sys/mount_internal.h>
42 #include <sys/param.h>
43 #include <sys/proc_internal.h>
44 #include <sys/sysctl.h>
45 #include <sys/unistd.h>
46 #include <sys/user.h>
47
48 #include <sys/aio_kern.h>
49 #include <sys/sysproto.h>
50
51 #include <machine/limits.h>
52
53 #include <mach/mach_types.h>
54 #include <kern/kern_types.h>
55 #include <kern/waitq.h>
56 #include <kern/zalloc.h>
57 #include <kern/task.h>
58 #include <kern/sched_prim.h>
59
60 #include <vm/vm_map_xnu.h>
61
62 #include <os/refcnt.h>
63
64 #include <kern/thread.h>
65 #include <kern/policy_internal.h>
66 #include <pthread/workqueue_internal.h>
67
68 #if 0
69 #undef KERNEL_DEBUG
70 #define KERNEL_DEBUG KERNEL_DEBUG_CONSTANT
71 #endif
72
73 #define AIO_work_queued 1
74 #define AIO_worker_wake 2
75 #define AIO_completion_sig 3
76 #define AIO_completion_kevent 4
77 #define AIO_completion_cleanup_wait 5
78 #define AIO_completion_cleanup_wake 6
79 #define AIO_completion_suspend_wake 7
80 #define AIO_cancel 10
81 #define AIO_cancel_async_workq 11
82 #define AIO_cancel_sync_workq 12
83 #define AIO_cancel_activeq 13
84 #define AIO_cancel_doneq 14
85 #define AIO_fsync 20
86 #define AIO_fsync_delay 21
87 #define AIO_read 30
88 #define AIO_write 40
89 #define AIO_listio 50
90 #define AIO_error 60
91 #define AIO_error_val 61
92 #define AIO_error_activeq 62
93 #define AIO_error_workq 63
94 #define AIO_return 70
95 #define AIO_return_val 71
96 #define AIO_return_activeq 72
97 #define AIO_return_workq 73
98 #define AIO_exec 80
99 #define AIO_exit 90
100 #define AIO_exit_sleep 91
101 #define AIO_close 100
102 #define AIO_close_sleep 101
103 #define AIO_suspend 110
104 #define AIO_suspend_sleep 111
105 #define AIO_worker_thread 120
106 #define AIO_register_kevent 130
107 #define AIO_WQ_process_entry 140
108 #define AIO_WQ_aio_thread_create 141
109 #define AIO_WQ_aio_thread_terminate 142
110 #define AIO_WQ_aio_death_call 143
111 #define AIO_WQ_aio_thread_park 144
112 #define AIO_WQ_aio_select_req 145
113 #define AIO_WQ_aio_thread_create_failed 146
114 #define AIO_WQ_aio_thread_wakeup 147
115
116 static TUNABLE(uint32_t, bootarg_aio_new_workq, "aio_new_workq", 1);
117
118 __options_decl(aio_entry_flags_t, uint32_t, {
119 AIO_READ = 0x00000001, /* a read */
120 AIO_WRITE = 0x00000002, /* a write */
121 AIO_FSYNC = 0x00000004, /* aio_fsync with op = O_SYNC */
122 AIO_DSYNC = 0x00000008, /* aio_fsync with op = O_DSYNC (not supported yet) */
123 AIO_LIO = 0x00000010, /* lio_listio generated IO */
124 AIO_LIO_WAIT = 0x00000020, /* lio_listio is waiting on the leader */
125
126 AIO_COMPLETED = 0x00000100, /* request has completed */
127 AIO_CANCELLED = 0x00000200, /* request has been cancelled */
128
129 /*
130 * These flags mean that this entry is blocking either:
131 * - close (AIO_CLOSE_WAIT)
132 * - exit or exec (AIO_EXIT_WAIT)
133 *
134 * These flags are mutually exclusive, and the AIO_EXIT_WAIT variant
135 * will also neuter notifications in do_aio_completion_and_unlock().
136 */
137 AIO_CLOSE_WAIT = 0x00004000,
138 AIO_EXIT_WAIT = 0x00008000,
139 });
140
141 /*! @struct aio_workq_entry
142 *
143 * @discussion
144 * This represents a piece of aio/lio work.
145 *
146 * The ownership rules go as follows:
147 *
148 * - the "proc" owns one refcount on the entry (from creation), while it is
149 * enqueued on the aio_activeq and then the aio_doneq.
150 *
151 * either aio_return() (user read the status) or _aio_exit() (the process
152 * died) will dequeue the entry and consume this ref.
153 *
154 * - the async workqueue owns one refcount once the work is submitted,
155 * which is consumed in do_aio_completion_and_unlock().
156 *
157 * This ref protects the entry for the the end of
158 * do_aio_completion_and_unlock() (when signal delivery happens).
159 *
160 * - lio_listio() for batches picks one of the entries to be the "leader"
161 * of the batch. Each work item will have a refcount on its leader
162 * so that the accounting of the batch completion can be done on the leader
163 * (to be able to decrement lio_pending).
164 *
165 * This ref is consumed in do_aio_completion_and_unlock() as well.
166 *
167 * - lastly, in lio_listio() when the LIO_WAIT behavior is requested,
168 * an extra ref is taken in this syscall as it needs to keep accessing
169 * the leader "lio_pending" field until it hits 0.
170 */
171 struct aio_workq_entry {
172 /* queue lock */
173 TAILQ_ENTRY(aio_workq_entry) aio_workq_link;
174
175 /* Proc lock */
176 TAILQ_ENTRY(aio_workq_entry) aio_proc_link; /* p_aio_activeq or p_aio_doneq */
177 user_ssize_t returnval; /* return value from read / write request */
178 errno_t errorval; /* error value from read / write request */
179 os_refcnt_t aio_refcount;
180 aio_entry_flags_t flags;
181
182 int lio_pending; /* pending I/Os in lio group, only on leader */
183 struct aio_workq_entry *lio_leader; /* pointer to the lio leader, can be self */
184
185 /* Initialized and never changed, safe to access */
186 struct proc *procp; /* user proc that queued this request */
187 user_addr_t uaiocbp; /* pointer passed in from user land */
188 struct user_aiocb aiocb; /* copy of aiocb from user land */
189 struct vfs_context context; /* context which enqueued the request */
190
191 /* Initialized, and possibly freed by aio_work_thread() or at free if cancelled */
192 vm_map_t aio_map; /* user land map we have a reference to */
193 };
194
195 /*
196 * aio requests queue up on the aio_async_workq or lio_sync_workq (for
197 * lio_listio LIO_WAIT). Requests then move to the per process aio_activeq
198 * (proc.aio_activeq) when one of our worker threads start the IO.
199 * And finally, requests move to the per process aio_doneq (proc.aio_doneq)
200 * when the IO request completes. The request remains on aio_doneq until
201 * user process calls aio_return or the process exits, either way that is our
202 * trigger to release aio resources.
203 */
204 typedef struct aio_workq {
205 TAILQ_HEAD(, aio_workq_entry) aioq_entries;
206 lck_spin_t aioq_lock;
207 struct waitq aioq_waitq;
208 } *aio_workq_t;
209
210 #define AIO_NUM_WORK_QUEUES 1
211 struct aio_anchor_cb {
212 os_atomic(int) aio_total_count; /* total extant entries */
213
214 /* Hash table of queues here */
215 int aio_num_workqs;
216 struct aio_workq aio_async_workqs[AIO_NUM_WORK_QUEUES];
217 };
218 typedef struct aio_anchor_cb aio_anchor_cb;
219
220
221 /* New per process workqueue */
222 #define WORKQUEUE_AIO_MAXTHREADS 16
223
224 TAILQ_HEAD(workq_aio_uthread_head, uthread);
225
226 typedef struct workq_aio_s {
227 thread_call_t wa_death_call;
228 struct workq_aio_uthread_head wa_thrunlist;
229 struct workq_aio_uthread_head wa_thidlelist;
230 TAILQ_HEAD(, aio_workq_entry) wa_aioq_entries;
231 proc_t wa_proc;
232 workq_state_flags_t _Atomic wa_flags;
233 uint16_t wa_nthreads;
234 uint16_t wa_thidlecount;
235 uint16_t wa_thdying_count;
236 } workq_aio_s, *workq_aio_t;
237
238 struct aio_workq_usec_var {
239 uint32_t usecs;
240 uint64_t abstime;
241 };
242
243 static int aio_workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS;
244
245 #define AIO_WORKQ_SYSCTL_USECS(var, init) \
246 static struct aio_workq_usec_var var = { .usecs = (init) }; \
247 SYSCTL_OID(_kern, OID_AUTO, var##_usecs, \
248 CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_LOCKED, &(var), 0, \
249 aio_workq_sysctl_handle_usecs, "I", "")
250
251 AIO_WORKQ_SYSCTL_USECS(aio_wq_reduce_pool_window, WQ_REDUCE_POOL_WINDOW_USECS);
252
253 #define WQ_AIO_TRACE(x, wq, a, b, c, d) \
254 ({ KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, (x)),\
255 proc_getpid((wq)->wa_proc), (a), (b), (c), (d)); })
256
257 #define WQ_AIO_TRACE_WQ(x, wq) \
258 ({ KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, (x)),\
259 proc_getpid((wq)->wa_proc),\
260 (uintptr_t)thread_tid(current_thread()),\
261 (wq)->wa_nthreads, (wq)->wa_thidlecount, (wq)->wa_thdying_count); })
262
263 /*
264 * Notes on aio sleep / wake channels.
265 * We currently pick a couple fields within the proc structure that will allow
266 * us sleep channels that currently do not collide with any other kernel routines.
267 * At this time, for binary compatibility reasons, we cannot create new proc fields.
268 */
269 #define AIO_SUSPEND_SLEEP_CHAN p_aio_activeq
270 #define AIO_CLEANUP_SLEEP_CHAN p_aio_total_count
271
272 #define ASSERT_AIO_FROM_PROC(aiop, theproc) \
273 if ((aiop)->procp != (theproc)) { \
274 panic("AIO on a proc list that does not belong to that proc."); \
275 }
276
277 extern kern_return_t thread_terminate(thread_t);
278
279 /*
280 * LOCAL PROTOTYPES
281 */
282 static void aio_proc_lock(proc_t procp);
283 static void aio_proc_lock_spin(proc_t procp);
284 static void aio_proc_unlock(proc_t procp);
285 static lck_mtx_t *aio_proc_mutex(proc_t procp);
286 static bool aio_has_active_requests_for_process(proc_t procp);
287 static bool aio_proc_has_active_requests_for_file(proc_t procp, int fd);
288 static boolean_t is_already_queued(proc_t procp, user_addr_t aiocbp);
289
290 static aio_workq_t aio_entry_workq(aio_workq_entry *entryp);
291 static void aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
292 static void aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp);
293 static void aio_entry_ref(aio_workq_entry *entryp);
294 static void aio_entry_unref(aio_workq_entry *entryp);
295 static bool aio_entry_try_workq_remove(proc_t p, aio_workq_entry *entryp);
296 static boolean_t aio_delay_fsync_request(aio_workq_entry *entryp);
297 static void aio_free_request(aio_workq_entry *entryp);
298
299 static void aio_workq_init(aio_workq_t wq);
300 static void aio_workq_lock_spin(aio_workq_t wq);
301 static void aio_workq_unlock(aio_workq_t wq);
302 static lck_spin_t *aio_workq_lock(aio_workq_t wq);
303
304 static void aio_work_thread(void *arg, wait_result_t wr);
305 static aio_workq_entry *aio_get_some_work(void);
306
307 static int aio_queue_async_request(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t);
308 static int aio_validate(proc_t, aio_workq_entry *entryp);
309
310 static int do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp, aio_entry_flags_t);
311 static void do_aio_completion_and_unlock(proc_t p, aio_workq_entry *entryp, aio_entry_flags_t reason);
312 static int do_aio_fsync(aio_workq_entry *entryp);
313 static int do_aio_read(aio_workq_entry *entryp);
314 static int do_aio_write(aio_workq_entry *entryp);
315 static void do_munge_aiocb_user32_to_user(struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp);
316 static void do_munge_aiocb_user64_to_user(struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp);
317 static aio_workq_entry *aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t);
318 static int aio_copy_in_list(proc_t, user_addr_t, user_addr_t *, int);
319
320 static void workq_aio_prepare(struct proc *p);
321 static bool workq_aio_entry_add_locked(struct proc *p, aio_workq_entry *entryp);
322 static void workq_aio_wakeup_thread(proc_t p);
323 static void workq_aio_wakeup_thread_and_unlock(proc_t p);
324 static int workq_aio_process_entry(aio_workq_entry *entryp);
325 static bool workq_aio_entry_remove_locked(struct proc *p, aio_workq_entry *entryp);
326
327 static void workq_aio_kill_old_threads_call(void *param0, void *param1 __unused);
328 static void workq_aio_unpark_continue(void *parameter __unused, wait_result_t wr);
329
330 static void workq_aio_mark_exiting(proc_t p);
331 static void workq_aio_exit(proc_t p);
332
333 #define ASSERT_AIO_PROC_LOCK_OWNED(p) LCK_MTX_ASSERT(aio_proc_mutex(p), LCK_MTX_ASSERT_OWNED)
334 #define ASSERT_AIO_WORKQ_LOCK_OWNED(q) LCK_SPIN_ASSERT(aio_workq_lock(q), LCK_ASSERT_OWNED)
335
336 /*
337 * EXTERNAL PROTOTYPES
338 */
339
340 /* in ...bsd/kern/sys_generic.c */
341 extern int dofileread(vfs_context_t ctx, struct fileproc *fp,
342 user_addr_t bufp, user_size_t nbyte,
343 off_t offset, int flags, user_ssize_t *retval);
344 extern int dofilewrite(vfs_context_t ctx, struct fileproc *fp,
345 user_addr_t bufp, user_size_t nbyte, off_t offset,
346 int flags, user_ssize_t *retval);
347
348 /*
349 * aio external global variables.
350 */
351 extern int aio_max_requests; /* AIO_MAX - configurable */
352 extern int aio_max_requests_per_process; /* AIO_PROCESS_MAX - configurable */
353 extern int aio_worker_threads; /* AIO_THREAD_COUNT - configurable */
354
355
356 /*
357 * aio static variables.
358 */
359 static aio_anchor_cb aio_anchor = {
360 .aio_num_workqs = AIO_NUM_WORK_QUEUES,
361 };
362 os_refgrp_decl(static, aio_refgrp, "aio", NULL);
363 static LCK_GRP_DECLARE(aio_proc_lock_grp, "aio_proc");
364 static LCK_GRP_DECLARE(aio_queue_lock_grp, "aio_queue");
365 static LCK_MTX_DECLARE(aio_proc_mtx, &aio_proc_lock_grp);
366
367 static struct klist aio_klist;
368 static LCK_GRP_DECLARE(aio_klist_lck_grp, "aio_klist");
369 static LCK_MTX_DECLARE(aio_klist_lock, &aio_klist_lck_grp);
370
371 static KALLOC_TYPE_DEFINE(aio_workq_zonep, aio_workq_entry, KT_DEFAULT);
372
373 /* Hash */
374 static aio_workq_t
aio_entry_workq(__unused aio_workq_entry * entryp)375 aio_entry_workq(__unused aio_workq_entry *entryp)
376 {
377 return &aio_anchor.aio_async_workqs[0];
378 }
379
380 static void
aio_workq_init(aio_workq_t wq)381 aio_workq_init(aio_workq_t wq)
382 {
383 TAILQ_INIT(&wq->aioq_entries);
384 lck_spin_init(&wq->aioq_lock, &aio_queue_lock_grp, LCK_ATTR_NULL);
385 waitq_init(&wq->aioq_waitq, WQT_QUEUE, SYNC_POLICY_FIFO);
386 }
387
388
389 /*
390 * Can be passed a queue which is locked spin.
391 */
392 static void
aio_workq_remove_entry_locked(aio_workq_t queue,aio_workq_entry * entryp)393 aio_workq_remove_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
394 {
395 ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
396
397 if (entryp->aio_workq_link.tqe_prev == NULL) {
398 panic("Trying to remove an entry from a work queue, but it is not on a queue");
399 }
400
401 TAILQ_REMOVE(&queue->aioq_entries, entryp, aio_workq_link);
402 entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
403 }
404
405 static void
aio_workq_add_entry_locked(aio_workq_t queue,aio_workq_entry * entryp)406 aio_workq_add_entry_locked(aio_workq_t queue, aio_workq_entry *entryp)
407 {
408 ASSERT_AIO_WORKQ_LOCK_OWNED(queue);
409
410 if (bootarg_aio_new_workq) {
411 panic("old workq implementation selected with bootarg set");
412 }
413
414 TAILQ_INSERT_TAIL(&queue->aioq_entries, entryp, aio_workq_link);
415 }
416
417 static void
aio_proc_lock(proc_t procp)418 aio_proc_lock(proc_t procp)
419 {
420 lck_mtx_lock(aio_proc_mutex(procp));
421 }
422
423 static void
aio_proc_lock_spin(proc_t procp)424 aio_proc_lock_spin(proc_t procp)
425 {
426 lck_mtx_lock_spin(aio_proc_mutex(procp));
427 }
428
429 static bool
aio_has_any_work(void)430 aio_has_any_work(void)
431 {
432 return os_atomic_load(&aio_anchor.aio_total_count, relaxed) != 0;
433 }
434
435 static bool
aio_try_proc_insert_active_locked(proc_t procp,aio_workq_entry * entryp)436 aio_try_proc_insert_active_locked(proc_t procp, aio_workq_entry *entryp)
437 {
438 int old, new;
439
440 ASSERT_AIO_PROC_LOCK_OWNED(procp);
441
442 if (procp->p_aio_total_count >= aio_max_requests_per_process) {
443 return false;
444 }
445
446 if (is_already_queued(procp, entryp->uaiocbp)) {
447 return false;
448 }
449
450 os_atomic_rmw_loop(&aio_anchor.aio_total_count, old, new, relaxed, {
451 if (old >= aio_max_requests) {
452 os_atomic_rmw_loop_give_up(return false);
453 }
454 new = old + 1;
455 });
456
457 TAILQ_INSERT_TAIL(&procp->p_aio_activeq, entryp, aio_proc_link);
458 procp->p_aio_total_count++;
459 return true;
460 }
461
462 static void
aio_proc_move_done_locked(proc_t procp,aio_workq_entry * entryp)463 aio_proc_move_done_locked(proc_t procp, aio_workq_entry *entryp)
464 {
465 TAILQ_REMOVE(&procp->p_aio_activeq, entryp, aio_proc_link);
466 TAILQ_INSERT_TAIL(&procp->p_aio_doneq, entryp, aio_proc_link);
467 }
468
469 static void
aio_proc_remove_done_locked(proc_t procp,aio_workq_entry * entryp)470 aio_proc_remove_done_locked(proc_t procp, aio_workq_entry *entryp)
471 {
472 TAILQ_REMOVE(&procp->p_aio_doneq, entryp, aio_proc_link);
473 entryp->aio_proc_link.tqe_prev = NULL;
474 if (os_atomic_dec_orig(&aio_anchor.aio_total_count, relaxed) <= 0) {
475 panic("Negative total AIO count!");
476 }
477 if (procp->p_aio_total_count-- <= 0) {
478 panic("proc %p: p_aio_total_count accounting mismatch", procp);
479 }
480 }
481
482 static void
aio_proc_unlock(proc_t procp)483 aio_proc_unlock(proc_t procp)
484 {
485 lck_mtx_unlock(aio_proc_mutex(procp));
486 }
487
488 static lck_mtx_t*
aio_proc_mutex(proc_t procp)489 aio_proc_mutex(proc_t procp)
490 {
491 return &procp->p_mlock;
492 }
493
494 static void
aio_entry_ref(aio_workq_entry * entryp)495 aio_entry_ref(aio_workq_entry *entryp)
496 {
497 os_ref_retain(&entryp->aio_refcount);
498 }
499
500 static void
aio_entry_unref(aio_workq_entry * entryp)501 aio_entry_unref(aio_workq_entry *entryp)
502 {
503 if (os_ref_release(&entryp->aio_refcount) == 0) {
504 aio_free_request(entryp);
505 }
506 }
507
508 static bool
aio_entry_try_workq_remove(proc_t p,aio_workq_entry * entryp)509 aio_entry_try_workq_remove(proc_t p, aio_workq_entry *entryp)
510 {
511 /* Can only be cancelled if it's still on a work queue */
512 if (entryp->aio_workq_link.tqe_prev != NULL) {
513 aio_workq_t queue;
514 if (bootarg_aio_new_workq) {
515 return workq_aio_entry_remove_locked(p, entryp);
516 }
517
518 /* Will have to check again under the lock */
519 queue = aio_entry_workq(entryp);
520 aio_workq_lock_spin(queue);
521 if (entryp->aio_workq_link.tqe_prev != NULL) {
522 aio_workq_remove_entry_locked(queue, entryp);
523 aio_workq_unlock(queue);
524 return true;
525 } else {
526 aio_workq_unlock(queue);
527 }
528 }
529
530 return false;
531 }
532
533 static void
aio_workq_lock_spin(aio_workq_t wq)534 aio_workq_lock_spin(aio_workq_t wq)
535 {
536 lck_spin_lock(aio_workq_lock(wq));
537 }
538
539 static void
aio_workq_unlock(aio_workq_t wq)540 aio_workq_unlock(aio_workq_t wq)
541 {
542 lck_spin_unlock(aio_workq_lock(wq));
543 }
544
545 static lck_spin_t*
aio_workq_lock(aio_workq_t wq)546 aio_workq_lock(aio_workq_t wq)
547 {
548 return &wq->aioq_lock;
549 }
550
551 /*
552 * aio_cancel - attempt to cancel one or more async IO requests currently
553 * outstanding against file descriptor uap->fd. If uap->aiocbp is not
554 * NULL then only one specific IO is cancelled (if possible). If uap->aiocbp
555 * is NULL then all outstanding async IO request for the given file
556 * descriptor are cancelled (if possible).
557 */
558 int
aio_cancel(proc_t p,struct aio_cancel_args * uap,int * retval)559 aio_cancel(proc_t p, struct aio_cancel_args *uap, int *retval)
560 {
561 struct user_aiocb my_aiocb;
562 int result;
563
564 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel) | DBG_FUNC_START,
565 VM_KERNEL_ADDRPERM(p), uap->fd, uap->aiocbp, 0, 0);
566
567 if (uap->fd) {
568 vnode_t vp = NULLVP;
569 const char *vname = NULL;
570
571 result = vnode_getfromfd(vfs_context_current(), uap->fd, &vp);
572 if (result != 0) {
573 result = EBADF;
574 goto ExitRoutine;
575 }
576
577 vname = vnode_getname(vp);
578 /*
579 * The aio_cancel() system call will always return AIO_NOTCANCELED for
580 * file descriptor associated with raw disk device.
581 */
582 if (vnode_ischr(vp) && vname && !strncmp(vname, "rdisk", 5)) {
583 result = 0;
584 *retval = AIO_NOTCANCELED;
585 }
586
587 if (vname) {
588 vnode_putname(vname);
589 }
590 vnode_put(vp);
591
592 if (result == 0 && *retval == AIO_NOTCANCELED) {
593 goto ExitRoutine;
594 }
595 }
596
597 /* quick check to see if there are any async IO requests queued up */
598 if (!aio_has_any_work()) {
599 result = 0;
600 *retval = AIO_ALLDONE;
601 goto ExitRoutine;
602 }
603
604 *retval = -1;
605 if (uap->aiocbp != USER_ADDR_NULL) {
606 if (proc_is64bit(p)) {
607 struct user64_aiocb aiocb64;
608
609 result = copyin(uap->aiocbp, &aiocb64, sizeof(aiocb64));
610 if (result == 0) {
611 do_munge_aiocb_user64_to_user(&aiocb64, &my_aiocb);
612 }
613 } else {
614 struct user32_aiocb aiocb32;
615
616 result = copyin(uap->aiocbp, &aiocb32, sizeof(aiocb32));
617 if (result == 0) {
618 do_munge_aiocb_user32_to_user(&aiocb32, &my_aiocb);
619 }
620 }
621
622 if (result != 0) {
623 result = EAGAIN;
624 goto ExitRoutine;
625 }
626
627 /* NOTE - POSIX standard says a mismatch between the file */
628 /* descriptor passed in and the file descriptor embedded in */
629 /* the aiocb causes unspecified results. We return EBADF in */
630 /* that situation. */
631 if (uap->fd != my_aiocb.aio_fildes) {
632 result = EBADF;
633 goto ExitRoutine;
634 }
635 }
636
637 aio_proc_lock(p);
638 result = do_aio_cancel_locked(p, uap->fd, uap->aiocbp, 0);
639 ASSERT_AIO_PROC_LOCK_OWNED(p);
640 aio_proc_unlock(p);
641
642 if (result != -1) {
643 *retval = result;
644 result = 0;
645 goto ExitRoutine;
646 }
647
648 result = EBADF;
649
650 ExitRoutine:
651 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel) | DBG_FUNC_END,
652 VM_KERNEL_ADDRPERM(p), uap->fd, uap->aiocbp, result, 0);
653
654 return result;
655 }
656
657
658 /*
659 * _aio_close - internal function used to clean up async IO requests for
660 * a file descriptor that is closing.
661 * THIS MAY BLOCK.
662 */
663 __private_extern__ void
_aio_close(proc_t p,int fd)664 _aio_close(proc_t p, int fd)
665 {
666 int error;
667
668 /* quick check to see if there are any async IO requests queued up */
669 if (!aio_has_any_work()) {
670 return;
671 }
672
673 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close) | DBG_FUNC_START,
674 VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
675
676 /* cancel all async IO requests on our todo queues for this file descriptor */
677 aio_proc_lock(p);
678 error = do_aio_cancel_locked(p, fd, USER_ADDR_NULL, AIO_CLOSE_WAIT);
679 ASSERT_AIO_PROC_LOCK_OWNED(p);
680 if (error == AIO_NOTCANCELED) {
681 /*
682 * AIO_NOTCANCELED is returned when we find an aio request for this process
683 * and file descriptor on the active async IO queue. Active requests cannot
684 * be cancelled so we must wait for them to complete. We will get a special
685 * wake up call on our channel used to sleep for ALL active requests to
686 * complete. This sleep channel (proc.AIO_CLEANUP_SLEEP_CHAN) is only used
687 * when we must wait for all active aio requests.
688 */
689
690 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close_sleep) | DBG_FUNC_NONE,
691 VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
692
693 while (aio_proc_has_active_requests_for_file(p, fd)) {
694 msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_close", 0);
695 }
696 }
697
698 aio_proc_unlock(p);
699
700 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_close) | DBG_FUNC_END,
701 VM_KERNEL_ADDRPERM(p), fd, 0, 0, 0);
702 }
703
704
705 /*
706 * aio_error - return the error status associated with the async IO
707 * request referred to by uap->aiocbp. The error status is the errno
708 * value that would be set by the corresponding IO request (read, wrtie,
709 * fdatasync, or sync).
710 */
711 int
aio_error(proc_t p,struct aio_error_args * uap,int * retval)712 aio_error(proc_t p, struct aio_error_args *uap, int *retval)
713 {
714 aio_workq_entry *entryp;
715 int error;
716
717 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error) | DBG_FUNC_START,
718 VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
719
720 /* see if there are any aios to check */
721 if (!aio_has_any_work()) {
722 return EINVAL;
723 }
724
725 aio_proc_lock(p);
726
727 /* look for a match on our queue of async IO requests that have completed */
728 TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
729 if (entryp->uaiocbp == uap->aiocbp) {
730 ASSERT_AIO_FROM_PROC(entryp, p);
731
732 *retval = entryp->errorval;
733 error = 0;
734
735 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error_val) | DBG_FUNC_NONE,
736 VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
737 goto ExitRoutine;
738 }
739 }
740
741 /* look for a match on our queue of active async IO requests */
742 TAILQ_FOREACH(entryp, &p->p_aio_activeq, aio_proc_link) {
743 if (entryp->uaiocbp == uap->aiocbp) {
744 ASSERT_AIO_FROM_PROC(entryp, p);
745 *retval = EINPROGRESS;
746 error = 0;
747 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error_activeq) | DBG_FUNC_NONE,
748 VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
749 goto ExitRoutine;
750 }
751 }
752
753 error = EINVAL;
754
755 ExitRoutine:
756 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_error) | DBG_FUNC_END,
757 VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
758 aio_proc_unlock(p);
759
760 return error;
761 }
762
763
764 /*
765 * aio_fsync - asynchronously force all IO operations associated
766 * with the file indicated by the file descriptor (uap->aiocbp->aio_fildes) and
767 * queued at the time of the call to the synchronized completion state.
768 * NOTE - we do not support op O_DSYNC at this point since we do not support the
769 * fdatasync() call.
770 */
771 int
aio_fsync(proc_t p,struct aio_fsync_args * uap,int * retval)772 aio_fsync(proc_t p, struct aio_fsync_args *uap, int *retval)
773 {
774 aio_entry_flags_t fsync_kind;
775 int error;
776
777 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync) | DBG_FUNC_START,
778 VM_KERNEL_ADDRPERM(p), uap->aiocbp, uap->op, 0, 0);
779
780 *retval = 0;
781 /* 0 := O_SYNC for binary backward compatibility with Panther */
782 if (uap->op == O_SYNC || uap->op == 0) {
783 fsync_kind = AIO_FSYNC;
784 } else if (uap->op == O_DSYNC) {
785 fsync_kind = AIO_DSYNC;
786 } else {
787 *retval = -1;
788 error = EINVAL;
789 goto ExitRoutine;
790 }
791
792 error = aio_queue_async_request(p, uap->aiocbp, fsync_kind);
793 if (error != 0) {
794 *retval = -1;
795 }
796
797 ExitRoutine:
798 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync) | DBG_FUNC_END,
799 VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
800
801 return error;
802 }
803
804
805 /* aio_read - asynchronously read uap->aiocbp->aio_nbytes bytes from the
806 * file descriptor (uap->aiocbp->aio_fildes) into the buffer
807 * (uap->aiocbp->aio_buf).
808 */
809 int
aio_read(proc_t p,struct aio_read_args * uap,int * retval)810 aio_read(proc_t p, struct aio_read_args *uap, int *retval)
811 {
812 int error;
813
814 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_read) | DBG_FUNC_START,
815 VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
816
817 *retval = 0;
818
819 error = aio_queue_async_request(p, uap->aiocbp, AIO_READ);
820 if (error != 0) {
821 *retval = -1;
822 }
823
824 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_read) | DBG_FUNC_END,
825 VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
826
827 return error;
828 }
829
830
831 /*
832 * aio_return - return the return status associated with the async IO
833 * request referred to by uap->aiocbp. The return status is the value
834 * that would be returned by corresponding IO request (read, write,
835 * fdatasync, or sync). This is where we release kernel resources
836 * held for async IO call associated with the given aiocb pointer.
837 */
838 int
aio_return(proc_t p,struct aio_return_args * uap,user_ssize_t * retval)839 aio_return(proc_t p, struct aio_return_args *uap, user_ssize_t *retval)
840 {
841 aio_workq_entry *entryp;
842 int error = EINVAL;
843
844 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return) | DBG_FUNC_START,
845 VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
846
847 /* See if there are any entries to check */
848 if (!aio_has_any_work()) {
849 goto ExitRoutine;
850 }
851
852 aio_proc_lock(p);
853 *retval = 0;
854
855 /* look for a match on our queue of async IO requests that have completed */
856 TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
857 ASSERT_AIO_FROM_PROC(entryp, p);
858 if (entryp->uaiocbp == uap->aiocbp) {
859 /* Done and valid for aio_return(), pull it off the list */
860 aio_proc_remove_done_locked(p, entryp);
861
862 *retval = entryp->returnval;
863 error = 0;
864 aio_proc_unlock(p);
865
866 aio_entry_unref(entryp);
867
868 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return_val) | DBG_FUNC_NONE,
869 VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
870 goto ExitRoutine;
871 }
872 }
873
874 /* look for a match on our queue of active async IO requests */
875 TAILQ_FOREACH(entryp, &p->p_aio_activeq, aio_proc_link) {
876 ASSERT_AIO_FROM_PROC(entryp, p);
877 if (entryp->uaiocbp == uap->aiocbp) {
878 error = EINPROGRESS;
879 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return_activeq) | DBG_FUNC_NONE,
880 VM_KERNEL_ADDRPERM(p), uap->aiocbp, *retval, 0, 0);
881 break;
882 }
883 }
884
885 aio_proc_unlock(p);
886
887 ExitRoutine:
888 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_return) | DBG_FUNC_END,
889 VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
890
891 return error;
892 }
893
894
895 /*
896 * _aio_exec - internal function used to clean up async IO requests for
897 * a process that is going away due to exec(). We cancel any async IOs
898 * we can and wait for those already active. We also disable signaling
899 * for cancelled or active aio requests that complete.
900 * This routine MAY block!
901 */
902 __private_extern__ void
_aio_exec(proc_t p)903 _aio_exec(proc_t p)
904 {
905 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exec) | DBG_FUNC_START,
906 VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
907
908 _aio_exit(p);
909
910 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exec) | DBG_FUNC_END,
911 VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
912 }
913
914
915 /*
916 * _aio_exit - internal function used to clean up async IO requests for
917 * a process that is terminating (via exit() or exec()). We cancel any async IOs
918 * we can and wait for those already active. We also disable signaling
919 * for cancelled or active aio requests that complete. This routine MAY block!
920 */
921 __private_extern__ void
_aio_exit(proc_t p)922 _aio_exit(proc_t p)
923 {
924 TAILQ_HEAD(, aio_workq_entry) tofree = TAILQ_HEAD_INITIALIZER(tofree);
925 aio_workq_entry *entryp, *tmp;
926 int error;
927
928 /* quick check to see if there are any async IO requests queued up */
929 if (!aio_has_any_work()) {
930 workq_aio_mark_exiting(p);
931 workq_aio_exit(p);
932 return;
933 }
934
935 workq_aio_mark_exiting(p);
936
937 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit) | DBG_FUNC_START,
938 VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
939
940 aio_proc_lock(p);
941
942 /*
943 * cancel async IO requests on the todo work queue and wait for those
944 * already active to complete.
945 */
946 error = do_aio_cancel_locked(p, -1, USER_ADDR_NULL, AIO_EXIT_WAIT);
947 ASSERT_AIO_PROC_LOCK_OWNED(p);
948 if (error == AIO_NOTCANCELED) {
949 /*
950 * AIO_NOTCANCELED is returned when we find an aio request for this process
951 * on the active async IO queue. Active requests cannot be cancelled so we
952 * must wait for them to complete. We will get a special wake up call on
953 * our channel used to sleep for ALL active requests to complete. This sleep
954 * channel (proc.AIO_CLEANUP_SLEEP_CHAN) is only used when we must wait for all
955 * active aio requests.
956 */
957
958 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit_sleep) | DBG_FUNC_NONE,
959 VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
960
961 while (aio_has_active_requests_for_process(p)) {
962 msleep(&p->AIO_CLEANUP_SLEEP_CHAN, aio_proc_mutex(p), PRIBIO, "aio_exit", 0);
963 }
964 }
965
966 assert(!aio_has_active_requests_for_process(p));
967
968 /* release all aio resources used by this process */
969 TAILQ_FOREACH_SAFE(entryp, &p->p_aio_doneq, aio_proc_link, tmp) {
970 ASSERT_AIO_FROM_PROC(entryp, p);
971
972 aio_proc_remove_done_locked(p, entryp);
973 TAILQ_INSERT_TAIL(&tofree, entryp, aio_proc_link);
974 }
975
976 aio_proc_unlock(p);
977
978 workq_aio_exit(p);
979
980 /* free all the entries outside of the aio_proc_lock() */
981 TAILQ_FOREACH_SAFE(entryp, &tofree, aio_proc_link, tmp) {
982 entryp->aio_proc_link.tqe_prev = NULL;
983 aio_entry_unref(entryp);
984 }
985
986 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_exit) | DBG_FUNC_END,
987 VM_KERNEL_ADDRPERM(p), 0, 0, 0, 0);
988 }
989
990
991 static bool
should_cancel(aio_workq_entry * entryp,int fd,user_addr_t aiocbp,aio_entry_flags_t reason)992 should_cancel(aio_workq_entry *entryp, int fd, user_addr_t aiocbp,
993 aio_entry_flags_t reason)
994 {
995 if (reason & AIO_EXIT_WAIT) {
996 /* caller is _aio_exit() */
997 return true;
998 }
999 if (fd != entryp->aiocb.aio_fildes) {
1000 /* not the file we're looking for */
1001 return false;
1002 }
1003 /*
1004 * aio_cancel() or _aio_close() cancel
1005 * everything for a given fd when aiocbp is NULL
1006 */
1007 return aiocbp == USER_ADDR_NULL || entryp->uaiocbp == aiocbp;
1008 }
1009
1010 /*
1011 * do_aio_cancel_locked - cancel async IO requests (if possible). We get called by
1012 * aio_cancel, close, and at exit.
1013 * There are three modes of operation: 1) cancel all async IOs for a process -
1014 * fd is 0 and aiocbp is NULL 2) cancel all async IOs for file descriptor - fd
1015 * is > 0 and aiocbp is NULL 3) cancel one async IO associated with the given
1016 * aiocbp.
1017 * Returns -1 if no matches were found, AIO_CANCELED when we cancelled all
1018 * target async IO requests, AIO_NOTCANCELED if we could not cancel all
1019 * target async IO requests, and AIO_ALLDONE if all target async IO requests
1020 * were already complete.
1021 * WARNING - do not deference aiocbp in this routine, it may point to user
1022 * land data that has not been copied in (when called from aio_cancel())
1023 *
1024 * Called with proc locked, and returns the same way.
1025 */
1026 static int
do_aio_cancel_locked(proc_t p,int fd,user_addr_t aiocbp,aio_entry_flags_t reason)1027 do_aio_cancel_locked(proc_t p, int fd, user_addr_t aiocbp,
1028 aio_entry_flags_t reason)
1029 {
1030 bool multiple_matches = (aiocbp == USER_ADDR_NULL);
1031 aio_workq_entry *entryp, *tmp;
1032 int result;
1033
1034 ASSERT_AIO_PROC_LOCK_OWNED(p);
1035
1036 /* look for a match on our queue of async todo work. */
1037 again:
1038 result = -1;
1039 TAILQ_FOREACH_SAFE(entryp, &p->p_aio_activeq, aio_proc_link, tmp) {
1040 ASSERT_AIO_FROM_PROC(entryp, p);
1041
1042 if (!should_cancel(entryp, fd, aiocbp, reason)) {
1043 continue;
1044 }
1045
1046 if (reason) {
1047 /* mark the entry as blocking close or exit/exec */
1048 entryp->flags |= reason;
1049 if ((entryp->flags & AIO_EXIT_WAIT) && (entryp->flags & AIO_CLOSE_WAIT)) {
1050 panic("Close and exit flags set at the same time");
1051 }
1052 }
1053
1054 /* Can only be cancelled if it's still on a work queue */
1055 if (aio_entry_try_workq_remove(p, entryp)) {
1056 entryp->errorval = ECANCELED;
1057 entryp->returnval = -1;
1058
1059 /* Now it's officially cancelled. Do the completion */
1060 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq) | DBG_FUNC_NONE,
1061 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1062 fd, 0, 0);
1063 do_aio_completion_and_unlock(p, entryp, AIO_CANCELLED);
1064
1065 aio_proc_lock(p);
1066
1067 if (multiple_matches) {
1068 /*
1069 * Restart from the head of the proc active queue since it
1070 * may have been changed while we were away doing completion
1071 * processing.
1072 *
1073 * Note that if we found an uncancellable AIO before, we will
1074 * either find it again or discover that it's been completed,
1075 * so resetting the result will not cause us to return success
1076 * despite outstanding AIOs.
1077 */
1078 goto again;
1079 }
1080
1081 return AIO_CANCELED;
1082 }
1083
1084 /*
1085 * It's been taken off the active queue already, i.e. is in flight.
1086 * All we can do is ask for notification.
1087 */
1088 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_activeq) | DBG_FUNC_NONE,
1089 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1090 fd, 0, 0);
1091
1092 result = AIO_NOTCANCELED;
1093 if (!multiple_matches) {
1094 return result;
1095 }
1096 }
1097
1098 /*
1099 * if we didn't find any matches on the todo or active queues then look for a
1100 * match on our queue of async IO requests that have completed and if found
1101 * return AIO_ALLDONE result.
1102 *
1103 * Proc AIO lock is still held.
1104 */
1105 if (result == -1) {
1106 TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
1107 ASSERT_AIO_FROM_PROC(entryp, p);
1108 if (should_cancel(entryp, fd, aiocbp, reason)) {
1109 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_doneq) | DBG_FUNC_NONE,
1110 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1111 fd, 0, 0);
1112
1113 result = AIO_ALLDONE;
1114 if (!multiple_matches) {
1115 return result;
1116 }
1117 }
1118 }
1119 }
1120
1121 return result;
1122 }
1123
1124
1125 /*
1126 * aio_suspend - suspend the calling thread until at least one of the async
1127 * IO operations referenced by uap->aiocblist has completed, until a signal
1128 * interrupts the function, or uap->timeoutp time interval (optional) has
1129 * passed.
1130 * Returns 0 if one or more async IOs have completed else -1 and errno is
1131 * set appropriately - EAGAIN if timeout elapses or EINTR if an interrupt
1132 * woke us up.
1133 */
1134 int
aio_suspend(proc_t p,struct aio_suspend_args * uap,int * retval)1135 aio_suspend(proc_t p, struct aio_suspend_args *uap, int *retval)
1136 {
1137 __pthread_testcancel(1);
1138 return aio_suspend_nocancel(p, (struct aio_suspend_nocancel_args *)uap, retval);
1139 }
1140
1141
1142 int
aio_suspend_nocancel(proc_t p,struct aio_suspend_nocancel_args * uap,int * retval)1143 aio_suspend_nocancel(proc_t p, struct aio_suspend_nocancel_args *uap, int *retval)
1144 {
1145 int error;
1146 int i;
1147 uint64_t abstime;
1148 struct user_timespec ts;
1149 aio_workq_entry *entryp;
1150 user_addr_t *aiocbpp;
1151 size_t aiocbpp_size;
1152
1153 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend) | DBG_FUNC_START,
1154 VM_KERNEL_ADDRPERM(p), uap->nent, 0, 0, 0);
1155
1156 *retval = -1;
1157 abstime = 0;
1158 aiocbpp = NULL;
1159
1160 if (!aio_has_any_work()) {
1161 error = EINVAL;
1162 goto ExitThisRoutine;
1163 }
1164
1165 if (uap->nent < 1 || uap->nent > aio_max_requests_per_process ||
1166 os_mul_overflow(sizeof(user_addr_t), uap->nent, &aiocbpp_size)) {
1167 error = EINVAL;
1168 goto ExitThisRoutine;
1169 }
1170
1171 if (uap->timeoutp != USER_ADDR_NULL) {
1172 if (proc_is64bit(p)) {
1173 struct user64_timespec temp;
1174 error = copyin(uap->timeoutp, &temp, sizeof(temp));
1175 if (error == 0) {
1176 ts.tv_sec = (user_time_t)temp.tv_sec;
1177 ts.tv_nsec = (user_long_t)temp.tv_nsec;
1178 }
1179 } else {
1180 struct user32_timespec temp;
1181 error = copyin(uap->timeoutp, &temp, sizeof(temp));
1182 if (error == 0) {
1183 ts.tv_sec = temp.tv_sec;
1184 ts.tv_nsec = temp.tv_nsec;
1185 }
1186 }
1187 if (error != 0) {
1188 error = EAGAIN;
1189 goto ExitThisRoutine;
1190 }
1191
1192 if (ts.tv_sec < 0 || ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000) {
1193 error = EINVAL;
1194 goto ExitThisRoutine;
1195 }
1196
1197 nanoseconds_to_absolutetime((uint64_t)ts.tv_sec * NSEC_PER_SEC + ts.tv_nsec,
1198 &abstime);
1199 clock_absolutetime_interval_to_deadline(abstime, &abstime);
1200 }
1201
1202 aiocbpp = (user_addr_t *)kalloc_data(aiocbpp_size, Z_WAITOK);
1203 if (aiocbpp == NULL || aio_copy_in_list(p, uap->aiocblist, aiocbpp, uap->nent)) {
1204 error = EAGAIN;
1205 goto ExitThisRoutine;
1206 }
1207
1208 /* check list of aio requests to see if any have completed */
1209 check_for_our_aiocbp:
1210 aio_proc_lock_spin(p);
1211 for (i = 0; i < uap->nent; i++) {
1212 user_addr_t aiocbp;
1213
1214 /* NULL elements are legal so check for 'em */
1215 aiocbp = *(aiocbpp + i);
1216 if (aiocbp == USER_ADDR_NULL) {
1217 continue;
1218 }
1219
1220 /* return immediately if any aio request in the list is done */
1221 TAILQ_FOREACH(entryp, &p->p_aio_doneq, aio_proc_link) {
1222 ASSERT_AIO_FROM_PROC(entryp, p);
1223 if (entryp->uaiocbp == aiocbp) {
1224 aio_proc_unlock(p);
1225 *retval = 0;
1226 error = 0;
1227 goto ExitThisRoutine;
1228 }
1229 }
1230 }
1231
1232 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend_sleep) | DBG_FUNC_NONE,
1233 VM_KERNEL_ADDRPERM(p), uap->nent, 0, 0, 0);
1234
1235 /*
1236 * wait for an async IO to complete or a signal fires or timeout expires.
1237 * we return EAGAIN (35) for timeout expiration and EINTR (4) when a signal
1238 * interrupts us. If an async IO completes before a signal fires or our
1239 * timeout expires, we get a wakeup call from aio_work_thread().
1240 */
1241
1242 error = msleep1(&p->AIO_SUSPEND_SLEEP_CHAN, aio_proc_mutex(p),
1243 PCATCH | PWAIT | PDROP, "aio_suspend", abstime);
1244 if (error == 0) {
1245 /*
1246 * got our wakeup call from aio_work_thread().
1247 * Since we can get a wakeup on this channel from another thread in the
1248 * same process we head back up to make sure this is for the correct aiocbp.
1249 * If it is the correct aiocbp we will return from where we do the check
1250 * (see entryp->uaiocbp == aiocbp after check_for_our_aiocbp label)
1251 * else we will fall out and just sleep again.
1252 */
1253 goto check_for_our_aiocbp;
1254 } else if (error == EWOULDBLOCK) {
1255 /* our timeout expired */
1256 error = EAGAIN;
1257 } else {
1258 /* we were interrupted */
1259 error = EINTR;
1260 }
1261
1262 ExitThisRoutine:
1263 if (aiocbpp != NULL) {
1264 kfree_data(aiocbpp, aiocbpp_size);
1265 }
1266
1267 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_suspend) | DBG_FUNC_END,
1268 VM_KERNEL_ADDRPERM(p), uap->nent, error, 0, 0);
1269
1270 return error;
1271 }
1272
1273
1274 /* aio_write - asynchronously write uap->aiocbp->aio_nbytes bytes to the
1275 * file descriptor (uap->aiocbp->aio_fildes) from the buffer
1276 * (uap->aiocbp->aio_buf).
1277 */
1278
1279 int
aio_write(proc_t p,struct aio_write_args * uap,int * retval __unused)1280 aio_write(proc_t p, struct aio_write_args *uap, int *retval __unused)
1281 {
1282 int error;
1283
1284 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_write) | DBG_FUNC_START,
1285 VM_KERNEL_ADDRPERM(p), uap->aiocbp, 0, 0, 0);
1286
1287 error = aio_queue_async_request(p, uap->aiocbp, AIO_WRITE);
1288
1289 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_write) | DBG_FUNC_END,
1290 VM_KERNEL_ADDRPERM(p), uap->aiocbp, error, 0, 0);
1291
1292 return error;
1293 }
1294
1295
1296 static int
aio_copy_in_list(proc_t procp,user_addr_t aiocblist,user_addr_t * aiocbpp,int nent)1297 aio_copy_in_list(proc_t procp, user_addr_t aiocblist, user_addr_t *aiocbpp,
1298 int nent)
1299 {
1300 int result;
1301
1302 /* copyin our aiocb pointers from list */
1303 result = copyin(aiocblist, aiocbpp,
1304 proc_is64bit(procp) ? (nent * sizeof(user64_addr_t))
1305 : (nent * sizeof(user32_addr_t)));
1306 if (result) {
1307 return result;
1308 }
1309
1310 /*
1311 * We depend on a list of user_addr_t's so we need to
1312 * munge and expand when these pointers came from a
1313 * 32-bit process
1314 */
1315 if (!proc_is64bit(procp)) {
1316 /* copy from last to first to deal with overlap */
1317 user32_addr_t *my_ptrp = ((user32_addr_t *)aiocbpp) + (nent - 1);
1318 user_addr_t *my_addrp = aiocbpp + (nent - 1);
1319
1320 for (int i = 0; i < nent; i++, my_ptrp--, my_addrp--) {
1321 *my_addrp = (user_addr_t) (*my_ptrp);
1322 }
1323 }
1324
1325 return 0;
1326 }
1327
1328
1329 static int
aio_copy_in_sigev(proc_t procp,user_addr_t sigp,struct user_sigevent * sigev)1330 aio_copy_in_sigev(proc_t procp, user_addr_t sigp, struct user_sigevent *sigev)
1331 {
1332 int result = 0;
1333
1334 if (sigp == USER_ADDR_NULL) {
1335 goto out;
1336 }
1337
1338 /*
1339 * We need to munge aio_sigevent since it contains pointers.
1340 * Since we do not know if sigev_value is an int or a ptr we do
1341 * NOT cast the ptr to a user_addr_t. This means if we send
1342 * this info back to user space we need to remember sigev_value
1343 * was not expanded for the 32-bit case.
1344 *
1345 * Notes: This does NOT affect us since we don't support
1346 * sigev_value yet in the aio context.
1347 */
1348 if (proc_is64bit(procp)) {
1349 #if __LP64__
1350 struct user64_sigevent sigevent64;
1351
1352 result = copyin(sigp, &sigevent64, sizeof(sigevent64));
1353 if (result == 0) {
1354 sigev->sigev_notify = sigevent64.sigev_notify;
1355 sigev->sigev_signo = sigevent64.sigev_signo;
1356 sigev->sigev_value.size_equivalent.sival_int = sigevent64.sigev_value.size_equivalent.sival_int;
1357 sigev->sigev_notify_function = sigevent64.sigev_notify_function;
1358 sigev->sigev_notify_attributes = sigevent64.sigev_notify_attributes;
1359 }
1360 #else
1361 panic("64bit process on 32bit kernel is not supported");
1362 #endif
1363 } else {
1364 struct user32_sigevent sigevent32;
1365
1366 result = copyin(sigp, &sigevent32, sizeof(sigevent32));
1367 if (result == 0) {
1368 sigev->sigev_notify = sigevent32.sigev_notify;
1369 sigev->sigev_signo = sigevent32.sigev_signo;
1370 sigev->sigev_value.size_equivalent.sival_int = sigevent32.sigev_value.sival_int;
1371 sigev->sigev_notify_function = CAST_USER_ADDR_T(sigevent32.sigev_notify_function);
1372 sigev->sigev_notify_attributes = CAST_USER_ADDR_T(sigevent32.sigev_notify_attributes);
1373 }
1374 }
1375
1376 if (result != 0) {
1377 result = EAGAIN;
1378 }
1379
1380 out:
1381 return result;
1382 }
1383
1384 /*
1385 * validate user_sigevent. at this point we only support
1386 * sigev_notify equal to SIGEV_SIGNAL or SIGEV_NONE. this means
1387 * sigev_value, sigev_notify_function, and sigev_notify_attributes
1388 * are ignored, since SIGEV_THREAD is unsupported. This is consistent
1389 * with no [RTS] (RalTime Signal) option group support.
1390 */
1391 static int
aio_sigev_validate(const struct user_sigevent * sigev)1392 aio_sigev_validate(const struct user_sigevent *sigev)
1393 {
1394 switch (sigev->sigev_notify) {
1395 case SIGEV_SIGNAL:
1396 {
1397 int signum;
1398
1399 /* make sure we have a valid signal number */
1400 signum = sigev->sigev_signo;
1401 if (signum <= 0 || signum >= NSIG ||
1402 signum == SIGKILL || signum == SIGSTOP) {
1403 return EINVAL;
1404 }
1405 }
1406 break;
1407
1408 case SIGEV_NONE:
1409 break;
1410
1411 case SIGEV_KEVENT:
1412 /*
1413 * The sigev_signo should contain the descriptor of the kqueue.
1414 * Validate that it contains some sane value.
1415 */
1416 if (sigev->sigev_signo <= 0 || sigev->sigev_signo > maxfilesperproc) {
1417 return EINVAL;
1418 }
1419 break;
1420
1421 case SIGEV_THREAD:
1422 /* Unsupported [RTS] */
1423
1424 default:
1425 return EINVAL;
1426 }
1427
1428 return 0;
1429 }
1430
1431
1432 /*
1433 * aio_try_enqueue_work_locked
1434 *
1435 * Queue up the entry on the aio asynchronous work queue in priority order
1436 * based on the relative priority of the request. We calculate the relative
1437 * priority using the nice value of the caller and the value
1438 *
1439 * Parameters: procp Process queueing the I/O
1440 * entryp The work queue entry being queued
1441 * leader The work leader if any
1442 *
1443 * Returns: Whether the enqueue was successful
1444 *
1445 * Notes: This function is used for both lio_listio and aio
1446 *
1447 * XXX: At some point, we may have to consider thread priority
1448 * rather than process priority, but we don't maintain the
1449 * adjusted priority for threads the POSIX way.
1450 *
1451 * Called with proc locked.
1452 */
1453 static bool
aio_try_enqueue_work_locked(proc_t procp,aio_workq_entry * entryp,aio_workq_entry * leader)1454 aio_try_enqueue_work_locked(proc_t procp, aio_workq_entry *entryp,
1455 aio_workq_entry *leader)
1456 {
1457 ASSERT_AIO_PROC_LOCK_OWNED(procp);
1458
1459 /* Onto proc queue */
1460 if (!aio_try_proc_insert_active_locked(procp, entryp)) {
1461 return false;
1462 }
1463
1464 if (leader) {
1465 aio_entry_ref(leader); /* consumed in do_aio_completion_and_unlock */
1466 leader->lio_pending++;
1467 entryp->lio_leader = leader;
1468 }
1469
1470 /* And work queue */
1471 aio_entry_ref(entryp); /* consumed in do_aio_completion_and_unlock */
1472 if (bootarg_aio_new_workq) {
1473 if (!workq_aio_entry_add_locked(procp, entryp)) {
1474 (void)os_ref_release(&entryp->aio_refcount);
1475 return false;
1476 }
1477 } else {
1478 aio_workq_t queue = aio_entry_workq(entryp);
1479 aio_workq_lock_spin(queue);
1480 aio_workq_add_entry_locked(queue, entryp);
1481 waitq_wakeup64_one(&queue->aioq_waitq, CAST_EVENT64_T(queue),
1482 THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
1483 aio_workq_unlock(queue);
1484 }
1485
1486 KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued) | DBG_FUNC_START,
1487 VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1488 entryp->flags, entryp->aiocb.aio_fildes, 0);
1489 KERNEL_DEBUG_CONSTANT(BSDDBG_CODE(DBG_BSD_AIO, AIO_work_queued) | DBG_FUNC_END,
1490 entryp->aiocb.aio_offset, 0, entryp->aiocb.aio_nbytes, 0, 0);
1491 return true;
1492 }
1493
1494 /*
1495 * EV_FLAG0/1 are filter specific flags.
1496 * Repurpose EV_FLAG0 to indicate the kevent is registered from kernel.
1497 */
1498 #define EV_KERNEL EV_FLAG0
1499
1500 /* Register a kevent for AIO completion notification. */
1501 static int
aio_register_kevent(proc_t procp,aio_workq_entry * entryp)1502 aio_register_kevent(proc_t procp, aio_workq_entry *entryp)
1503 {
1504 struct kevent_qos_s kev;
1505 struct fileproc *fp = NULL;
1506 kqueue_t kqu;
1507 int kqfd = entryp->aiocb.aio_sigevent.sigev_signo;
1508 int error;
1509
1510 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_register_kevent) | DBG_FUNC_START,
1511 VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp),
1512 VM_KERNEL_ADDRPERM(entryp->uaiocbp), kqfd, 0);
1513
1514 error = fp_get_ftype(procp, kqfd, DTYPE_KQUEUE, EBADF, &fp);
1515 if (error) {
1516 goto exit;
1517 }
1518
1519 kqu.kq = (struct kqueue *)fp_get_data(fp);
1520
1521 memset(&kev, 0, sizeof(kev));
1522 kev.ident = (uintptr_t)entryp->uaiocbp;
1523 kev.filter = EVFILT_AIO;
1524 /*
1525 * Set the EV_FLAG0 to indicate the event is registered from the kernel.
1526 * This flag later is checked in filt_aioattach() and to determine if
1527 * a kevent is registered from kernel or user-space.
1528 */
1529 kev.flags = EV_ADD | EV_ENABLE | EV_CLEAR | EV_ONESHOT | EV_KERNEL;
1530 kev.udata = entryp->aiocb.aio_sigevent.sigev_value.sival_ptr;
1531 kev.data = (intptr_t)entryp;
1532
1533 error = kevent_register(kqu.kq, &kev, NULL);
1534 assert((error & FILTER_REGISTER_WAIT) == 0);
1535
1536 exit:
1537 if (fp) {
1538 fp_drop(procp, kqfd, fp, 0);
1539 }
1540
1541 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_register_kevent) | DBG_FUNC_END,
1542 VM_KERNEL_ADDRPERM(procp), VM_KERNEL_ADDRPERM(entryp), error, 0, 0);
1543
1544 return error;
1545 }
1546
1547 /*
1548 * lio_listio - initiate a list of IO requests. We process the list of
1549 * aiocbs either synchronously (mode == LIO_WAIT) or asynchronously
1550 * (mode == LIO_NOWAIT).
1551 *
1552 * The caller gets error and return status for each aiocb in the list
1553 * via aio_error and aio_return. We must keep completed requests until
1554 * released by the aio_return call.
1555 */
1556 int
lio_listio(proc_t p,struct lio_listio_args * uap,int * retval __unused)1557 lio_listio(proc_t p, struct lio_listio_args *uap, int *retval __unused)
1558 {
1559 aio_workq_entry *entries[AIO_LISTIO_MAX] = { };
1560 user_addr_t aiocbpp[AIO_LISTIO_MAX];
1561 struct user_sigevent aiosigev = { };
1562 int result = 0;
1563 int lio_count = 0;
1564
1565 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_listio) | DBG_FUNC_START,
1566 VM_KERNEL_ADDRPERM(p), uap->nent, uap->mode, 0, 0);
1567
1568 if (!(uap->mode == LIO_NOWAIT || uap->mode == LIO_WAIT)) {
1569 result = EINVAL;
1570 goto ExitRoutine;
1571 }
1572
1573 if (uap->nent < 1 || uap->nent > AIO_LISTIO_MAX) {
1574 result = EINVAL;
1575 goto ExitRoutine;
1576 }
1577
1578 /*
1579 * Use sigevent passed in to lio_listio for each of our calls, but
1580 * only do completion notification after the last request completes.
1581 */
1582 if (uap->sigp != USER_ADDR_NULL) {
1583 result = aio_copy_in_sigev(p, uap->sigp, &aiosigev);
1584 if (result) {
1585 goto ExitRoutine;
1586 }
1587 result = aio_sigev_validate(&aiosigev);
1588 if (result) {
1589 goto ExitRoutine;
1590 }
1591 }
1592
1593 if (aio_copy_in_list(p, uap->aiocblist, aiocbpp, uap->nent)) {
1594 result = EAGAIN;
1595 goto ExitRoutine;
1596 }
1597
1598 /*
1599 * allocate/parse all entries
1600 */
1601 for (int i = 0; i < uap->nent; i++) {
1602 aio_workq_entry *entryp;
1603
1604 /* NULL elements are legal so check for 'em */
1605 if (aiocbpp[i] == USER_ADDR_NULL) {
1606 continue;
1607 }
1608
1609 entryp = aio_create_queue_entry(p, aiocbpp[i], AIO_LIO);
1610 if (entryp == NULL) {
1611 result = EAGAIN;
1612 goto ExitRoutine;
1613 }
1614
1615 /*
1616 * This refcount is cleaned up on exit if the entry
1617 * isn't submitted
1618 */
1619 entries[lio_count++] = entryp;
1620 if ((uap->mode == LIO_NOWAIT) &&
1621 (entryp->aiocb.aio_sigevent.sigev_notify != SIGEV_KEVENT)) {
1622 /* Set signal hander, if any */
1623 entryp->aiocb.aio_sigevent = aiosigev;
1624 }
1625 }
1626
1627 if (lio_count == 0) {
1628 /* There's nothing to submit */
1629 goto ExitRoutine;
1630 }
1631
1632 /*
1633 * Past this point we're commited and will not bail out
1634 *
1635 * - keep a reference on the leader for LIO_WAIT
1636 * - perform the submissions and optionally wait
1637 */
1638
1639 aio_workq_entry *leader = entries[0];
1640 if (uap->mode == LIO_WAIT) {
1641 aio_entry_ref(leader); /* consumed below */
1642 }
1643
1644 aio_proc_lock(p);
1645
1646 for (int i = 0; i < lio_count; i++) {
1647 if (aio_try_enqueue_work_locked(p, entries[i], leader)) {
1648 workq_aio_wakeup_thread(p); /* this may drop and reacquire the proc lock */
1649 /*
1650 * For SIGEV_KEVENT, every AIO in the list would get its own kevent
1651 * notification upon completion as opposed to SIGEV_SIGNAL which a
1652 * single notification is deliverd when all AIOs have completed.
1653 */
1654 if ((uap->mode == LIO_NOWAIT) &&
1655 (entries[i]->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT)) {
1656 aio_register_kevent(p, entries[i]);
1657 }
1658 entries[i] = NULL; /* the entry was submitted */
1659 } else {
1660 result = EAGAIN;
1661 }
1662 }
1663
1664 if (uap->mode == LIO_WAIT && result == 0) {
1665 leader->flags |= AIO_LIO_WAIT;
1666
1667 while (leader->lio_pending) {
1668 /* If we were interrupted, fail out (even if all finished) */
1669 if (msleep(leader, aio_proc_mutex(p),
1670 PCATCH | PRIBIO | PSPIN, "lio_listio", 0) != 0) {
1671 result = EINTR;
1672 break;
1673 }
1674 }
1675
1676 leader->flags &= ~AIO_LIO_WAIT;
1677 }
1678
1679 aio_proc_unlock(p);
1680
1681 if (uap->mode == LIO_WAIT) {
1682 aio_entry_unref(leader);
1683 }
1684
1685 ExitRoutine:
1686 /* Consume unsubmitted entries */
1687 for (int i = 0; i < lio_count; i++) {
1688 if (entries[i]) {
1689 aio_entry_unref(entries[i]);
1690 }
1691 }
1692
1693 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_listio) | DBG_FUNC_END,
1694 VM_KERNEL_ADDRPERM(p), result, 0, 0, 0);
1695
1696 return result;
1697 }
1698
1699
1700 /*
1701 * aio worker thread. this is where all the real work gets done.
1702 * we get a wake up call on sleep channel &aio_anchor.aio_async_workq
1703 * after new work is queued up.
1704 */
1705 __attribute__((noreturn))
1706 static void
aio_work_thread(void * arg __unused,wait_result_t wr __unused)1707 aio_work_thread(void *arg __unused, wait_result_t wr __unused)
1708 {
1709 aio_workq_entry *entryp;
1710 int error;
1711 vm_map_switch_context_t switch_ctx;
1712 struct uthread *uthreadp = NULL;
1713 proc_t p = NULL;
1714
1715 for (;;) {
1716 /*
1717 * returns with the entry ref'ed.
1718 * sleeps until work is available.
1719 */
1720 entryp = aio_get_some_work();
1721 p = entryp->procp;
1722
1723 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread) | DBG_FUNC_START,
1724 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1725 entryp->flags, 0, 0);
1726
1727 /*
1728 * Assume the target's address space identity for the duration
1729 * of the IO. Note: don't need to have the entryp locked,
1730 * because the proc and map don't change until it's freed.
1731 */
1732 uthreadp = (struct uthread *) current_uthread();
1733 assert(get_task_map(proc_task(current_proc())) != entryp->aio_map);
1734 assert(uthreadp->uu_aio_task == NULL);
1735
1736 /*
1737 * workq entries at this stage cause _aio_exec() and _aio_exit() to
1738 * block until we hit `do_aio_completion_and_unlock()` below,
1739 * which means that it is safe to dereference p->task without
1740 * holding a lock or taking references.
1741 */
1742 uthreadp->uu_aio_task = proc_task(p);
1743 switch_ctx = vm_map_switch_to(entryp->aio_map);
1744
1745 if ((entryp->flags & AIO_READ) != 0) {
1746 error = do_aio_read(entryp);
1747 } else if ((entryp->flags & AIO_WRITE) != 0) {
1748 error = do_aio_write(entryp);
1749 } else if ((entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0) {
1750 error = do_aio_fsync(entryp);
1751 } else {
1752 error = EINVAL;
1753 }
1754
1755 /* Restore old map */
1756 vm_map_switch_back(switch_ctx);
1757 uthreadp->uu_aio_task = NULL;
1758
1759 /* liberate unused map */
1760 vm_map_deallocate(entryp->aio_map);
1761 entryp->aio_map = VM_MAP_NULL;
1762
1763 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_worker_thread) | DBG_FUNC_END,
1764 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
1765 entryp->errorval, entryp->returnval, 0);
1766
1767 /* we're done with the IO request so pop it off the active queue and */
1768 /* push it on the done queue */
1769 aio_proc_lock(p);
1770 entryp->errorval = error;
1771 do_aio_completion_and_unlock(p, entryp, AIO_COMPLETED);
1772 }
1773 }
1774
1775
1776 /*
1777 * aio_get_some_work - get the next async IO request that is ready to be executed.
1778 * aio_fsync complicates matters a bit since we cannot do the fsync until all async
1779 * IO requests at the time the aio_fsync call came in have completed.
1780 * NOTE - AIO_LOCK must be held by caller
1781 */
1782 static aio_workq_entry *
aio_get_some_work(void)1783 aio_get_some_work(void)
1784 {
1785 aio_workq_entry *entryp = NULL;
1786 aio_workq_t queue = NULL;
1787
1788 /* Just one queue for the moment. In the future there will be many. */
1789 queue = &aio_anchor.aio_async_workqs[0];
1790 aio_workq_lock_spin(queue);
1791
1792 /*
1793 * Hold the queue lock.
1794 *
1795 * pop some work off the work queue and add to our active queue
1796 * Always start with the queue lock held.
1797 */
1798 while ((entryp = TAILQ_FIRST(&queue->aioq_entries))) {
1799 /*
1800 * Pull of of work queue. Once it's off, it can't be cancelled,
1801 * so we can take our ref once we drop the queue lock.
1802 */
1803
1804 aio_workq_remove_entry_locked(queue, entryp);
1805
1806 aio_workq_unlock(queue);
1807
1808 /*
1809 * Check if it's an fsync that must be delayed. No need to lock the entry;
1810 * that flag would have been set at initialization.
1811 */
1812 if ((entryp->flags & AIO_FSYNC) != 0) {
1813 /*
1814 * Check for unfinished operations on the same file
1815 * in this proc's queue.
1816 */
1817 aio_proc_lock_spin(entryp->procp);
1818 if (aio_delay_fsync_request(entryp)) {
1819 /* It needs to be delayed. Put it back on the end of the work queue */
1820 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay) | DBG_FUNC_NONE,
1821 VM_KERNEL_ADDRPERM(entryp->procp),
1822 VM_KERNEL_ADDRPERM(entryp->uaiocbp), 0, 0, 0);
1823
1824 aio_proc_unlock(entryp->procp);
1825
1826 aio_workq_lock_spin(queue);
1827 aio_workq_add_entry_locked(queue, entryp);
1828 continue;
1829 }
1830 aio_proc_unlock(entryp->procp);
1831 }
1832
1833 return entryp;
1834 }
1835
1836 /* We will wake up when someone enqueues something */
1837 waitq_assert_wait64(&queue->aioq_waitq, CAST_EVENT64_T(queue), THREAD_UNINT, 0);
1838 aio_workq_unlock(queue);
1839 thread_block(aio_work_thread);
1840
1841 __builtin_unreachable();
1842 }
1843
1844 /*
1845 * aio_delay_fsync_request - look to see if this aio_fsync request should be delayed.
1846 * A big, simple hammer: only send it off if it's the most recently filed IO which has
1847 * not been completed.
1848 */
1849 static boolean_t
aio_delay_fsync_request(aio_workq_entry * entryp)1850 aio_delay_fsync_request(aio_workq_entry *entryp)
1851 {
1852 if (proc_in_teardown(entryp->procp)) {
1853 /*
1854 * we can't delay FSYNCS when in teardown as it will confuse _aio_exit,
1855 * if it was dequeued, then we must now commit to it
1856 */
1857 return FALSE;
1858 }
1859
1860 if (entryp == TAILQ_FIRST(&entryp->procp->p_aio_activeq)) {
1861 return FALSE;
1862 }
1863
1864 return TRUE;
1865 }
1866
1867 static aio_workq_entry *
aio_create_queue_entry(proc_t procp,user_addr_t aiocbp,aio_entry_flags_t flags)1868 aio_create_queue_entry(proc_t procp, user_addr_t aiocbp, aio_entry_flags_t flags)
1869 {
1870 aio_workq_entry *entryp;
1871
1872 entryp = zalloc_flags(aio_workq_zonep, Z_WAITOK | Z_ZERO);
1873 entryp->procp = procp;
1874 entryp->uaiocbp = aiocbp;
1875 entryp->flags = flags;
1876 /* consumed in aio_return or _aio_exit */
1877 os_ref_init(&entryp->aio_refcount, &aio_refgrp);
1878
1879 if (proc_is64bit(procp)) {
1880 struct user64_aiocb aiocb64;
1881
1882 if (copyin(aiocbp, &aiocb64, sizeof(aiocb64)) != 0) {
1883 goto error_exit;
1884 }
1885 do_munge_aiocb_user64_to_user(&aiocb64, &entryp->aiocb);
1886 } else {
1887 struct user32_aiocb aiocb32;
1888
1889 if (copyin(aiocbp, &aiocb32, sizeof(aiocb32)) != 0) {
1890 goto error_exit;
1891 }
1892 do_munge_aiocb_user32_to_user(&aiocb32, &entryp->aiocb);
1893 }
1894
1895 /* do some more validation on the aiocb and embedded file descriptor */
1896 if (aio_validate(procp, entryp) != 0) {
1897 goto error_exit;
1898 }
1899
1900 /* get a reference on the current_thread, which is passed in vfs_context. */
1901 entryp->context = *vfs_context_current();
1902 thread_reference(entryp->context.vc_thread);
1903 kauth_cred_ref(entryp->context.vc_ucred);
1904
1905 if (bootarg_aio_new_workq) {
1906 entryp->aio_map = VM_MAP_NULL;
1907 workq_aio_prepare(procp);
1908 } else {
1909 /* get a reference to the user land map in order to keep it around */
1910 entryp->aio_map = get_task_map(proc_task(procp));
1911 vm_map_reference(entryp->aio_map);
1912 }
1913 return entryp;
1914
1915 error_exit:
1916 zfree(aio_workq_zonep, entryp);
1917 return NULL;
1918 }
1919
1920
1921 /*
1922 * aio_queue_async_request - queue up an async IO request on our work queue then
1923 * wake up one of our worker threads to do the actual work. We get a reference
1924 * to our caller's user land map in order to keep it around while we are
1925 * processing the request.
1926 */
1927 static int
aio_queue_async_request(proc_t procp,user_addr_t aiocbp,aio_entry_flags_t flags)1928 aio_queue_async_request(proc_t procp, user_addr_t aiocbp,
1929 aio_entry_flags_t flags)
1930 {
1931 aio_workq_entry *entryp;
1932 int result;
1933
1934 entryp = aio_create_queue_entry(procp, aiocbp, flags);
1935 if (entryp == NULL) {
1936 result = EAGAIN;
1937 goto error_noalloc;
1938 }
1939
1940 aio_proc_lock(procp);
1941 if (!aio_try_enqueue_work_locked(procp, entryp, NULL)) {
1942 result = EAGAIN;
1943 goto error_exit;
1944 }
1945
1946 if ((entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) &&
1947 (aio_register_kevent(procp, entryp) != 0)) {
1948 result = EAGAIN;
1949 goto error_exit;
1950 }
1951 workq_aio_wakeup_thread_and_unlock(procp);
1952 return 0;
1953
1954 error_exit:
1955 /*
1956 * This entry has not been queued up so no worries about
1957 * unlocked state and aio_map
1958 */
1959 aio_proc_unlock(procp);
1960 aio_free_request(entryp);
1961 error_noalloc:
1962 return result;
1963 }
1964
1965
1966 /*
1967 * aio_free_request - remove our reference on the user land map and
1968 * free the work queue entry resources. The entry is off all lists
1969 * and has zero refcount, so no one can have a pointer to it.
1970 */
1971 static void
aio_free_request(aio_workq_entry * entryp)1972 aio_free_request(aio_workq_entry *entryp)
1973 {
1974 if (entryp->aio_proc_link.tqe_prev || entryp->aio_workq_link.tqe_prev) {
1975 panic("aio_workq_entry %p being freed while still enqueued", entryp);
1976 }
1977
1978 /* remove our reference to the user land map. */
1979 if (VM_MAP_NULL != entryp->aio_map) {
1980 vm_map_deallocate(entryp->aio_map);
1981 }
1982
1983 /* remove our reference to thread which enqueued the request */
1984 if (entryp->context.vc_thread) {
1985 thread_deallocate(entryp->context.vc_thread);
1986 }
1987 kauth_cred_unref(&entryp->context.vc_ucred);
1988
1989 zfree(aio_workq_zonep, entryp);
1990 }
1991
1992
1993 /*
1994 * aio_validate
1995 *
1996 * validate the aiocb passed in by one of the aio syscalls.
1997 */
1998 static int
aio_validate(proc_t p,aio_workq_entry * entryp)1999 aio_validate(proc_t p, aio_workq_entry *entryp)
2000 {
2001 struct fileproc *fp;
2002 int flag;
2003 int result;
2004
2005 result = 0;
2006
2007 if ((entryp->flags & AIO_LIO) != 0) {
2008 if (entryp->aiocb.aio_lio_opcode == LIO_READ) {
2009 entryp->flags |= AIO_READ;
2010 } else if (entryp->aiocb.aio_lio_opcode == LIO_WRITE) {
2011 entryp->flags |= AIO_WRITE;
2012 } else if (entryp->aiocb.aio_lio_opcode == LIO_NOP) {
2013 return 0;
2014 } else {
2015 return EINVAL;
2016 }
2017 }
2018
2019 flag = FREAD;
2020 if ((entryp->flags & (AIO_WRITE | AIO_FSYNC | AIO_DSYNC)) != 0) {
2021 flag = FWRITE;
2022 }
2023
2024 if ((entryp->flags & (AIO_READ | AIO_WRITE)) != 0) {
2025 if (entryp->aiocb.aio_nbytes > INT_MAX ||
2026 entryp->aiocb.aio_buf == USER_ADDR_NULL ||
2027 entryp->aiocb.aio_offset < 0) {
2028 return EINVAL;
2029 }
2030 }
2031
2032 result = aio_sigev_validate(&entryp->aiocb.aio_sigevent);
2033 if (result) {
2034 return result;
2035 }
2036
2037 /* validate the file descriptor and that the file was opened
2038 * for the appropriate read / write access.
2039 */
2040 proc_fdlock(p);
2041
2042 fp = fp_get_noref_locked(p, entryp->aiocb.aio_fildes);
2043 if (fp == NULL) {
2044 result = EBADF;
2045 } else if ((fp->fp_glob->fg_flag & flag) == 0) {
2046 /* we don't have read or write access */
2047 result = EBADF;
2048 } else if (FILEGLOB_DTYPE(fp->fp_glob) != DTYPE_VNODE) {
2049 /* this is not a file */
2050 result = ESPIPE;
2051 } else {
2052 fp->fp_flags |= FP_AIOISSUED;
2053 }
2054
2055 proc_fdunlock(p);
2056
2057 return result;
2058 }
2059
2060 /*
2061 * do_aio_completion_and_unlock. Handle async IO completion.
2062 */
2063 static void
do_aio_completion_and_unlock(proc_t p,aio_workq_entry * entryp,aio_entry_flags_t reason)2064 do_aio_completion_and_unlock(proc_t p, aio_workq_entry *entryp,
2065 aio_entry_flags_t reason)
2066 {
2067 aio_workq_entry *leader = entryp->lio_leader;
2068 int lio_pending = 0;
2069 bool do_signal, do_kevent;
2070
2071 ASSERT_AIO_PROC_LOCK_OWNED(p);
2072
2073 aio_proc_move_done_locked(p, entryp);
2074 entryp->flags |= reason;
2075
2076 if (leader) {
2077 lio_pending = --leader->lio_pending;
2078 if (lio_pending < 0) {
2079 panic("lio_pending accounting mistake");
2080 }
2081 if (lio_pending == 0 && (leader->flags & AIO_LIO_WAIT)) {
2082 wakeup(leader);
2083 }
2084 entryp->lio_leader = NULL; /* no dangling pointers please */
2085 }
2086
2087 /*
2088 * need to handle case where a process is trying to exit, exec, or
2089 * close and is currently waiting for active aio requests to complete.
2090 * If AIO_CLEANUP_WAIT is set then we need to look to see if there are any
2091 * other requests in the active queue for this process. If there are
2092 * none then wakeup using the AIO_CLEANUP_SLEEP_CHAN tsleep channel.
2093 * If there are some still active then do nothing - we only want to
2094 * wakeup when all active aio requests for the process are complete.
2095 */
2096 do_signal = do_kevent = false;
2097 if (__improbable(entryp->flags & AIO_EXIT_WAIT)) {
2098 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait) | DBG_FUNC_NONE,
2099 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2100 0, 0, 0);
2101
2102 if (!aio_has_active_requests_for_process(p)) {
2103 /*
2104 * no active aio requests for this process, continue exiting. In this
2105 * case, there should be no one else waiting on the proc in AIO...
2106 */
2107 wakeup_one((caddr_t)&p->AIO_CLEANUP_SLEEP_CHAN);
2108
2109 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake) | DBG_FUNC_NONE,
2110 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2111 0, 0, 0);
2112 }
2113 } else if (entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2114 /*
2115 * If this was the last request in the group, or not part of
2116 * a group, and that a signal is desired, send one.
2117 */
2118 do_signal = (lio_pending == 0);
2119 } else if (entryp->aiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) {
2120 /*
2121 * For SIGEV_KEVENT, every AIO (even it is part of a group) would get
2122 * a kevent notification.
2123 */
2124 do_kevent = true;
2125 }
2126
2127 if (__improbable(entryp->flags & AIO_CLOSE_WAIT)) {
2128 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wait) | DBG_FUNC_NONE,
2129 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2130 0, 0, 0);
2131
2132 if (!aio_proc_has_active_requests_for_file(p, entryp->aiocb.aio_fildes)) {
2133 /* Can't wakeup_one(); multiple closes might be in progress. */
2134 wakeup(&p->AIO_CLEANUP_SLEEP_CHAN);
2135
2136 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_cleanup_wake) | DBG_FUNC_NONE,
2137 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2138 0, 0, 0);
2139 }
2140 }
2141
2142 aio_proc_unlock(p);
2143
2144 if (do_signal) {
2145 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_sig) | DBG_FUNC_NONE,
2146 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2147 entryp->aiocb.aio_sigevent.sigev_signo, 0, 0);
2148
2149 psignal(p, entryp->aiocb.aio_sigevent.sigev_signo);
2150 } else if (do_kevent) {
2151 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_kevent) | DBG_FUNC_NONE,
2152 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2153 entryp->aiocb.aio_sigevent.sigev_signo, 0, 0);
2154
2155 /* We only support one event type for either completed/cancelled AIO. */
2156 lck_mtx_lock(&aio_klist_lock);
2157 KNOTE(&aio_klist, 1);
2158 lck_mtx_unlock(&aio_klist_lock);
2159 }
2160
2161 /*
2162 * A thread in aio_suspend() wants to known about completed IOs. If it checked
2163 * the done list before we moved our AIO there, then it already asserted its wait,
2164 * and we can wake it up without holding the lock. If it checked the list after
2165 * we did our move, then it already has seen the AIO that we moved. Herego, we
2166 * can do our wakeup without holding the lock.
2167 */
2168 wakeup(&p->AIO_SUSPEND_SLEEP_CHAN);
2169 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_completion_suspend_wake) | DBG_FUNC_NONE,
2170 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp), 0, 0, 0);
2171
2172 aio_entry_unref(entryp); /* see aio_try_enqueue_work_locked */
2173 if (leader) {
2174 aio_entry_unref(leader); /* see lio_listio */
2175 }
2176 }
2177
2178
2179 /*
2180 * do_aio_read
2181 */
2182 static int
do_aio_read(aio_workq_entry * entryp)2183 do_aio_read(aio_workq_entry *entryp)
2184 {
2185 struct proc *p = entryp->procp;
2186 struct fileproc *fp;
2187 int error;
2188
2189 if ((error = fp_lookup(p, entryp->aiocb.aio_fildes, &fp, 0))) {
2190 return error;
2191 }
2192
2193 if (fp->fp_glob->fg_flag & FREAD) {
2194 error = dofileread(&entryp->context, fp,
2195 entryp->aiocb.aio_buf,
2196 entryp->aiocb.aio_nbytes,
2197 entryp->aiocb.aio_offset, FOF_OFFSET,
2198 &entryp->returnval);
2199 } else {
2200 error = EBADF;
2201 }
2202
2203 fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2204 return error;
2205 }
2206
2207
2208 /*
2209 * do_aio_write
2210 */
2211 static int
do_aio_write(aio_workq_entry * entryp)2212 do_aio_write(aio_workq_entry *entryp)
2213 {
2214 struct proc *p = entryp->procp;
2215 struct fileproc *fp;
2216 int error;
2217
2218 if ((error = fp_lookup(p, entryp->aiocb.aio_fildes, &fp, 0))) {
2219 return error;
2220 }
2221
2222 if (fp->fp_glob->fg_flag & FWRITE) {
2223 int flags = 0;
2224
2225 if ((fp->fp_glob->fg_flag & O_APPEND) == 0) {
2226 flags |= FOF_OFFSET;
2227 }
2228
2229 /* NB: tell dofilewrite the offset, and to use the proc cred */
2230 error = dofilewrite(&entryp->context,
2231 fp,
2232 entryp->aiocb.aio_buf,
2233 entryp->aiocb.aio_nbytes,
2234 entryp->aiocb.aio_offset,
2235 flags,
2236 &entryp->returnval);
2237 } else {
2238 error = EBADF;
2239 }
2240
2241 fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2242 return error;
2243 }
2244
2245
2246 /*
2247 * aio_has_active_requests_for_process - return whether the process has active
2248 * requests pending.
2249 */
2250 static bool
aio_has_active_requests_for_process(proc_t procp)2251 aio_has_active_requests_for_process(proc_t procp)
2252 {
2253 return !TAILQ_EMPTY(&procp->p_aio_activeq);
2254 }
2255
2256 /*
2257 * Called with the proc locked.
2258 */
2259 static bool
aio_proc_has_active_requests_for_file(proc_t procp,int fd)2260 aio_proc_has_active_requests_for_file(proc_t procp, int fd)
2261 {
2262 aio_workq_entry *entryp;
2263
2264 TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) {
2265 if (entryp->aiocb.aio_fildes == fd) {
2266 return true;
2267 }
2268 }
2269
2270 return false;
2271 }
2272
2273
2274 /*
2275 * do_aio_fsync
2276 */
2277 static int
do_aio_fsync(aio_workq_entry * entryp)2278 do_aio_fsync(aio_workq_entry *entryp)
2279 {
2280 struct proc *p = entryp->procp;
2281 struct vnode *vp;
2282 struct fileproc *fp;
2283 int sync_flag;
2284 int error;
2285
2286 /*
2287 * We are never called unless either AIO_FSYNC or AIO_DSYNC are set.
2288 *
2289 * If AIO_DSYNC is set, we can tell the lower layers that it is OK
2290 * to mark for update the metadata not strictly necessary for data
2291 * retrieval, rather than forcing it to disk.
2292 *
2293 * If AIO_FSYNC is set, we have to also wait for metadata not really
2294 * necessary to data retrival are committed to stable storage (e.g.
2295 * atime, mtime, ctime, etc.).
2296 *
2297 * Metadata necessary for data retrieval ust be committed to stable
2298 * storage in either case (file length, etc.).
2299 */
2300 if (entryp->flags & AIO_FSYNC) {
2301 sync_flag = MNT_WAIT;
2302 } else {
2303 sync_flag = MNT_DWAIT;
2304 }
2305
2306 error = fp_get_ftype(p, entryp->aiocb.aio_fildes, DTYPE_VNODE, ENOTSUP, &fp);
2307 if (error != 0) {
2308 entryp->returnval = -1;
2309 return error;
2310 }
2311 vp = fp_get_data(fp);
2312
2313 if ((error = vnode_getwithref(vp)) == 0) {
2314 error = VNOP_FSYNC(vp, sync_flag, &entryp->context);
2315
2316 (void)vnode_put(vp);
2317 } else {
2318 entryp->returnval = -1;
2319 }
2320
2321 fp_drop(p, entryp->aiocb.aio_fildes, fp, 0);
2322 return error;
2323 }
2324
2325
2326 /*
2327 * is_already_queued - runs through our queues to see if the given
2328 * aiocbp / process is there. Returns TRUE if there is a match
2329 * on any of our aio queues.
2330 *
2331 * Called with proc aio lock held (can be held spin)
2332 */
2333 static boolean_t
is_already_queued(proc_t procp,user_addr_t aiocbp)2334 is_already_queued(proc_t procp, user_addr_t aiocbp)
2335 {
2336 aio_workq_entry *entryp;
2337 boolean_t result;
2338
2339 result = FALSE;
2340
2341 /* look for matches on our queue of async IO requests that have completed */
2342 TAILQ_FOREACH(entryp, &procp->p_aio_doneq, aio_proc_link) {
2343 if (aiocbp == entryp->uaiocbp) {
2344 result = TRUE;
2345 goto ExitThisRoutine;
2346 }
2347 }
2348
2349 /* look for matches on our queue of active async IO requests */
2350 TAILQ_FOREACH(entryp, &procp->p_aio_activeq, aio_proc_link) {
2351 if (aiocbp == entryp->uaiocbp) {
2352 result = TRUE;
2353 goto ExitThisRoutine;
2354 }
2355 }
2356
2357 ExitThisRoutine:
2358 return result;
2359 }
2360
2361
2362 /*
2363 * aio initialization
2364 */
2365 __private_extern__ void
aio_init(void)2366 aio_init(void)
2367 {
2368 for (int i = 0; i < AIO_NUM_WORK_QUEUES; i++) {
2369 aio_workq_init(&aio_anchor.aio_async_workqs[i]);
2370 }
2371
2372 if (bootarg_aio_new_workq) {
2373 printf("New aio workqueue implementation selected\n");
2374 } else {
2375 _aio_create_worker_threads(aio_worker_threads);
2376 }
2377
2378 klist_init(&aio_klist);
2379
2380 clock_interval_to_absolutetime_interval(aio_wq_reduce_pool_window.usecs,
2381 NSEC_PER_USEC, &aio_wq_reduce_pool_window.abstime);
2382 }
2383
2384
2385 /*
2386 * aio worker threads created here.
2387 */
2388 __private_extern__ void
_aio_create_worker_threads(int num)2389 _aio_create_worker_threads(int num)
2390 {
2391 int i;
2392
2393 /* create some worker threads to handle the async IO requests */
2394 for (i = 0; i < num; i++) {
2395 thread_t myThread;
2396
2397 if (KERN_SUCCESS != kernel_thread_start(aio_work_thread, NULL, &myThread)) {
2398 printf("%s - failed to create a work thread \n", __FUNCTION__);
2399 } else {
2400 thread_deallocate(myThread);
2401 }
2402 }
2403 }
2404
2405 /*
2406 * Return the current activation utask
2407 */
2408 task_t
get_aiotask(void)2409 get_aiotask(void)
2410 {
2411 return current_uthread()->uu_aio_task;
2412 }
2413
2414
2415 /*
2416 * In the case of an aiocb from a
2417 * 32-bit process we need to expand some longs and pointers to the correct
2418 * sizes in order to let downstream code always work on the same type of
2419 * aiocb (in our case that is a user_aiocb)
2420 */
2421 static void
do_munge_aiocb_user32_to_user(struct user32_aiocb * my_aiocbp,struct user_aiocb * the_user_aiocbp)2422 do_munge_aiocb_user32_to_user(struct user32_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp)
2423 {
2424 the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
2425 the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
2426 the_user_aiocbp->aio_buf = CAST_USER_ADDR_T(my_aiocbp->aio_buf);
2427 the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
2428 the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
2429 the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
2430
2431 /* special case here. since we do not know if sigev_value is an */
2432 /* int or a ptr we do NOT cast the ptr to a user_addr_t. This */
2433 /* means if we send this info back to user space we need to remember */
2434 /* sigev_value was not expanded for the 32-bit case. */
2435 /* NOTE - this does NOT affect us since we don't support sigev_value */
2436 /* yet in the aio context. */
2437 //LP64
2438 the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
2439 the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
2440 the_user_aiocbp->aio_sigevent.sigev_value.sival_ptr =
2441 my_aiocbp->aio_sigevent.sigev_value.sival_ptr;
2442 the_user_aiocbp->aio_sigevent.sigev_notify_function =
2443 CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_function);
2444 the_user_aiocbp->aio_sigevent.sigev_notify_attributes =
2445 CAST_USER_ADDR_T(my_aiocbp->aio_sigevent.sigev_notify_attributes);
2446 }
2447
2448 /* Similar for 64-bit user process, so that we don't need to satisfy
2449 * the alignment constraints of the original user64_aiocb
2450 */
2451 #if !__LP64__
2452 __dead2
2453 #endif
2454 static void
do_munge_aiocb_user64_to_user(struct user64_aiocb * my_aiocbp,struct user_aiocb * the_user_aiocbp)2455 do_munge_aiocb_user64_to_user(struct user64_aiocb *my_aiocbp, struct user_aiocb *the_user_aiocbp)
2456 {
2457 #if __LP64__
2458 the_user_aiocbp->aio_fildes = my_aiocbp->aio_fildes;
2459 the_user_aiocbp->aio_offset = my_aiocbp->aio_offset;
2460 the_user_aiocbp->aio_buf = my_aiocbp->aio_buf;
2461 the_user_aiocbp->aio_nbytes = my_aiocbp->aio_nbytes;
2462 the_user_aiocbp->aio_reqprio = my_aiocbp->aio_reqprio;
2463 the_user_aiocbp->aio_lio_opcode = my_aiocbp->aio_lio_opcode;
2464
2465 the_user_aiocbp->aio_sigevent.sigev_notify = my_aiocbp->aio_sigevent.sigev_notify;
2466 the_user_aiocbp->aio_sigevent.sigev_signo = my_aiocbp->aio_sigevent.sigev_signo;
2467 the_user_aiocbp->aio_sigevent.sigev_value.sival_ptr =
2468 my_aiocbp->aio_sigevent.sigev_value.sival_ptr;
2469 the_user_aiocbp->aio_sigevent.sigev_notify_function =
2470 my_aiocbp->aio_sigevent.sigev_notify_function;
2471 the_user_aiocbp->aio_sigevent.sigev_notify_attributes =
2472 my_aiocbp->aio_sigevent.sigev_notify_attributes;
2473 #else
2474 #pragma unused(my_aiocbp, the_user_aiocbp)
2475 panic("64bit process on 32bit kernel is not supported");
2476 #endif
2477 }
2478
2479
2480 static int
filt_aioattach(struct knote * kn,struct kevent_qos_s * kev)2481 filt_aioattach(struct knote *kn, struct kevent_qos_s *kev)
2482 {
2483 aio_workq_entry *entryp = (aio_workq_entry *)kev->data;
2484
2485 /* Don't allow kevent registration from the user-space. */
2486 if ((kev->flags & EV_KERNEL) == 0) {
2487 return EPERM;
2488 }
2489
2490 kev->flags &= ~EV_KERNEL;
2491 /* Clear the 'kn_fflags' state afte the knote has been processed. */
2492 kn->kn_flags |= EV_CLEAR;
2493
2494 /* Associate the knote with the AIO work. */
2495 knote_kn_hook_set_raw(kn, (void *)entryp);
2496
2497 lck_mtx_lock(&aio_klist_lock);
2498 KNOTE_ATTACH(&aio_klist, kn);
2499 lck_mtx_unlock(&aio_klist_lock);
2500
2501 return 0;
2502 }
2503
2504 static void
filt_aiodetach(struct knote * kn)2505 filt_aiodetach(struct knote *kn)
2506 {
2507 lck_mtx_lock(&aio_klist_lock);
2508 KNOTE_DETACH(&aio_klist, kn);
2509 lck_mtx_unlock(&aio_klist_lock);
2510 }
2511
2512 /*
2513 * The 'f_event' is called with 'aio_klist_lock' held when KNOTE() was called
2514 * in do_aio_completion_and_unlock().
2515 */
2516 static int
filt_aioevent(struct knote * kn,long hint)2517 filt_aioevent(struct knote *kn, long hint)
2518 {
2519 aio_workq_entry *entryp;
2520 int activate = 0;
2521
2522 /*
2523 * The 'f_event' and 'f_process' can run concurrently so it is possible
2524 * the aio_workq_entry has been detached from the knote when the
2525 * filt_aioprocess() was called earlier. In this case, we will skip
2526 * activating the event.
2527 */
2528 entryp = knote_kn_hook_get_raw(kn);
2529 if (__improbable(entryp == NULL)) {
2530 goto out;
2531 }
2532
2533 /* We can only activate the filter if the AIO work has completed. */
2534 if (entryp->flags & AIO_COMPLETED) {
2535 kn->kn_fflags |= hint;
2536 activate = FILTER_ACTIVE;
2537 }
2538
2539 out:
2540 return activate;
2541 }
2542
2543 static int
filt_aiotouch(struct knote * kn,struct kevent_qos_s * kev)2544 filt_aiotouch(struct knote *kn, struct kevent_qos_s *kev)
2545 {
2546 panic("%s: kn %p kev %p (NOT EXPECTED TO BE CALLED!!)", __func__, kn, kev);
2547 }
2548
2549 static int
filt_aioprocess(struct knote * kn,struct kevent_qos_s * kev)2550 filt_aioprocess(struct knote *kn, struct kevent_qos_s *kev)
2551 {
2552 aio_workq_entry *entryp;
2553 proc_t p;
2554 int res = 0;
2555
2556 entryp = knote_kn_hook_get_raw(kn);
2557 assert(entryp);
2558 p = entryp->procp;
2559
2560 lck_mtx_lock(&aio_klist_lock);
2561 if (kn->kn_fflags) {
2562 /* Propagate the error status and return value back to the user. */
2563 kn->kn_ext[0] = entryp->errorval;
2564 kn->kn_ext[1] = entryp->returnval;
2565 knote_fill_kevent(kn, kev, 0);
2566 knote_kn_hook_set_raw(kn, NULL);
2567
2568 aio_proc_lock(p);
2569 aio_proc_remove_done_locked(p, entryp);
2570 aio_proc_unlock(p);
2571 aio_entry_unref(entryp);
2572
2573 res = FILTER_ACTIVE;
2574 }
2575 lck_mtx_unlock(&aio_klist_lock);
2576
2577 return res;
2578 }
2579
2580 SECURITY_READ_ONLY_EARLY(struct filterops) aio_filtops = {
2581 .f_isfd = 0,
2582 .f_attach = filt_aioattach,
2583 .f_detach = filt_aiodetach,
2584 .f_event = filt_aioevent,
2585 .f_touch = filt_aiotouch,
2586 .f_process = filt_aioprocess,
2587 };
2588
2589 #pragma mark per process aio workqueue
2590
2591 /*
2592 * The per process workq threads call this function to handle the aio request. The threads
2593 * belong to the same process so we don't need to change the vm maps as we would for kernel
2594 * threads.
2595 */
2596 static int
workq_aio_process_entry(aio_workq_entry * entryp)2597 workq_aio_process_entry(aio_workq_entry *entryp)
2598 {
2599 proc_t p = entryp->procp;
2600 int error;
2601
2602 assert(current_proc() == p && current_thread() != vfs_context_thread(&entryp->context));
2603
2604 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_WQ_process_entry) | DBG_FUNC_START,
2605 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2606 entryp->flags, 0, 0);
2607
2608 if ((entryp->flags & AIO_READ) != 0) {
2609 error = do_aio_read(entryp);
2610 } else if ((entryp->flags & AIO_WRITE) != 0) {
2611 error = do_aio_write(entryp);
2612 } else if ((entryp->flags & (AIO_FSYNC | AIO_DSYNC)) != 0) {
2613 if ((entryp->flags & AIO_FSYNC) != 0) {
2614 /*
2615 * Check for unfinished operations on the same file
2616 * in this proc's queue.
2617 */
2618 aio_proc_lock_spin(p);
2619 if (aio_delay_fsync_request(entryp)) {
2620 /* It needs to be delayed. Put it back on the end of the work queue */
2621 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_fsync_delay) | DBG_FUNC_NONE,
2622 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2623 0, 0, 0);
2624
2625 /* The references on this entry havn't been consumed */
2626 if (!workq_aio_entry_add_locked(p, entryp)) {
2627 entryp->errorval = ECANCELED;
2628 entryp->returnval = -1;
2629
2630 /* Now it's officially cancelled. Do the completion */
2631 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_cancel_async_workq) | DBG_FUNC_NONE,
2632 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2633 entryp->aiocb.aio_fildes, 0, 0);
2634
2635 do_aio_completion_and_unlock(p, entryp, AIO_CANCELLED);
2636 } else {
2637 workq_aio_wakeup_thread_and_unlock(p);
2638 }
2639 return 0;
2640 }
2641 aio_proc_unlock(entryp->procp);
2642 }
2643 error = do_aio_fsync(entryp);
2644 } else {
2645 error = EINVAL;
2646 }
2647
2648 KERNEL_DEBUG(BSDDBG_CODE(DBG_BSD_AIO, AIO_WQ_process_entry) | DBG_FUNC_END,
2649 VM_KERNEL_ADDRPERM(p), VM_KERNEL_ADDRPERM(entryp->uaiocbp),
2650 entryp->errorval, entryp->returnval, 0);
2651
2652 /* we're done with the IO request so pop it off the active queue and */
2653 /* push it on the done queue */
2654 aio_proc_lock(p);
2655 entryp->errorval = error;
2656 do_aio_completion_and_unlock(p, entryp, AIO_COMPLETED);
2657 return 0;
2658 }
2659
2660 /*
2661 * The functions below implement a workqueue for aio which is taken from the
2662 * workqueue implementation for libdispatch/pthreads. They are stripped down versions
2663 * of the corresponding functions for libdispatch/pthreads.
2664 */
2665
2666 static int
2667 aio_workq_sysctl_handle_usecs SYSCTL_HANDLER_ARGS
2668 {
2669 #pragma unused(arg2)
2670 struct aio_workq_usec_var *v = arg1;
2671 int error = sysctl_handle_int(oidp, &v->usecs, 0, req);
2672 if (error || !req->newptr) {
2673 return error;
2674 }
2675 clock_interval_to_absolutetime_interval(v->usecs, NSEC_PER_USEC,
2676 &v->abstime);
2677 return 0;
2678 }
2679
2680 #pragma mark wq_flags
2681
2682 #define AIO_WQ_DEAD 0x1000
2683
2684 static inline uint32_t
_wa_flags(workq_aio_t wq_aio)2685 _wa_flags(workq_aio_t wq_aio)
2686 {
2687 return os_atomic_load(&wq_aio->wa_flags, relaxed);
2688 }
2689
2690 static inline bool
_wq_exiting(workq_aio_t wq_aio)2691 _wq_exiting(workq_aio_t wq_aio)
2692 {
2693 return _wa_flags(wq_aio) & WQ_EXITING;
2694 }
2695
2696 static inline bool
_wq_dead(workq_aio_t wq_aio)2697 _wq_dead(workq_aio_t wq_aio)
2698 {
2699 return _wa_flags(wq_aio) & AIO_WQ_DEAD;
2700 }
2701
2702 #define AIO_WQPTR_IS_INITING_VALUE ((workq_aio_t)~(uintptr_t)0)
2703
2704 static workq_aio_t
proc_get_aio_wqptr_fast(struct proc * p)2705 proc_get_aio_wqptr_fast(struct proc *p)
2706 {
2707 return os_atomic_load(&p->p_aio_wqptr, relaxed);
2708 }
2709
2710 static workq_aio_t
proc_get_aio_wqptr(struct proc * p)2711 proc_get_aio_wqptr(struct proc *p)
2712 {
2713 workq_aio_t wq_aio = proc_get_aio_wqptr_fast(p);
2714 return wq_aio == AIO_WQPTR_IS_INITING_VALUE ? NULL : wq_aio;
2715 }
2716
2717 static void
proc_set_aio_wqptr(struct proc * p,workq_aio_t wq_aio)2718 proc_set_aio_wqptr(struct proc *p, workq_aio_t wq_aio)
2719 {
2720 wq_aio = os_atomic_xchg(&p->p_aio_wqptr, wq_aio, release);
2721 if (wq_aio == AIO_WQPTR_IS_INITING_VALUE) {
2722 proc_lock(p);
2723 thread_wakeup(&p->p_aio_wqptr);
2724 proc_unlock(p);
2725 }
2726 }
2727
2728 static bool
proc_init_aio_wqptr_or_wait(struct proc * p)2729 proc_init_aio_wqptr_or_wait(struct proc *p)
2730 {
2731 workq_aio_t wq_aio;
2732
2733 proc_lock(p);
2734 wq_aio = os_atomic_load(&p->p_aio_wqptr, relaxed);
2735
2736 if (wq_aio == NULL) {
2737 os_atomic_store(&p->p_aio_wqptr, AIO_WQPTR_IS_INITING_VALUE, relaxed);
2738 proc_unlock(p);
2739 return true;
2740 }
2741
2742 if (wq_aio == AIO_WQPTR_IS_INITING_VALUE) {
2743 assert_wait(&p->p_aio_wqptr, THREAD_UNINT);
2744 proc_unlock(p);
2745 thread_block(THREAD_CONTINUE_NULL);
2746 } else {
2747 proc_unlock(p);
2748 }
2749 return false;
2750 }
2751
2752 static inline event_t
workq_aio_parked_wait_event(struct uthread * uth)2753 workq_aio_parked_wait_event(struct uthread *uth)
2754 {
2755 return (event_t)&uth->uu_workq_stackaddr;
2756 }
2757
2758 static inline void
workq_aio_thread_wakeup(struct uthread * uth)2759 workq_aio_thread_wakeup(struct uthread *uth)
2760 {
2761 thread_wakeup_thread(workq_aio_parked_wait_event(uth), get_machthread(uth));
2762 }
2763
2764 /*
2765 * Routine: workq_aio_mark_exiting
2766 *
2767 * Function: Mark the work queue such that new threads will not be added to the
2768 * work queue after we return.
2769 *
2770 * Conditions: Called against the current process.
2771 */
2772 static void
workq_aio_mark_exiting(proc_t p)2773 workq_aio_mark_exiting(proc_t p)
2774 {
2775 workq_aio_t wq_aio = proc_get_aio_wqptr(p);
2776 uint32_t wq_flags;
2777
2778 if (!wq_aio) {
2779 return;
2780 }
2781
2782 wq_flags = os_atomic_or_orig(&wq_aio->wa_flags, WQ_EXITING, relaxed);
2783 if (__improbable(wq_flags & WQ_EXITING)) {
2784 panic("workq_aio_mark_exiting_locked called twice");
2785 }
2786
2787 /*
2788 * Opportunistically try to cancel thread calls that are likely in flight.
2789 * workq_aio_exit() will do the proper cleanup.
2790 */
2791 if (wq_flags & WQ_DEATH_CALL_SCHEDULED) {
2792 thread_call_cancel(wq_aio->wa_death_call);
2793 }
2794 }
2795
2796 static void
workq_aio_exit(proc_t p)2797 workq_aio_exit(proc_t p)
2798 {
2799 workq_aio_t wq_aio;
2800
2801 wq_aio = os_atomic_xchg(&p->p_aio_wqptr, NULL, release);
2802
2803 if (!wq_aio) {
2804 return;
2805 }
2806
2807 /*
2808 * Thread calls are always scheduled by the proc itself or under the
2809 * workqueue spinlock if WQ_EXITING is not yet set.
2810 *
2811 * Either way, when this runs, the proc has no threads left beside
2812 * the one running this very code, so we know no thread call can be
2813 * dispatched anymore.
2814 */
2815
2816 thread_call_cancel_wait(wq_aio->wa_death_call);
2817 thread_call_free(wq_aio->wa_death_call);
2818
2819 /*
2820 * Clean up workqueue data structures for threads that exited and
2821 * didn't get a chance to clean up after themselves.
2822 *
2823 * idle/new threads should have been interrupted and died on their own
2824 */
2825
2826 assert(TAILQ_EMPTY(&wq_aio->wa_aioq_entries));
2827 assert(TAILQ_EMPTY(&wq_aio->wa_thrunlist));
2828
2829 if (wq_aio->wa_nthreads) {
2830 os_atomic_or(&wq_aio->wa_flags, AIO_WQ_DEAD, relaxed);
2831 aio_proc_lock_spin(p);
2832 if (wq_aio->wa_nthreads) {
2833 struct uthread *uth;
2834
2835 TAILQ_FOREACH(uth, &wq_aio->wa_thidlelist, uu_workq_entry) {
2836 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
2837 workq_aio_thread_wakeup(uth);
2838 continue;
2839 }
2840 wq_aio->wa_thdying_count++;
2841 uth->uu_workq_flags |= UT_WORKQ_DYING;
2842 workq_aio_thread_wakeup(uth);
2843 }
2844 while (wq_aio->wa_nthreads) {
2845 msleep(&wq_aio->wa_nthreads, aio_proc_mutex(p), PRIBIO | PSPIN, "aio_workq_exit", 0);
2846 }
2847 }
2848 aio_proc_unlock(p);
2849 }
2850
2851 assertf(TAILQ_EMPTY(&wq_aio->wa_thidlelist),
2852 "wa_thidlecount = %d, wa_thdying_count = %d",
2853 wq_aio->wa_thidlecount, wq_aio->wa_thdying_count);
2854 assertf(wq_aio->wa_thidlecount == 0,
2855 "wa_thidlecount = %d, wa_thdying_count = %d",
2856 wq_aio->wa_thidlecount, wq_aio->wa_thdying_count);
2857 assertf(wq_aio->wa_thdying_count == 0,
2858 "wa_thdying_count = %d", wq_aio->wa_thdying_count);
2859
2860 kfree_type(workq_aio_s, wq_aio);
2861 }
2862
2863 static int
workq_aio_open(struct proc * p)2864 workq_aio_open(struct proc *p)
2865 {
2866 workq_aio_t wq_aio;
2867 int error = 0;
2868
2869 if (proc_get_aio_wqptr(p) == NULL) {
2870 if (proc_init_aio_wqptr_or_wait(p) == FALSE) {
2871 assert(proc_get_aio_wqptr(p) != NULL);
2872 goto out;
2873 }
2874
2875 wq_aio = kalloc_type(workq_aio_s, Z_WAITOK | Z_ZERO);
2876
2877 wq_aio->wa_proc = p;
2878
2879 TAILQ_INIT(&wq_aio->wa_thidlelist);
2880 TAILQ_INIT(&wq_aio->wa_thrunlist);
2881 TAILQ_INIT(&wq_aio->wa_aioq_entries);
2882
2883 wq_aio->wa_death_call = thread_call_allocate_with_options(
2884 workq_aio_kill_old_threads_call, wq_aio,
2885 THREAD_CALL_PRIORITY_USER, THREAD_CALL_OPTIONS_ONCE);
2886
2887 proc_set_aio_wqptr(p, wq_aio);
2888 }
2889 out:
2890 return error;
2891 }
2892
2893 #pragma mark aio workqueue idle thread accounting
2894
2895 static inline struct uthread *
workq_oldest_killable_idle_aio_thread(workq_aio_t wq_aio)2896 workq_oldest_killable_idle_aio_thread(workq_aio_t wq_aio)
2897 {
2898 return TAILQ_LAST(&wq_aio->wa_thidlelist, workq_aio_uthread_head);
2899 }
2900
2901 static inline uint64_t
workq_kill_delay_for_idle_aio_thread()2902 workq_kill_delay_for_idle_aio_thread()
2903 {
2904 return aio_wq_reduce_pool_window.abstime;
2905 }
2906
2907 static inline bool
workq_should_kill_idle_aio_thread(struct uthread * uth,uint64_t now)2908 workq_should_kill_idle_aio_thread(struct uthread *uth, uint64_t now)
2909 {
2910 uint64_t delay = workq_kill_delay_for_idle_aio_thread();
2911 return now - uth->uu_save.uus_workq_park_data.idle_stamp > delay;
2912 }
2913
2914 static void
workq_aio_death_call_schedule(workq_aio_t wq_aio,uint64_t deadline)2915 workq_aio_death_call_schedule(workq_aio_t wq_aio, uint64_t deadline)
2916 {
2917 uint32_t wa_flags = os_atomic_load(&wq_aio->wa_flags, relaxed);
2918
2919 if (wa_flags & (WQ_EXITING | WQ_DEATH_CALL_SCHEDULED)) {
2920 return;
2921 }
2922 os_atomic_or(&wq_aio->wa_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
2923
2924 /*
2925 * <rdar://problem/13139182> Due to how long term timers work, the leeway
2926 * can't be too short, so use 500ms which is long enough that we will not
2927 * wake up the CPU for killing threads, but short enough that it doesn't
2928 * fall into long-term timer list shenanigans.
2929 */
2930 thread_call_enter_delayed_with_leeway(wq_aio->wa_death_call, NULL, deadline,
2931 aio_wq_reduce_pool_window.abstime / 10,
2932 THREAD_CALL_DELAY_LEEWAY | THREAD_CALL_DELAY_USER_BACKGROUND);
2933 }
2934
2935 /*
2936 * `decrement` is set to the number of threads that are no longer dying:
2937 * - because they have been resuscitated just in time (workq_pop_idle_thread)
2938 * - or have been killed (workq_thread_terminate).
2939 */
2940 static void
workq_aio_death_policy_evaluate(workq_aio_t wq_aio,uint16_t decrement)2941 workq_aio_death_policy_evaluate(workq_aio_t wq_aio, uint16_t decrement)
2942 {
2943 struct uthread *uth;
2944
2945 assert(wq_aio->wa_thdying_count >= decrement);
2946 #if 0
2947 if (decrement) {
2948 printf("VV_DEBUG_AIO : %s:%d : pid = %d, ctid = %d, after decrement thdying_count = %d\n",
2949 __FUNCTION__, __LINE__, proc_pid(current_proc()), thread_get_ctid(thr),
2950 wq_aio->wa_thdying_count - decrement);
2951 }
2952 #endif
2953
2954 if ((wq_aio->wa_thdying_count -= decrement) > 0) {
2955 return;
2956 }
2957
2958 if (wq_aio->wa_thidlecount <= 1) {
2959 return;
2960 }
2961
2962 if (((uth = workq_oldest_killable_idle_aio_thread(wq_aio)) == NULL)) {
2963 return;
2964 }
2965
2966 uint64_t now = mach_absolute_time();
2967 uint64_t delay = workq_kill_delay_for_idle_aio_thread();
2968
2969 if (now - uth->uu_save.uus_workq_park_data.idle_stamp > delay) {
2970 if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
2971 wq_aio->wa_thdying_count++;
2972 uth->uu_workq_flags |= UT_WORKQ_DYING;
2973 }
2974 workq_aio_thread_wakeup(uth);
2975 return;
2976 }
2977
2978 workq_aio_death_call_schedule(wq_aio,
2979 uth->uu_save.uus_workq_park_data.idle_stamp + delay);
2980 }
2981
2982 static void
workq_aio_kill_old_threads_call(void * param0,void * param1 __unused)2983 workq_aio_kill_old_threads_call(void *param0, void *param1 __unused)
2984 {
2985 workq_aio_t wq_aio = param0;
2986
2987 aio_proc_lock_spin(wq_aio->wa_proc);
2988 WQ_AIO_TRACE_WQ(AIO_WQ_aio_death_call | DBG_FUNC_START, wq_aio);
2989 os_atomic_andnot(&wq_aio->wa_flags, WQ_DEATH_CALL_SCHEDULED, relaxed);
2990 workq_aio_death_policy_evaluate(wq_aio, 0);
2991 WQ_AIO_TRACE_WQ(AIO_WQ_aio_death_call | DBG_FUNC_END, wq_aio);
2992 aio_proc_unlock(wq_aio->wa_proc);;
2993 }
2994
2995 #define WORKQ_UNPARK_FOR_DEATH_WAS_IDLE 0x1
2996 #define WQ_SETUP_NONE 0
2997
2998 __attribute__((noreturn, noinline))
2999 static void
workq_aio_unpark_for_death_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t death_flags,__unused uint32_t setup_flags)3000 workq_aio_unpark_for_death_and_unlock(proc_t p, workq_aio_t wq_aio,
3001 struct uthread *uth, uint32_t death_flags, __unused uint32_t setup_flags)
3002 {
3003 if (death_flags & WORKQ_UNPARK_FOR_DEATH_WAS_IDLE) {
3004 wq_aio->wa_thidlecount--;
3005 TAILQ_REMOVE(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3006 }
3007
3008 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3009 wq_aio->wa_thdying_count--;
3010 }
3011 assert(wq_aio->wa_nthreads > 0);
3012 wq_aio->wa_nthreads--;
3013
3014 WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_terminate | DBG_FUNC_NONE, wq_aio);
3015
3016 if (_wq_dead(wq_aio) && (wq_aio->wa_nthreads == 0)) {
3017 wakeup(&wq_aio->wa_nthreads);
3018 }
3019
3020 aio_proc_unlock(p);
3021
3022 thread_t th = get_machthread(uth);
3023 assert(th == current_thread());
3024
3025 thread_deallocate(th);
3026 thread_terminate(th);
3027 thread_exception_return();
3028 __builtin_unreachable();
3029 }
3030
3031 static void
workq_push_idle_aio_thread(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3032 workq_push_idle_aio_thread(proc_t p, workq_aio_t wq_aio, struct uthread *uth,
3033 uint32_t setup_flags)
3034 {
3035 uint64_t now = mach_absolute_time();
3036
3037 uth->uu_workq_flags &= ~(UT_WORKQ_RUNNING);
3038 TAILQ_REMOVE(&wq_aio->wa_thrunlist, uth, uu_workq_entry);
3039
3040 uth->uu_save.uus_workq_park_data.idle_stamp = now;
3041
3042 struct uthread *oldest = workq_oldest_killable_idle_aio_thread(wq_aio);
3043 uint16_t cur_idle = wq_aio->wa_thidlecount;
3044
3045 if (_wq_exiting(wq_aio) || (wq_aio->wa_thdying_count == 0 && oldest &&
3046 workq_should_kill_idle_aio_thread(oldest, now))) {
3047 /*
3048 * Immediately kill threads if we have too may of them.
3049 *
3050 * And swap "place" with the oldest one we'd have woken up.
3051 * This is a relatively desperate situation where we really
3052 * need to kill threads quickly and it's best to kill
3053 * the one that's currently on core than context switching.
3054 */
3055 if (oldest) {
3056 oldest->uu_save.uus_workq_park_data.idle_stamp = now;
3057 TAILQ_REMOVE(&wq_aio->wa_thidlelist, oldest, uu_workq_entry);
3058 TAILQ_INSERT_HEAD(&wq_aio->wa_thidlelist, oldest, uu_workq_entry);
3059 }
3060
3061 if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3062 wq_aio->wa_thdying_count++;
3063 uth->uu_workq_flags |= UT_WORKQ_DYING;
3064 }
3065 workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth, 0, setup_flags);
3066 __builtin_unreachable();
3067 }
3068
3069 struct uthread *tail = TAILQ_LAST(&wq_aio->wa_thidlelist, workq_aio_uthread_head);
3070
3071 cur_idle += 1;
3072 wq_aio->wa_thidlecount = cur_idle;
3073 uth->uu_save.uus_workq_park_data.has_stack = false;
3074 TAILQ_INSERT_HEAD(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3075
3076 if (!tail) {
3077 uint64_t delay = workq_kill_delay_for_idle_aio_thread();
3078 workq_aio_death_call_schedule(wq_aio, now + delay);
3079 }
3080 }
3081
3082 /*
3083 * We have no work to do, park ourselves on the idle list.
3084 *
3085 * Consumes the workqueue lock and does not return.
3086 */
3087 __attribute__((noreturn, noinline))
3088 static void
workq_aio_park_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3089 workq_aio_park_and_unlock(proc_t p, workq_aio_t wq_aio, struct uthread *uth,
3090 uint32_t setup_flags)
3091 {
3092 assert(uth == current_uthread());
3093 assert(uth->uu_kqr_bound == NULL);
3094
3095 workq_push_idle_aio_thread(p, wq_aio, uth, setup_flags); // may not return
3096
3097 if (uth->uu_workq_flags & UT_WORKQ_DYING) {
3098 workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth,
3099 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, setup_flags);
3100 __builtin_unreachable();
3101 }
3102
3103 WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_park | DBG_FUNC_NONE, wq_aio);
3104
3105 thread_set_pending_block_hint(get_machthread(uth), kThreadWaitParkedWorkQueue);
3106 /* XXX this should probably be THREAD_UNINT */
3107 assert_wait(workq_aio_parked_wait_event(uth), THREAD_INTERRUPTIBLE);
3108 aio_proc_unlock(p);
3109 thread_block(workq_aio_unpark_continue);
3110 __builtin_unreachable();
3111 }
3112
3113 #define WORKQ_POLICY_INIT(qos) \
3114 (struct uu_workq_policy){ .qos_req = (qos), .qos_bucket = (qos) }
3115
3116 /*
3117 * This function is always called with the workq lock.
3118 */
3119 static void
workq_aio_thread_reset_pri(struct uthread * uth,thread_t src_th)3120 workq_aio_thread_reset_pri(struct uthread *uth, thread_t src_th)
3121 {
3122 thread_t th = get_machthread(uth);
3123 thread_qos_t qos = (thread_qos_t)proc_get_effective_thread_policy(src_th, TASK_POLICY_QOS);
3124 int priority = 31;
3125 int policy = POLICY_TIMESHARE;
3126
3127 uth->uu_workq_pri = WORKQ_POLICY_INIT(qos);
3128 thread_set_workq_pri(th, qos, priority, policy);
3129 }
3130
3131 static inline void
workq_aio_thread_set_type(struct uthread * uth,uint16_t flags)3132 workq_aio_thread_set_type(struct uthread *uth, uint16_t flags)
3133 {
3134 uth->uu_workq_flags &= ~(UT_WORKQ_OVERCOMMIT | UT_WORKQ_COOPERATIVE);
3135 uth->uu_workq_flags |= flags;
3136 }
3137
3138 __attribute__((noreturn, noinline))
3139 static void
workq_aio_unpark_select_req_or_park_and_unlock(proc_t p,workq_aio_t wq_aio,struct uthread * uth,uint32_t setup_flags)3140 workq_aio_unpark_select_req_or_park_and_unlock(proc_t p, workq_aio_t wq_aio,
3141 struct uthread *uth, uint32_t setup_flags)
3142 {
3143 aio_workq_entry *entryp;
3144 thread_t last_thread = NULL;
3145
3146 WQ_AIO_TRACE_WQ(AIO_WQ_aio_select_req | DBG_FUNC_START, wq_aio);
3147 thread_freeze_base_pri(get_machthread(uth));
3148 workq_aio_thread_set_type(uth, 0);
3149 while ((entryp = TAILQ_FIRST(&wq_aio->wa_aioq_entries))) {
3150 if (__improbable(_wq_exiting(wq_aio))) {
3151 break;
3152 }
3153
3154 TAILQ_REMOVE(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3155 entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
3156
3157 aio_proc_unlock(p);
3158
3159 thread_t thr = vfs_context_thread(&entryp->context);
3160 if (last_thread != thr) {
3161 workq_aio_thread_reset_pri(uth, thr);
3162 last_thread = thr;
3163 }
3164
3165 /* this frees references to workq entry */
3166 workq_aio_process_entry(entryp);
3167
3168 aio_proc_lock_spin(p);
3169 }
3170 WQ_AIO_TRACE_WQ(AIO_WQ_aio_select_req | DBG_FUNC_END, wq_aio);
3171 thread_unfreeze_base_pri(get_machthread(uth));
3172 workq_aio_park_and_unlock(p, wq_aio, uth, setup_flags);
3173 }
3174
3175 /*
3176 * parked idle thread wakes up
3177 */
3178 __attribute__((noreturn, noinline))
3179 static void
workq_aio_unpark_continue(void * parameter __unused,wait_result_t wr)3180 workq_aio_unpark_continue(void *parameter __unused, wait_result_t wr)
3181 {
3182 thread_t th = current_thread();
3183 struct uthread *uth = get_bsdthread_info(th);
3184 proc_t p = current_proc();
3185 workq_aio_t wq_aio = proc_get_aio_wqptr_fast(p);
3186
3187 aio_proc_lock_spin(p);
3188
3189 if (__probable(uth->uu_workq_flags & UT_WORKQ_RUNNING)) {
3190 workq_aio_unpark_select_req_or_park_and_unlock(p, wq_aio, uth, WQ_SETUP_NONE);
3191 __builtin_unreachable();
3192 }
3193
3194 if (__probable(wr == THREAD_AWAKENED)) {
3195 /*
3196 * We were set running, but for the purposes of dying.
3197 */
3198 assert(uth->uu_workq_flags & UT_WORKQ_DYING);
3199 assert((uth->uu_workq_flags & UT_WORKQ_NEW) == 0);
3200 } else {
3201 /*
3202 * workaround for <rdar://problem/38647347>,
3203 * in case we do hit userspace, make sure calling
3204 * workq_thread_terminate() does the right thing here,
3205 * and if we never call it, that workq_exit() will too because it sees
3206 * this thread on the runlist.
3207 */
3208 assert(wr == THREAD_INTERRUPTED);
3209
3210 if (!(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3211 wq_aio->wa_thdying_count++;
3212 uth->uu_workq_flags |= UT_WORKQ_DYING;
3213 }
3214 }
3215
3216 workq_aio_unpark_for_death_and_unlock(p, wq_aio, uth,
3217 WORKQ_UNPARK_FOR_DEATH_WAS_IDLE, WQ_SETUP_NONE);
3218
3219 __builtin_unreachable();
3220 }
3221
3222 /*
3223 * Called by thread_create_workq_aio_waiting() during thread initialization, before
3224 * assert_wait, before the thread has been started.
3225 */
3226 event_t
aio_workq_thread_init_and_wq_lock(task_t task,thread_t th)3227 aio_workq_thread_init_and_wq_lock(task_t task, thread_t th)
3228 {
3229 struct uthread *uth = get_bsdthread_info(th);
3230
3231 uth->uu_workq_flags = UT_WORKQ_NEW;
3232 uth->uu_workq_pri = WORKQ_POLICY_INIT(THREAD_QOS_LEGACY);
3233 uth->uu_workq_thport = MACH_PORT_NULL;
3234 uth->uu_workq_stackaddr = 0;
3235 uth->uu_workq_pthread_kill_allowed = 0;
3236
3237 thread_set_tag(th, THREAD_TAG_AIO_WORKQUEUE);
3238 thread_reset_workq_qos(th, THREAD_QOS_LEGACY);
3239
3240 aio_proc_lock(get_bsdtask_info(task));
3241 return workq_aio_parked_wait_event(uth);
3242 }
3243
3244 /**
3245 * Try to add a new workqueue thread for aio.
3246 *
3247 * - called with workq lock held
3248 * - dropped and retaken around thread creation
3249 * - return with workq lock held
3250 * - aio threads do not call into pthread functions to setup or destroy stacks.
3251 */
3252 static kern_return_t
workq_aio_add_new_thread(proc_t p,workq_aio_t wq_aio)3253 workq_aio_add_new_thread(proc_t p, workq_aio_t wq_aio)
3254 {
3255 kern_return_t kret;
3256 thread_t th;
3257
3258 wq_aio->wa_nthreads++;
3259
3260 aio_proc_unlock(p);
3261
3262 kret = thread_create_aio_workq_waiting(proc_task(p),
3263 workq_aio_unpark_continue,
3264 &th);
3265
3266 if (kret != KERN_SUCCESS) {
3267 WQ_AIO_TRACE(AIO_WQ_aio_thread_create_failed | DBG_FUNC_NONE, wq_aio,
3268 kret, 0, 0, 0);
3269 goto out;
3270 }
3271
3272 /*
3273 * thread_create_aio_workq_waiting() will return with the wq lock held
3274 * on success, because it calls workq_thread_init_and_wq_lock().
3275 */
3276 struct uthread *uth = get_bsdthread_info(th);
3277 TAILQ_INSERT_TAIL(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3278 wq_aio->wa_thidlecount++;
3279 uth->uu_workq_flags &= ~UT_WORKQ_NEW;
3280 WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_create | DBG_FUNC_NONE, wq_aio);
3281 return kret;
3282
3283 out:
3284 aio_proc_lock(p);
3285 /*
3286 * Do not redrive here if we went under wq_max_threads again,
3287 * it is the responsibility of the callers of this function
3288 * to do so when it fails.
3289 */
3290 wq_aio->wa_nthreads--;
3291 return kret;
3292 }
3293
3294 static void
workq_aio_wakeup_thread_internal(proc_t p,bool unlock)3295 workq_aio_wakeup_thread_internal(proc_t p, bool unlock)
3296 {
3297 workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3298 bool needs_wakeup = false;
3299 struct uthread *uth = NULL;
3300
3301 if (!wq_aio) {
3302 goto out;
3303 }
3304
3305 uth = TAILQ_FIRST(&wq_aio->wa_thidlelist);
3306 while (!uth && (wq_aio->wa_nthreads < WORKQUEUE_AIO_MAXTHREADS) &&
3307 !(thread_get_tag(current_thread()) & THREAD_TAG_AIO_WORKQUEUE)) {
3308 if (workq_aio_add_new_thread(p, wq_aio) != KERN_SUCCESS) {
3309 break;
3310 }
3311 uth = TAILQ_FIRST(&wq_aio->wa_thidlelist);
3312 }
3313
3314 if (!uth) {
3315 goto out;
3316 }
3317
3318 TAILQ_REMOVE(&wq_aio->wa_thidlelist, uth, uu_workq_entry);
3319 wq_aio->wa_thidlecount--;
3320
3321 TAILQ_INSERT_TAIL(&wq_aio->wa_thrunlist, uth, uu_workq_entry);
3322 assert((uth->uu_workq_flags & UT_WORKQ_RUNNING) == 0);
3323 uth->uu_workq_flags |= UT_WORKQ_RUNNING;
3324
3325 WQ_AIO_TRACE_WQ(AIO_WQ_aio_thread_wakeup | DBG_FUNC_NONE, wq_aio);
3326
3327 if (__improbable(uth->uu_workq_flags & UT_WORKQ_DYING)) {
3328 uth->uu_workq_flags ^= UT_WORKQ_DYING;
3329 workq_aio_death_policy_evaluate(wq_aio, 1);
3330 needs_wakeup = false;
3331 } else {
3332 needs_wakeup = true;
3333 }
3334 out:
3335 if (unlock) {
3336 aio_proc_unlock(p);
3337 }
3338
3339 if (uth && needs_wakeup) {
3340 workq_aio_thread_wakeup(uth);
3341 }
3342 }
3343
3344 static void
workq_aio_wakeup_thread_and_unlock(proc_t p)3345 workq_aio_wakeup_thread_and_unlock(proc_t p)
3346 {
3347 return workq_aio_wakeup_thread_internal(p, true);
3348 }
3349
3350 static void
workq_aio_wakeup_thread(proc_t p)3351 workq_aio_wakeup_thread(proc_t p)
3352 {
3353 return workq_aio_wakeup_thread_internal(p, false);
3354 }
3355
3356 void
workq_aio_prepare(struct proc * p)3357 workq_aio_prepare(struct proc *p)
3358 {
3359 workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3360
3361 if (__improbable(!wq_aio && !proc_in_teardown(p))) {
3362 workq_aio_open(p);
3363 }
3364 }
3365
3366 bool
workq_aio_entry_add_locked(struct proc * p,aio_workq_entry * entryp)3367 workq_aio_entry_add_locked(struct proc *p, aio_workq_entry *entryp)
3368 {
3369 workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3370 bool ret = false;
3371
3372 ASSERT_AIO_PROC_LOCK_OWNED(p);
3373
3374 if (!proc_in_teardown(p) && wq_aio && !_wq_exiting(wq_aio)) {
3375 TAILQ_INSERT_TAIL(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3376 ret = true;
3377 }
3378
3379 return ret;
3380 }
3381
3382 bool
workq_aio_entry_remove_locked(struct proc * p,aio_workq_entry * entryp)3383 workq_aio_entry_remove_locked(struct proc *p, aio_workq_entry *entryp)
3384 {
3385 workq_aio_t wq_aio = proc_get_aio_wqptr(p);
3386
3387 ASSERT_AIO_PROC_LOCK_OWNED(p);
3388
3389 if (entryp->aio_workq_link.tqe_prev == NULL) {
3390 panic("Trying to remove an entry from a work queue, but it is not on a queue");
3391 }
3392
3393 TAILQ_REMOVE(&wq_aio->wa_aioq_entries, entryp, aio_workq_link);
3394 entryp->aio_workq_link.tqe_prev = NULL; /* Not on a workq */
3395
3396 return true;
3397 }
3398