xref: /xnu-8019.80.24/osfmk/ipc/ipc_mqueue.c (revision a325d9c4a84054e40bbe985afedcb50ab80993ea)
1 /*
2  * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 /*
29  * @OSF_FREE_COPYRIGHT@
30  */
31 /*
32  * Mach Operating System
33  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34  * All Rights Reserved.
35  *
36  * Permission to use, copy, modify and distribute this software and its
37  * documentation is hereby granted, provided that both the copyright
38  * notice and this permission notice appear in all copies of the
39  * software, derivative works or modified versions, and any portions
40  * thereof, and that both notices appear in supporting documentation.
41  *
42  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45  *
46  * Carnegie Mellon requests users of this software to return to
47  *
48  *  Software Distribution Coordinator  or  [email protected]
49  *  School of Computer Science
50  *  Carnegie Mellon University
51  *  Pittsburgh PA 15213-3890
52  *
53  * any improvements or extensions that they make and grant Carnegie Mellon
54  * the rights to redistribute these changes.
55  */
56 /*
57  */
58 /*
59  *	File:	ipc/ipc_mqueue.c
60  *	Author:	Rich Draves
61  *	Date:	1989
62  *
63  *	Functions to manipulate IPC message queues.
64  */
65 /*
66  * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67  * support for mandatory and extensible security protections.  This notice
68  * is included in support of clause 2.2 (b) of the Apple Public License,
69  * Version 2.0.
70  */
71 
72 
73 #include <mach/port.h>
74 #include <mach/message.h>
75 #include <mach/sync_policy.h>
76 
77 #include <kern/assert.h>
78 #include <kern/counter.h>
79 #include <kern/sched_prim.h>
80 #include <kern/ipc_kobject.h>
81 #include <kern/ipc_mig.h>       /* XXX - for mach_msg_receive_continue */
82 #include <kern/misc_protos.h>
83 #include <kern/task.h>
84 #include <kern/thread.h>
85 #include <kern/waitq.h>
86 
87 #include <ipc/port.h>
88 #include <ipc/ipc_mqueue.h>
89 #include <ipc/ipc_kmsg.h>
90 #include <ipc/ipc_right.h>
91 #include <ipc/ipc_port.h>
92 #include <ipc/ipc_pset.h>
93 #include <ipc/ipc_space.h>
94 
95 #if MACH_FLIPC
96 #include <ipc/flipc.h>
97 #endif
98 
99 #ifdef __LP64__
100 #include <vm/vm_map.h>
101 #endif
102 
103 #include <sys/event.h>
104 
105 extern char     *proc_name_address(void *p);
106 
107 int ipc_mqueue_full;            /* address is event for queue space */
108 int ipc_mqueue_rcv;             /* address is event for message arrival */
109 
110 /* forward declarations */
111 static void ipc_mqueue_receive_results(wait_result_t result);
112 static void ipc_mqueue_peek_on_thread_locked(
113 	ipc_mqueue_t        port_mq,
114 	mach_msg_option_t   option,
115 	thread_t            thread);
116 
117 /* Deliver message to message queue or waiting receiver */
118 static void ipc_mqueue_post(
119 	ipc_mqueue_t            mqueue,
120 	ipc_kmsg_t              kmsg,
121 	mach_msg_option_t       option);
122 
123 /*
124  *	Routine:	ipc_mqueue_init
125  *	Purpose:
126  *		Initialize a newly-allocated message queue.
127  */
128 void
ipc_mqueue_init(ipc_mqueue_t mqueue)129 ipc_mqueue_init(
130 	ipc_mqueue_t            mqueue)
131 {
132 	ipc_kmsg_queue_init(&mqueue->imq_messages);
133 	mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
134 	klist_init(&mqueue->imq_klist);
135 }
136 
137 /*
138  *	Routine:	ipc_mqueue_add_unlock
139  *	Purpose:
140  *		Associate the portset's mqueue with the port's mqueue.
141  *		This has to be done so that posting the port will wakeup
142  *		a portset waiter.  If there are waiters on the portset
143  *		mqueue and messages on the port mqueue, try to match them
144  *		up now.
145  *	Conditions:
146  *		Port locked on entry, unlocked on return.
147  *		The waitq set needs to have its link initialized.
148  *		May block.
149  */
150 kern_return_t
ipc_mqueue_add_unlock(ipc_mqueue_t port_mqueue,ipc_pset_t pset,waitq_ref_t * reserved_link,uint64_t * reserved_prepost,boolean_t unlink_before_adding)151 ipc_mqueue_add_unlock(
152 	ipc_mqueue_t    port_mqueue,
153 	ipc_pset_t      pset,
154 	waitq_ref_t     *reserved_link,
155 	uint64_t        *reserved_prepost,
156 	boolean_t       unlink_before_adding)
157 {
158 	ipc_port_t       port = ip_from_mq(port_mqueue);
159 	struct waitq     *port_waitq = &port->ip_waitq;
160 	struct waitq_set *set_waitq = &pset->ips_wqset;
161 	ipc_kmsg_queue_t kmsgq;
162 	ipc_kmsg_t       kmsg, next;
163 	kern_return_t    kr = KERN_SUCCESS;
164 
165 	assert(waitqs_is_linked(set_waitq));
166 
167 	/*
168 	 * The link operation is now under the same lock-hold as
169 	 * message iteration and thread wakeup, but doesn't have to be...
170 	 */
171 	if (unlink_before_adding) {
172 		waitq_unlink_all_relink_unlock(port_waitq, set_waitq);
173 		/*
174 		 * While the port was unlocked,
175 		 * it could have been moved out of the set already.
176 		 *
177 		 * If this is the case, we should not go ahead with wakeups
178 		 * and message deliveries.
179 		 */
180 		ip_mq_lock(ip_from_mq(port_mqueue));
181 		if (!waitq_member_locked(port_waitq, set_waitq)) {
182 			goto leave;
183 		}
184 	} else {
185 		assert(reserved_link && reserved_link->wqr_value != 0);
186 		kr = waitq_link(port_waitq, set_waitq, WAITQ_ALREADY_LOCKED, reserved_link);
187 		if (kr != KERN_SUCCESS) {
188 			goto leave;
189 		}
190 	}
191 
192 	/*
193 	 * Now that the set has been added to the port, there may be
194 	 * messages queued on the port and threads waiting on the set
195 	 * waitq.  Lets get them together.
196 	 */
197 	kmsgq = &port_mqueue->imq_messages;
198 	for (kmsg = ipc_kmsg_queue_first(kmsgq);
199 	    kmsg != IKM_NULL;
200 	    kmsg = next) {
201 		next = ipc_kmsg_queue_next(kmsgq, kmsg);
202 
203 		for (;;) {
204 			thread_t th;
205 			mach_msg_size_t msize;
206 			spl_t th_spl;
207 
208 			th = waitq_wakeup64_identify_locked(
209 				port_waitq,
210 				IPC_MQUEUE_RECEIVE,
211 				THREAD_AWAKENED, &th_spl,
212 				reserved_prepost, WAITQ_ALL_PRIORITIES,
213 				WAITQ_KEEP_LOCKED);
214 			/* waitq/mqueue still locked, thread locked */
215 
216 			if (th == THREAD_NULL) {
217 				goto leave;
218 			}
219 
220 			/*
221 			 * If the receiver waited with a facility not directly
222 			 * related to Mach messaging, then it isn't prepared to get
223 			 * handed the message directly.  Just set it running, and
224 			 * go look for another thread that can.
225 			 */
226 			if (th->ith_state != MACH_RCV_IN_PROGRESS) {
227 				if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
228 					/*
229 					 * wakeup the peeking thread, but
230 					 * continue to loop over the threads
231 					 * waiting on the port's mqueue to see
232 					 * if there are any actual receivers
233 					 */
234 					ipc_mqueue_peek_on_thread_locked(port_mqueue,
235 					    th->ith_option,
236 					    th);
237 				}
238 				thread_unlock(th);
239 				splx(th_spl);
240 				continue;
241 			}
242 
243 			/*
244 			 * Found a receiver. see if they can handle the message
245 			 * correctly (the message is not too large for them, or
246 			 * they didn't care to be informed that the message was
247 			 * too large).  If they can't handle it, take them off
248 			 * the list and let them go back and figure it out and
249 			 * just move onto the next.
250 			 */
251 			msize = ipc_kmsg_copyout_size(kmsg, th->map);
252 			if (th->ith_rsize <
253 			    (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
254 				th->ith_state = MACH_RCV_TOO_LARGE;
255 				th->ith_msize = msize;
256 				if (th->ith_option & MACH_RCV_LARGE) {
257 					/*
258 					 * let him go without message
259 					 */
260 					th->ith_receiver_name = port_mqueue->imq_receiver_name;
261 					th->ith_kmsg = IKM_NULL;
262 					th->ith_seqno = 0;
263 					thread_unlock(th);
264 					splx(th_spl);
265 					continue; /* find another thread */
266 				}
267 			} else {
268 				th->ith_state = MACH_MSG_SUCCESS;
269 			}
270 
271 			/*
272 			 * This thread is going to take this message,
273 			 * so give it to him.
274 			 */
275 			ipc_kmsg_rmqueue(kmsgq, kmsg);
276 #if MACH_FLIPC
277 			mach_node_t  node = kmsg->ikm_node;
278 #endif
279 			ipc_mqueue_release_msgcount(port_mqueue);
280 
281 			th->ith_kmsg = kmsg;
282 			th->ith_seqno = port_mqueue->imq_seqno++;
283 			thread_unlock(th);
284 			splx(th_spl);
285 #if MACH_FLIPC
286 			if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
287 				flipc_msg_ack(node, port_mqueue, TRUE);
288 			}
289 #endif
290 			break;  /* go to next message */
291 		}
292 	}
293 
294 leave:
295 	ip_mq_unlock(ip_from_mq(port_mqueue));
296 	return kr;
297 }
298 
299 
300 /*
301  *	Routine:	ipc_port_has_klist
302  *	Purpose:
303  *		Returns whether the given port imq_klist field can be used as a klist.
304  */
305 static inline bool
ipc_port_has_klist(ipc_port_t port)306 ipc_port_has_klist(ipc_port_t port)
307 {
308 	return !port->ip_specialreply &&
309 	       port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
310 }
311 
312 static inline struct klist *
ipc_object_klist(ipc_object_t object)313 ipc_object_klist(ipc_object_t object)
314 {
315 	if (io_otype(object) == IOT_PORT) {
316 		ipc_port_t port = ip_object_to_port(object);
317 
318 		return ipc_port_has_klist(port) ? &port->ip_klist : NULL;
319 	}
320 	return &ips_object_to_pset(object)->ips_klist;
321 }
322 
323 /*
324  *	Routine:	ipc_mqueue_changed
325  *	Purpose:
326  *		Wake up receivers waiting in a message queue.
327  *	Conditions:
328  *		The object containing the message queue is locked.
329  */
330 void
ipc_mqueue_changed(ipc_space_t space,struct waitq * waitq)331 ipc_mqueue_changed(
332 	ipc_space_t         space,
333 	struct waitq       *waitq)
334 {
335 	ipc_object_t object = io_from_waitq(waitq);
336 	struct klist *klist = ipc_object_klist(object);
337 
338 	if (klist && SLIST_FIRST(klist)) {
339 		/*
340 		 * Indicate that this message queue is vanishing
341 		 *
342 		 * When this is called, the associated receive right may be in flight
343 		 * between two tasks: the one it used to live in, and the one that armed
344 		 * a port destroyed notification for it.
345 		 *
346 		 * The new process may want to register the port it gets back with an
347 		 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
348 		 * port pending already, in which case we want the imq_klist field to be
349 		 * reusable for nefarious purposes.
350 		 *
351 		 * Fortunately, we really don't need this linkage anymore after this
352 		 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
353 		 *
354 		 * Note: we don't have the space lock here, however, this covers the
355 		 *       case of when a task is terminating the space, triggering
356 		 *       several knote_vanish() calls.
357 		 *
358 		 *       We don't need the lock to observe that the space is inactive as
359 		 *       we just deactivated it on the same thread.
360 		 *
361 		 *       We still need to call knote_vanish() so that the knote is
362 		 *       marked with EV_VANISHED or EV_EOF so that the detach step
363 		 *       in filt_machportdetach is skipped correctly.
364 		 */
365 		assert(space);
366 		knote_vanish(klist, is_active(space));
367 	}
368 
369 	if (io_otype(object) == IOT_PORT) {
370 		ipc_port_adjust_sync_link_state_locked(ip_object_to_port(object),
371 		    PORT_SYNC_LINK_ANY, NULL);
372 	} else {
373 		klist_init(klist);
374 	}
375 
376 	waitq_wakeup64_all_locked(waitq,
377 	    IPC_MQUEUE_RECEIVE,
378 	    THREAD_RESTART,
379 	    NULL,
380 	    WAITQ_ALL_PRIORITIES,
381 	    WAITQ_KEEP_LOCKED);
382 }
383 
384 
385 
386 
387 /*
388  *	Routine:	ipc_mqueue_send
389  *	Purpose:
390  *		Send a message to a message queue.  The message holds a reference
391  *		for the destination port for this message queue in the
392  *		msgh_remote_port field.
393  *
394  *		If unsuccessful, the caller still has possession of
395  *		the message and must do something with it.  If successful,
396  *		the message is queued, given to a receiver, or destroyed.
397  *	Conditions:
398  *		port is locked.
399  *	Returns:
400  *		MACH_MSG_SUCCESS	The message was accepted.
401  *		MACH_SEND_TIMED_OUT	Caller still has message.
402  *		MACH_SEND_INTERRUPTED	Caller still has message.
403  */
404 mach_msg_return_t
ipc_mqueue_send_locked(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option_t option,mach_msg_timeout_t send_timeout)405 ipc_mqueue_send_locked(
406 	ipc_mqueue_t            mqueue,
407 	ipc_kmsg_t              kmsg,
408 	mach_msg_option_t       option,
409 	mach_msg_timeout_t  send_timeout)
410 {
411 	ipc_port_t port = ip_from_mq(mqueue);
412 	int wresult;
413 
414 	/*
415 	 *  Don't block if:
416 	 *	1) We're under the queue limit.
417 	 *	2) Caller used the MACH_SEND_ALWAYS internal option.
418 	 *	3) Message is sent to a send-once right.
419 	 */
420 	if (!imq_full(mqueue) ||
421 	    (!imq_full_kernel(mqueue) &&
422 	    ((option & MACH_SEND_ALWAYS) ||
423 	    (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
424 	    MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
425 		mqueue->imq_msgcount++;
426 		assert(mqueue->imq_msgcount > 0);
427 		ip_mq_unlock(port);
428 	} else {
429 		thread_t cur_thread = current_thread();
430 		struct turnstile *send_turnstile = TURNSTILE_NULL;
431 		uint64_t deadline;
432 
433 		/*
434 		 * We have to wait for space to be granted to us.
435 		 */
436 		if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
437 			ip_mq_unlock(port);
438 			return MACH_SEND_TIMED_OUT;
439 		}
440 		if (imq_full_kernel(mqueue)) {
441 			ip_mq_unlock(port);
442 			return MACH_SEND_NO_BUFFER;
443 		}
444 		port->ip_fullwaiters = true;
445 
446 		if (option & MACH_SEND_TIMEOUT) {
447 			clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
448 		} else {
449 			deadline = 0;
450 		}
451 
452 		thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
453 
454 		send_turnstile = turnstile_prepare((uintptr_t)port,
455 		    port_send_turnstile_address(port),
456 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
457 
458 		ipc_port_send_update_inheritor(port, send_turnstile,
459 		    TURNSTILE_DELAYED_UPDATE);
460 
461 		wresult = waitq_assert_wait64_leeway(
462 			&send_turnstile->ts_waitq,
463 			IPC_MQUEUE_FULL,
464 			THREAD_ABORTSAFE,
465 			TIMEOUT_URGENCY_USER_NORMAL,
466 			deadline,
467 			TIMEOUT_NO_LEEWAY);
468 
469 		ip_mq_unlock(port);
470 		turnstile_update_inheritor_complete(send_turnstile,
471 		    TURNSTILE_INTERLOCK_NOT_HELD);
472 
473 		if (wresult == THREAD_WAITING) {
474 			wresult = thread_block(THREAD_CONTINUE_NULL);
475 		}
476 
477 		/* Call turnstile complete with interlock held */
478 		ip_mq_lock(port);
479 		turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC);
480 		ip_mq_unlock(port);
481 
482 		/* Call cleanup after dropping the interlock */
483 		turnstile_cleanup();
484 
485 		switch (wresult) {
486 		case THREAD_AWAKENED:
487 			/*
488 			 * we can proceed - inherited msgcount from waker
489 			 * or the message queue has been destroyed and the msgcount
490 			 * has been reset to zero (will detect in ipc_mqueue_post()).
491 			 */
492 			break;
493 
494 		case THREAD_TIMED_OUT:
495 			assert(option & MACH_SEND_TIMEOUT);
496 			return MACH_SEND_TIMED_OUT;
497 
498 		case THREAD_INTERRUPTED:
499 			return MACH_SEND_INTERRUPTED;
500 
501 		case THREAD_RESTART:
502 			/* mqueue is being destroyed */
503 			return MACH_SEND_INVALID_DEST;
504 		default:
505 			panic("ipc_mqueue_send");
506 		}
507 	}
508 
509 	ipc_mqueue_post(mqueue, kmsg, option);
510 	return MACH_MSG_SUCCESS;
511 }
512 
513 /*
514  *	Routine:	ipc_mqueue_override_send_locked
515  *	Purpose:
516  *		Set an override qos on the first message in the queue
517  *		(if the queue is full). This is a send-possible override
518  *		that will go away as soon as we drain a message from the
519  *		queue.
520  *
521  *	Conditions:
522  *		The port corresponding to mqueue is locked.
523  *		The caller holds a reference on the message queue.
524  */
525 void
ipc_mqueue_override_send_locked(ipc_mqueue_t mqueue,mach_msg_qos_t qos_ovr)526 ipc_mqueue_override_send_locked(
527 	ipc_mqueue_t        mqueue,
528 	mach_msg_qos_t      qos_ovr)
529 {
530 	ipc_port_t port = ip_from_mq(mqueue);
531 	boolean_t __unused full_queue_empty = FALSE;
532 
533 	assert(waitq_is_valid(&port->ip_waitq));
534 
535 	if (imq_full(mqueue)) {
536 		ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
537 
538 		if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, qos_ovr)) {
539 			if (ip_in_a_space(port) &&
540 			    is_active(ip_get_receiver(port)) &&
541 			    ipc_port_has_klist(port)) {
542 				KNOTE(&port->ip_klist, 0);
543 			}
544 		}
545 		if (!first) {
546 			full_queue_empty = TRUE;
547 		}
548 	}
549 
550 #if DEVELOPMENT || DEBUG
551 	if (full_queue_empty) {
552 		int dst_pid = 0;
553 		dst_pid = ipc_port_get_receiver_task(port, NULL);
554 	}
555 #endif
556 }
557 
558 /*
559  *	Routine:	ipc_mqueue_release_msgcount
560  *	Purpose:
561  *		Release a message queue reference in the case where we
562  *		found a waiter.
563  *
564  *	Conditions:
565  *		The port corresponding to message queue is locked.
566  *		The message corresponding to this reference is off the queue.
567  *		There is no need to pass reserved preposts because this will
568  *		never prepost to anyone
569  */
570 void
ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)571 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)
572 {
573 	ipc_port_t port = ip_from_mq(port_mq);
574 	struct turnstile *send_turnstile = port_send_turnstile(port);
575 
576 	ip_mq_lock_held(port);
577 	assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
578 
579 	port_mq->imq_msgcount--;
580 
581 	if (!imq_full(port_mq) && port->ip_fullwaiters &&
582 	    send_turnstile != TURNSTILE_NULL) {
583 		/*
584 		 * boost the priority of the awoken thread
585 		 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
586 		 * the message queue slot we've just reserved.
587 		 *
588 		 * NOTE: this will never prepost
589 		 *
590 		 * The wakeup happens on a turnstile waitq
591 		 * which will wakeup the highest priority waiter.
592 		 * A potential downside of this would be starving low
593 		 * priority senders if there is a constant churn of
594 		 * high priority threads trying to send to this port.
595 		 */
596 		if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
597 		    IPC_MQUEUE_FULL,
598 		    THREAD_AWAKENED,
599 		    WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
600 			port->ip_fullwaiters = false;
601 		} else {
602 			/* gave away our slot - add reference back */
603 			port_mq->imq_msgcount++;
604 		}
605 	}
606 
607 	if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
608 		/* no more msgs: invalidate the port's prepost object */
609 		waitq_clear_prepost_locked(&port->ip_waitq);
610 	}
611 }
612 
613 /*
614  *	Routine:	ipc_mqueue_post
615  *	Purpose:
616  *		Post a message to a waiting receiver or enqueue it.  If a
617  *		receiver is waiting, we can release our reserved space in
618  *		the message queue.
619  *
620  *	Conditions:
621  *		port is unlocked
622  *		If we need to queue, our space in the message queue is reserved.
623  */
624 static void
ipc_mqueue_post(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option_t __unused option)625 ipc_mqueue_post(
626 	ipc_mqueue_t               mqueue,
627 	ipc_kmsg_t                 kmsg,
628 	mach_msg_option_t __unused option)
629 {
630 	ipc_port_t port = ip_from_mq(mqueue);
631 	struct waitq *waitq = &port->ip_waitq;
632 	uint64_t reserved_prepost = 0;
633 	boolean_t destroy_msg = FALSE;
634 
635 	ipc_kmsg_trace_send(kmsg, option);
636 
637 	/*
638 	 *	While the msg queue	is locked, we have control of the
639 	 *  kmsg, so the ref in	it for the port is still good.
640 	 *
641 	 *	Check for a receiver for the message.
642 	 */
643 	reserved_prepost = waitq_prepost_reserve(waitq, 0,
644 	    WAITQ_KEEP_LOCKED);
645 
646 	/* we may have raced with port destruction! */
647 	if (!waitq_is_valid(&port->ip_waitq)) {
648 		destroy_msg = TRUE;
649 		goto out_unlock;
650 	}
651 
652 	for (;;) {
653 		spl_t th_spl;
654 		thread_t receiver;
655 		mach_msg_size_t msize;
656 
657 		receiver = waitq_wakeup64_identify_locked(waitq,
658 		    IPC_MQUEUE_RECEIVE,
659 		    THREAD_AWAKENED,
660 		    &th_spl,
661 		    &reserved_prepost,
662 		    WAITQ_ALL_PRIORITIES,
663 		    WAITQ_KEEP_LOCKED);
664 		/* waitq still locked, thread locked */
665 
666 		if (receiver == THREAD_NULL) {
667 			/*
668 			 * no receivers; queue kmsg if space still reserved
669 			 * Reservations are cancelled when the port goes inactive.
670 			 * note that this will enqueue the message for any
671 			 * "peeking" receivers.
672 			 *
673 			 * Also, post the knote to wake up any threads waiting
674 			 * on that style of interface if this insertion is of
675 			 * note (first insertion, or adjusted override qos all
676 			 * the way to the head of the queue).
677 			 *
678 			 * This is just for ports. port-sets knotes are being
679 			 * posted to by the waitq_wakeup64_identify_locked()
680 			 * above already.
681 			 */
682 			if (mqueue->imq_msgcount > 0) {
683 				if (ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
684 					/* if the space is dead there is no point calling KNOTE */
685 					if (ip_in_a_space(port) &&
686 					    is_active(ip_get_receiver(port)) &&
687 					    ipc_port_has_klist(port)) {
688 						KNOTE(&port->ip_klist, 0);
689 					}
690 				}
691 				break;
692 			}
693 
694 			/*
695 			 * Otherwise, the message queue must belong to an inactive
696 			 * port, so just destroy the message and pretend it was posted.
697 			 */
698 			destroy_msg = TRUE;
699 			goto out_unlock;
700 		}
701 
702 		/*
703 		 * If a thread is attempting a "peek" into the message queue
704 		 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
705 		 * thread running.  A successful peek is essentially the same as
706 		 * message delivery since the peeking thread takes responsibility
707 		 * for delivering the message and (eventually) removing it from
708 		 * the mqueue.  Only one thread can successfully use the peek
709 		 * facility on any given port, so we exit the waitq loop after
710 		 * encountering such a thread.
711 		 */
712 		if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
713 			ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
714 			ipc_mqueue_peek_on_thread_locked(mqueue, receiver->ith_option, receiver);
715 			thread_unlock(receiver);
716 			splx(th_spl);
717 			break; /* Message was posted, so break out of loop */
718 		}
719 
720 		/*
721 		 * If the receiver waited with a facility not directly related
722 		 * to Mach messaging, then it isn't prepared to get handed the
723 		 * message directly. Just set it running, and go look for
724 		 * another thread that can.
725 		 */
726 		if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
727 			thread_unlock(receiver);
728 			splx(th_spl);
729 			continue;
730 		}
731 
732 
733 		/*
734 		 * We found a waiting thread.
735 		 * If the message is too large or the scatter list is too small
736 		 * the thread we wake up will get that as its status.
737 		 */
738 		msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
739 		if (receiver->ith_rsize <
740 		    (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
741 			receiver->ith_msize = msize;
742 			receiver->ith_state = MACH_RCV_TOO_LARGE;
743 		} else {
744 			receiver->ith_state = MACH_MSG_SUCCESS;
745 		}
746 
747 		/*
748 		 * If there is no problem with the upcoming receive, or the
749 		 * receiver thread didn't specifically ask for special too
750 		 * large error condition, go ahead and select it anyway.
751 		 */
752 		if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
753 		    !(receiver->ith_option & MACH_RCV_LARGE)) {
754 			receiver->ith_kmsg = kmsg;
755 			receiver->ith_seqno = mqueue->imq_seqno++;
756 #if MACH_FLIPC
757 			mach_node_t node = kmsg->ikm_node;
758 #endif
759 			thread_unlock(receiver);
760 			splx(th_spl);
761 
762 			/* we didn't need our reserved spot in the queue */
763 			ipc_mqueue_release_msgcount(mqueue);
764 
765 #if MACH_FLIPC
766 			if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
767 				flipc_msg_ack(node, mqueue, TRUE);
768 			}
769 #endif
770 			break;
771 		}
772 
773 		/*
774 		 * Otherwise, this thread needs to be released to run
775 		 * and handle its error without getting the message.  We
776 		 * need to go back and pick another one.
777 		 */
778 		receiver->ith_receiver_name = mqueue->imq_receiver_name;
779 		receiver->ith_kmsg = IKM_NULL;
780 		receiver->ith_seqno = 0;
781 		thread_unlock(receiver);
782 		splx(th_spl);
783 	}
784 
785 out_unlock:
786 	/* clear the waitq boost we may have been given */
787 	waitq_clear_promotion_locked(waitq, current_thread());
788 	waitq_unlock(waitq);
789 	waitq_prepost_release_reserve(reserved_prepost);
790 
791 	if (destroy_msg) {
792 		ipc_kmsg_destroy(kmsg);
793 	}
794 
795 	counter_inc(&current_task()->messages_sent);
796 	return;
797 }
798 
799 
800 static void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)801 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
802 {
803 	thread_t                self = current_thread();
804 	mach_msg_option_t       option = self->ith_option;
805 
806 	/*
807 	 * why did we wake up?
808 	 */
809 	switch (saved_wait_result) {
810 	case THREAD_TIMED_OUT:
811 		self->ith_state = MACH_RCV_TIMED_OUT;
812 		return;
813 
814 	case THREAD_INTERRUPTED:
815 		self->ith_state = MACH_RCV_INTERRUPTED;
816 		return;
817 
818 	case THREAD_RESTART:
819 		/* something bad happened to the port/set */
820 		self->ith_state = MACH_RCV_PORT_CHANGED;
821 		return;
822 
823 	case THREAD_AWAKENED:
824 		/*
825 		 * We do not need to go select a message, somebody
826 		 * handed us one (or a too-large indication).
827 		 */
828 		switch (self->ith_state) {
829 		case MACH_RCV_SCATTER_SMALL:
830 		case MACH_RCV_TOO_LARGE:
831 			/*
832 			 * Somebody tried to give us a too large
833 			 * message. If we indicated that we cared,
834 			 * then they only gave us the indication,
835 			 * otherwise they gave us the indication
836 			 * AND the message anyway.
837 			 */
838 			if (option & MACH_RCV_LARGE) {
839 				return;
840 			}
841 			return;
842 		case MACH_MSG_SUCCESS:
843 			return;
844 		case MACH_PEEK_READY:
845 			return;
846 
847 		default:
848 			panic("ipc_mqueue_receive_results: strange ith_state");
849 		}
850 
851 	default:
852 		panic("ipc_mqueue_receive_results: strange wait_result");
853 	}
854 }
855 
856 void
ipc_mqueue_receive_continue(__unused void * param,wait_result_t wresult)857 ipc_mqueue_receive_continue(
858 	__unused void *param,
859 	wait_result_t wresult)
860 {
861 	ipc_mqueue_receive_results(wresult);
862 	mach_msg_receive_continue();  /* hard-coded for now */
863 }
864 
865 /*
866  *	Routine:	ipc_mqueue_receive
867  *	Purpose:
868  *		Receive a message from a message queue.
869  *
870  *	Conditions:
871  *		Our caller must hold a reference for the port or port set
872  *		to which this queue belongs, to keep the queue
873  *		from being deallocated.
874  *
875  *		The kmsg is returned with clean header fields
876  *		and with the circular bit turned off through the ith_kmsg
877  *		field of the thread's receive continuation state.
878  *	Returns:
879  *		MACH_MSG_SUCCESS	Message returned in ith_kmsg.
880  *		MACH_RCV_TOO_LARGE	Message size returned in ith_msize.
881  *		MACH_RCV_TIMED_OUT	No message obtained.
882  *		MACH_RCV_INTERRUPTED	No message obtained.
883  *		MACH_RCV_PORT_DIED	Port/set died; no message.
884  *		MACH_RCV_PORT_CHANGED	Port moved into set; no msg.
885  *
886  */
887 
888 void
ipc_mqueue_receive(struct waitq * waitq,mach_msg_option_t option,mach_msg_size_t max_size,mach_msg_timeout_t rcv_timeout,int interruptible)889 ipc_mqueue_receive(
890 	struct waitq           *waitq,
891 	mach_msg_option_t       option,
892 	mach_msg_size_t         max_size,
893 	mach_msg_timeout_t      rcv_timeout,
894 	int                     interruptible)
895 {
896 	wait_result_t           wresult;
897 	thread_t                self = current_thread();
898 
899 	wresult = ipc_mqueue_receive_on_thread(waitq, option, max_size,
900 	    rcv_timeout, interruptible, self);
901 	/* object unlocked */
902 	if (wresult == THREAD_NOT_WAITING) {
903 		return;
904 	}
905 
906 	if (wresult == THREAD_WAITING) {
907 		if (self->ith_continuation) {
908 			thread_block(ipc_mqueue_receive_continue);
909 		}
910 		/* NOTREACHED */
911 
912 		wresult = thread_block(THREAD_CONTINUE_NULL);
913 	}
914 	ipc_mqueue_receive_results(wresult);
915 }
916 
917 /*
918  *	Routine:	ipc_mqueue_receive_on_thread
919  *	Purpose:
920  *		Receive a message from a message queue using a specified thread.
921  *		If no message available, assert_wait on the appropriate waitq.
922  *
923  *	Conditions:
924  *		Assumes thread is self.
925  *		No port/port-set lock held.
926  *		May have assert-waited. Caller must block in those cases.
927  */
928 wait_result_t
ipc_mqueue_receive_on_thread(struct waitq * waitq,mach_msg_option_t option,mach_msg_size_t max_size,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread)929 ipc_mqueue_receive_on_thread(
930 	struct waitq           *waitq,
931 	mach_msg_option_t       option,
932 	mach_msg_size_t         max_size,
933 	mach_msg_timeout_t      rcv_timeout,
934 	int                     interruptible,
935 	thread_t                thread)
936 {
937 	ipc_object_t            object = io_from_waitq(waitq);
938 	wait_result_t           wresult;
939 	uint64_t                deadline;
940 	struct turnstile        *rcv_turnstile = TURNSTILE_NULL;
941 
942 	io_lock(object);
943 
944 	/* no need to reserve anything: we never prepost to anyone */
945 
946 	if (!waitq_is_valid(waitq)) {
947 		/* someone raced us to destroy this mqueue/port! */
948 		io_unlock(object);
949 		/*
950 		 * ipc_mqueue_receive_results updates the thread's ith_state
951 		 * TODO: differentiate between rights being moved and
952 		 * rights/ports being destroyed (21885327)
953 		 */
954 		return THREAD_RESTART;
955 	}
956 
957 	if (waitq_is_set(waitq)) {
958 		__block ipc_mqueue_t port_mq = IMQ_NULL;
959 
960 		(void)waitq_set_iterate_preposts((struct waitq_set *)waitq,
961 		    ^(struct waitq *wq) {
962 			ipc_port_t port = ip_from_waitq(wq);
963 
964 			/*
965 			 * If there are no messages on this queue,
966 			 * skip it and remove it from the prepost list
967 			 */
968 			if (ipc_kmsg_queue_empty(&port->ip_messages.imq_messages)) {
969 			        return WQ_ITERATE_INVALIDATE_CONTINUE;
970 			}
971 
972 			/*
973 			 * There are messages waiting on this port.
974 			 * Instruct the prepost iteration logic to break, but keep the
975 			 * waitq locked.
976 			 */
977 			port_mq = &port->ip_messages;
978 			return WQ_ITERATE_BREAK_KEEP_LOCKED;
979 		});
980 		/* Returns with port locked */
981 
982 		if (port_mq != IMQ_NULL) {
983 			/*
984 			 * We get here if there is at least one message
985 			 * waiting on port_mq. We have instructed the prepost
986 			 * iteration logic to leave both the port_mq and the
987 			 * set waitq locked.
988 			 *
989 			 * TODO: previously, we would place this port at the
990 			 *       back of the prepost list...
991 			 */
992 			io_unlock(object);
993 
994 			/*
995 			 * Continue on to handling the message with just
996 			 * the port waitq locked.
997 			 */
998 			if (option & MACH_PEEK_MSG) {
999 				ipc_mqueue_peek_on_thread_locked(port_mq, option, thread);
1000 			} else {
1001 				ipc_mqueue_select_on_thread_locked(port_mq,
1002 				    option, max_size, thread);
1003 			}
1004 
1005 			ip_mq_unlock(ip_from_mq(port_mq));
1006 			return THREAD_NOT_WAITING;
1007 		}
1008 	} else if (waitq_is_queue(waitq) || waitq_is_turnstile_proxy(waitq)) {
1009 		ipc_port_t port = ip_from_waitq(waitq);
1010 		ipc_kmsg_queue_t kmsgs;
1011 
1012 		/*
1013 		 * Receive on a single port. Just try to get the messages.
1014 		 */
1015 		kmsgs = &port->ip_messages.imq_messages;
1016 		if (ipc_kmsg_queue_first(kmsgs) != IKM_NULL) {
1017 			if (option & MACH_PEEK_MSG) {
1018 				ipc_mqueue_peek_on_thread_locked(&port->ip_messages, option, thread);
1019 			} else {
1020 				ipc_mqueue_select_on_thread_locked(&port->ip_messages,
1021 				    option, max_size, thread);
1022 			}
1023 			io_unlock(object);
1024 			return THREAD_NOT_WAITING;
1025 		}
1026 	} else {
1027 		panic("Unknown waitq type 0x%x: likely memory corruption!",
1028 		    waitq->waitq_type);
1029 	}
1030 
1031 	/*
1032 	 * Looks like we'll have to block.  The waitq we will
1033 	 * block on (whether the set's or the local port's) is
1034 	 * still locked.
1035 	 */
1036 	if (option & MACH_RCV_TIMEOUT) {
1037 		if (rcv_timeout == 0) {
1038 			io_unlock(object);
1039 			thread->ith_state = MACH_RCV_TIMED_OUT;
1040 			return THREAD_NOT_WAITING;
1041 		}
1042 	}
1043 
1044 	thread->ith_option = option;
1045 	thread->ith_rsize = max_size;
1046 	thread->ith_msize = 0;
1047 
1048 	if (option & MACH_PEEK_MSG) {
1049 		thread->ith_state = MACH_PEEK_IN_PROGRESS;
1050 	} else {
1051 		thread->ith_state = MACH_RCV_IN_PROGRESS;
1052 	}
1053 
1054 	if (option & MACH_RCV_TIMEOUT) {
1055 		clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
1056 	} else {
1057 		deadline = 0;
1058 	}
1059 
1060 	/*
1061 	 * Threads waiting on a reply port (not portset)
1062 	 * will wait on its receive turnstile.
1063 	 *
1064 	 * Donate waiting thread's turnstile and
1065 	 * setup inheritor for special reply port.
1066 	 * Based on the state of the special reply
1067 	 * port, the inheritor would be the send
1068 	 * turnstile of the connection port on which
1069 	 * the send of sync ipc would happen or
1070 	 * workloop's turnstile who would reply to
1071 	 * the sync ipc message.
1072 	 *
1073 	 * Pass in mqueue wait in waitq_assert_wait to
1074 	 * support port set wakeup. The mqueue waitq of port
1075 	 * will be converted to to turnstile waitq
1076 	 * in waitq_assert_wait instead of global waitqs.
1077 	 */
1078 	if (waitq_is_turnstile_proxy(waitq)) {
1079 		ipc_port_t port = ip_from_waitq(waitq);
1080 		rcv_turnstile = turnstile_prepare((uintptr_t)port,
1081 		    port_rcv_turnstile_address(port),
1082 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
1083 
1084 		ipc_port_recv_update_inheritor(port, rcv_turnstile,
1085 		    TURNSTILE_DELAYED_UPDATE);
1086 	}
1087 
1088 	thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
1089 	wresult = waitq_assert_wait64_locked(waitq,
1090 	    IPC_MQUEUE_RECEIVE,
1091 	    interruptible,
1092 	    TIMEOUT_URGENCY_USER_NORMAL,
1093 	    deadline,
1094 	    TIMEOUT_NO_LEEWAY,
1095 	    thread);
1096 	/* preposts should be detected above, not here */
1097 	if (wresult == THREAD_AWAKENED) {
1098 		panic("ipc_mqueue_receive_on_thread: sleep walking");
1099 	}
1100 
1101 	io_unlock(object);
1102 
1103 	/* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1104 	if (rcv_turnstile != TURNSTILE_NULL) {
1105 		turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
1106 	}
1107 	/* Its callers responsibility to call turnstile_complete to get the turnstile back */
1108 
1109 	return wresult;
1110 }
1111 
1112 
1113 /*
1114  *	Routine:	ipc_mqueue_peek_on_thread_locked
1115  *	Purpose:
1116  *		A receiver discovered that there was a message on the queue
1117  *		before he had to block. Tell a thread about the message queue,
1118  *		but don't pick off any messages.
1119  *	Conditions:
1120  *		port_mq locked
1121  *		at least one message on port_mq's message queue
1122  *
1123  *	Returns: (on thread->ith_state)
1124  *		MACH_PEEK_READY		ith_peekq contains a message queue
1125  */
1126 void
ipc_mqueue_peek_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option_t option,thread_t thread)1127 ipc_mqueue_peek_on_thread_locked(
1128 	ipc_mqueue_t        port_mq,
1129 	mach_msg_option_t   option,
1130 	thread_t            thread)
1131 {
1132 	(void)option;
1133 	assert(option & MACH_PEEK_MSG);
1134 	assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
1135 
1136 	/*
1137 	 * Take a reference on the mqueue's associated port:
1138 	 * the peeking thread will be responsible to release this reference
1139 	 */
1140 	ip_reference(ip_from_mq(port_mq));
1141 	thread->ith_peekq = port_mq;
1142 	thread->ith_state = MACH_PEEK_READY;
1143 }
1144 
1145 /*
1146  *	Routine:	ipc_mqueue_select_on_thread_locked
1147  *	Purpose:
1148  *		A receiver discovered that there was a message on the queue
1149  *		before he had to block.  Pick the message off the queue and
1150  *		"post" it to thread.
1151  *	Conditions:
1152  *		port locked.
1153  *              thread not locked.
1154  *		There is a message.
1155  *		No need to reserve prepost objects - it will never prepost
1156  *
1157  *	Returns:
1158  *		MACH_MSG_SUCCESS	Actually selected a message for ourselves.
1159  *		MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
1160  */
1161 void
ipc_mqueue_select_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option_t option,mach_msg_size_t max_size,thread_t thread)1162 ipc_mqueue_select_on_thread_locked(
1163 	ipc_mqueue_t            port_mq,
1164 	mach_msg_option_t       option,
1165 	mach_msg_size_t         max_size,
1166 	thread_t                thread)
1167 {
1168 	ipc_kmsg_t kmsg;
1169 	mach_msg_return_t mr = MACH_MSG_SUCCESS;
1170 	mach_msg_size_t msize;
1171 
1172 	/*
1173 	 * Do some sanity checking of our ability to receive
1174 	 * before pulling the message off the queue.
1175 	 */
1176 	kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1177 	assert(kmsg != IKM_NULL);
1178 
1179 	/*
1180 	 * If we really can't receive it, but we had the
1181 	 * MACH_RCV_LARGE option set, then don't take it off
1182 	 * the queue, instead return the appropriate error
1183 	 * (and size needed).
1184 	 */
1185 	msize = ipc_kmsg_copyout_size(kmsg, thread->map);
1186 	if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) {
1187 		mr = MACH_RCV_TOO_LARGE;
1188 		if (option & MACH_RCV_LARGE) {
1189 			thread->ith_receiver_name = port_mq->imq_receiver_name;
1190 			thread->ith_kmsg = IKM_NULL;
1191 			thread->ith_msize = msize;
1192 			thread->ith_seqno = 0;
1193 			thread->ith_state = mr;
1194 			return;
1195 		}
1196 	}
1197 
1198 	ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1199 #if MACH_FLIPC
1200 	if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
1201 		flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
1202 	}
1203 #endif
1204 	ipc_mqueue_release_msgcount(port_mq);
1205 	thread->ith_seqno = port_mq->imq_seqno++;
1206 	thread->ith_kmsg = kmsg;
1207 	thread->ith_state = mr;
1208 
1209 	counter_inc(&current_task()->messages_received);
1210 	return;
1211 }
1212 
1213 /*
1214  *	Routine:	ipc_mqueue_peek_locked
1215  *	Purpose:
1216  *		Peek at a (non-set) message queue to see if it has a message
1217  *		matching the sequence number provided (if zero, then the
1218  *		first message in the queue) and return vital info about the
1219  *		message.
1220  *
1221  *	Conditions:
1222  *		The io object corresponding to mq is locked by callers.
1223  *		Other locks may be held by callers, so this routine cannot block.
1224  *		Caller holds reference on the message queue.
1225  */
1226 unsigned
ipc_mqueue_peek_locked(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1227 ipc_mqueue_peek_locked(ipc_mqueue_t mq,
1228     mach_port_seqno_t * seqnop,
1229     mach_msg_size_t * msg_sizep,
1230     mach_msg_id_t * msg_idp,
1231     mach_msg_max_trailer_t * msg_trailerp,
1232     ipc_kmsg_t *kmsgp)
1233 {
1234 	ipc_kmsg_queue_t kmsgq;
1235 	ipc_kmsg_t kmsg;
1236 	mach_port_seqno_t seqno, msgoff;
1237 	unsigned res = 0;
1238 
1239 	seqno = 0;
1240 	if (seqnop != NULL) {
1241 		seqno = *seqnop;
1242 	}
1243 
1244 	if (seqno == 0) {
1245 		seqno = mq->imq_seqno;
1246 		msgoff = 0;
1247 	} else if (seqno >= mq->imq_seqno &&
1248 	    seqno < mq->imq_seqno + mq->imq_msgcount) {
1249 		msgoff = seqno - mq->imq_seqno;
1250 	} else {
1251 		goto out;
1252 	}
1253 
1254 	/* look for the message that would match that seqno */
1255 	kmsgq = &mq->imq_messages;
1256 	kmsg = ipc_kmsg_queue_first(kmsgq);
1257 	while (msgoff-- && kmsg != IKM_NULL) {
1258 		kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1259 	}
1260 	if (kmsg == IKM_NULL) {
1261 		goto out;
1262 	}
1263 
1264 	/* found one - return the requested info */
1265 	if (seqnop != NULL) {
1266 		*seqnop = seqno;
1267 	}
1268 	if (msg_sizep != NULL) {
1269 		*msg_sizep = kmsg->ikm_header->msgh_size;
1270 	}
1271 	if (msg_idp != NULL) {
1272 		*msg_idp = kmsg->ikm_header->msgh_id;
1273 	}
1274 	if (msg_trailerp != NULL) {
1275 		memcpy(msg_trailerp,
1276 		    (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
1277 		    mach_round_msg(kmsg->ikm_header->msgh_size)),
1278 		    sizeof(mach_msg_max_trailer_t));
1279 	}
1280 	if (kmsgp != NULL) {
1281 		*kmsgp = kmsg;
1282 	}
1283 
1284 	res = 1;
1285 
1286 out:
1287 	return res;
1288 }
1289 
1290 
1291 /*
1292  *	Routine:	ipc_mqueue_peek
1293  *	Purpose:
1294  *		Peek at a (non-set) message queue to see if it has a message
1295  *		matching the sequence number provided (if zero, then the
1296  *		first message in the queue) and return vital info about the
1297  *		message.
1298  *
1299  *	Conditions:
1300  *		The ipc_mqueue_t is unlocked.
1301  *		Locks may be held by callers, so this routine cannot block.
1302  *		Caller holds reference on the message queue.
1303  */
1304 unsigned
ipc_mqueue_peek(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1305 ipc_mqueue_peek(ipc_mqueue_t mq,
1306     mach_port_seqno_t * seqnop,
1307     mach_msg_size_t * msg_sizep,
1308     mach_msg_id_t * msg_idp,
1309     mach_msg_max_trailer_t * msg_trailerp,
1310     ipc_kmsg_t *kmsgp)
1311 {
1312 	ipc_port_t port = ip_from_mq(mq);
1313 	unsigned res;
1314 
1315 	ip_mq_lock(port);
1316 
1317 	res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
1318 	    msg_trailerp, kmsgp);
1319 
1320 	ip_mq_unlock(port);
1321 	return res;
1322 }
1323 
1324 /*
1325  *	Routine:	ipc_mqueue_release_peek_ref
1326  *	Purpose:
1327  *		Release the reference on an mqueue's associated port which was
1328  *		granted to a thread in ipc_mqueue_peek_on_thread (on the
1329  *		MACH_PEEK_MSG thread wakeup path).
1330  *
1331  *	Conditions:
1332  *		The ipc_mqueue_t should be locked on entry.
1333  *		The ipc_mqueue_t will be _unlocked_ on return
1334  *			(and potentially invalid!)
1335  *
1336  */
1337 void
ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)1338 ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)
1339 {
1340 	ipc_port_t port = ip_from_mq(mqueue);
1341 
1342 	ip_mq_lock_held(port);
1343 
1344 	/*
1345 	 * clear any preposts this mq may have generated
1346 	 * (which would cause subsequent immediate wakeups)
1347 	 */
1348 	waitq_clear_prepost_locked(&port->ip_waitq);
1349 
1350 	ip_mq_unlock(port);
1351 
1352 	/*
1353 	 * release the port reference: we need to do this outside the lock
1354 	 * because we might be holding the last port reference!
1355 	 **/
1356 	ip_release(port);
1357 }
1358 
1359 /*
1360  *	Routine:	ipc_mqueue_set_gather_member_names
1361  *	Purpose:
1362  *		Discover all ports which are members of a given port set.
1363  *		Because the waitq linkage mechanism was redesigned to save
1364  *		significan amounts of memory, it no longer keeps back-pointers
1365  *		from a port set to a port. Therefore, we must iterate over all
1366  *		ports within a given IPC space and individually query them to
1367  *		see if they are members of the given set. Port names of ports
1368  *		found to be members of the given set will be gathered into the
1369  *		provided 'names' array.  Actual returned names are limited to
1370  *		maxnames entries, but we keep counting the actual number of
1371  *		members to let the caller decide to retry if necessary.
1372  *
1373  *	Conditions:
1374  *		Locks may be held by callers, so this routine cannot block.
1375  *		Caller holds reference on the message queue (via port set).
1376  */
1377 void
ipc_mqueue_set_gather_member_names(ipc_space_t space,ipc_pset_t pset,ipc_entry_num_t maxnames,mach_port_name_t * names,ipc_entry_num_t * actualp)1378 ipc_mqueue_set_gather_member_names(
1379 	ipc_space_t space,
1380 	ipc_pset_t pset,
1381 	ipc_entry_num_t maxnames,
1382 	mach_port_name_t *names,
1383 	ipc_entry_num_t *actualp)
1384 {
1385 	ipc_entry_t table;
1386 	ipc_entry_num_t tsize;
1387 	struct waitq_set *wqset;
1388 	ipc_entry_num_t actual = 0;
1389 
1390 	assert(pset != IPS_NULL);
1391 	wqset = &pset->ips_wqset;
1392 
1393 	assert(space != IS_NULL);
1394 	is_read_lock(space);
1395 	if (!is_active(space)) {
1396 		is_read_unlock(space);
1397 		goto out;
1398 	}
1399 
1400 	if (!waitq_set_is_valid(wqset)) {
1401 		is_read_unlock(space);
1402 		goto out;
1403 	}
1404 
1405 	table = is_active_table(space);
1406 	tsize = table->ie_size;
1407 	for (ipc_entry_num_t idx = 1; idx < tsize; idx++) {
1408 		ipc_entry_t entry = &table[idx];
1409 
1410 		/* only receive rights can be members of port sets */
1411 		if ((entry->ie_bits & MACH_PORT_TYPE_RECEIVE) != MACH_PORT_TYPE_NONE) {
1412 			ipc_port_t port = ip_object_to_port(entry->ie_object);
1413 
1414 			assert(IP_VALID(port));
1415 			ip_mq_lock(port);
1416 			if (waitq_member_locked(&port->ip_waitq, wqset)) {
1417 				if (actual < maxnames) {
1418 					names[actual] = ip_get_receiver_name(port);
1419 				}
1420 				actual++;
1421 			}
1422 			ip_mq_unlock(port);
1423 		}
1424 	}
1425 
1426 	is_read_unlock(space);
1427 
1428 out:
1429 	*actualp = actual;
1430 }
1431 
1432 
1433 /*
1434  *	Routine:	ipc_mqueue_destroy_locked
1435  *	Purpose:
1436  *		Destroy a message queue.
1437  *		Set any blocked senders running.
1438  *		Destroy the kmsgs in the queue.
1439  *	Conditions:
1440  *		port locked
1441  *		Receivers were removed when the receive right was "changed"
1442  */
1443 boolean_t
ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)1444 ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue)
1445 {
1446 	ipc_port_t port = ip_from_mq(mqueue);
1447 	boolean_t reap = FALSE;
1448 	struct turnstile *send_turnstile = port_send_turnstile(port);
1449 
1450 	/*
1451 	 *	rouse all blocked senders
1452 	 *	(don't boost anyone - we're tearing this queue down)
1453 	 *	(never preposts)
1454 	 */
1455 	port->ip_fullwaiters = false;
1456 
1457 	if (send_turnstile != TURNSTILE_NULL) {
1458 		waitq_wakeup64_all(&send_turnstile->ts_waitq,
1459 		    IPC_MQUEUE_FULL,
1460 		    THREAD_RESTART,
1461 		    WAITQ_ALL_PRIORITIES);
1462 	}
1463 
1464 #if MACH_FLIPC
1465 	ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
1466 	if (first) {
1467 		ipc_kmsg_t kmsg = first;
1468 		do {
1469 			if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport)) {
1470 				flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
1471 			}
1472 			kmsg = kmsg->ikm_next;
1473 		} while (kmsg != first);
1474 	}
1475 #endif
1476 
1477 	/*
1478 	 * Move messages from the specified queue to the per-thread
1479 	 * clean/drain queue while we have the mqueue lock.
1480 	 */
1481 	reap = ipc_kmsg_delayed_destroy_queue(&mqueue->imq_messages);
1482 
1483 	/*
1484 	 * Wipe out message count, both for messages about to be
1485 	 * reaped and for reserved space for (previously) woken senders.
1486 	 * This is the indication to them that their reserved space is gone
1487 	 * (the mqueue was destroyed).
1488 	 */
1489 	mqueue->imq_msgcount = 0;
1490 
1491 	/*
1492 	 * invalidate the waitq for subsequent mqueue operations,
1493 	 * the port lock could be dropped after invalidating the mqueue.
1494 	 */
1495 	waitq_invalidate(&port->ip_waitq);
1496 
1497 	/* clear out any preposting we may have done, drops and reacquires port lock */
1498 	waitq_clear_prepost_locked(&port->ip_waitq);
1499 
1500 	/*
1501 	 * assert that we are destroying / invalidating a queue that's
1502 	 * not a member of any other queue.
1503 	 */
1504 	assert(port->ip_preposts == 0);
1505 	assert(!ip_in_pset(port));
1506 
1507 	return reap;
1508 }
1509 
1510 /*
1511  *	Routine:	ipc_mqueue_set_qlimit_locked
1512  *	Purpose:
1513  *		Changes a message queue limit; the maximum number
1514  *		of messages which may be queued.
1515  *	Conditions:
1516  *		Port locked.
1517  */
1518 
1519 void
ipc_mqueue_set_qlimit_locked(ipc_mqueue_t mqueue,mach_port_msgcount_t qlimit)1520 ipc_mqueue_set_qlimit_locked(
1521 	ipc_mqueue_t           mqueue,
1522 	mach_port_msgcount_t   qlimit)
1523 {
1524 	ipc_port_t port = ip_from_mq(mqueue);
1525 
1526 	assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1527 
1528 	/* wake up senders allowed by the new qlimit */
1529 	if (qlimit > mqueue->imq_qlimit) {
1530 		mach_port_msgcount_t i, wakeup;
1531 		struct turnstile *send_turnstile = port_send_turnstile(port);
1532 
1533 		/* caution: wakeup, qlimit are unsigned */
1534 		wakeup = qlimit - mqueue->imq_qlimit;
1535 
1536 		for (i = 0; i < wakeup; i++) {
1537 			/*
1538 			 * boost the priority of the awoken thread
1539 			 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1540 			 * the message queue slot we've just reserved.
1541 			 *
1542 			 * NOTE: this will never prepost
1543 			 */
1544 			if (send_turnstile == TURNSTILE_NULL ||
1545 			    waitq_wakeup64_one(&send_turnstile->ts_waitq,
1546 			    IPC_MQUEUE_FULL,
1547 			    THREAD_AWAKENED,
1548 			    WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
1549 				port->ip_fullwaiters = false;
1550 				break;
1551 			}
1552 			mqueue->imq_msgcount++;  /* give it to the awakened thread */
1553 		}
1554 	}
1555 	mqueue->imq_qlimit = (uint16_t)qlimit;
1556 }
1557 
1558 /*
1559  *	Routine:	ipc_mqueue_set_seqno_locked
1560  *	Purpose:
1561  *		Changes an mqueue's sequence number.
1562  *	Conditions:
1563  *		Caller holds a reference to the queue's containing object.
1564  */
1565 void
ipc_mqueue_set_seqno_locked(ipc_mqueue_t mqueue,mach_port_seqno_t seqno)1566 ipc_mqueue_set_seqno_locked(
1567 	ipc_mqueue_t            mqueue,
1568 	mach_port_seqno_t       seqno)
1569 {
1570 	mqueue->imq_seqno = seqno;
1571 }
1572 
1573 
1574 /*
1575  *	Routine:	ipc_mqueue_copyin
1576  *	Purpose:
1577  *		Convert a name in a space to a message queue.
1578  *	Conditions:
1579  *		Nothing locked.  If successful, the caller gets a ref for
1580  *		for the object.	This ref ensures the continued existence of
1581  *		the queue.
1582  *	Returns:
1583  *		MACH_MSG_SUCCESS	Found a message queue.
1584  *		MACH_RCV_INVALID_NAME	The space is dead.
1585  *		MACH_RCV_INVALID_NAME	The name doesn't denote a right.
1586  *		MACH_RCV_INVALID_NAME
1587  *			The denoted right is not receive or port set.
1588  *		MACH_RCV_IN_SET		Receive right is a member of a set.
1589  */
1590 
1591 mach_msg_return_t
ipc_mqueue_copyin(ipc_space_t space,mach_port_name_t name,ipc_object_t * objectp)1592 ipc_mqueue_copyin(
1593 	ipc_space_t             space,
1594 	mach_port_name_t        name,
1595 	ipc_object_t            *objectp)
1596 {
1597 	ipc_entry_bits_t bits;
1598 	ipc_object_t object;
1599 	kern_return_t kr;
1600 
1601 	kr = ipc_right_lookup_read(space, name, &bits, &object);
1602 	if (kr != KERN_SUCCESS) {
1603 		return MACH_RCV_INVALID_NAME;
1604 	}
1605 	/* object is locked and active */
1606 
1607 	if (bits & MACH_PORT_TYPE_RECEIVE) {
1608 		__assert_only ipc_port_t port = ip_object_to_port(object);
1609 		assert(ip_get_receiver_name(port) == name);
1610 		assert(ip_in_space(port, space));
1611 	}
1612 	if (bits & (MACH_PORT_TYPE_RECEIVE | MACH_PORT_TYPE_PORT_SET)) {
1613 		io_reference(object);
1614 		io_unlock(object);
1615 	} else {
1616 		io_unlock(object);
1617 		/* guard exception if we never held the receive right in this entry */
1618 		if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) {
1619 			mach_port_guard_exception(name, 0, 0, kGUARD_EXC_RCV_INVALID_NAME);
1620 		}
1621 		return MACH_RCV_INVALID_NAME;
1622 	}
1623 
1624 	*objectp = object;
1625 	return MACH_MSG_SUCCESS;
1626 }
1627