xref: /xnu-10063.121.3/osfmk/ipc/ipc_mqueue.c (revision 2c2f96dc2b9a4408a43d3150ae9c105355ca3daa)
1 /*
2  * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 /*
29  * @OSF_FREE_COPYRIGHT@
30  */
31 /*
32  * Mach Operating System
33  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34  * All Rights Reserved.
35  *
36  * Permission to use, copy, modify and distribute this software and its
37  * documentation is hereby granted, provided that both the copyright
38  * notice and this permission notice appear in all copies of the
39  * software, derivative works or modified versions, and any portions
40  * thereof, and that both notices appear in supporting documentation.
41  *
42  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45  *
46  * Carnegie Mellon requests users of this software to return to
47  *
48  *  Software Distribution Coordinator  or  [email protected]
49  *  School of Computer Science
50  *  Carnegie Mellon University
51  *  Pittsburgh PA 15213-3890
52  *
53  * any improvements or extensions that they make and grant Carnegie Mellon
54  * the rights to redistribute these changes.
55  */
56 /*
57  */
58 /*
59  *	File:	ipc/ipc_mqueue.c
60  *	Author:	Rich Draves
61  *	Date:	1989
62  *
63  *	Functions to manipulate IPC message queues.
64  */
65 /*
66  * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67  * support for mandatory and extensible security protections.  This notice
68  * is included in support of clause 2.2 (b) of the Apple Public License,
69  * Version 2.0.
70  */
71 
72 
73 #include <mach/port.h>
74 #include <mach/message.h>
75 #include <mach/sync_policy.h>
76 
77 #include <kern/assert.h>
78 #include <kern/counter.h>
79 #include <kern/sched_prim.h>
80 #include <kern/ipc_kobject.h>
81 #include <kern/ipc_mig.h>       /* XXX - for mach_msg_receive_continue */
82 #include <kern/misc_protos.h>
83 #include <kern/task.h>
84 #include <kern/thread.h>
85 #include <kern/waitq.h>
86 
87 #include <ipc/port.h>
88 #include <ipc/ipc_mqueue.h>
89 #include <ipc/ipc_kmsg.h>
90 #include <ipc/ipc_right.h>
91 #include <ipc/ipc_port.h>
92 #include <ipc/ipc_pset.h>
93 #include <ipc/ipc_space.h>
94 
95 #if MACH_FLIPC
96 #include <ipc/flipc.h>
97 #endif
98 
99 #ifdef __LP64__
100 #include <vm/vm_map.h>
101 #endif
102 
103 #include <sys/event.h>
104 
105 extern char     *proc_name_address(void *p);
106 
107 int ipc_mqueue_full;            /* address is event for queue space */
108 int ipc_mqueue_rcv;             /* address is event for message arrival */
109 
110 /* forward declarations */
111 static void ipc_mqueue_receive_results(wait_result_t result);
112 
113 #if MACH_FLIPC
114 static void ipc_mqueue_peek_on_thread_locked(
115 	ipc_mqueue_t        port_mq,
116 	mach_msg_option64_t option,
117 	thread_t            thread);
118 #endif /* MACH_FLIPC */
119 
120 /* Deliver message to message queue or waiting receiver */
121 static void ipc_mqueue_post(
122 	ipc_mqueue_t            mqueue,
123 	ipc_kmsg_t              kmsg,
124 	mach_msg_option_t       option);
125 
126 /*
127  *	Routine:	ipc_mqueue_init
128  *	Purpose:
129  *		Initialize a newly-allocated message queue.
130  */
131 void
ipc_mqueue_init(ipc_mqueue_t mqueue)132 ipc_mqueue_init(
133 	ipc_mqueue_t            mqueue)
134 {
135 	ipc_kmsg_queue_init(&mqueue->imq_messages);
136 	mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
137 	klist_init(&mqueue->imq_klist);
138 }
139 
140 /*
141  *	Routine:	ipc_mqueue_add_locked.
142  *	Purpose:
143  *		Associate the portset's mqueue with the port's mqueue.
144  *		This has to be done so that posting the port will wakeup
145  *		a portset waiter.  If there are waiters on the portset
146  *		mqueue and messages on the port mqueue, try to match them
147  *		up now.
148  *	Conditions:
149  *		Port and Pset both locked.
150  */
151 kern_return_t
ipc_mqueue_add_locked(ipc_mqueue_t port_mqueue,ipc_pset_t pset,waitq_link_t * linkp)152 ipc_mqueue_add_locked(
153 	ipc_mqueue_t    port_mqueue,
154 	ipc_pset_t      pset,
155 	waitq_link_t   *linkp)
156 {
157 	ipc_port_t       port = ip_from_mq(port_mqueue);
158 	struct waitq_set *wqset = &pset->ips_wqset;
159 	circle_queue_t   kmsgq = &port_mqueue->imq_messages;
160 	kern_return_t    kr = KERN_SUCCESS;
161 	ipc_kmsg_t       kmsg;
162 
163 	kr = waitq_link_locked(&port->ip_waitq, wqset, linkp);
164 	if (kr != KERN_SUCCESS) {
165 		return kr;
166 	}
167 
168 	/*
169 	 * Now that the set has been added to the port, there may be
170 	 * messages queued on the port and threads waiting on the set
171 	 * waitq.  Lets get them together.
172 	 *
173 	 * Only consider this set however, as the other ones have been
174 	 * posted to already.
175 	 */
176 	while ((kmsg = ipc_kmsg_queue_first(kmsgq)) != IKM_NULL) {
177 		thread_t th;
178 		mach_msg_size_t msize, asize;
179 
180 		th = waitq_wakeup64_identify_locked(wqset, IPC_MQUEUE_RECEIVE,
181 		    THREAD_AWAKENED, WAITQ_KEEP_LOCKED);
182 		/* port and pset still locked, thread not runnable */
183 
184 		if (th == THREAD_NULL) {
185 			/*
186 			 * Didn't find a thread to wake up but messages
187 			 * are enqueued, prepost the set instead,
188 			 * as calling waitq_wakeup64_identify_locked()
189 			 * on the set directly will not take care of it.
190 			 */
191 			waitq_link_prepost_locked(&port->ip_waitq, wqset);
192 			break;
193 		}
194 
195 		/*
196 		 * Because we hold the thread off the runqueue at this point,
197 		 * it's safe to modify ith_ fields on the thread, as
198 		 * until it is resumed, it must be off core or in between
199 		 * the assert wait and returning from the continuation.
200 		 */
201 
202 		/*
203 		 * If the receiver waited with a facility not directly
204 		 * related to Mach messaging, then it isn't prepared to get
205 		 * handed the message directly.  Just set it running, and
206 		 * go look for another thread that can.
207 		 */
208 		if (th->ith_state != MACH_RCV_IN_PROGRESS) {
209 #if MACH_FLIPC
210 			if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
211 				/*
212 				 * wakeup the peeking thread, but
213 				 * continue to loop over the threads
214 				 * waiting on the port's mqueue to see
215 				 * if there are any actual receivers
216 				 */
217 				ipc_mqueue_peek_on_thread_locked(port_mqueue,
218 				    th->ith_option, th);
219 			}
220 #endif /* MACH_FLIPC */
221 
222 			waitq_resume_identified_thread(wqset, th,
223 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
224 			continue;
225 		}
226 
227 		/*
228 		 * Found a receiver. see if they can handle the message
229 		 * correctly (the message is not too large for them, or
230 		 * they didn't care to be informed that the message was
231 		 * too large).  If they can't handle it, take them off
232 		 * the list and let them go back and figure it out and
233 		 * just move onto the next.
234 		 */
235 		msize = ipc_kmsg_copyout_size(kmsg, th->map);
236 		asize = ipc_kmsg_aux_data_size(kmsg);
237 
238 		if (ipc_kmsg_too_large(msize, asize, th->ith_option,
239 		    th->ith_max_msize, th->ith_max_asize, th)) {
240 			th->ith_state = MACH_RCV_TOO_LARGE;
241 			th->ith_msize = msize;
242 			th->ith_asize = asize;
243 			if (th->ith_option & MACH_RCV_LARGE) {
244 				/*
245 				 * let him go without message
246 				 */
247 				th->ith_receiver_name = port_mqueue->imq_receiver_name;
248 				th->ith_kmsg = IKM_NULL;
249 				th->ith_seqno = 0;
250 
251 				waitq_resume_identified_thread(wqset, th,
252 				    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
253 
254 				continue; /* find another thread */
255 			}
256 		} else {
257 			th->ith_state = MACH_MSG_SUCCESS;
258 		}
259 
260 		/*
261 		 * This thread is going to take this message,
262 		 * so give it the message.
263 		 */
264 		ipc_kmsg_rmqueue(kmsgq, kmsg);
265 
266 #if MACH_FLIPC
267 		mach_node_t  node = kmsg->ikm_node;
268 #endif
269 
270 		ipc_mqueue_release_msgcount(port_mqueue);
271 
272 		th->ith_kmsg = kmsg;
273 		th->ith_seqno = port_mqueue->imq_seqno++;
274 
275 		waitq_resume_identified_thread(wqset, th,
276 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
277 
278 #if MACH_FLIPC
279 		if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
280 			flipc_msg_ack(node, port_mqueue, TRUE);
281 		}
282 #endif
283 	}
284 
285 	return KERN_SUCCESS;
286 }
287 
288 
289 /*
290  *	Routine:	ipc_port_has_klist
291  *	Purpose:
292  *		Returns whether the given port imq_klist field can be used as a klist.
293  */
294 bool
ipc_port_has_klist(ipc_port_t port)295 ipc_port_has_klist(ipc_port_t port)
296 {
297 	return !port->ip_specialreply &&
298 	       port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
299 }
300 
301 static inline struct klist *
ipc_object_klist(ipc_object_t object)302 ipc_object_klist(ipc_object_t object)
303 {
304 	if (io_otype(object) == IOT_PORT) {
305 		ipc_port_t port = ip_object_to_port(object);
306 
307 		return ipc_port_has_klist(port) ? &port->ip_klist : NULL;
308 	}
309 	return &ips_object_to_pset(object)->ips_klist;
310 }
311 
312 /*
313  *	Routine:	ipc_mqueue_changed
314  *	Purpose:
315  *		Wake up receivers waiting in a message queue.
316  *	Conditions:
317  *		The object containing the message queue is locked.
318  */
319 void
ipc_mqueue_changed(ipc_space_t space,struct waitq * waitq)320 ipc_mqueue_changed(
321 	ipc_space_t         space,
322 	struct waitq       *waitq)
323 {
324 	ipc_object_t object = io_from_waitq(waitq);
325 	struct klist *klist = ipc_object_klist(object);
326 
327 	if (klist && SLIST_FIRST(klist)) {
328 		/*
329 		 * Indicate that this message queue is vanishing
330 		 *
331 		 * When this is called, the associated receive right may be in flight
332 		 * between two tasks: the one it used to live in, and the one that armed
333 		 * a port destroyed notification for it.
334 		 *
335 		 * The new process may want to register the port it gets back with an
336 		 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
337 		 * port pending already, in which case we want the imq_klist field to be
338 		 * reusable for nefarious purposes.
339 		 *
340 		 * Fortunately, we really don't need this linkage anymore after this
341 		 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
342 		 *
343 		 * Note: we don't have the space lock here, however, this covers the
344 		 *       case of when a task is terminating the space, triggering
345 		 *       several knote_vanish() calls.
346 		 *
347 		 *       We don't need the lock to observe that the space is inactive as
348 		 *       we just deactivated it on the same thread.
349 		 *
350 		 *       We still need to call knote_vanish() so that the knote is
351 		 *       marked with EV_VANISHED or EV_EOF so that the detach step
352 		 *       in filt_machportdetach is skipped correctly.
353 		 */
354 		assert(space);
355 		knote_vanish(klist, is_active(space));
356 	}
357 
358 	if (io_otype(object) == IOT_PORT) {
359 		ipc_port_t port = ip_object_to_port(object);
360 		if (!port->ip_specialreply) {
361 			ipc_port_adjust_sync_link_state_locked(port,
362 			    PORT_SYNC_LINK_ANY, NULL);
363 		}
364 	} else {
365 		klist_init(klist);
366 	}
367 
368 	/*
369 	 * do not pass WAITQ_UPDATE_INHERITOR, ipc_port_destroy()
370 	 * needs to handle this manually, and the port lock
371 	 * is the waitq lock, so there's really no inefficiency there.
372 	 */
373 	waitq_wakeup64_all_locked(waitq, IPC_MQUEUE_RECEIVE,
374 	    THREAD_RESTART, WAITQ_KEEP_LOCKED);
375 }
376 
377 
378 
379 
380 /*
381  *	Routine:	ipc_mqueue_send
382  *	Purpose:
383  *		Send a message to a message queue.  The message holds a reference
384  *		for the destination port for this message queue in the
385  *		msgh_remote_port field.
386  *
387  *		If unsuccessful, the caller still has possession of
388  *		the message and must do something with it.  If successful,
389  *		the message is queued, given to a receiver, or destroyed.
390  *	Conditions:
391  *		port is locked.
392  *	Returns:
393  *		MACH_MSG_SUCCESS	The message was accepted.
394  *		MACH_SEND_TIMED_OUT	Caller still has message.
395  *		MACH_SEND_INTERRUPTED	Caller still has message.
396  */
397 mach_msg_return_t
ipc_mqueue_send_locked(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option_t option,mach_msg_timeout_t send_timeout)398 ipc_mqueue_send_locked(
399 	ipc_mqueue_t            mqueue,
400 	ipc_kmsg_t              kmsg,
401 	mach_msg_option_t       option,
402 	mach_msg_timeout_t  send_timeout)
403 {
404 	ipc_port_t port = ip_from_mq(mqueue);
405 	int wresult;
406 
407 	/*
408 	 *  Don't block if:
409 	 *	1) We're under the queue limit.
410 	 *	2) Caller used the MACH_SEND_ALWAYS internal option.
411 	 *	3) Message is sent to a send-once right.
412 	 */
413 	if (!imq_full(mqueue) ||
414 	    (!imq_full_kernel(mqueue) &&
415 	    ((option & MACH_SEND_ALWAYS) ||
416 	    (MACH_MSGH_BITS_REMOTE(ikm_header(kmsg)->msgh_bits) ==
417 	    MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
418 		mqueue->imq_msgcount++;
419 		assert(mqueue->imq_msgcount > 0);
420 		ip_mq_unlock(port);
421 	} else {
422 		thread_t cur_thread = current_thread();
423 		struct turnstile *send_turnstile = TURNSTILE_NULL;
424 		uint64_t deadline;
425 
426 		/*
427 		 * We have to wait for space to be granted to us.
428 		 */
429 		if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
430 			ip_mq_unlock(port);
431 			return MACH_SEND_TIMED_OUT;
432 		}
433 		if (imq_full_kernel(mqueue)) {
434 			ip_mq_unlock(port);
435 			return MACH_SEND_NO_BUFFER;
436 		}
437 		port->ip_fullwaiters = true;
438 
439 		if (option & MACH_SEND_TIMEOUT) {
440 			clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
441 		} else {
442 			deadline = 0;
443 		}
444 
445 		thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
446 
447 		send_turnstile = turnstile_prepare((uintptr_t)port,
448 		    port_send_turnstile_address(port),
449 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
450 
451 		ipc_port_send_update_inheritor(port, send_turnstile,
452 		    TURNSTILE_DELAYED_UPDATE);
453 
454 		wresult = waitq_assert_wait64_leeway(
455 			&send_turnstile->ts_waitq,
456 			IPC_MQUEUE_FULL,
457 			THREAD_ABORTSAFE,
458 			TIMEOUT_URGENCY_USER_NORMAL,
459 			deadline,
460 			TIMEOUT_NO_LEEWAY);
461 
462 		ip_mq_unlock(port);
463 		turnstile_update_inheritor_complete(send_turnstile,
464 		    TURNSTILE_INTERLOCK_NOT_HELD);
465 
466 		if (wresult == THREAD_WAITING) {
467 			wresult = thread_block(THREAD_CONTINUE_NULL);
468 		}
469 
470 		/* Call turnstile complete with interlock held */
471 		ip_mq_lock(port);
472 		turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC);
473 		ip_mq_unlock(port);
474 
475 		/* Call cleanup after dropping the interlock */
476 		turnstile_cleanup();
477 
478 		switch (wresult) {
479 		case THREAD_AWAKENED:
480 			/*
481 			 * we can proceed - inherited msgcount from waker
482 			 * or the message queue has been destroyed and the msgcount
483 			 * has been reset to zero (will detect in ipc_mqueue_post()).
484 			 */
485 			break;
486 
487 		case THREAD_TIMED_OUT:
488 			assert(option & MACH_SEND_TIMEOUT);
489 			return MACH_SEND_TIMED_OUT;
490 
491 		case THREAD_INTERRUPTED:
492 			return MACH_SEND_INTERRUPTED;
493 
494 		case THREAD_RESTART:
495 			/* mqueue is being destroyed */
496 			return MACH_SEND_INVALID_DEST;
497 		default:
498 			panic("ipc_mqueue_send");
499 		}
500 	}
501 
502 	ipc_mqueue_post(mqueue, kmsg, option);
503 	return MACH_MSG_SUCCESS;
504 }
505 
506 /*
507  *	Routine:	ipc_mqueue_override_send_locked
508  *	Purpose:
509  *		Set an override qos on the first message in the queue
510  *		(if the queue is full). This is a send-possible override
511  *		that will go away as soon as we drain a message from the
512  *		queue.
513  *
514  *	Conditions:
515  *		The port corresponding to mqueue is locked.
516  *		The caller holds a reference on the message queue.
517  */
518 void
ipc_mqueue_override_send_locked(ipc_mqueue_t mqueue,mach_msg_qos_t qos_ovr)519 ipc_mqueue_override_send_locked(
520 	ipc_mqueue_t        mqueue,
521 	mach_msg_qos_t      qos_ovr)
522 {
523 	ipc_port_t port = ip_from_mq(mqueue);
524 
525 	assert(waitq_is_valid(&port->ip_waitq));
526 
527 	if (imq_full(mqueue)) {
528 		ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
529 
530 		if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, qos_ovr)) {
531 			if (ip_in_a_space(port) &&
532 			    is_active(ip_get_receiver(port)) &&
533 			    ipc_port_has_klist(port)) {
534 				KNOTE(&port->ip_klist, 0);
535 			}
536 		}
537 	}
538 }
539 
540 /*
541  *	Routine:	ipc_mqueue_release_msgcount
542  *	Purpose:
543  *		Release a message queue reference in the case where we
544  *		found a waiter.
545  *
546  *	Conditions:
547  *		The port corresponding to message queue is locked.
548  *		The message corresponding to this reference is off the queue.
549  *		There is no need to pass reserved preposts because this will
550  *		never prepost to anyone
551  */
552 void
ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)553 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)
554 {
555 	ipc_port_t port = ip_from_mq(port_mq);
556 	struct turnstile *send_turnstile = port_send_turnstile(port);
557 
558 	ip_mq_lock_held(port);
559 	assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
560 
561 	port_mq->imq_msgcount--;
562 
563 	if (!imq_full(port_mq) && port->ip_fullwaiters &&
564 	    send_turnstile != TURNSTILE_NULL) {
565 		/*
566 		 * boost the priority of the awoken thread
567 		 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
568 		 * the message queue slot we've just reserved.
569 		 *
570 		 * NOTE: this will never prepost
571 		 *
572 		 * The wakeup happens on a turnstile waitq
573 		 * which will wakeup the highest priority waiter.
574 		 * A potential downside of this would be starving low
575 		 * priority senders if there is a constant churn of
576 		 * high priority threads trying to send to this port.
577 		 */
578 		if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
579 		    IPC_MQUEUE_FULL,
580 		    THREAD_AWAKENED,
581 		    WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
582 			port->ip_fullwaiters = false;
583 		} else {
584 			/* gave away our slot - add reference back */
585 			port_mq->imq_msgcount++;
586 		}
587 	}
588 
589 	if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
590 		waitq_clear_prepost_locked(&port->ip_waitq);
591 	}
592 }
593 
594 /*
595  *	Routine:	ipc_mqueue_post
596  *	Purpose:
597  *		Post a message to a waiting receiver or enqueue it.  If a
598  *		receiver is waiting, we can release our reserved space in
599  *		the message queue.
600  *
601  *	Conditions:
602  *		port is unlocked
603  *		If we need to queue, our space in the message queue is reserved.
604  */
605 static void
ipc_mqueue_post(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option_t __unused option)606 ipc_mqueue_post(
607 	ipc_mqueue_t               mqueue,
608 	ipc_kmsg_t                 kmsg,
609 	mach_msg_option_t __unused option)
610 {
611 	ipc_port_t port = ip_from_mq(mqueue);
612 	struct waitq *waitq = &port->ip_waitq;
613 	boolean_t destroy_msg = FALSE;
614 
615 	ipc_kmsg_trace_send(kmsg, option);
616 
617 	/*
618 	 *	While the msg queue is locked, we have control of the
619 	 *	kmsg, so the ref in it for the port is still good.
620 	 *
621 	 *	Check for a receiver for the message.
622 	 */
623 	ip_mq_lock(port);
624 
625 	/* we may have raced with port destruction! */
626 	if (!waitq_is_valid(&port->ip_waitq)) {
627 		destroy_msg = TRUE;
628 		goto out_unlock;
629 	}
630 
631 	for (;;) {
632 		thread_t receiver;
633 		mach_msg_size_t msize, asize;
634 
635 		receiver = waitq_wakeup64_identify_locked(waitq,
636 		    IPC_MQUEUE_RECEIVE, THREAD_AWAKENED, WAITQ_KEEP_LOCKED);
637 		/* waitq still locked, thread not runnable */
638 
639 		if (receiver == THREAD_NULL) {
640 			/*
641 			 * no receivers; queue kmsg if space still reserved
642 			 * Reservations are cancelled when the port goes inactive.
643 			 * note that this will enqueue the message for any
644 			 * "peeking" receivers.
645 			 *
646 			 * Also, post the knote to wake up any threads waiting
647 			 * on that style of interface if this insertion is of
648 			 * note (first insertion, or adjusted override qos all
649 			 * the way to the head of the queue).
650 			 *
651 			 * This is just for ports. port-sets knotes are being
652 			 * posted to by the waitq_wakeup64_identify_locked()
653 			 * above already.
654 			 */
655 			if (mqueue->imq_msgcount == 0) {
656 				/*
657 				 * The message queue must belong
658 				 * to an inactive port, so just destroy
659 				 * the message and pretend it was posted.
660 				 */
661 				destroy_msg = TRUE;
662 			} else if (!ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
663 				/*
664 				 * queue was not empty and qos
665 				 * didn't change, nothing to do.
666 				 */
667 			} else if (ip_in_a_space(port) &&
668 			    is_active(ip_get_receiver(port)) &&
669 			    ipc_port_has_klist(port)) {
670 				/*
671 				 * queue was empty or qos changed
672 				 * we need to tell kqueue, unless
673 				 * the space is getting torn down
674 				 */
675 				KNOTE(&port->ip_klist, 0);
676 			}
677 			break;
678 		}
679 
680 #if MACH_FLIPC
681 		/*
682 		 * If a thread is attempting a "peek" into the message queue
683 		 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
684 		 * thread running.  A successful peek is essentially the same as
685 		 * message delivery since the peeking thread takes responsibility
686 		 * for delivering the message and (eventually) removing it from
687 		 * the mqueue.  Only one thread can successfully use the peek
688 		 * facility on any given port, so we exit the waitq loop after
689 		 * encountering such a thread.
690 		 */
691 		if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
692 			ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
693 			ipc_mqueue_peek_on_thread_locked(mqueue, receiver->ith_option, receiver);
694 			waitq_resume_identified_thread(waitq, receiver,
695 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
696 			break; /* Message was posted, so break out of loop */
697 		}
698 #endif /* MACH_FLIPC */
699 
700 		/*
701 		 * If the receiver waited with a facility not directly related
702 		 * to Mach messaging, then it isn't prepared to get handed the
703 		 * message directly. Just set it running, and go look for
704 		 * another thread that can.
705 		 */
706 		if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
707 			waitq_resume_identified_thread(waitq, receiver,
708 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
709 
710 			continue;
711 		}
712 
713 
714 		/*
715 		 * We found a waiting thread.
716 		 * If the message is too large or the scatter list is too small
717 		 * the thread we wake up will get that as its status.
718 		 */
719 		msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
720 		asize = ipc_kmsg_aux_data_size(kmsg);
721 
722 		if (ipc_kmsg_too_large(msize, asize, receiver->ith_option,
723 		    receiver->ith_max_msize, receiver->ith_max_asize, receiver)) {
724 			receiver->ith_msize = msize;
725 			receiver->ith_asize = asize;
726 			receiver->ith_state = MACH_RCV_TOO_LARGE;
727 		} else {
728 			receiver->ith_state = MACH_MSG_SUCCESS;
729 		}
730 
731 		/*
732 		 * If there is no problem with the upcoming receive, or the
733 		 * receiver thread didn't specifically ask for special too
734 		 * large error condition, go ahead and select it anyway.
735 		 */
736 		if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
737 		    !(receiver->ith_option & MACH_RCV_LARGE)) {
738 			receiver->ith_kmsg = kmsg;
739 			receiver->ith_seqno = mqueue->imq_seqno++;
740 #if MACH_FLIPC
741 			mach_node_t node = kmsg->ikm_node;
742 #endif
743 			waitq_resume_identified_thread(waitq, receiver,
744 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
745 
746 			/* we didn't need our reserved spot in the queue */
747 			ipc_mqueue_release_msgcount(mqueue);
748 
749 #if MACH_FLIPC
750 			if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
751 				flipc_msg_ack(node, mqueue, TRUE);
752 			}
753 #endif
754 			break;
755 		}
756 
757 		/*
758 		 * Otherwise, this thread needs to be released to run
759 		 * and handle its error without getting the message.  We
760 		 * need to go back and pick another one.
761 		 */
762 		receiver->ith_receiver_name = mqueue->imq_receiver_name;
763 		receiver->ith_kmsg = IKM_NULL;
764 		receiver->ith_seqno = 0;
765 
766 		waitq_resume_identified_thread(waitq, receiver,
767 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
768 	}
769 
770 out_unlock:
771 	/* clear the waitq boost we may have been given */
772 	waitq_clear_promotion_locked(waitq, current_thread());
773 	waitq_unlock(waitq);
774 
775 	if (destroy_msg) {
776 		ipc_kmsg_destroy(kmsg, IPC_KMSG_DESTROY_ALL);
777 	}
778 
779 	counter_inc(&current_task()->messages_sent);
780 	return;
781 }
782 
783 
784 static void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)785 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
786 {
787 	thread_t                self = current_thread();
788 	mach_msg_option64_t     option64 = self->ith_option;
789 
790 	/*
791 	 * why did we wake up?
792 	 */
793 	switch (saved_wait_result) {
794 	case THREAD_TIMED_OUT:
795 		self->ith_state = MACH_RCV_TIMED_OUT;
796 		return;
797 
798 	case THREAD_INTERRUPTED:
799 		self->ith_state = MACH_RCV_INTERRUPTED;
800 		return;
801 
802 	case THREAD_RESTART:
803 		/* something bad happened to the port/set */
804 		self->ith_state = MACH_RCV_PORT_CHANGED;
805 		return;
806 
807 	case THREAD_AWAKENED:
808 		/*
809 		 * We do not need to go select a message, somebody
810 		 * handed us one (or a too-large indication).
811 		 */
812 		switch (self->ith_state) {
813 		case MACH_RCV_SCATTER_SMALL:
814 		case MACH_RCV_TOO_LARGE:
815 			/*
816 			 * Somebody tried to give us a too large
817 			 * message. If we indicated that we cared,
818 			 * then they only gave us the indication,
819 			 * otherwise they gave us the indication
820 			 * AND the message anyway.
821 			 */
822 			if (option64 & MACH_RCV_LARGE) {
823 				return;
824 			}
825 			return;
826 		case MACH_MSG_SUCCESS:
827 			return;
828 #if MACH_FLIPC
829 		case MACH_PEEK_READY:
830 			return;
831 #endif /* MACH_FLIPC */
832 
833 		default:
834 			panic("ipc_mqueue_receive_results: strange ith_state %d", self->ith_state);
835 		}
836 
837 	default:
838 		panic("ipc_mqueue_receive_results: strange wait_result %d", saved_wait_result);
839 	}
840 }
841 
842 void
ipc_mqueue_receive_continue(__unused void * param,wait_result_t wresult)843 ipc_mqueue_receive_continue(
844 	__unused void *param,
845 	wait_result_t wresult)
846 {
847 	ipc_mqueue_receive_results(wresult);
848 	mach_msg_receive_continue();  /* hard-coded for now */
849 }
850 
851 /*
852  *	Routine:	ipc_mqueue_receive
853  *	Purpose:
854  *		Receive a message from a message queue.
855  *
856  *	Conditions:
857  *		Our caller must hold a reference for the port or port set
858  *		to which this queue belongs, to keep the queue
859  *		from being deallocated.
860  *
861  *		The kmsg is returned with clean header fields
862  *		and with the circular bit turned off through the ith_kmsg
863  *		field of the thread's receive continuation state.
864  *	Returns:
865  *		MACH_MSG_SUCCESS	Message returned in ith_kmsg.
866  *		MACH_RCV_TOO_LARGE	Message size returned in ith_msize,
867  *                          Auxiliary data size returned in ith_asize
868  *		MACH_RCV_TIMED_OUT	No message obtained.
869  *		MACH_RCV_INTERRUPTED	No message obtained.
870  *		MACH_RCV_PORT_DIED	Port/set died; no message.
871  *		MACH_RCV_PORT_CHANGED	Port moved into set; no msg.
872  *
873  */
874 
875 void
ipc_mqueue_receive(struct waitq * waitq,mach_msg_option64_t option64,mach_msg_size_t max_size,mach_msg_size_t max_aux_size,mach_msg_timeout_t rcv_timeout,int interruptible,bool has_continuation)876 ipc_mqueue_receive(
877 	struct waitq            *waitq,
878 	mach_msg_option64_t     option64,
879 	mach_msg_size_t         max_size,
880 	mach_msg_size_t         max_aux_size,   /* 0 if no aux buffer */
881 	mach_msg_timeout_t      rcv_timeout,
882 	int                     interruptible,
883 	bool                    has_continuation)
884 {
885 	wait_result_t           wresult;
886 	thread_t                self = current_thread();
887 
888 	waitq_lock(waitq);
889 
890 	wresult = ipc_mqueue_receive_on_thread_and_unlock(waitq, option64, max_size,
891 	    max_aux_size, rcv_timeout, interruptible, self);
892 	/* object unlocked */
893 	if (wresult == THREAD_NOT_WAITING) {
894 		return;
895 	}
896 
897 	if (wresult == THREAD_WAITING) {
898 		if (has_continuation) {
899 			wresult = thread_block(ipc_mqueue_receive_continue);
900 			/* NOTREACHED */
901 		}
902 		wresult = thread_block(THREAD_CONTINUE_NULL);
903 	}
904 	ipc_mqueue_receive_results(wresult);
905 }
906 
907 /*
908  *	Routine:	ipc_mqueue_receive_on_thread_and_unlock
909  *	Purpose:
910  *		Receive a message from a message queue using a specified thread.
911  *		If no message available, assert_wait on the appropriate waitq.
912  *
913  *	Conditions:
914  *		Assumes thread is self.
915  *		The port/port-set waitq is locked on entry, unlocked on return.
916  *		May have assert-waited. Caller must block in those cases.
917  */
918 wait_result_t
ipc_mqueue_receive_on_thread_and_unlock(struct waitq * waitq,mach_msg_option64_t option64,mach_msg_size_t max_msg_size,mach_msg_size_t max_aux_size,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread)919 ipc_mqueue_receive_on_thread_and_unlock(
920 	struct waitq            *waitq,
921 	mach_msg_option64_t     option64,
922 	mach_msg_size_t         max_msg_size,
923 	mach_msg_size_t         max_aux_size,
924 	mach_msg_timeout_t      rcv_timeout,
925 	int                     interruptible,
926 	thread_t                thread)
927 {
928 	ipc_object_t            object = io_from_waitq(waitq);
929 	ipc_port_t              port = IP_NULL;
930 	wait_result_t           wresult;
931 	uint64_t                deadline;
932 	struct turnstile        *rcv_turnstile = TURNSTILE_NULL;
933 
934 	if (waitq_type(waitq) == WQT_PORT_SET) {
935 		ipc_pset_t pset = ips_object_to_pset(object);
936 		wqs_prepost_flags_t wqs_flags = WQS_PREPOST_LOCK;
937 		struct waitq *port_wq;
938 
939 		/*
940 		 * Put the message at the back of the prepost list
941 		 * if it's not a PEEK.
942 		 *
943 		 * Might drop the pset lock temporarily.
944 		 */
945 #if MACH_FLIPC
946 		if (option64 & MACH64_PEEK_MSG) {
947 			wqs_flags |= WQS_PREPOST_PEEK;
948 		}
949 #endif /* MACH_FLIPC */
950 		port_wq = waitq_set_first_prepost(&pset->ips_wqset, wqs_flags);
951 
952 		/* Returns with port locked */
953 
954 		if (port_wq != NULL) {
955 			/*
956 			 * We get here if there is at least one message
957 			 * waiting on port_wq. We have instructed the prepost
958 			 * iteration logic to leave both the port_wq and the
959 			 * set waitq locked.
960 			 *
961 			 * Continue on to handling the message with just
962 			 * the port waitq locked.
963 			 */
964 			io_unlock(object);
965 			port = ip_from_waitq(port_wq);
966 		}
967 	} else if (waitq_type(waitq) == WQT_PORT) {
968 		port = ip_from_waitq(waitq);
969 		if (ipc_kmsg_queue_empty(&port->ip_messages.imq_messages)) {
970 			port = IP_NULL;
971 		}
972 	} else {
973 		panic("Unknown waitq type (%p/0x%x)", waitq, waitq_type(waitq));
974 	}
975 
976 	if (port) {
977 #if MACH_FLIPC
978 		if (option64 & MACH64_PEEK_MSG) {
979 			ipc_mqueue_peek_on_thread_locked(&port->ip_messages,
980 			    option64, thread);
981 		} else
982 #endif /* MACH_FLIPC */
983 		{
984 			ipc_mqueue_select_on_thread_locked(&port->ip_messages,
985 			    option64, max_msg_size, max_aux_size, thread);
986 		}
987 		ip_mq_unlock(port);
988 		return THREAD_NOT_WAITING;
989 	}
990 
991 	if (!waitq_is_valid(waitq)) {
992 		/* someone raced us to destroy this mqueue/port! */
993 		io_unlock(object);
994 		/*
995 		 * ipc_mqueue_receive_results updates the thread's ith_state
996 		 * TODO: differentiate between rights being moved and
997 		 * rights/ports being destroyed (21885327)
998 		 */
999 		return THREAD_RESTART;
1000 	}
1001 
1002 	/*
1003 	 * Looks like we'll have to block.  The waitq we will
1004 	 * block on (whether the set's or the local port's) is
1005 	 * still locked.
1006 	 */
1007 	if ((option64 & MACH_RCV_TIMEOUT) && rcv_timeout == 0) {
1008 		io_unlock(object);
1009 		thread->ith_state = MACH_RCV_TIMED_OUT;
1010 		return THREAD_NOT_WAITING;
1011 	}
1012 
1013 	thread->ith_option = option64;
1014 	thread->ith_max_msize = max_msg_size;
1015 	thread->ith_msize = 0;
1016 
1017 	thread->ith_max_asize = max_aux_size;
1018 	thread->ith_asize = 0;
1019 
1020 	thread->ith_state = MACH_RCV_IN_PROGRESS;
1021 #if MACH_FLIPC
1022 	if (option64 & MACH64_PEEK_MSG) {
1023 		thread->ith_state = MACH_PEEK_IN_PROGRESS;
1024 	}
1025 #endif /* MACH_FLIPC */
1026 
1027 	if (option64 & MACH_RCV_TIMEOUT) {
1028 		clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
1029 	} else {
1030 		deadline = 0;
1031 	}
1032 
1033 	/*
1034 	 * Threads waiting on a reply port (not portset)
1035 	 * will wait on its receive turnstile.
1036 	 *
1037 	 * Donate waiting thread's turnstile and
1038 	 * setup inheritor for special reply port.
1039 	 * Based on the state of the special reply
1040 	 * port, the inheritor would be the send
1041 	 * turnstile of the connection port on which
1042 	 * the send of sync ipc would happen or
1043 	 * workloop's turnstile who would reply to
1044 	 * the sync ipc message.
1045 	 *
1046 	 * Pass in mqueue wait in waitq_assert_wait to
1047 	 * support port set wakeup. The mqueue waitq of port
1048 	 * will be converted to to turnstile waitq
1049 	 * in waitq_assert_wait instead of global waitqs.
1050 	 */
1051 	if (waitq_type(waitq) == WQT_PORT) {
1052 		port = ip_from_waitq(waitq);
1053 		rcv_turnstile = turnstile_prepare((uintptr_t)port,
1054 		    port_rcv_turnstile_address(port),
1055 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
1056 
1057 		ipc_port_recv_update_inheritor(port, rcv_turnstile,
1058 		    TURNSTILE_DELAYED_UPDATE);
1059 	}
1060 
1061 	thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
1062 	wresult = waitq_assert_wait64_locked(waitq,
1063 	    IPC_MQUEUE_RECEIVE,
1064 	    interruptible,
1065 	    TIMEOUT_URGENCY_USER_NORMAL,
1066 	    deadline,
1067 	    TIMEOUT_NO_LEEWAY,
1068 	    thread);
1069 	if (wresult == THREAD_AWAKENED) {
1070 		/*
1071 		 * The first thing we did was to look for preposts
1072 		 * (using waitq_set_first_prepost() for sets, or looking
1073 		 * at the port's queue for ports).
1074 		 *
1075 		 * Since we found none, we kept the waitq locked.
1076 		 *
1077 		 * It ensures that waitq_assert_wait64_locked() can't
1078 		 * find pre-posts either, won't drop the waitq lock
1079 		 * either (even for a set), and can't return THREAD_AWAKENED.
1080 		 */
1081 		panic("ipc_mqueue_receive_on_thread: sleep walking");
1082 	}
1083 
1084 	io_unlock(object);
1085 
1086 	/*
1087 	 * After this point, a waiting thread could be found by the wakeup
1088 	 * identify path, and the other side now owns the ith_ fields until
1089 	 * this thread blocks and resumes in the continuation
1090 	 */
1091 
1092 	/* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1093 	if (rcv_turnstile != TURNSTILE_NULL) {
1094 		turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
1095 	}
1096 	/* Its callers responsibility to call turnstile_complete to get the turnstile back */
1097 
1098 	return wresult;
1099 }
1100 
1101 #if MACH_FLIPC
1102 /*
1103  *	Routine:	ipc_mqueue_peek_on_thread_locked
1104  *	Purpose:
1105  *		A receiver discovered that there was a message on the queue
1106  *		before he had to block. Tell a thread about the message queue,
1107  *		but don't pick off any messages.
1108  *	Conditions:
1109  *		port_mq locked
1110  *		at least one message on port_mq's message queue
1111  *
1112  *	Returns: (on thread->ith_state)
1113  *		MACH_PEEK_READY		ith_peekq contains a message queue
1114  */
1115 void
ipc_mqueue_peek_on_thread_locked(ipc_mqueue_t port_mq,__assert_only mach_msg_option64_t option64,thread_t thread)1116 ipc_mqueue_peek_on_thread_locked(
1117 	ipc_mqueue_t        port_mq,
1118 	__assert_only mach_msg_option64_t option64,
1119 	thread_t            thread)
1120 {
1121 	assert(option64 & MACH64_PEEK_MSG);
1122 	assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
1123 
1124 	/*
1125 	 * Take a reference on the mqueue's associated port:
1126 	 * the peeking thread will be responsible to release this reference
1127 	 */
1128 	ip_validate(ip_from_mq(port_mq));
1129 	ip_reference(ip_from_mq(port_mq));
1130 	thread->ith_peekq = port_mq;
1131 	thread->ith_state = MACH_PEEK_READY;
1132 }
1133 #endif /* MACH_FLIPC */
1134 
1135 /*
1136  *	Routine:	ipc_mqueue_select_on_thread_locked
1137  *	Purpose:
1138  *		A receiver discovered that there was a message on the queue
1139  *		before he had to block.  Pick the message off the queue and
1140  *		"post" it to thread.
1141  *	Conditions:
1142  *		port locked.
1143  *              thread not locked.
1144  *		There is a message.
1145  *		No need to reserve prepost objects - it will never prepost
1146  *
1147  *	Returns:
1148  *		MACH_MSG_SUCCESS	Actually selected a message for ourselves.
1149  *		MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
1150  */
1151 void
ipc_mqueue_select_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option64_t option64,mach_msg_size_t max_msg_size,mach_msg_size_t max_aux_size,thread_t thread)1152 ipc_mqueue_select_on_thread_locked(
1153 	ipc_mqueue_t            port_mq,
1154 	mach_msg_option64_t     option64,
1155 	mach_msg_size_t         max_msg_size,
1156 	mach_msg_size_t         max_aux_size,
1157 	thread_t                thread)
1158 {
1159 	ipc_kmsg_t kmsg;
1160 	mach_msg_size_t msize, asize;
1161 
1162 	mach_msg_return_t mr = MACH_MSG_SUCCESS;
1163 
1164 	/*
1165 	 * Do some sanity checking of our ability to receive
1166 	 * before pulling the message off the queue.
1167 	 */
1168 	kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1169 	assert(kmsg != IKM_NULL);
1170 
1171 	/*
1172 	 * If we really can't receive it, but we had the
1173 	 * MACH_RCV_LARGE option set, then don't take it off
1174 	 * the queue, instead return the appropriate error
1175 	 * (and size needed).
1176 	 */
1177 	msize = ipc_kmsg_copyout_size(kmsg, thread->map);
1178 	asize = ipc_kmsg_aux_data_size(kmsg);
1179 
1180 	if (ipc_kmsg_too_large(msize, asize, option64,
1181 	    max_msg_size, max_aux_size, thread)) {
1182 		mr = MACH_RCV_TOO_LARGE;
1183 		if (option64 & MACH_RCV_LARGE) {
1184 			ipc_kmsg_validate_partial_sig(kmsg);
1185 			thread->ith_receiver_name = port_mq->imq_receiver_name;
1186 			thread->ith_kmsg = IKM_NULL;
1187 			thread->ith_msize = msize;
1188 			thread->ith_asize = asize;
1189 			thread->ith_seqno = 0;
1190 			thread->ith_state = mr;
1191 			return;
1192 		}
1193 	}
1194 
1195 	ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1196 #if MACH_FLIPC
1197 	if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
1198 		flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
1199 	}
1200 #endif
1201 	ipc_mqueue_release_msgcount(port_mq);
1202 	thread->ith_seqno = port_mq->imq_seqno++;
1203 	thread->ith_kmsg = kmsg;
1204 	thread->ith_state = mr;
1205 
1206 	counter_inc(&current_task()->messages_received);
1207 	return;
1208 }
1209 
1210 /*
1211  *	Routine:	ipc_mqueue_peek_locked
1212  *	Purpose:
1213  *		Peek at a (non-set) message queue to see if it has a message
1214  *		matching the sequence number provided (if zero, then the
1215  *		first message in the queue) and return vital info about the
1216  *		message.
1217  *
1218  *	Conditions:
1219  *		The io object corresponding to mq is locked by callers.
1220  *		Other locks may be held by callers, so this routine cannot block.
1221  *		Caller holds reference on the message queue.
1222  */
1223 unsigned
ipc_mqueue_peek_locked(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1224 ipc_mqueue_peek_locked(ipc_mqueue_t mq,
1225     mach_port_seqno_t * seqnop,
1226     mach_msg_size_t * msg_sizep,
1227     mach_msg_id_t * msg_idp,
1228     mach_msg_max_trailer_t * msg_trailerp,
1229     ipc_kmsg_t *kmsgp)
1230 {
1231 	ipc_kmsg_queue_t kmsgq;
1232 	ipc_kmsg_t kmsg;
1233 	mach_port_seqno_t seqno, msgoff;
1234 	unsigned res = 0;
1235 	mach_msg_header_t *hdr;
1236 
1237 	seqno = 0;
1238 	if (seqnop != NULL) {
1239 		seqno = *seqnop;
1240 	}
1241 
1242 	if (seqno == 0) {
1243 		seqno = mq->imq_seqno;
1244 		msgoff = 0;
1245 	} else if (seqno >= mq->imq_seqno &&
1246 	    seqno < mq->imq_seqno + mq->imq_msgcount) {
1247 		msgoff = seqno - mq->imq_seqno;
1248 	} else {
1249 		goto out;
1250 	}
1251 
1252 	/* look for the message that would match that seqno */
1253 	kmsgq = &mq->imq_messages;
1254 	kmsg = ipc_kmsg_queue_first(kmsgq);
1255 	while (msgoff-- && kmsg != IKM_NULL) {
1256 		kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1257 	}
1258 	if (kmsg == IKM_NULL) {
1259 		goto out;
1260 	}
1261 
1262 	/*
1263 	 * Validate kmsg signature before doing anything with it. Since we are holding
1264 	 * the mqueue lock here, and only header + trailer will be peeked on, just
1265 	 * do a partial validation to finish quickly.
1266 	 *
1267 	 * Partial kmsg signature is only supported on PAC devices.
1268 	 */
1269 	ipc_kmsg_validate_partial_sig(kmsg);
1270 
1271 	hdr = ikm_header(kmsg);
1272 	/* found one - return the requested info */
1273 	if (seqnop != NULL) {
1274 		*seqnop = seqno;
1275 	}
1276 	if (msg_sizep != NULL) {
1277 		*msg_sizep = hdr->msgh_size;
1278 	}
1279 	if (msg_idp != NULL) {
1280 		*msg_idp = hdr->msgh_id;
1281 	}
1282 	if (msg_trailerp != NULL) {
1283 		memcpy(msg_trailerp, ipc_kmsg_get_trailer(kmsg, false), sizeof(mach_msg_max_trailer_t));
1284 	}
1285 	if (kmsgp != NULL) {
1286 		*kmsgp = kmsg;
1287 	}
1288 
1289 	res = 1;
1290 
1291 out:
1292 	return res;
1293 }
1294 
1295 
1296 /*
1297  *	Routine:	ipc_mqueue_peek
1298  *	Purpose:
1299  *		Peek at a (non-set) message queue to see if it has a message
1300  *		matching the sequence number provided (if zero, then the
1301  *		first message in the queue) and return vital info about the
1302  *		message.
1303  *
1304  *	Conditions:
1305  *		The ipc_mqueue_t is unlocked.
1306  *		Locks may be held by callers, so this routine cannot block.
1307  *		Caller holds reference on the message queue.
1308  */
1309 unsigned
ipc_mqueue_peek(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1310 ipc_mqueue_peek(ipc_mqueue_t mq,
1311     mach_port_seqno_t * seqnop,
1312     mach_msg_size_t * msg_sizep,
1313     mach_msg_id_t * msg_idp,
1314     mach_msg_max_trailer_t * msg_trailerp,
1315     ipc_kmsg_t *kmsgp)
1316 {
1317 	ipc_port_t port = ip_from_mq(mq);
1318 	unsigned res;
1319 
1320 	ip_mq_lock(port);
1321 
1322 	res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
1323 	    msg_trailerp, kmsgp);
1324 
1325 	ip_mq_unlock(port);
1326 	return res;
1327 }
1328 
1329 #if MACH_FLIPC
1330 /*
1331  *	Routine:	ipc_mqueue_release_peek_ref
1332  *	Purpose:
1333  *		Release the reference on an mqueue's associated port which was
1334  *		granted to a thread in ipc_mqueue_peek_on_thread (on the
1335  *		MACH64_PEEK_MSG thread wakeup path).
1336  *
1337  *	Conditions:
1338  *		The ipc_mqueue_t should be locked on entry.
1339  *		The ipc_mqueue_t will be _unlocked_ on return
1340  *			(and potentially invalid!)
1341  *
1342  */
1343 void
ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)1344 ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)
1345 {
1346 	ipc_port_t port = ip_from_mq(mqueue);
1347 
1348 	ip_mq_lock_held(port);
1349 
1350 	/*
1351 	 * clear any preposts this mq may have generated
1352 	 * (which would cause subsequent immediate wakeups)
1353 	 */
1354 	waitq_clear_prepost_locked(&port->ip_waitq);
1355 
1356 	ip_mq_unlock(port);
1357 
1358 	/*
1359 	 * release the port reference: we need to do this outside the lock
1360 	 * because we might be holding the last port reference!
1361 	 **/
1362 	ip_release(port);
1363 }
1364 #endif /* MACH_FLIPC */
1365 
1366 /*
1367  *	Routine:	ipc_mqueue_destroy_locked
1368  *	Purpose:
1369  *		Destroy a message queue.
1370  *		Set any blocked senders running.
1371  *		Destroy the kmsgs in the queue.
1372  *	Conditions:
1373  *		port locked
1374  *		Receivers were removed when the receive right was "changed"
1375  */
1376 boolean_t
ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue,waitq_link_list_t * free_l)1377 ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue, waitq_link_list_t *free_l)
1378 {
1379 	ipc_port_t port = ip_from_mq(mqueue);
1380 	boolean_t reap = FALSE;
1381 	struct turnstile *send_turnstile = port_send_turnstile(port);
1382 
1383 	/*
1384 	 *	rouse all blocked senders
1385 	 *	(don't boost anyone - we're tearing this queue down)
1386 	 *	(never preposts)
1387 	 */
1388 	port->ip_fullwaiters = false;
1389 
1390 	if (send_turnstile != TURNSTILE_NULL) {
1391 		waitq_wakeup64_all(&send_turnstile->ts_waitq,
1392 		    IPC_MQUEUE_FULL,
1393 		    THREAD_RESTART, WAITQ_WAKEUP_DEFAULT);
1394 	}
1395 
1396 #if MACH_FLIPC
1397 	ipc_kmsg_t kmsg;
1398 
1399 	cqe_foreach_element_safe(kmsg, &mqueue->imq_messages, ikm_link) {
1400 		if (MACH_NODE_VALID(kmsg->ikm_node) &&
1401 		    FPORT_VALID(mqueue->imq_fport)) {
1402 			flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
1403 		}
1404 	}
1405 #endif
1406 
1407 	/*
1408 	 * Move messages from the specified queue to the per-thread
1409 	 * clean/drain queue while we have the mqueue lock.
1410 	 */
1411 	reap = ipc_kmsg_delayed_destroy_queue(&mqueue->imq_messages);
1412 
1413 	/*
1414 	 * Wipe out message count, both for messages about to be
1415 	 * reaped and for reserved space for (previously) woken senders.
1416 	 * This is the indication to them that their reserved space is gone
1417 	 * (the mqueue was destroyed).
1418 	 */
1419 	mqueue->imq_msgcount = 0;
1420 
1421 	/*
1422 	 * invalidate the waitq for subsequent mqueue operations,
1423 	 * the port lock could be dropped after invalidating the mqueue.
1424 	 */
1425 
1426 	waitq_invalidate(&port->ip_waitq);
1427 
1428 	waitq_unlink_all_locked(&port->ip_waitq, NULL, free_l);
1429 
1430 	return reap;
1431 }
1432 
1433 /*
1434  *	Routine:	ipc_mqueue_set_qlimit_locked
1435  *	Purpose:
1436  *		Changes a message queue limit; the maximum number
1437  *		of messages which may be queued.
1438  *	Conditions:
1439  *		Port locked.
1440  */
1441 
1442 void
ipc_mqueue_set_qlimit_locked(ipc_mqueue_t mqueue,mach_port_msgcount_t qlimit)1443 ipc_mqueue_set_qlimit_locked(
1444 	ipc_mqueue_t           mqueue,
1445 	mach_port_msgcount_t   qlimit)
1446 {
1447 	ipc_port_t port = ip_from_mq(mqueue);
1448 
1449 	assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1450 
1451 	/* wake up senders allowed by the new qlimit */
1452 	if (qlimit > mqueue->imq_qlimit) {
1453 		mach_port_msgcount_t i, wakeup;
1454 		struct turnstile *send_turnstile = port_send_turnstile(port);
1455 
1456 		/* caution: wakeup, qlimit are unsigned */
1457 		wakeup = qlimit - mqueue->imq_qlimit;
1458 
1459 		for (i = 0; i < wakeup; i++) {
1460 			/*
1461 			 * boost the priority of the awoken thread
1462 			 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1463 			 * the message queue slot we've just reserved.
1464 			 *
1465 			 * NOTE: this will never prepost
1466 			 */
1467 			if (send_turnstile == TURNSTILE_NULL ||
1468 			    waitq_wakeup64_one(&send_turnstile->ts_waitq,
1469 			    IPC_MQUEUE_FULL,
1470 			    THREAD_AWAKENED,
1471 			    WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
1472 				port->ip_fullwaiters = false;
1473 				break;
1474 			}
1475 			mqueue->imq_msgcount++;  /* give it to the awakened thread */
1476 		}
1477 	}
1478 	mqueue->imq_qlimit = (uint16_t)qlimit;
1479 }
1480 
1481 /*
1482  *	Routine:	ipc_mqueue_set_seqno_locked
1483  *	Purpose:
1484  *		Changes an mqueue's sequence number.
1485  *	Conditions:
1486  *		Caller holds a reference to the queue's containing object.
1487  */
1488 void
ipc_mqueue_set_seqno_locked(ipc_mqueue_t mqueue,mach_port_seqno_t seqno)1489 ipc_mqueue_set_seqno_locked(
1490 	ipc_mqueue_t            mqueue,
1491 	mach_port_seqno_t       seqno)
1492 {
1493 	mqueue->imq_seqno = seqno;
1494 }
1495 
1496 
1497 /*
1498  *	Routine:	ipc_mqueue_copyin
1499  *	Purpose:
1500  *		Convert a name in a space to a message queue.
1501  *	Conditions:
1502  *		Nothing locked.  If successful, the caller gets a ref for
1503  *		for the object.	This ref ensures the continued existence of
1504  *		the queue.
1505  *	Returns:
1506  *		MACH_MSG_SUCCESS	Found a message queue.
1507  *		MACH_RCV_INVALID_NAME	The space is dead.
1508  *		MACH_RCV_INVALID_NAME	The name doesn't denote a right.
1509  *		MACH_RCV_INVALID_NAME
1510  *			The denoted right is not receive or port set.
1511  *		MACH_RCV_IN_SET		Receive right is a member of a set.
1512  */
1513 
1514 mach_msg_return_t
ipc_mqueue_copyin(ipc_space_t space,mach_port_name_t name,ipc_object_t * objectp)1515 ipc_mqueue_copyin(
1516 	ipc_space_t             space,
1517 	mach_port_name_t        name,
1518 	ipc_object_t            *objectp)
1519 {
1520 	ipc_entry_bits_t bits;
1521 	ipc_object_t object;
1522 	kern_return_t kr;
1523 
1524 	kr = ipc_right_lookup_read(space, name, &bits, &object);
1525 	if (kr != KERN_SUCCESS) {
1526 		return MACH_RCV_INVALID_NAME;
1527 	}
1528 	/* object is locked and active */
1529 
1530 	if (bits & MACH_PORT_TYPE_RECEIVE) {
1531 		__assert_only ipc_port_t port = ip_object_to_port(object);
1532 		assert(ip_get_receiver_name(port) == name);
1533 		assert(ip_in_space(port, space));
1534 	}
1535 	if (bits & (MACH_PORT_TYPE_RECEIVE | MACH_PORT_TYPE_PORT_SET)) {
1536 		io_reference(object);
1537 		io_unlock(object);
1538 	} else {
1539 		io_unlock(object);
1540 		/* guard exception if we never held the receive right in this entry */
1541 		if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) {
1542 			mach_port_guard_exception(name, 0, 0, kGUARD_EXC_RCV_INVALID_NAME);
1543 		}
1544 		return MACH_RCV_INVALID_NAME;
1545 	}
1546 
1547 	*objectp = object;
1548 	return MACH_MSG_SUCCESS;
1549 }
1550