1 /*
2 * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28 /*
29 * @OSF_FREE_COPYRIGHT@
30 */
31 /*
32 * Mach Operating System
33 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34 * All Rights Reserved.
35 *
36 * Permission to use, copy, modify and distribute this software and its
37 * documentation is hereby granted, provided that both the copyright
38 * notice and this permission notice appear in all copies of the
39 * software, derivative works or modified versions, and any portions
40 * thereof, and that both notices appear in supporting documentation.
41 *
42 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45 *
46 * Carnegie Mellon requests users of this software to return to
47 *
48 * Software Distribution Coordinator or [email protected]
49 * School of Computer Science
50 * Carnegie Mellon University
51 * Pittsburgh PA 15213-3890
52 *
53 * any improvements or extensions that they make and grant Carnegie Mellon
54 * the rights to redistribute these changes.
55 */
56 /*
57 */
58 /*
59 * File: ipc/ipc_mqueue.c
60 * Author: Rich Draves
61 * Date: 1989
62 *
63 * Functions to manipulate IPC message queues.
64 */
65 /*
66 * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67 * support for mandatory and extensible security protections. This notice
68 * is included in support of clause 2.2 (b) of the Apple Public License,
69 * Version 2.0.
70 */
71
72
73 #include <mach/port.h>
74 #include <mach/message.h>
75 #include <mach/sync_policy.h>
76
77 #include <kern/assert.h>
78 #include <kern/counter.h>
79 #include <kern/sched_prim.h>
80 #include <kern/ipc_kobject.h>
81 #include <kern/ipc_mig.h> /* XXX - for mach_msg_receive_continue */
82 #include <kern/misc_protos.h>
83 #include <kern/task.h>
84 #include <kern/thread.h>
85 #include <kern/waitq.h>
86
87 #include <ipc/port.h>
88 #include <ipc/ipc_mqueue.h>
89 #include <ipc/ipc_kmsg.h>
90 #include <ipc/ipc_right.h>
91 #include <ipc/ipc_port.h>
92 #include <ipc/ipc_pset.h>
93 #include <ipc/ipc_space.h>
94
95 #if MACH_FLIPC
96 #include <ipc/flipc.h>
97 #endif
98
99 #ifdef __LP64__
100 #include <vm/vm_map.h>
101 #endif
102
103 #include <sys/event.h>
104
105 extern char *proc_name_address(void *p);
106
107 int ipc_mqueue_full; /* address is event for queue space */
108 int ipc_mqueue_rcv; /* address is event for message arrival */
109
110 /* forward declarations */
111 static void ipc_mqueue_receive_results(wait_result_t result);
112 static void ipc_mqueue_peek_on_thread_locked(
113 ipc_mqueue_t port_mq,
114 mach_msg_option_t option,
115 thread_t thread);
116
117 /* Deliver message to message queue or waiting receiver */
118 static void ipc_mqueue_post(
119 ipc_mqueue_t mqueue,
120 ipc_kmsg_t kmsg,
121 mach_msg_option_t option);
122
123 /*
124 * Routine: ipc_mqueue_init
125 * Purpose:
126 * Initialize a newly-allocated message queue.
127 */
128 void
ipc_mqueue_init(ipc_mqueue_t mqueue)129 ipc_mqueue_init(
130 ipc_mqueue_t mqueue)
131 {
132 ipc_kmsg_queue_init(&mqueue->imq_messages);
133 mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
134 klist_init(&mqueue->imq_klist);
135 }
136
137 /*
138 * Routine: ipc_mqueue_add_unlock
139 * Purpose:
140 * Associate the portset's mqueue with the port's mqueue.
141 * This has to be done so that posting the port will wakeup
142 * a portset waiter. If there are waiters on the portset
143 * mqueue and messages on the port mqueue, try to match them
144 * up now.
145 * Conditions:
146 * Port locked on entry, unlocked on return.
147 * The waitq set needs to have its link initialized.
148 * May block.
149 */
150 kern_return_t
ipc_mqueue_add_unlock(ipc_mqueue_t port_mqueue,ipc_pset_t pset,waitq_ref_t * reserved_link,uint64_t * reserved_prepost,boolean_t unlink_before_adding)151 ipc_mqueue_add_unlock(
152 ipc_mqueue_t port_mqueue,
153 ipc_pset_t pset,
154 waitq_ref_t *reserved_link,
155 uint64_t *reserved_prepost,
156 boolean_t unlink_before_adding)
157 {
158 ipc_port_t port = ip_from_mq(port_mqueue);
159 struct waitq *port_waitq = &port->ip_waitq;
160 struct waitq_set *set_waitq = &pset->ips_wqset;
161 ipc_kmsg_queue_t kmsgq;
162 ipc_kmsg_t kmsg, next;
163 kern_return_t kr = KERN_SUCCESS;
164
165 assert(waitqs_is_linked(set_waitq));
166
167 /*
168 * The link operation is now under the same lock-hold as
169 * message iteration and thread wakeup, but doesn't have to be...
170 */
171 if (unlink_before_adding) {
172 waitq_unlink_all_relink_unlock(port_waitq, set_waitq);
173 /*
174 * While the port was unlocked,
175 * it could have been moved out of the set already.
176 *
177 * If this is the case, we should not go ahead with wakeups
178 * and message deliveries.
179 */
180 ip_mq_lock(ip_from_mq(port_mqueue));
181 if (!waitq_member_locked(port_waitq, set_waitq)) {
182 goto leave;
183 }
184 } else {
185 assert(reserved_link && reserved_link->wqr_value != 0);
186 kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link);
187 if (kr != KERN_SUCCESS) {
188 goto leave;
189 }
190 }
191
192 /*
193 * Now that the set has been added to the port, there may be
194 * messages queued on the port and threads waiting on the set
195 * waitq. Lets get them together.
196 */
197 kmsgq = &port_mqueue->imq_messages;
198 for (kmsg = ipc_kmsg_queue_first(kmsgq);
199 kmsg != IKM_NULL;
200 kmsg = next) {
201 next = ipc_kmsg_queue_next(kmsgq, kmsg);
202
203 for (;;) {
204 thread_t th;
205 mach_msg_size_t msize;
206 spl_t th_spl;
207
208 th = waitq_wakeup64_identify_locked(
209 port_waitq,
210 IPC_MQUEUE_RECEIVE,
211 THREAD_AWAKENED, &th_spl,
212 reserved_prepost, WAITQ_ALL_PRIORITIES,
213 WAITQ_KEEP_LOCKED);
214 /* waitq/mqueue still locked, thread locked */
215
216 if (th == THREAD_NULL) {
217 goto leave;
218 }
219
220 /*
221 * If the receiver waited with a facility not directly
222 * related to Mach messaging, then it isn't prepared to get
223 * handed the message directly. Just set it running, and
224 * go look for another thread that can.
225 */
226 if (th->ith_state != MACH_RCV_IN_PROGRESS) {
227 if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
228 /*
229 * wakeup the peeking thread, but
230 * continue to loop over the threads
231 * waiting on the port's mqueue to see
232 * if there are any actual receivers
233 */
234 ipc_mqueue_peek_on_thread_locked(port_mqueue,
235 th->ith_option,
236 th);
237 }
238 thread_unlock(th);
239 splx(th_spl);
240 continue;
241 }
242
243 /*
244 * Found a receiver. see if they can handle the message
245 * correctly (the message is not too large for them, or
246 * they didn't care to be informed that the message was
247 * too large). If they can't handle it, take them off
248 * the list and let them go back and figure it out and
249 * just move onto the next.
250 */
251 msize = ipc_kmsg_copyout_size(kmsg, th->map);
252 if (th->ith_rsize <
253 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
254 th->ith_state = MACH_RCV_TOO_LARGE;
255 th->ith_msize = msize;
256 if (th->ith_option & MACH_RCV_LARGE) {
257 /*
258 * let him go without message
259 */
260 th->ith_receiver_name = port_mqueue->imq_receiver_name;
261 th->ith_kmsg = IKM_NULL;
262 th->ith_seqno = 0;
263 thread_unlock(th);
264 splx(th_spl);
265 continue; /* find another thread */
266 }
267 } else {
268 th->ith_state = MACH_MSG_SUCCESS;
269 }
270
271 /*
272 * This thread is going to take this message,
273 * so give it to him.
274 */
275 ipc_kmsg_rmqueue(kmsgq, kmsg);
276 #if MACH_FLIPC
277 mach_node_t node = kmsg->ikm_node;
278 #endif
279 ipc_mqueue_release_msgcount(port_mqueue);
280
281 th->ith_kmsg = kmsg;
282 th->ith_seqno = port_mqueue->imq_seqno++;
283 thread_unlock(th);
284 splx(th_spl);
285 #if MACH_FLIPC
286 if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
287 flipc_msg_ack(node, port_mqueue, TRUE);
288 }
289 #endif
290 break; /* go to next message */
291 }
292 }
293
294 leave:
295 ip_mq_unlock(ip_from_mq(port_mqueue));
296 return kr;
297 }
298
299
300 /*
301 * Routine: ipc_port_has_klist
302 * Purpose:
303 * Returns whether the given port imq_klist field can be used as a klist.
304 */
305 static inline bool
ipc_port_has_klist(ipc_port_t port)306 ipc_port_has_klist(ipc_port_t port)
307 {
308 return !port->ip_specialreply &&
309 port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
310 }
311
312 static inline struct klist *
ipc_object_klist(ipc_object_t object)313 ipc_object_klist(ipc_object_t object)
314 {
315 if (io_otype(object) == IOT_PORT) {
316 ipc_port_t port = ip_object_to_port(object);
317
318 return ipc_port_has_klist(port) ? &port->ip_klist : NULL;
319 }
320 return &ips_object_to_pset(object)->ips_klist;
321 }
322
323 /*
324 * Routine: ipc_mqueue_changed
325 * Purpose:
326 * Wake up receivers waiting in a message queue.
327 * Conditions:
328 * The object containing the message queue is locked.
329 */
330 void
ipc_mqueue_changed(ipc_space_t space,struct waitq * waitq)331 ipc_mqueue_changed(
332 ipc_space_t space,
333 struct waitq *waitq)
334 {
335 ipc_object_t object = io_from_waitq(waitq);
336 struct klist *klist = ipc_object_klist(object);
337
338 if (klist && SLIST_FIRST(klist)) {
339 /*
340 * Indicate that this message queue is vanishing
341 *
342 * When this is called, the associated receive right may be in flight
343 * between two tasks: the one it used to live in, and the one that armed
344 * a port destroyed notification for it.
345 *
346 * The new process may want to register the port it gets back with an
347 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
348 * port pending already, in which case we want the imq_klist field to be
349 * reusable for nefarious purposes.
350 *
351 * Fortunately, we really don't need this linkage anymore after this
352 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
353 *
354 * Note: we don't have the space lock here, however, this covers the
355 * case of when a task is terminating the space, triggering
356 * several knote_vanish() calls.
357 *
358 * We don't need the lock to observe that the space is inactive as
359 * we just deactivated it on the same thread.
360 *
361 * We still need to call knote_vanish() so that the knote is
362 * marked with EV_VANISHED or EV_EOF so that the detach step
363 * in filt_machportdetach is skipped correctly.
364 */
365 assert(space);
366 knote_vanish(klist, is_active(space));
367 }
368
369 if (io_otype(object) == IOT_PORT) {
370 ipc_port_adjust_sync_link_state_locked(ip_object_to_port(object),
371 PORT_SYNC_LINK_ANY, NULL);
372 } else {
373 klist_init(klist);
374 }
375
376 waitq_wakeup64_all_locked(waitq,
377 IPC_MQUEUE_RECEIVE,
378 THREAD_RESTART,
379 NULL,
380 WAITQ_ALL_PRIORITIES,
381 WAITQ_KEEP_LOCKED);
382 }
383
384
385
386
387 /*
388 * Routine: ipc_mqueue_send
389 * Purpose:
390 * Send a message to a message queue. The message holds a reference
391 * for the destination port for this message queue in the
392 * msgh_remote_port field.
393 *
394 * If unsuccessful, the caller still has possession of
395 * the message and must do something with it. If successful,
396 * the message is queued, given to a receiver, or destroyed.
397 * Conditions:
398 * port is locked.
399 * Returns:
400 * MACH_MSG_SUCCESS The message was accepted.
401 * MACH_SEND_TIMED_OUT Caller still has message.
402 * MACH_SEND_INTERRUPTED Caller still has message.
403 */
404 mach_msg_return_t
ipc_mqueue_send_locked(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option_t option,mach_msg_timeout_t send_timeout)405 ipc_mqueue_send_locked(
406 ipc_mqueue_t mqueue,
407 ipc_kmsg_t kmsg,
408 mach_msg_option_t option,
409 mach_msg_timeout_t send_timeout)
410 {
411 ipc_port_t port = ip_from_mq(mqueue);
412 int wresult;
413
414 /*
415 * Don't block if:
416 * 1) We're under the queue limit.
417 * 2) Caller used the MACH_SEND_ALWAYS internal option.
418 * 3) Message is sent to a send-once right.
419 */
420 if (!imq_full(mqueue) ||
421 (!imq_full_kernel(mqueue) &&
422 ((option & MACH_SEND_ALWAYS) ||
423 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
424 MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
425 mqueue->imq_msgcount++;
426 assert(mqueue->imq_msgcount > 0);
427 ip_mq_unlock(port);
428 } else {
429 thread_t cur_thread = current_thread();
430 struct turnstile *send_turnstile = TURNSTILE_NULL;
431 uint64_t deadline;
432
433 /*
434 * We have to wait for space to be granted to us.
435 */
436 if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
437 ip_mq_unlock(port);
438 return MACH_SEND_TIMED_OUT;
439 }
440 if (imq_full_kernel(mqueue)) {
441 ip_mq_unlock(port);
442 return MACH_SEND_NO_BUFFER;
443 }
444 port->ip_fullwaiters = true;
445
446 if (option & MACH_SEND_TIMEOUT) {
447 clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
448 } else {
449 deadline = 0;
450 }
451
452 thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
453
454 send_turnstile = turnstile_prepare((uintptr_t)port,
455 port_send_turnstile_address(port),
456 TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
457
458 ipc_port_send_update_inheritor(port, send_turnstile,
459 TURNSTILE_DELAYED_UPDATE);
460
461 wresult = waitq_assert_wait64_leeway(
462 &send_turnstile->ts_waitq,
463 IPC_MQUEUE_FULL,
464 THREAD_ABORTSAFE,
465 TIMEOUT_URGENCY_USER_NORMAL,
466 deadline,
467 TIMEOUT_NO_LEEWAY);
468
469 ip_mq_unlock(port);
470 turnstile_update_inheritor_complete(send_turnstile,
471 TURNSTILE_INTERLOCK_NOT_HELD);
472
473 if (wresult == THREAD_WAITING) {
474 wresult = thread_block(THREAD_CONTINUE_NULL);
475 }
476
477 /* Call turnstile complete with interlock held */
478 ip_mq_lock(port);
479 turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC);
480 ip_mq_unlock(port);
481
482 /* Call cleanup after dropping the interlock */
483 turnstile_cleanup();
484
485 switch (wresult) {
486 case THREAD_AWAKENED:
487 /*
488 * we can proceed - inherited msgcount from waker
489 * or the message queue has been destroyed and the msgcount
490 * has been reset to zero (will detect in ipc_mqueue_post()).
491 */
492 break;
493
494 case THREAD_TIMED_OUT:
495 assert(option & MACH_SEND_TIMEOUT);
496 return MACH_SEND_TIMED_OUT;
497
498 case THREAD_INTERRUPTED:
499 return MACH_SEND_INTERRUPTED;
500
501 case THREAD_RESTART:
502 /* mqueue is being destroyed */
503 return MACH_SEND_INVALID_DEST;
504 default:
505 panic("ipc_mqueue_send");
506 }
507 }
508
509 ipc_mqueue_post(mqueue, kmsg, option);
510 return MACH_MSG_SUCCESS;
511 }
512
513 /*
514 * Routine: ipc_mqueue_override_send_locked
515 * Purpose:
516 * Set an override qos on the first message in the queue
517 * (if the queue is full). This is a send-possible override
518 * that will go away as soon as we drain a message from the
519 * queue.
520 *
521 * Conditions:
522 * The port corresponding to mqueue is locked.
523 * The caller holds a reference on the message queue.
524 */
525 void
ipc_mqueue_override_send_locked(ipc_mqueue_t mqueue,mach_msg_qos_t qos_ovr)526 ipc_mqueue_override_send_locked(
527 ipc_mqueue_t mqueue,
528 mach_msg_qos_t qos_ovr)
529 {
530 ipc_port_t port = ip_from_mq(mqueue);
531 boolean_t __unused full_queue_empty = FALSE;
532
533 assert(waitq_is_valid(&port->ip_waitq));
534
535 if (imq_full(mqueue)) {
536 ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
537
538 if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, qos_ovr)) {
539 if (ip_in_a_space(port) &&
540 is_active(ip_get_receiver(port)) &&
541 ipc_port_has_klist(port)) {
542 KNOTE(&port->ip_klist, 0);
543 }
544 }
545 if (!first) {
546 full_queue_empty = TRUE;
547 }
548 }
549
550 #if DEVELOPMENT || DEBUG
551 if (full_queue_empty) {
552 int dst_pid = 0;
553 dst_pid = ipc_port_get_receiver_task(port, NULL);
554 }
555 #endif
556 }
557
558 /*
559 * Routine: ipc_mqueue_release_msgcount
560 * Purpose:
561 * Release a message queue reference in the case where we
562 * found a waiter.
563 *
564 * Conditions:
565 * The port corresponding to message queue is locked.
566 * The message corresponding to this reference is off the queue.
567 * There is no need to pass reserved preposts because this will
568 * never prepost to anyone
569 */
570 void
ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)571 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)
572 {
573 ipc_port_t port = ip_from_mq(port_mq);
574 struct turnstile *send_turnstile = port_send_turnstile(port);
575
576 ip_mq_lock_held(port);
577 assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
578
579 port_mq->imq_msgcount--;
580
581 if (!imq_full(port_mq) && port->ip_fullwaiters &&
582 send_turnstile != TURNSTILE_NULL) {
583 /*
584 * boost the priority of the awoken thread
585 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
586 * the message queue slot we've just reserved.
587 *
588 * NOTE: this will never prepost
589 *
590 * The wakeup happens on a turnstile waitq
591 * which will wakeup the highest priority waiter.
592 * A potential downside of this would be starving low
593 * priority senders if there is a constant churn of
594 * high priority threads trying to send to this port.
595 */
596 if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
597 IPC_MQUEUE_FULL,
598 THREAD_AWAKENED,
599 WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
600 port->ip_fullwaiters = false;
601 } else {
602 /* gave away our slot - add reference back */
603 port_mq->imq_msgcount++;
604 }
605 }
606
607 if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
608 /* no more msgs: invalidate the port's prepost object */
609 waitq_clear_prepost_locked(&port->ip_waitq);
610 }
611 }
612
613 /*
614 * Routine: ipc_mqueue_post
615 * Purpose:
616 * Post a message to a waiting receiver or enqueue it. If a
617 * receiver is waiting, we can release our reserved space in
618 * the message queue.
619 *
620 * Conditions:
621 * port is unlocked
622 * If we need to queue, our space in the message queue is reserved.
623 */
624 static void
ipc_mqueue_post(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option_t __unused option)625 ipc_mqueue_post(
626 ipc_mqueue_t mqueue,
627 ipc_kmsg_t kmsg,
628 mach_msg_option_t __unused option)
629 {
630 ipc_port_t port = ip_from_mq(mqueue);
631 struct waitq *waitq = &port->ip_waitq;
632 uint64_t reserved_prepost = 0;
633 boolean_t destroy_msg = FALSE;
634
635 ipc_kmsg_trace_send(kmsg, option);
636
637 /*
638 * While the msg queue is locked, we have control of the
639 * kmsg, so the ref in it for the port is still good.
640 *
641 * Check for a receiver for the message.
642 */
643 reserved_prepost = waitq_prepost_reserve(waitq, 0,
644 WAITQ_KEEP_LOCKED);
645
646 /* we may have raced with port destruction! */
647 if (!waitq_is_valid(&port->ip_waitq)) {
648 destroy_msg = TRUE;
649 goto out_unlock;
650 }
651
652 for (;;) {
653 spl_t th_spl;
654 thread_t receiver;
655 mach_msg_size_t msize;
656
657 receiver = waitq_wakeup64_identify_locked(waitq,
658 IPC_MQUEUE_RECEIVE,
659 THREAD_AWAKENED,
660 &th_spl,
661 &reserved_prepost,
662 WAITQ_ALL_PRIORITIES,
663 WAITQ_KEEP_LOCKED);
664 /* waitq still locked, thread locked */
665
666 if (receiver == THREAD_NULL) {
667 /*
668 * no receivers; queue kmsg if space still reserved
669 * Reservations are cancelled when the port goes inactive.
670 * note that this will enqueue the message for any
671 * "peeking" receivers.
672 *
673 * Also, post the knote to wake up any threads waiting
674 * on that style of interface if this insertion is of
675 * note (first insertion, or adjusted override qos all
676 * the way to the head of the queue).
677 *
678 * This is just for ports. port-sets knotes are being
679 * posted to by the waitq_wakeup64_identify_locked()
680 * above already.
681 */
682 if (mqueue->imq_msgcount > 0) {
683 if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
684 /* if the space is dead there is no point calling KNOTE */
685 if (ip_in_a_space(port) &&
686 is_active(ip_get_receiver(port)) &&
687 ipc_port_has_klist(port)) {
688 KNOTE(&port->ip_klist, 0);
689 }
690 }
691 break;
692 }
693
694 /*
695 * Otherwise, the message queue must belong to an inactive
696 * port, so just destroy the message and pretend it was posted.
697 */
698 destroy_msg = TRUE;
699 goto out_unlock;
700 }
701
702 /*
703 * If a thread is attempting a "peek" into the message queue
704 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
705 * thread running. A successful peek is essentially the same as
706 * message delivery since the peeking thread takes responsibility
707 * for delivering the message and (eventually) removing it from
708 * the mqueue. Only one thread can successfully use the peek
709 * facility on any given port, so we exit the waitq loop after
710 * encountering such a thread.
711 */
712 if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
713 ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
714 ipc_mqueue_peek_on_thread_locked(mqueue, receiver->ith_option, receiver);
715 thread_unlock(receiver);
716 splx(th_spl);
717 break; /* Message was posted, so break out of loop */
718 }
719
720 /*
721 * If the receiver waited with a facility not directly related
722 * to Mach messaging, then it isn't prepared to get handed the
723 * message directly. Just set it running, and go look for
724 * another thread that can.
725 */
726 if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
727 thread_unlock(receiver);
728 splx(th_spl);
729 continue;
730 }
731
732
733 /*
734 * We found a waiting thread.
735 * If the message is too large or the scatter list is too small
736 * the thread we wake up will get that as its status.
737 */
738 msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
739 if (receiver->ith_rsize <
740 (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
741 receiver->ith_msize = msize;
742 receiver->ith_state = MACH_RCV_TOO_LARGE;
743 } else {
744 receiver->ith_state = MACH_MSG_SUCCESS;
745 }
746
747 /*
748 * If there is no problem with the upcoming receive, or the
749 * receiver thread didn't specifically ask for special too
750 * large error condition, go ahead and select it anyway.
751 */
752 if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
753 !(receiver->ith_option & MACH_RCV_LARGE)) {
754 receiver->ith_kmsg = kmsg;
755 receiver->ith_seqno = mqueue->imq_seqno++;
756 #if MACH_FLIPC
757 mach_node_t node = kmsg->ikm_node;
758 #endif
759 thread_unlock(receiver);
760 splx(th_spl);
761
762 /* we didn't need our reserved spot in the queue */
763 ipc_mqueue_release_msgcount(mqueue);
764
765 #if MACH_FLIPC
766 if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
767 flipc_msg_ack(node, mqueue, TRUE);
768 }
769 #endif
770 break;
771 }
772
773 /*
774 * Otherwise, this thread needs to be released to run
775 * and handle its error without getting the message. We
776 * need to go back and pick another one.
777 */
778 receiver->ith_receiver_name = mqueue->imq_receiver_name;
779 receiver->ith_kmsg = IKM_NULL;
780 receiver->ith_seqno = 0;
781 thread_unlock(receiver);
782 splx(th_spl);
783 }
784
785 out_unlock:
786 /* clear the waitq boost we may have been given */
787 waitq_clear_promotion_locked(waitq, current_thread());
788 waitq_unlock(waitq);
789 waitq_prepost_release_reserve(reserved_prepost);
790
791 if (destroy_msg) {
792 ipc_kmsg_destroy(kmsg);
793 }
794
795 counter_inc(¤t_task()->messages_sent);
796 return;
797 }
798
799
800 static void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)801 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
802 {
803 thread_t self = current_thread();
804 mach_msg_option_t option = self->ith_option;
805
806 /*
807 * why did we wake up?
808 */
809 switch (saved_wait_result) {
810 case THREAD_TIMED_OUT:
811 self->ith_state = MACH_RCV_TIMED_OUT;
812 return;
813
814 case THREAD_INTERRUPTED:
815 self->ith_state = MACH_RCV_INTERRUPTED;
816 return;
817
818 case THREAD_RESTART:
819 /* something bad happened to the port/set */
820 self->ith_state = MACH_RCV_PORT_CHANGED;
821 return;
822
823 case THREAD_AWAKENED:
824 /*
825 * We do not need to go select a message, somebody
826 * handed us one (or a too-large indication).
827 */
828 switch (self->ith_state) {
829 case MACH_RCV_SCATTER_SMALL:
830 case MACH_RCV_TOO_LARGE:
831 /*
832 * Somebody tried to give us a too large
833 * message. If we indicated that we cared,
834 * then they only gave us the indication,
835 * otherwise they gave us the indication
836 * AND the message anyway.
837 */
838 if (option & MACH_RCV_LARGE) {
839 return;
840 }
841 return;
842 case MACH_MSG_SUCCESS:
843 return;
844 case MACH_PEEK_READY:
845 return;
846
847 default:
848 panic("ipc_mqueue_receive_results: strange ith_state");
849 }
850
851 default:
852 panic("ipc_mqueue_receive_results: strange wait_result");
853 }
854 }
855
856 void
ipc_mqueue_receive_continue(__unused void * param,wait_result_t wresult)857 ipc_mqueue_receive_continue(
858 __unused void *param,
859 wait_result_t wresult)
860 {
861 ipc_mqueue_receive_results(wresult);
862 mach_msg_receive_continue(); /* hard-coded for now */
863 }
864
865 /*
866 * Routine: ipc_mqueue_receive
867 * Purpose:
868 * Receive a message from a message queue.
869 *
870 * Conditions:
871 * Our caller must hold a reference for the port or port set
872 * to which this queue belongs, to keep the queue
873 * from being deallocated.
874 *
875 * The kmsg is returned with clean header fields
876 * and with the circular bit turned off through the ith_kmsg
877 * field of the thread's receive continuation state.
878 * Returns:
879 * MACH_MSG_SUCCESS Message returned in ith_kmsg.
880 * MACH_RCV_TOO_LARGE Message size returned in ith_msize.
881 * MACH_RCV_TIMED_OUT No message obtained.
882 * MACH_RCV_INTERRUPTED No message obtained.
883 * MACH_RCV_PORT_DIED Port/set died; no message.
884 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
885 *
886 */
887
888 void
ipc_mqueue_receive(struct waitq * waitq,mach_msg_option_t option,mach_msg_size_t max_size,mach_msg_timeout_t rcv_timeout,int interruptible)889 ipc_mqueue_receive(
890 struct waitq *waitq,
891 mach_msg_option_t option,
892 mach_msg_size_t max_size,
893 mach_msg_timeout_t rcv_timeout,
894 int interruptible)
895 {
896 wait_result_t wresult;
897 thread_t self = current_thread();
898
899 wresult = ipc_mqueue_receive_on_thread(waitq, option, max_size,
900 rcv_timeout, interruptible, self);
901 /* object unlocked */
902 if (wresult == THREAD_NOT_WAITING) {
903 return;
904 }
905
906 if (wresult == THREAD_WAITING) {
907 if (self->ith_continuation) {
908 thread_block(ipc_mqueue_receive_continue);
909 }
910 /* NOTREACHED */
911
912 wresult = thread_block(THREAD_CONTINUE_NULL);
913 }
914 ipc_mqueue_receive_results(wresult);
915 }
916
917 /*
918 * Routine: ipc_mqueue_receive_on_thread
919 * Purpose:
920 * Receive a message from a message queue using a specified thread.
921 * If no message available, assert_wait on the appropriate waitq.
922 *
923 * Conditions:
924 * Assumes thread is self.
925 * No port/port-set lock held.
926 * May have assert-waited. Caller must block in those cases.
927 */
928 wait_result_t
ipc_mqueue_receive_on_thread(struct waitq * waitq,mach_msg_option_t option,mach_msg_size_t max_size,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread)929 ipc_mqueue_receive_on_thread(
930 struct waitq *waitq,
931 mach_msg_option_t option,
932 mach_msg_size_t max_size,
933 mach_msg_timeout_t rcv_timeout,
934 int interruptible,
935 thread_t thread)
936 {
937 ipc_object_t object = io_from_waitq(waitq);
938 wait_result_t wresult;
939 uint64_t deadline;
940 struct turnstile *rcv_turnstile = TURNSTILE_NULL;
941
942 io_lock(object);
943
944 /* no need to reserve anything: we never prepost to anyone */
945
946 if (!waitq_is_valid(waitq)) {
947 /* someone raced us to destroy this mqueue/port! */
948 io_unlock(object);
949 /*
950 * ipc_mqueue_receive_results updates the thread's ith_state
951 * TODO: differentiate between rights being moved and
952 * rights/ports being destroyed (21885327)
953 */
954 return THREAD_RESTART;
955 }
956
957 if (waitq_is_set(waitq)) {
958 __block ipc_mqueue_t port_mq = IMQ_NULL;
959
960 (void)waitq_set_iterate_preposts((struct waitq_set *)waitq,
961 ^(struct waitq *wq) {
962 ipc_port_t port = ip_from_waitq(wq);
963
964 /*
965 * If there are no messages on this queue,
966 * skip it and remove it from the prepost list
967 */
968 if (ipc_kmsg_queue_empty(&port->ip_messages.imq_messages)) {
969 return WQ_ITERATE_INVALIDATE_CONTINUE;
970 }
971
972 /*
973 * There are messages waiting on this port.
974 * Instruct the prepost iteration logic to break, but keep the
975 * waitq locked.
976 */
977 port_mq = &port->ip_messages;
978 return WQ_ITERATE_BREAK_KEEP_LOCKED;
979 });
980 /* Returns with port locked */
981
982 if (port_mq != IMQ_NULL) {
983 /*
984 * We get here if there is at least one message
985 * waiting on port_mq. We have instructed the prepost
986 * iteration logic to leave both the port_mq and the
987 * set waitq locked.
988 *
989 * TODO: previously, we would place this port at the
990 * back of the prepost list...
991 */
992 io_unlock(object);
993
994 /*
995 * Continue on to handling the message with just
996 * the port waitq locked.
997 */
998 if (option & MACH_PEEK_MSG) {
999 ipc_mqueue_peek_on_thread_locked(port_mq, option, thread);
1000 } else {
1001 ipc_mqueue_select_on_thread_locked(port_mq,
1002 option, max_size, thread);
1003 }
1004
1005 ip_mq_unlock(ip_from_mq(port_mq));
1006 return THREAD_NOT_WAITING;
1007 }
1008 } else if (waitq_is_queue(waitq) || waitq_is_turnstile_proxy(waitq)) {
1009 ipc_port_t port = ip_from_waitq(waitq);
1010 ipc_kmsg_queue_t kmsgs;
1011
1012 /*
1013 * Receive on a single port. Just try to get the messages.
1014 */
1015 kmsgs = &port->ip_messages.imq_messages;
1016 if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
1017 if (option & MACH_PEEK_MSG) {
1018 ipc_mqueue_peek_on_thread_locked(&port->ip_messages, option, thread);
1019 } else {
1020 ipc_mqueue_select_on_thread_locked(&port->ip_messages,
1021 option, max_size, thread);
1022 }
1023 io_unlock(object);
1024 return THREAD_NOT_WAITING;
1025 }
1026 } else {
1027 panic("Unknown waitq type 0x%x: likely memory corruption!",
1028 waitq->waitq_type);
1029 }
1030
1031 /*
1032 * Looks like we'll have to block. The waitq we will
1033 * block on (whether the set's or the local port's) is
1034 * still locked.
1035 */
1036 if (option & MACH_RCV_TIMEOUT) {
1037 if (rcv_timeout == 0) {
1038 io_unlock(object);
1039 thread->ith_state = MACH_RCV_TIMED_OUT;
1040 return THREAD_NOT_WAITING;
1041 }
1042 }
1043
1044 thread->ith_option = option;
1045 thread->ith_rsize = max_size;
1046 thread->ith_msize = 0;
1047
1048 if (option & MACH_PEEK_MSG) {
1049 thread->ith_state = MACH_PEEK_IN_PROGRESS;
1050 } else {
1051 thread->ith_state = MACH_RCV_IN_PROGRESS;
1052 }
1053
1054 if (option & MACH_RCV_TIMEOUT) {
1055 clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
1056 } else {
1057 deadline = 0;
1058 }
1059
1060 /*
1061 * Threads waiting on a reply port (not portset)
1062 * will wait on its receive turnstile.
1063 *
1064 * Donate waiting thread's turnstile and
1065 * setup inheritor for special reply port.
1066 * Based on the state of the special reply
1067 * port, the inheritor would be the send
1068 * turnstile of the connection port on which
1069 * the send of sync ipc would happen or
1070 * workloop's turnstile who would reply to
1071 * the sync ipc message.
1072 *
1073 * Pass in mqueue wait in waitq_assert_wait to
1074 * support port set wakeup. The mqueue waitq of port
1075 * will be converted to to turnstile waitq
1076 * in waitq_assert_wait instead of global waitqs.
1077 */
1078 if (waitq_is_turnstile_proxy(waitq)) {
1079 ipc_port_t port = ip_from_waitq(waitq);
1080 rcv_turnstile = turnstile_prepare((uintptr_t)port,
1081 port_rcv_turnstile_address(port),
1082 TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
1083
1084 ipc_port_recv_update_inheritor(port, rcv_turnstile,
1085 TURNSTILE_DELAYED_UPDATE);
1086 }
1087
1088 thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
1089 wresult = waitq_assert_wait64_locked(waitq,
1090 IPC_MQUEUE_RECEIVE,
1091 interruptible,
1092 TIMEOUT_URGENCY_USER_NORMAL,
1093 deadline,
1094 TIMEOUT_NO_LEEWAY,
1095 thread);
1096 /* preposts should be detected above, not here */
1097 if (wresult == THREAD_AWAKENED) {
1098 panic("ipc_mqueue_receive_on_thread: sleep walking");
1099 }
1100
1101 io_unlock(object);
1102
1103 /* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1104 if (rcv_turnstile != TURNSTILE_NULL) {
1105 turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
1106 }
1107 /* Its callers responsibility to call turnstile_complete to get the turnstile back */
1108
1109 return wresult;
1110 }
1111
1112
1113 /*
1114 * Routine: ipc_mqueue_peek_on_thread_locked
1115 * Purpose:
1116 * A receiver discovered that there was a message on the queue
1117 * before he had to block. Tell a thread about the message queue,
1118 * but don't pick off any messages.
1119 * Conditions:
1120 * port_mq locked
1121 * at least one message on port_mq's message queue
1122 *
1123 * Returns: (on thread->ith_state)
1124 * MACH_PEEK_READY ith_peekq contains a message queue
1125 */
1126 void
ipc_mqueue_peek_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option_t option,thread_t thread)1127 ipc_mqueue_peek_on_thread_locked(
1128 ipc_mqueue_t port_mq,
1129 mach_msg_option_t option,
1130 thread_t thread)
1131 {
1132 (void)option;
1133 assert(option & MACH_PEEK_MSG);
1134 assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
1135
1136 /*
1137 * Take a reference on the mqueue's associated port:
1138 * the peeking thread will be responsible to release this reference
1139 */
1140 ip_reference(ip_from_mq(port_mq));
1141 thread->ith_peekq = port_mq;
1142 thread->ith_state = MACH_PEEK_READY;
1143 }
1144
1145 /*
1146 * Routine: ipc_mqueue_select_on_thread_locked
1147 * Purpose:
1148 * A receiver discovered that there was a message on the queue
1149 * before he had to block. Pick the message off the queue and
1150 * "post" it to thread.
1151 * Conditions:
1152 * port locked.
1153 * thread not locked.
1154 * There is a message.
1155 * No need to reserve prepost objects - it will never prepost
1156 *
1157 * Returns:
1158 * MACH_MSG_SUCCESS Actually selected a message for ourselves.
1159 * MACH_RCV_TOO_LARGE May or may not have pull it, but it is large
1160 */
1161 void
ipc_mqueue_select_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option_t option,mach_msg_size_t max_size,thread_t thread)1162 ipc_mqueue_select_on_thread_locked(
1163 ipc_mqueue_t port_mq,
1164 mach_msg_option_t option,
1165 mach_msg_size_t max_size,
1166 thread_t thread)
1167 {
1168 ipc_kmsg_t kmsg;
1169 mach_msg_return_t mr = MACH_MSG_SUCCESS;
1170 mach_msg_size_t msize;
1171
1172 /*
1173 * Do some sanity checking of our ability to receive
1174 * before pulling the message off the queue.
1175 */
1176 kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1177 assert(kmsg != IKM_NULL);
1178
1179 /*
1180 * If we really can't receive it, but we had the
1181 * MACH_RCV_LARGE option set, then don't take it off
1182 * the queue, instead return the appropriate error
1183 * (and size needed).
1184 */
1185 msize = ipc_kmsg_copyout_size(kmsg, thread->map);
1186 if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) {
1187 mr = MACH_RCV_TOO_LARGE;
1188 if (option & MACH_RCV_LARGE) {
1189 thread->ith_receiver_name = port_mq->imq_receiver_name;
1190 thread->ith_kmsg = IKM_NULL;
1191 thread->ith_msize = msize;
1192 thread->ith_seqno = 0;
1193 thread->ith_state = mr;
1194 return;
1195 }
1196 }
1197
1198 ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1199 #if MACH_FLIPC
1200 if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
1201 flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
1202 }
1203 #endif
1204 ipc_mqueue_release_msgcount(port_mq);
1205 thread->ith_seqno = port_mq->imq_seqno++;
1206 thread->ith_kmsg = kmsg;
1207 thread->ith_state = mr;
1208
1209 counter_inc(¤t_task()->messages_received);
1210 return;
1211 }
1212
1213 /*
1214 * Routine: ipc_mqueue_peek_locked
1215 * Purpose:
1216 * Peek at a (non-set) message queue to see if it has a message
1217 * matching the sequence number provided (if zero, then the
1218 * first message in the queue) and return vital info about the
1219 * message.
1220 *
1221 * Conditions:
1222 * The io object corresponding to mq is locked by callers.
1223 * Other locks may be held by callers, so this routine cannot block.
1224 * Caller holds reference on the message queue.
1225 */
1226 unsigned
ipc_mqueue_peek_locked(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1227 ipc_mqueue_peek_locked(ipc_mqueue_t mq,
1228 mach_port_seqno_t * seqnop,
1229 mach_msg_size_t * msg_sizep,
1230 mach_msg_id_t * msg_idp,
1231 mach_msg_max_trailer_t * msg_trailerp,
1232 ipc_kmsg_t *kmsgp)
1233 {
1234 ipc_kmsg_queue_t kmsgq;
1235 ipc_kmsg_t kmsg;
1236 mach_port_seqno_t seqno, msgoff;
1237 unsigned res = 0;
1238
1239 seqno = 0;
1240 if (seqnop != NULL) {
1241 seqno = *seqnop;
1242 }
1243
1244 if (seqno == 0) {
1245 seqno = mq->imq_seqno;
1246 msgoff = 0;
1247 } else if (seqno >= mq->imq_seqno &&
1248 seqno < mq->imq_seqno + mq->imq_msgcount) {
1249 msgoff = seqno - mq->imq_seqno;
1250 } else {
1251 goto out;
1252 }
1253
1254 /* look for the message that would match that seqno */
1255 kmsgq = &mq->imq_messages;
1256 kmsg = ipc_kmsg_queue_first(kmsgq);
1257 while (msgoff-- && kmsg != IKM_NULL) {
1258 kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1259 }
1260 if (kmsg == IKM_NULL) {
1261 goto out;
1262 }
1263
1264 /* found one - return the requested info */
1265 if (seqnop != NULL) {
1266 *seqnop = seqno;
1267 }
1268 if (msg_sizep != NULL) {
1269 *msg_sizep = kmsg->ikm_header->msgh_size;
1270 }
1271 if (msg_idp != NULL) {
1272 *msg_idp = kmsg->ikm_header->msgh_id;
1273 }
1274 if (msg_trailerp != NULL) {
1275 memcpy(msg_trailerp,
1276 (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
1277 mach_round_msg(kmsg->ikm_header->msgh_size)),
1278 sizeof(mach_msg_max_trailer_t));
1279 }
1280 if (kmsgp != NULL) {
1281 *kmsgp = kmsg;
1282 }
1283
1284 res = 1;
1285
1286 out:
1287 return res;
1288 }
1289
1290
1291 /*
1292 * Routine: ipc_mqueue_peek
1293 * Purpose:
1294 * Peek at a (non-set) message queue to see if it has a message
1295 * matching the sequence number provided (if zero, then the
1296 * first message in the queue) and return vital info about the
1297 * message.
1298 *
1299 * Conditions:
1300 * The ipc_mqueue_t is unlocked.
1301 * Locks may be held by callers, so this routine cannot block.
1302 * Caller holds reference on the message queue.
1303 */
1304 unsigned
ipc_mqueue_peek(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1305 ipc_mqueue_peek(ipc_mqueue_t mq,
1306 mach_port_seqno_t * seqnop,
1307 mach_msg_size_t * msg_sizep,
1308 mach_msg_id_t * msg_idp,
1309 mach_msg_max_trailer_t * msg_trailerp,
1310 ipc_kmsg_t *kmsgp)
1311 {
1312 ipc_port_t port = ip_from_mq(mq);
1313 unsigned res;
1314
1315 ip_mq_lock(port);
1316
1317 res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
1318 msg_trailerp, kmsgp);
1319
1320 ip_mq_unlock(port);
1321 return res;
1322 }
1323
1324 /*
1325 * Routine: ipc_mqueue_release_peek_ref
1326 * Purpose:
1327 * Release the reference on an mqueue's associated port which was
1328 * granted to a thread in ipc_mqueue_peek_on_thread (on the
1329 * MACH_PEEK_MSG thread wakeup path).
1330 *
1331 * Conditions:
1332 * The ipc_mqueue_t should be locked on entry.
1333 * The ipc_mqueue_t will be _unlocked_ on return
1334 * (and potentially invalid!)
1335 *
1336 */
1337 void
ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)1338 ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)
1339 {
1340 ipc_port_t port = ip_from_mq(mqueue);
1341
1342 ip_mq_lock_held(port);
1343
1344 /*
1345 * clear any preposts this mq may have generated
1346 * (which would cause subsequent immediate wakeups)
1347 */
1348 waitq_clear_prepost_locked(&port->ip_waitq);
1349
1350 ip_mq_unlock(port);
1351
1352 /*
1353 * release the port reference: we need to do this outside the lock
1354 * because we might be holding the last port reference!
1355 **/
1356 ip_release(port);
1357 }
1358
1359 /*
1360 * Routine: ipc_mqueue_set_gather_member_names
1361 * Purpose:
1362 * Discover all ports which are members of a given port set.
1363 * Because the waitq linkage mechanism was redesigned to save
1364 * significan amounts of memory, it no longer keeps back-pointers
1365 * from a port set to a port. Therefore, we must iterate over all
1366 * ports within a given IPC space and individually query them to
1367 * see if they are members of the given set. Port names of ports
1368 * found to be members of the given set will be gathered into the
1369 * provided 'names' array. Actual returned names are limited to
1370 * maxnames entries, but we keep counting the actual number of
1371 * members to let the caller decide to retry if necessary.
1372 *
1373 * Conditions:
1374 * Locks may be held by callers, so this routine cannot block.
1375 * Caller holds reference on the message queue (via port set).
1376 */
1377 void
ipc_mqueue_set_gather_member_names(ipc_space_t space,ipc_pset_t pset,ipc_entry_num_t maxnames,mach_port_name_t * names,ipc_entry_num_t * actualp)1378 ipc_mqueue_set_gather_member_names(
1379 ipc_space_t space,
1380 ipc_pset_t pset,
1381 ipc_entry_num_t maxnames,
1382 mach_port_name_t *names,
1383 ipc_entry_num_t *actualp)
1384 {
1385 ipc_entry_t table;
1386 ipc_entry_num_t tsize;
1387 struct waitq_set *wqset;
1388 ipc_entry_num_t actual = 0;
1389
1390 assert(pset != IPS_NULL);
1391 wqset = &pset->ips_wqset;
1392
1393 assert(space != IS_NULL);
1394 is_read_lock(space);
1395 if (!is_active(space)) {
1396 is_read_unlock(space);
1397 goto out;
1398 }
1399
1400 if (!waitq_set_is_valid(wqset)) {
1401 is_read_unlock(space);
1402 goto out;
1403 }
1404
1405 table = is_active_table(space);
1406 tsize = table->ie_size;
1407 for (ipc_entry_num_t idx = 1; idx < tsize; idx++) {
1408 ipc_entry_t entry = &table[idx];
1409
1410 /* only receive rights can be members of port sets */
1411 if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) {
1412 ipc_port_t port = ip_object_to_port(entry->ie_object);
1413
1414 assert(IP_VALID(port));
1415 ip_mq_lock(port);
1416 if (waitq_member_locked(&port->ip_waitq, wqset)) {
1417 if (actual < maxnames) {
1418 names[actual] = ip_get_receiver_name(port);
1419 }
1420 actual++;
1421 }
1422 ip_mq_unlock(port);
1423 }
1424 }
1425
1426 is_read_unlock(space);
1427
1428 out:
1429 *actualp = actual;
1430 }
1431
1432
1433 /*
1434 * Routine: ipc_mqueue_destroy_locked
1435 * Purpose:
1436 * Destroy a message queue.
1437 * Set any blocked senders running.
1438 * Destroy the kmsgs in the queue.
1439 * Conditions:
1440 * port locked
1441 * Receivers were removed when the receive right was "changed"
1442 */
1443 boolean_t
ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)1444 ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)
1445 {
1446 ipc_port_t port = ip_from_mq(mqueue);
1447 boolean_t reap = FALSE;
1448 struct turnstile *send_turnstile = port_send_turnstile(port);
1449
1450 /*
1451 * rouse all blocked senders
1452 * (don't boost anyone - we're tearing this queue down)
1453 * (never preposts)
1454 */
1455 port->ip_fullwaiters = false;
1456
1457 if (send_turnstile != TURNSTILE_NULL) {
1458 waitq_wakeup64_all(&send_turnstile->ts_waitq,
1459 IPC_MQUEUE_FULL,
1460 THREAD_RESTART,
1461 WAITQ_ALL_PRIORITIES);
1462 }
1463
1464 #if MACH_FLIPC
1465 ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
1466 if (first) {
1467 ipc_kmsg_t kmsg = first;
1468 do {
1469 if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport)) {
1470 flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
1471 }
1472 kmsg = kmsg->ikm_next;
1473 } while (kmsg != first);
1474 }
1475 #endif
1476
1477 /*
1478 * Move messages from the specified queue to the per-thread
1479 * clean/drain queue while we have the mqueue lock.
1480 */
1481 reap = ipc_kmsg_delayed_destroy_queue(&mqueue->imq_messages);
1482
1483 /*
1484 * Wipe out message count, both for messages about to be
1485 * reaped and for reserved space for (previously) woken senders.
1486 * This is the indication to them that their reserved space is gone
1487 * (the mqueue was destroyed).
1488 */
1489 mqueue->imq_msgcount = 0;
1490
1491 /*
1492 * invalidate the waitq for subsequent mqueue operations,
1493 * the port lock could be dropped after invalidating the mqueue.
1494 */
1495 waitq_invalidate(&port->ip_waitq);
1496
1497 /* clear out any preposting we may have done, drops and reacquires port lock */
1498 waitq_clear_prepost_locked(&port->ip_waitq);
1499
1500 /*
1501 * assert that we are destroying / invalidating a queue that's
1502 * not a member of any other queue.
1503 */
1504 assert(port->ip_preposts == 0);
1505 assert(!ip_in_pset(port));
1506
1507 return reap;
1508 }
1509
1510 /*
1511 * Routine: ipc_mqueue_set_qlimit_locked
1512 * Purpose:
1513 * Changes a message queue limit; the maximum number
1514 * of messages which may be queued.
1515 * Conditions:
1516 * Port locked.
1517 */
1518
1519 void
ipc_mqueue_set_qlimit_locked(ipc_mqueue_t mqueue,mach_port_msgcount_t qlimit)1520 ipc_mqueue_set_qlimit_locked(
1521 ipc_mqueue_t mqueue,
1522 mach_port_msgcount_t qlimit)
1523 {
1524 ipc_port_t port = ip_from_mq(mqueue);
1525
1526 assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1527
1528 /* wake up senders allowed by the new qlimit */
1529 if (qlimit > mqueue->imq_qlimit) {
1530 mach_port_msgcount_t i, wakeup;
1531 struct turnstile *send_turnstile = port_send_turnstile(port);
1532
1533 /* caution: wakeup, qlimit are unsigned */
1534 wakeup = qlimit - mqueue->imq_qlimit;
1535
1536 for (i = 0; i < wakeup; i++) {
1537 /*
1538 * boost the priority of the awoken thread
1539 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1540 * the message queue slot we've just reserved.
1541 *
1542 * NOTE: this will never prepost
1543 */
1544 if (send_turnstile == TURNSTILE_NULL ||
1545 waitq_wakeup64_one(&send_turnstile->ts_waitq,
1546 IPC_MQUEUE_FULL,
1547 THREAD_AWAKENED,
1548 WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
1549 port->ip_fullwaiters = false;
1550 break;
1551 }
1552 mqueue->imq_msgcount++; /* give it to the awakened thread */
1553 }
1554 }
1555 mqueue->imq_qlimit = (uint16_t)qlimit;
1556 }
1557
1558 /*
1559 * Routine: ipc_mqueue_set_seqno_locked
1560 * Purpose:
1561 * Changes an mqueue's sequence number.
1562 * Conditions:
1563 * Caller holds a reference to the queue's containing object.
1564 */
1565 void
ipc_mqueue_set_seqno_locked(ipc_mqueue_t mqueue,mach_port_seqno_t seqno)1566 ipc_mqueue_set_seqno_locked(
1567 ipc_mqueue_t mqueue,
1568 mach_port_seqno_t seqno)
1569 {
1570 mqueue->imq_seqno = seqno;
1571 }
1572
1573
1574 /*
1575 * Routine: ipc_mqueue_copyin
1576 * Purpose:
1577 * Convert a name in a space to a message queue.
1578 * Conditions:
1579 * Nothing locked. If successful, the caller gets a ref for
1580 * for the object. This ref ensures the continued existence of
1581 * the queue.
1582 * Returns:
1583 * MACH_MSG_SUCCESS Found a message queue.
1584 * MACH_RCV_INVALID_NAME The space is dead.
1585 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
1586 * MACH_RCV_INVALID_NAME
1587 * The denoted right is not receive or port set.
1588 * MACH_RCV_IN_SET Receive right is a member of a set.
1589 */
1590
1591 mach_msg_return_t
ipc_mqueue_copyin(ipc_space_t space,mach_port_name_t name,ipc_object_t * objectp)1592 ipc_mqueue_copyin(
1593 ipc_space_t space,
1594 mach_port_name_t name,
1595 ipc_object_t *objectp)
1596 {
1597 ipc_entry_bits_t bits;
1598 ipc_object_t object;
1599 kern_return_t kr;
1600
1601 kr = ipc_right_lookup_read(space, name, &bits, &object);
1602 if (kr != KERN_SUCCESS) {
1603 return MACH_RCV_INVALID_NAME;
1604 }
1605 /* object is locked and active */
1606
1607 if (bits & MACH_PORT_TYPE_RECEIVE) {
1608 __assert_only ipc_port_t port = ip_object_to_port(object);
1609 assert(ip_get_receiver_name(port) == name);
1610 assert(ip_in_space(port, space));
1611 }
1612 if (bits & (MACH_PORT_TYPE_RECEIVE | MACH_PORT_TYPE_PORT_SET)) {
1613 io_reference(object);
1614 io_unlock(object);
1615 } else {
1616 io_unlock(object);
1617 /* guard exception if we never held the receive right in this entry */
1618 if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) {
1619 mach_port_guard_exception(name, 0, 0, kGUARD_EXC_RCV_INVALID_NAME);
1620 }
1621 return MACH_RCV_INVALID_NAME;
1622 }
1623
1624 *objectp = object;
1625 return MACH_MSG_SUCCESS;
1626 }
1627