xref: /xnu-11417.121.6/osfmk/ipc/ipc_mqueue.c (revision a1e26a70f38d1d7daa7b49b258e2f8538ad81650)
1 /*
2  * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 /*
29  * @OSF_FREE_COPYRIGHT@
30  */
31 /*
32  * Mach Operating System
33  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34  * All Rights Reserved.
35  *
36  * Permission to use, copy, modify and distribute this software and its
37  * documentation is hereby granted, provided that both the copyright
38  * notice and this permission notice appear in all copies of the
39  * software, derivative works or modified versions, and any portions
40  * thereof, and that both notices appear in supporting documentation.
41  *
42  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45  *
46  * Carnegie Mellon requests users of this software to return to
47  *
48  *  Software Distribution Coordinator  or  [email protected]
49  *  School of Computer Science
50  *  Carnegie Mellon University
51  *  Pittsburgh PA 15213-3890
52  *
53  * any improvements or extensions that they make and grant Carnegie Mellon
54  * the rights to redistribute these changes.
55  */
56 /*
57  */
58 /*
59  *	File:	ipc/ipc_mqueue.c
60  *	Author:	Rich Draves
61  *	Date:	1989
62  *
63  *	Functions to manipulate IPC message queues.
64  */
65 /*
66  * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67  * support for mandatory and extensible security protections.  This notice
68  * is included in support of clause 2.2 (b) of the Apple Public License,
69  * Version 2.0.
70  */
71 
72 
73 #include <mach/port.h>
74 #include <mach/message.h>
75 #include <mach/sync_policy.h>
76 
77 #include <kern/assert.h>
78 #include <kern/counter.h>
79 #include <kern/sched_prim.h>
80 #include <kern/ipc_kobject.h>
81 #include <kern/ipc_mig.h>       /* XXX - for mach_msg_receive_continue */
82 #include <kern/misc_protos.h>
83 #include <kern/task.h>
84 #include <kern/thread.h>
85 #include <kern/waitq.h>
86 
87 #include <ipc/port.h>
88 #include <ipc/ipc_mqueue.h>
89 #include <ipc/ipc_kmsg.h>
90 #include <ipc/ipc_right.h>
91 #include <ipc/ipc_policy.h>
92 #include <ipc/ipc_port.h>
93 #include <ipc/ipc_pset.h>
94 #include <ipc/ipc_space.h>
95 
96 #if MACH_FLIPC
97 #include <ipc/flipc.h>
98 #endif
99 
100 #ifdef __LP64__
101 #include <vm/vm_map.h>
102 #endif
103 
104 #include <sys/event.h>
105 
106 extern char     *proc_name_address(void *p);
107 
108 int ipc_mqueue_full;            /* address is event for queue space */
109 int ipc_mqueue_rcv;             /* address is event for message arrival */
110 
111 /* forward declarations */
112 static void ipc_mqueue_receive_results(wait_result_t result);
113 
114 #if MACH_FLIPC
115 static void ipc_mqueue_peek_on_thread_locked(
116 	ipc_mqueue_t        port_mq,
117 	mach_msg_option64_t option,
118 	thread_t            thread);
119 #endif /* MACH_FLIPC */
120 
121 /* Deliver message to message queue or waiting receiver */
122 static void ipc_mqueue_post(
123 	ipc_mqueue_t            mqueue,
124 	ipc_kmsg_t              kmsg,
125 	mach_msg_option64_t     option);
126 
127 /*
128  *	Routine:	ipc_mqueue_init
129  *	Purpose:
130  *		Initialize a newly-allocated message queue.
131  */
132 void
ipc_mqueue_init(ipc_mqueue_t mqueue)133 ipc_mqueue_init(
134 	ipc_mqueue_t            mqueue)
135 {
136 	ipc_kmsg_queue_init(&mqueue->imq_messages);
137 	mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
138 	klist_init(&mqueue->imq_klist);
139 }
140 
141 /*
142  *	Routine:	ipc_mqueue_msg_too_large
143  *	Purpose:
144  *		Return true if kmsg is too large to be received:
145  *
146  *      If MACH64_RCV_LINEAR_VECTOR:
147  *          - combined message buffer is not large enough
148  *            to fit both the message (plus trailer) and
149  *            auxiliary data.
150  *      Otherwise:
151  *          - message buffer is not large enough
152  *          - auxiliary buffer is not large enough:
153  *	      (1) kmsg is a vector with aux, but user expects
154  *                a scalar kmsg (ith_max_asize is 0)
155  *            (2) kmsg is a vector with aux, but user aux
156  *                buffer is not large enough.
157  */
158 static bool
ipc_mqueue_msg_too_large(mach_msg_size_t msg_size,mach_msg_size_t trailer_size,mach_msg_size_t aux_size,mach_msg_option64_t options,mach_msg_recv_bufs_t * recv_bufs)159 ipc_mqueue_msg_too_large(
160 	mach_msg_size_t         msg_size,
161 	mach_msg_size_t         trailer_size,
162 	mach_msg_size_t         aux_size,
163 	mach_msg_option64_t     options,
164 	mach_msg_recv_bufs_t   *recv_bufs)
165 {
166 	mach_msg_size_t max_msg_size = recv_bufs->recv_msg_size;
167 	mach_msg_size_t max_aux_size = recv_bufs->recv_aux_size;
168 
169 	if (max_aux_size != 0) {
170 		assert(options & MACH64_MSG_VECTOR);
171 	}
172 
173 	if (options & MACH64_RCV_LINEAR_VECTOR) {
174 		assert(max_aux_size == 0);
175 		assert(options & MACH64_MSG_VECTOR);
176 
177 		if (max_msg_size < msg_size + trailer_size + aux_size) {
178 			return true;
179 		}
180 	} else {
181 		if (max_msg_size < msg_size + trailer_size) {
182 			return true;
183 		}
184 
185 		/*
186 		 * only return too large if MACH64_MSG_VECTOR.
187 		 *
188 		 * silently drop aux data when receiver is not expecting it for compat
189 		 * reasons.
190 		 */
191 		if ((options & MACH64_MSG_VECTOR) && max_aux_size < aux_size) {
192 			return true;
193 		}
194 	}
195 
196 	return false;
197 }
198 
199 /*
200  *	Routine:	ipc_mqueue_add_locked.
201  *	Purpose:
202  *		Associate the portset's mqueue with the port's mqueue.
203  *		This has to be done so that posting the port will wakeup
204  *		a portset waiter.  If there are waiters on the portset
205  *		mqueue and messages on the port mqueue, try to match them
206  *		up now.
207  *	Conditions:
208  *		Port and Pset both locked.
209  */
210 kern_return_t
ipc_mqueue_add_locked(ipc_mqueue_t port_mqueue,ipc_pset_t pset,waitq_link_t * linkp)211 ipc_mqueue_add_locked(
212 	ipc_mqueue_t    port_mqueue,
213 	ipc_pset_t      pset,
214 	waitq_link_t   *linkp)
215 {
216 	ipc_port_t       port = ip_from_mq(port_mqueue);
217 	struct waitq_set *wqset = &pset->ips_wqset;
218 	circle_queue_t   kmsgq = &port_mqueue->imq_messages;
219 	kern_return_t    kr = KERN_SUCCESS;
220 	ipc_kmsg_t       kmsg;
221 
222 	kr = waitq_link_locked(&port->ip_waitq, wqset, linkp);
223 	if (kr != KERN_SUCCESS) {
224 		return kr;
225 	}
226 
227 	/*
228 	 * Now that the set has been added to the port, there may be
229 	 * messages queued on the port and threads waiting on the set
230 	 * waitq.  Lets get them together.
231 	 *
232 	 * Only consider this set however, as the other ones have been
233 	 * posted to already.
234 	 */
235 	while ((kmsg = ipc_kmsg_queue_first(kmsgq)) != IKM_NULL) {
236 		mach_msg_size_t msize, tsize, asize;
237 		thread_t th;
238 
239 		th = waitq_wakeup64_identify_locked(wqset, IPC_MQUEUE_RECEIVE,
240 		    THREAD_AWAKENED, WAITQ_KEEP_LOCKED);
241 		/* port and pset still locked, thread not runnable */
242 
243 		if (th == THREAD_NULL) {
244 			/*
245 			 * Didn't find a thread to wake up but messages
246 			 * are enqueued, prepost the set instead,
247 			 * as calling waitq_wakeup64_identify_locked()
248 			 * on the set directly will not take care of it.
249 			 */
250 			waitq_link_prepost_locked(&port->ip_waitq, wqset);
251 			break;
252 		}
253 
254 		/*
255 		 * Because we hold the thread off the runqueue at this point,
256 		 * it's safe to modify ith_ fields on the thread, as
257 		 * until it is resumed, it must be off core or in between
258 		 * the assert wait and returning from the continuation.
259 		 */
260 
261 		/*
262 		 * If the receiver waited with a facility not directly
263 		 * related to Mach messaging, then it isn't prepared to get
264 		 * handed the message directly.  Just set it running, and
265 		 * go look for another thread that can.
266 		 */
267 		if (th->ith_state != MACH_RCV_IN_PROGRESS) {
268 #if MACH_FLIPC
269 			if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
270 				/*
271 				 * wakeup the peeking thread, but
272 				 * continue to loop over the threads
273 				 * waiting on the port's mqueue to see
274 				 * if there are any actual receivers
275 				 */
276 				ipc_mqueue_peek_on_thread_locked(port_mqueue,
277 				    th->ith_option, th);
278 			}
279 #endif /* MACH_FLIPC */
280 
281 			waitq_resume_identified_thread(wqset, th,
282 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
283 			continue;
284 		}
285 
286 		/*
287 		 * Found a receiver. see if they can handle the message
288 		 * correctly (the message is not too large for them, or
289 		 * they didn't care to be informed that the message was
290 		 * too large).  If they can't handle it, take them off
291 		 * the list and let them go back and figure it out and
292 		 * just move onto the next.
293 		 */
294 		msize = ipc_kmsg_copyout_size(kmsg, th->map);
295 		tsize = ipc_kmsg_trailer_size(th->ith_option, th->map);
296 		asize = kmsg->ikm_aux_size;
297 
298 		if (ipc_mqueue_msg_too_large(msize, tsize, asize, th->ith_option,
299 		    &th->ith_recv_bufs)) {
300 			th->ith_state = MACH_RCV_TOO_LARGE;
301 			th->ith_msize = msize;
302 			th->ith_asize = asize;
303 			if (th->ith_option & MACH_RCV_LARGE) {
304 				/*
305 				 * let him go without message
306 				 */
307 				th->ith_receiver_name = port_mqueue->imq_receiver_name;
308 				th->ith_kmsg = IKM_NULL;
309 				th->ith_seqno = 0;
310 
311 				waitq_resume_identified_thread(wqset, th,
312 				    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
313 
314 				continue; /* find another thread */
315 			}
316 		} else {
317 			th->ith_state = MACH_MSG_SUCCESS;
318 		}
319 
320 		/*
321 		 * This thread is going to take this message,
322 		 * so give it the message.
323 		 */
324 		ipc_kmsg_rmqueue(kmsgq, kmsg);
325 
326 #if MACH_FLIPC
327 		mach_node_t  node = kmsg->ikm_node;
328 #endif
329 
330 		ipc_mqueue_release_msgcount(port_mqueue);
331 
332 		th->ith_kmsg = kmsg;
333 		th->ith_seqno = port_mqueue->imq_seqno++;
334 
335 		waitq_resume_identified_thread(wqset, th,
336 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
337 
338 #if MACH_FLIPC
339 		if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
340 			flipc_msg_ack(node, port_mqueue, TRUE);
341 		}
342 #endif
343 	}
344 
345 	return KERN_SUCCESS;
346 }
347 
348 
349 /*
350  *	Routine:	ipc_port_has_klist
351  *	Purpose:
352  *		Returns whether the given port imq_klist field can be used as a klist.
353  */
354 bool
ipc_port_has_klist(ipc_port_t port)355 ipc_port_has_klist(ipc_port_t port)
356 {
357 	return !port->ip_specialreply &&
358 	       port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
359 }
360 
361 static inline struct klist *
ipc_object_klist(ipc_object_t object)362 ipc_object_klist(ipc_object_t object)
363 {
364 	if (io_otype(object) == IOT_PORT) {
365 		ipc_port_t port = ip_object_to_port(object);
366 
367 		return ipc_port_has_klist(port) ? &port->ip_klist : NULL;
368 	}
369 	return &ips_object_to_pset(object)->ips_klist;
370 }
371 
372 /*
373  *	Routine:	ipc_mqueue_changed
374  *	Purpose:
375  *		Wake up receivers waiting in a message queue.
376  *	Conditions:
377  *		The object containing the message queue is locked.
378  */
379 void
ipc_mqueue_changed(ipc_space_t space,struct waitq * waitq)380 ipc_mqueue_changed(
381 	ipc_space_t         space,
382 	struct waitq       *waitq)
383 {
384 	ipc_object_t object = io_from_waitq(waitq);
385 	struct klist *klist = ipc_object_klist(object);
386 
387 	if (klist && SLIST_FIRST(klist)) {
388 		/*
389 		 * Indicate that this message queue is vanishing
390 		 *
391 		 * When this is called, the associated receive right may be in flight
392 		 * between two tasks: the one it used to live in, and the one that armed
393 		 * a port destroyed notification for it.
394 		 *
395 		 * The new process may want to register the port it gets back with an
396 		 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
397 		 * port pending already, in which case we want the imq_klist field to be
398 		 * reusable for nefarious purposes.
399 		 *
400 		 * Fortunately, we really don't need this linkage anymore after this
401 		 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
402 		 *
403 		 * Note: we don't have the space lock here, however, this covers the
404 		 *       case of when a task is terminating the space, triggering
405 		 *       several knote_vanish() calls.
406 		 *
407 		 *       We don't need the lock to observe that the space is inactive as
408 		 *       we just deactivated it on the same thread.
409 		 *
410 		 *       We still need to call knote_vanish() so that the knote is
411 		 *       marked with EV_VANISHED or EV_EOF so that the detach step
412 		 *       in filt_machportdetach is skipped correctly.
413 		 */
414 		assert(space);
415 		knote_vanish(klist, is_active(space));
416 	}
417 
418 	if (io_otype(object) == IOT_PORT) {
419 		ipc_port_t port = ip_object_to_port(object);
420 		if (!port->ip_specialreply) {
421 			ipc_port_adjust_sync_link_state_locked(port,
422 			    PORT_SYNC_LINK_ANY, NULL);
423 		}
424 	} else {
425 		klist_init(klist);
426 	}
427 
428 	/*
429 	 * do not pass WAITQ_UPDATE_INHERITOR, ipc_port_destroy()
430 	 * needs to handle this manually, and the port lock
431 	 * is the waitq lock, so there's really no inefficiency there.
432 	 */
433 	waitq_wakeup64_all_locked(waitq, IPC_MQUEUE_RECEIVE,
434 	    THREAD_RESTART, WAITQ_KEEP_LOCKED);
435 }
436 
437 
438 
439 
440 /*
441  *	Routine:	ipc_mqueue_send
442  *	Purpose:
443  *		Send a message to a message queue.  The message holds a reference
444  *		for the destination port for this message queue in the
445  *		msgh_remote_port field.
446  *
447  *		If unsuccessful, the caller still has possession of
448  *		the message and must do something with it.  If successful,
449  *		the message is queued, given to a receiver, or destroyed.
450  *	Conditions:
451  *		port is locked.
452  *	Returns:
453  *		MACH_MSG_SUCCESS	The message was accepted.
454  *		MACH_SEND_TIMED_OUT	Caller still has message.
455  *		MACH_SEND_INTERRUPTED	Caller still has message.
456  */
457 mach_msg_return_t
ipc_mqueue_send_locked(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option64_t option,mach_msg_timeout_t send_timeout)458 ipc_mqueue_send_locked(
459 	ipc_mqueue_t            mqueue,
460 	ipc_kmsg_t              kmsg,
461 	mach_msg_option64_t     option,
462 	mach_msg_timeout_t      send_timeout)
463 {
464 	ipc_port_t port = ip_from_mq(mqueue);
465 	int wresult;
466 
467 	/*
468 	 *  Don't block if:
469 	 *	1) We're under the queue limit.
470 	 *	2) Caller used the MACH_SEND_ALWAYS internal option.
471 	 *	3) Message is sent to a send-once right.
472 	 */
473 	if (!imq_full(mqueue) ||
474 	    (!imq_full_kernel(mqueue) &&
475 	    ((option & MACH_SEND_ALWAYS) ||
476 	    (MACH_MSGH_BITS_REMOTE(ikm_header(kmsg)->msgh_bits) ==
477 	    MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
478 		mqueue->imq_msgcount++;
479 		assert(mqueue->imq_msgcount > 0);
480 		ip_mq_unlock(port);
481 	} else {
482 		thread_t cur_thread = current_thread();
483 		struct turnstile *send_turnstile = TURNSTILE_NULL;
484 		uint64_t deadline;
485 
486 		/*
487 		 * We have to wait for space to be granted to us.
488 		 */
489 		if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
490 			ip_mq_unlock(port);
491 			return MACH_SEND_TIMED_OUT;
492 		}
493 		if (imq_full_kernel(mqueue)) {
494 			ip_mq_unlock(port);
495 			return MACH_SEND_NO_BUFFER;
496 		}
497 		port->ip_fullwaiters = true;
498 
499 		if (option & MACH_SEND_TIMEOUT) {
500 			clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
501 		} else {
502 			deadline = 0;
503 		}
504 
505 		thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
506 
507 		send_turnstile = turnstile_prepare((uintptr_t)port,
508 		    port_send_turnstile_address(port),
509 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
510 
511 		ipc_port_send_update_inheritor(port, send_turnstile,
512 		    TURNSTILE_DELAYED_UPDATE);
513 
514 		wresult = waitq_assert_wait64_leeway(
515 			&send_turnstile->ts_waitq,
516 			IPC_MQUEUE_FULL,
517 			THREAD_ABORTSAFE,
518 			TIMEOUT_URGENCY_USER_NORMAL,
519 			deadline,
520 			TIMEOUT_NO_LEEWAY);
521 
522 		ip_mq_unlock(port);
523 		turnstile_update_inheritor_complete(send_turnstile,
524 		    TURNSTILE_INTERLOCK_NOT_HELD);
525 
526 		if (wresult == THREAD_WAITING) {
527 			wresult = thread_block(THREAD_CONTINUE_NULL);
528 		}
529 
530 		/* Call turnstile complete with interlock held */
531 		ip_mq_lock(port);
532 		turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC);
533 		ip_mq_unlock(port);
534 
535 		/* Call cleanup after dropping the interlock */
536 		turnstile_cleanup();
537 
538 		switch (wresult) {
539 		case THREAD_AWAKENED:
540 			/*
541 			 * we can proceed - inherited msgcount from waker
542 			 * or the message queue has been destroyed and the msgcount
543 			 * has been reset to zero (will detect in ipc_mqueue_post()).
544 			 */
545 			break;
546 
547 		case THREAD_TIMED_OUT:
548 			assert(option & MACH_SEND_TIMEOUT);
549 			return MACH_SEND_TIMED_OUT;
550 
551 		case THREAD_INTERRUPTED:
552 			return MACH_SEND_INTERRUPTED;
553 
554 		case THREAD_RESTART:
555 			/* mqueue is being destroyed */
556 			return MACH_SEND_INVALID_DEST;
557 		default:
558 			panic("ipc_mqueue_send");
559 		}
560 	}
561 
562 	ipc_mqueue_post(mqueue, kmsg, option);
563 	return MACH_MSG_SUCCESS;
564 }
565 
566 /*
567  *	Routine:	ipc_mqueue_override_send_locked
568  *	Purpose:
569  *		Set an override qos on the first message in the queue
570  *		(if the queue is full). This is a send-possible override
571  *		that will go away as soon as we drain a message from the
572  *		queue.
573  *
574  *	Conditions:
575  *		The port corresponding to mqueue is locked.
576  *		The caller holds a reference on the message queue.
577  */
578 void
ipc_mqueue_override_send_locked(ipc_mqueue_t mqueue,mach_msg_qos_t qos_ovr)579 ipc_mqueue_override_send_locked(
580 	ipc_mqueue_t        mqueue,
581 	mach_msg_qos_t      qos_ovr)
582 {
583 	ipc_port_t port = ip_from_mq(mqueue);
584 
585 	assert(waitq_is_valid(&port->ip_waitq));
586 
587 	if (imq_full(mqueue)) {
588 		ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
589 
590 		if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, qos_ovr)) {
591 			if (ip_in_a_space(port) &&
592 			    is_active(ip_get_receiver(port)) &&
593 			    ipc_port_has_klist(port)) {
594 				KNOTE(&port->ip_klist, 0);
595 			}
596 		}
597 	}
598 }
599 
600 /*
601  *	Routine:	ipc_mqueue_release_msgcount
602  *	Purpose:
603  *		Release a message queue reference in the case where we
604  *		found a waiter.
605  *
606  *	Conditions:
607  *		The port corresponding to message queue is locked.
608  *		The message corresponding to this reference is off the queue.
609  *		There is no need to pass reserved preposts because this will
610  *		never prepost to anyone
611  */
612 void
ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)613 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)
614 {
615 	ipc_port_t port = ip_from_mq(port_mq);
616 	struct turnstile *send_turnstile = port_send_turnstile(port);
617 
618 	ip_mq_lock_held(port);
619 	assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
620 
621 	port_mq->imq_msgcount--;
622 
623 	if (!imq_full(port_mq) && port->ip_fullwaiters &&
624 	    send_turnstile != TURNSTILE_NULL) {
625 		/*
626 		 * boost the priority of the awoken thread
627 		 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
628 		 * the message queue slot we've just reserved.
629 		 *
630 		 * NOTE: this will never prepost
631 		 *
632 		 * The wakeup happens on a turnstile waitq
633 		 * which will wakeup the highest priority waiter.
634 		 * A potential downside of this would be starving low
635 		 * priority senders if there is a constant churn of
636 		 * high priority threads trying to send to this port.
637 		 */
638 		if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
639 		    IPC_MQUEUE_FULL,
640 		    THREAD_AWAKENED,
641 		    WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
642 			port->ip_fullwaiters = false;
643 		} else {
644 			/* gave away our slot - add reference back */
645 			port_mq->imq_msgcount++;
646 		}
647 	}
648 
649 	if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
650 		waitq_clear_prepost_locked(&port->ip_waitq);
651 	}
652 }
653 
654 /*
655  *	Routine:	ipc_mqueue_post
656  *	Purpose:
657  *		Post a message to a waiting receiver or enqueue it.  If a
658  *		receiver is waiting, we can release our reserved space in
659  *		the message queue.
660  *
661  *	Conditions:
662  *		port is unlocked
663  *		If we need to queue, our space in the message queue is reserved.
664  */
665 static void
ipc_mqueue_post(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option64_t option __unused)666 ipc_mqueue_post(
667 	ipc_mqueue_t               mqueue,
668 	ipc_kmsg_t                 kmsg,
669 	mach_msg_option64_t        option __unused)
670 {
671 	ipc_port_t port = ip_from_mq(mqueue);
672 	struct waitq *waitq = &port->ip_waitq;
673 	boolean_t destroy_msg = FALSE;
674 
675 	ipc_kmsg_trace_send(kmsg, option);
676 
677 	/*
678 	 *	While the msg queue is locked, we have control of the
679 	 *	kmsg, so the ref in it for the port is still good.
680 	 *
681 	 *	Check for a receiver for the message.
682 	 */
683 	ip_mq_lock(port);
684 
685 	/* we may have raced with port destruction! */
686 	if (!waitq_is_valid(&port->ip_waitq)) {
687 		destroy_msg = TRUE;
688 		goto out_unlock;
689 	}
690 
691 	for (;;) {
692 		mach_msg_size_t msize, tsize, asize;
693 		thread_t receiver;
694 
695 		receiver = waitq_wakeup64_identify_locked(waitq,
696 		    IPC_MQUEUE_RECEIVE, THREAD_AWAKENED, WAITQ_KEEP_LOCKED);
697 		/* waitq still locked, thread not runnable */
698 
699 		if (receiver == THREAD_NULL) {
700 			/*
701 			 * no receivers; queue kmsg if space still reserved
702 			 * Reservations are cancelled when the port goes inactive.
703 			 * note that this will enqueue the message for any
704 			 * "peeking" receivers.
705 			 *
706 			 * Also, post the knote to wake up any threads waiting
707 			 * on that style of interface if this insertion is of
708 			 * note (first insertion, or adjusted override qos all
709 			 * the way to the head of the queue).
710 			 *
711 			 * This is just for ports. port-sets knotes are being
712 			 * posted to by the waitq_wakeup64_identify_locked()
713 			 * above already.
714 			 */
715 			if (mqueue->imq_msgcount == 0) {
716 				/*
717 				 * The message queue must belong
718 				 * to an inactive port, so just destroy
719 				 * the message and pretend it was posted.
720 				 */
721 				destroy_msg = TRUE;
722 			} else if (!ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
723 				/*
724 				 * queue was not empty and qos
725 				 * didn't change, nothing to do.
726 				 */
727 			} else if (ip_in_a_space(port) &&
728 			    is_active(ip_get_receiver(port)) &&
729 			    ipc_port_has_klist(port)) {
730 				/*
731 				 * queue was empty or qos changed
732 				 * we need to tell kqueue, unless
733 				 * the space is getting torn down
734 				 */
735 				KNOTE(&port->ip_klist, 0);
736 			}
737 			break;
738 		}
739 
740 #if MACH_FLIPC
741 		/*
742 		 * If a thread is attempting a "peek" into the message queue
743 		 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
744 		 * thread running.  A successful peek is essentially the same as
745 		 * message delivery since the peeking thread takes responsibility
746 		 * for delivering the message and (eventually) removing it from
747 		 * the mqueue.  Only one thread can successfully use the peek
748 		 * facility on any given port, so we exit the waitq loop after
749 		 * encountering such a thread.
750 		 */
751 		if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
752 			ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
753 			ipc_mqueue_peek_on_thread_locked(mqueue, receiver->ith_option, receiver);
754 			waitq_resume_identified_thread(waitq, receiver,
755 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
756 			break; /* Message was posted, so break out of loop */
757 		}
758 #endif /* MACH_FLIPC */
759 
760 		/*
761 		 * If the receiver waited with a facility not directly related
762 		 * to Mach messaging, then it isn't prepared to get handed the
763 		 * message directly. Just set it running, and go look for
764 		 * another thread that can.
765 		 */
766 		if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
767 			waitq_resume_identified_thread(waitq, receiver,
768 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
769 
770 			continue;
771 		}
772 
773 
774 		/*
775 		 * We found a waiting thread.
776 		 * If the message is too large or the scatter list is too small
777 		 * the thread we wake up will get that as its status.
778 		 */
779 		msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
780 		tsize = ipc_kmsg_trailer_size(receiver->ith_option, receiver->map);
781 		asize = kmsg->ikm_aux_size;
782 
783 		if (ipc_mqueue_msg_too_large(msize, tsize, asize, receiver->ith_option,
784 		    &receiver->ith_recv_bufs)) {
785 			receiver->ith_msize = msize;
786 			receiver->ith_asize = asize;
787 			receiver->ith_state = MACH_RCV_TOO_LARGE;
788 		} else {
789 			receiver->ith_state = MACH_MSG_SUCCESS;
790 		}
791 
792 		/*
793 		 * If there is no problem with the upcoming receive, or the
794 		 * receiver thread didn't specifically ask for special too
795 		 * large error condition, go ahead and select it anyway.
796 		 */
797 		if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
798 		    !(receiver->ith_option & MACH_RCV_LARGE)) {
799 			receiver->ith_kmsg = kmsg;
800 			receiver->ith_seqno = mqueue->imq_seqno++;
801 #if MACH_FLIPC
802 			mach_node_t node = kmsg->ikm_node;
803 #endif
804 			waitq_resume_identified_thread(waitq, receiver,
805 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
806 
807 			/* we didn't need our reserved spot in the queue */
808 			ipc_mqueue_release_msgcount(mqueue);
809 
810 #if MACH_FLIPC
811 			if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
812 				flipc_msg_ack(node, mqueue, TRUE);
813 			}
814 #endif
815 			break;
816 		}
817 
818 		/*
819 		 * Otherwise, this thread needs to be released to run
820 		 * and handle its error without getting the message.  We
821 		 * need to go back and pick another one.
822 		 */
823 		receiver->ith_receiver_name = mqueue->imq_receiver_name;
824 		receiver->ith_kmsg = IKM_NULL;
825 		receiver->ith_seqno = 0;
826 
827 		waitq_resume_identified_thread(waitq, receiver,
828 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
829 	}
830 
831 out_unlock:
832 	/* clear the waitq boost we may have been given */
833 	waitq_clear_promotion_locked(waitq, current_thread());
834 	waitq_unlock(waitq);
835 
836 	if (destroy_msg) {
837 		ipc_kmsg_destroy(kmsg, IPC_KMSG_DESTROY_ALL);
838 	}
839 
840 	counter_inc(&current_task()->messages_sent);
841 	return;
842 }
843 
844 
845 static void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)846 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
847 {
848 	thread_t                self = current_thread();
849 	mach_msg_option64_t     option64 = self->ith_option;
850 
851 	/*
852 	 * why did we wake up?
853 	 */
854 	switch (saved_wait_result) {
855 	case THREAD_TIMED_OUT:
856 		self->ith_state = MACH_RCV_TIMED_OUT;
857 		return;
858 
859 	case THREAD_INTERRUPTED:
860 		self->ith_state = MACH_RCV_INTERRUPTED;
861 		return;
862 
863 	case THREAD_RESTART:
864 		/* something bad happened to the port/set */
865 		self->ith_state = MACH_RCV_PORT_CHANGED;
866 		return;
867 
868 	case THREAD_AWAKENED:
869 		/*
870 		 * We do not need to go select a message, somebody
871 		 * handed us one (or a too-large indication).
872 		 */
873 		switch (self->ith_state) {
874 		case MACH_RCV_SCATTER_SMALL:
875 		case MACH_RCV_TOO_LARGE:
876 			/*
877 			 * Somebody tried to give us a too large
878 			 * message. If we indicated that we cared,
879 			 * then they only gave us the indication,
880 			 * otherwise they gave us the indication
881 			 * AND the message anyway.
882 			 */
883 			if (option64 & MACH_RCV_LARGE) {
884 				return;
885 			}
886 			return;
887 		case MACH_MSG_SUCCESS:
888 			return;
889 #if MACH_FLIPC
890 		case MACH_PEEK_READY:
891 			return;
892 #endif /* MACH_FLIPC */
893 
894 		default:
895 			panic("ipc_mqueue_receive_results: strange ith_state %d", self->ith_state);
896 		}
897 
898 	default:
899 		panic("ipc_mqueue_receive_results: strange wait_result %d", saved_wait_result);
900 	}
901 }
902 
903 void
ipc_mqueue_receive_continue(__unused void * param,wait_result_t wresult)904 ipc_mqueue_receive_continue(
905 	__unused void *param,
906 	wait_result_t wresult)
907 {
908 	ipc_mqueue_receive_results(wresult);
909 	mach_msg_receive_continue();  /* hard-coded for now */
910 }
911 
912 /*
913  *	Routine:	ipc_mqueue_receive
914  *	Purpose:
915  *		Receive a message from a message queue.
916  *
917  *	Conditions:
918  *		Our caller must hold a reference for the port or port set
919  *		to which this queue belongs, to keep the queue
920  *		from being deallocated.
921  *
922  *		The kmsg is returned with clean header fields
923  *		and with the circular bit turned off through the ith_kmsg
924  *		field of the thread's receive continuation state.
925  *	Returns:
926  *		MACH_MSG_SUCCESS	Message returned in ith_kmsg.
927  *		MACH_RCV_TOO_LARGE	Message size returned in ith_msize,
928  *                          Auxiliary data size returned in ith_asize
929  *		MACH_RCV_TIMED_OUT	No message obtained.
930  *		MACH_RCV_INTERRUPTED	No message obtained.
931  *		MACH_RCV_PORT_DIED	Port/set died; no message.
932  *		MACH_RCV_PORT_CHANGED	Port moved into set; no msg.
933  *
934  */
935 
936 void
ipc_mqueue_receive(struct waitq * waitq,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread,bool has_continuation)937 ipc_mqueue_receive(
938 	struct waitq           *waitq,
939 	mach_msg_timeout_t      rcv_timeout,
940 	int                     interruptible,
941 	thread_t                thread,
942 	bool                    has_continuation)
943 {
944 	wait_result_t           wresult;
945 
946 	assert(thread == current_thread());
947 	waitq_lock(waitq);
948 
949 	wresult = ipc_mqueue_receive_on_thread_and_unlock(waitq, rcv_timeout,
950 	    interruptible, thread);
951 	/* object unlocked */
952 	if (wresult == THREAD_NOT_WAITING) {
953 		return;
954 	}
955 
956 	if (wresult == THREAD_WAITING) {
957 		if (has_continuation) {
958 			wresult = thread_block(ipc_mqueue_receive_continue);
959 			/* NOTREACHED */
960 		}
961 		wresult = thread_block(THREAD_CONTINUE_NULL);
962 	}
963 	ipc_mqueue_receive_results(wresult);
964 }
965 
966 /*
967  *	Routine:	ipc_mqueue_receive_on_thread_and_unlock
968  *	Purpose:
969  *		Receive a message from a message queue using a specified thread.
970  *		If no message available, assert_wait on the appropriate waitq.
971  *
972  *	Conditions:
973  *		Assumes thread is self.
974  *		The port/port-set waitq is locked on entry, unlocked on return.
975  *		May have assert-waited. Caller must block in those cases.
976  */
977 wait_result_t
ipc_mqueue_receive_on_thread_and_unlock(struct waitq * waitq,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread)978 ipc_mqueue_receive_on_thread_and_unlock(
979 	struct waitq           *waitq,
980 	mach_msg_timeout_t      rcv_timeout,
981 	int                     interruptible,
982 	thread_t                thread)
983 {
984 	mach_msg_option64_t     option64 = thread->ith_option;
985 	ipc_port_t              port = IP_NULL;
986 	wait_result_t           wresult;
987 	uint64_t                deadline;
988 	struct turnstile        *rcv_turnstile = TURNSTILE_NULL;
989 
990 	assert(thread == current_thread());
991 
992 	if (waitq_type(waitq) == WQT_PORT_SET) {
993 		ipc_pset_t pset = ips_from_waitq(waitq);
994 		wqs_prepost_flags_t wqs_flags = WQS_PREPOST_LOCK;
995 		struct waitq *port_wq;
996 
997 		/*
998 		 * Put the message at the back of the prepost list
999 		 * if it's not a PEEK.
1000 		 *
1001 		 * Might drop the pset lock temporarily.
1002 		 */
1003 #if MACH_FLIPC
1004 		if (option64 & MACH64_PEEK_MSG) {
1005 			wqs_flags |= WQS_PREPOST_PEEK;
1006 		}
1007 #endif /* MACH_FLIPC */
1008 		port_wq = waitq_set_first_prepost(&pset->ips_wqset, wqs_flags);
1009 
1010 		/* Returns with port locked */
1011 
1012 		if (port_wq != NULL) {
1013 			/*
1014 			 * We get here if there is at least one message
1015 			 * waiting on port_wq. We have instructed the prepost
1016 			 * iteration logic to leave both the port_wq and the
1017 			 * set waitq locked.
1018 			 *
1019 			 * Continue on to handling the message with just
1020 			 * the port waitq locked.
1021 			 */
1022 			waitq_unlock(waitq);
1023 			port = ip_from_waitq(port_wq);
1024 		}
1025 	} else if (waitq_type(waitq) == WQT_PORT) {
1026 		port = ip_from_waitq(waitq);
1027 		if (ipc_kmsg_queue_empty(&port->ip_messages.imq_messages)) {
1028 			port = IP_NULL;
1029 		}
1030 	} else {
1031 		panic("Unknown waitq type (%p/0x%x)", waitq, waitq_type(waitq));
1032 	}
1033 
1034 	if (port) {
1035 #if MACH_FLIPC
1036 		if (option64 & MACH64_PEEK_MSG) {
1037 			ipc_mqueue_peek_on_thread_locked(&port->ip_messages,
1038 			    option64, thread);
1039 		} else
1040 #endif /* MACH_FLIPC */
1041 		{
1042 			ipc_mqueue_select_on_thread_locked(&port->ip_messages,
1043 			    option64, thread);
1044 		}
1045 		ip_mq_unlock(port);
1046 		return THREAD_NOT_WAITING;
1047 	}
1048 
1049 	if (!waitq_is_valid(waitq)) {
1050 		/* someone raced us to destroy this mqueue/port! */
1051 		waitq_unlock(waitq);
1052 		/*
1053 		 * ipc_mqueue_receive_results updates the thread's ith_state
1054 		 * TODO: differentiate between rights being moved and
1055 		 * rights/ports being destroyed (21885327)
1056 		 */
1057 		return THREAD_RESTART;
1058 	}
1059 
1060 	/*
1061 	 * Looks like we'll have to block.  The waitq we will
1062 	 * block on (whether the set's or the local port's) is
1063 	 * still locked.
1064 	 */
1065 	if ((option64 & MACH_RCV_TIMEOUT) && rcv_timeout == 0) {
1066 		waitq_unlock(waitq);
1067 		thread->ith_state = MACH_RCV_TIMED_OUT;
1068 		return THREAD_NOT_WAITING;
1069 	}
1070 
1071 	thread->ith_state = MACH_RCV_IN_PROGRESS;
1072 #if MACH_FLIPC
1073 	if (option64 & MACH64_PEEK_MSG) {
1074 		thread->ith_state = MACH_PEEK_IN_PROGRESS;
1075 	}
1076 #endif /* MACH_FLIPC */
1077 
1078 	if (option64 & MACH_RCV_TIMEOUT) {
1079 		clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
1080 	} else {
1081 		deadline = 0;
1082 	}
1083 
1084 	/*
1085 	 * Threads waiting on a reply port (not portset)
1086 	 * will wait on its receive turnstile.
1087 	 *
1088 	 * Donate waiting thread's turnstile and
1089 	 * setup inheritor for special reply port.
1090 	 * Based on the state of the special reply
1091 	 * port, the inheritor would be the send
1092 	 * turnstile of the connection port on which
1093 	 * the send of sync ipc would happen or
1094 	 * workloop's turnstile who would reply to
1095 	 * the sync ipc message.
1096 	 *
1097 	 * Pass in mqueue wait in waitq_assert_wait to
1098 	 * support port set wakeup. The mqueue waitq of port
1099 	 * will be converted to to turnstile waitq
1100 	 * in waitq_assert_wait instead of global waitqs.
1101 	 */
1102 	if (waitq_type(waitq) == WQT_PORT) {
1103 		port = ip_from_waitq(waitq);
1104 		rcv_turnstile = turnstile_prepare((uintptr_t)port,
1105 		    port_rcv_turnstile_address(port),
1106 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
1107 
1108 		ipc_port_recv_update_inheritor(port, rcv_turnstile,
1109 		    TURNSTILE_DELAYED_UPDATE);
1110 	}
1111 
1112 	thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
1113 	wresult = waitq_assert_wait64_locked(waitq,
1114 	    IPC_MQUEUE_RECEIVE,
1115 	    interruptible,
1116 	    TIMEOUT_URGENCY_USER_NORMAL,
1117 	    deadline,
1118 	    TIMEOUT_NO_LEEWAY,
1119 	    thread);
1120 	if (wresult == THREAD_AWAKENED) {
1121 		/*
1122 		 * The first thing we did was to look for preposts
1123 		 * (using waitq_set_first_prepost() for sets, or looking
1124 		 * at the port's queue for ports).
1125 		 *
1126 		 * Since we found none, we kept the waitq locked.
1127 		 *
1128 		 * It ensures that waitq_assert_wait64_locked() can't
1129 		 * find pre-posts either, won't drop the waitq lock
1130 		 * either (even for a set), and can't return THREAD_AWAKENED.
1131 		 */
1132 		panic("ipc_mqueue_receive_on_thread: sleep walking");
1133 	}
1134 
1135 	waitq_unlock(waitq);
1136 
1137 	/*
1138 	 * After this point, a waiting thread could be found by the wakeup
1139 	 * identify path, and the other side now owns the ith_ fields until
1140 	 * this thread blocks and resumes in the continuation
1141 	 */
1142 
1143 	/* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1144 	if (rcv_turnstile != TURNSTILE_NULL) {
1145 		turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
1146 	}
1147 	/* Its callers responsibility to call turnstile_complete to get the turnstile back */
1148 
1149 	return wresult;
1150 }
1151 
1152 #if MACH_FLIPC
1153 /*
1154  *	Routine:	ipc_mqueue_peek_on_thread_locked
1155  *	Purpose:
1156  *		A receiver discovered that there was a message on the queue
1157  *		before he had to block. Tell a thread about the message queue,
1158  *		but don't pick off any messages.
1159  *	Conditions:
1160  *		port_mq locked
1161  *		at least one message on port_mq's message queue
1162  *
1163  *	Returns: (on thread->ith_state)
1164  *		MACH_PEEK_READY		ith_peekq contains a message queue
1165  */
1166 void
ipc_mqueue_peek_on_thread_locked(ipc_mqueue_t port_mq,__assert_only mach_msg_option64_t option64,thread_t thread)1167 ipc_mqueue_peek_on_thread_locked(
1168 	ipc_mqueue_t        port_mq,
1169 	__assert_only mach_msg_option64_t option64,
1170 	thread_t            thread)
1171 {
1172 	assert(option64 & MACH64_PEEK_MSG);
1173 	assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
1174 
1175 	/*
1176 	 * Take a reference on the mqueue's associated port:
1177 	 * the peeking thread will be responsible to release this reference
1178 	 */
1179 	ip_validate(ip_from_mq(port_mq));
1180 	ip_reference(ip_from_mq(port_mq));
1181 	thread->ith_peekq = port_mq;
1182 	thread->ith_state = MACH_PEEK_READY;
1183 }
1184 #endif /* MACH_FLIPC */
1185 
1186 /*
1187  *	Routine:	ipc_mqueue_select_on_thread_locked
1188  *	Purpose:
1189  *		A receiver discovered that there was a message on the queue
1190  *		before he had to block.  Pick the message off the queue and
1191  *		"post" it to thread.
1192  *	Conditions:
1193  *		port locked.
1194  *              thread not locked.
1195  *		There is a message.
1196  *		No need to reserve prepost objects - it will never prepost
1197  *
1198  *	Returns:
1199  *		MACH_MSG_SUCCESS	Actually selected a message for ourselves.
1200  *		MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
1201  */
1202 void
ipc_mqueue_select_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option64_t options,thread_t thread)1203 ipc_mqueue_select_on_thread_locked(
1204 	ipc_mqueue_t            port_mq,
1205 	mach_msg_option64_t     options,
1206 	thread_t                thread)
1207 {
1208 	mach_msg_size_t msize, tsize, asize;
1209 	ipc_kmsg_t kmsg;
1210 
1211 	mach_msg_return_t mr = MACH_MSG_SUCCESS;
1212 
1213 	/*
1214 	 * Do some sanity checking of our ability to receive
1215 	 * before pulling the message off the queue.
1216 	 */
1217 	kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1218 	assert(kmsg != IKM_NULL);
1219 
1220 	/*
1221 	 * If we really can't receive it, but we had the
1222 	 * MACH_RCV_LARGE option set, then don't take it off
1223 	 * the queue, instead return the appropriate error
1224 	 * (and size needed).
1225 	 */
1226 	msize = ipc_kmsg_copyout_size(kmsg, thread->map);
1227 	tsize = ipc_kmsg_trailer_size(options, thread->map);
1228 	asize = kmsg->ikm_aux_size;
1229 
1230 	if (ipc_mqueue_msg_too_large(msize, tsize, asize, options,
1231 	    &thread->ith_recv_bufs)) {
1232 		mr = MACH_RCV_TOO_LARGE;
1233 		if (options & MACH_RCV_LARGE) {
1234 			(void)ipc_kmsg_validate_signature(kmsg);
1235 			thread->ith_receiver_name = port_mq->imq_receiver_name;
1236 			thread->ith_kmsg = IKM_NULL;
1237 			thread->ith_msize = msize;
1238 			thread->ith_asize = asize;
1239 			thread->ith_seqno = 0;
1240 			thread->ith_state = mr;
1241 			return;
1242 		}
1243 	}
1244 
1245 	ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1246 #if MACH_FLIPC
1247 	if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
1248 		flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
1249 	}
1250 #endif
1251 	ipc_mqueue_release_msgcount(port_mq);
1252 	thread->ith_seqno = port_mq->imq_seqno++;
1253 	thread->ith_kmsg = kmsg;
1254 	thread->ith_state = mr;
1255 
1256 	counter_inc(&current_task()->messages_received);
1257 	return;
1258 }
1259 
1260 /*
1261  *	Routine:	ipc_mqueue_peek_locked
1262  *	Purpose:
1263  *		Peek at a (non-set) message queue to see if it has a message
1264  *		matching the sequence number provided (if zero, then the
1265  *		first message in the queue) and return vital info about the
1266  *		message.
1267  *
1268  *	Conditions:
1269  *		The io object corresponding to mq is locked by callers.
1270  *		Other locks may be held by callers, so this routine cannot block.
1271  *		Caller holds reference on the message queue.
1272  */
1273 unsigned
ipc_mqueue_peek_locked(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1274 ipc_mqueue_peek_locked(
1275 	ipc_mqueue_t            mq,
1276 	mach_port_seqno_t      *seqnop,
1277 	mach_msg_size_t        *msg_sizep,
1278 	mach_msg_id_t          *msg_idp,
1279 	mach_msg_max_trailer_t *msg_trailerp,
1280 	ipc_kmsg_t             *kmsgp)
1281 {
1282 	ipc_kmsg_queue_t kmsgq;
1283 	ipc_kmsg_t kmsg;
1284 	mach_port_seqno_t seqno, msgoff;
1285 	unsigned res = 0;
1286 	mach_msg_header_t *hdr;
1287 
1288 	seqno = 0;
1289 	if (seqnop != NULL) {
1290 		seqno = *seqnop;
1291 	}
1292 
1293 	if (seqno == 0) {
1294 		seqno = mq->imq_seqno;
1295 		msgoff = 0;
1296 	} else if (seqno >= mq->imq_seqno &&
1297 	    seqno < mq->imq_seqno + mq->imq_msgcount) {
1298 		msgoff = seqno - mq->imq_seqno;
1299 	} else {
1300 		goto out;
1301 	}
1302 
1303 	/* look for the message that would match that seqno */
1304 	kmsgq = &mq->imq_messages;
1305 	kmsg = ipc_kmsg_queue_first(kmsgq);
1306 	while (msgoff-- && kmsg != IKM_NULL) {
1307 		kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1308 	}
1309 	if (kmsg == IKM_NULL) {
1310 		goto out;
1311 	}
1312 
1313 	/*
1314 	 * Validate kmsg signature before doing anything with it. Since we are holding
1315 	 * the mqueue lock here, and only header + trailer will be peeked on, just
1316 	 * do a partial validation to finish quickly.
1317 	 *
1318 	 * Partial kmsg signature is only supported on PAC devices.
1319 	 */
1320 	(void)ipc_kmsg_validate_signature(kmsg);
1321 
1322 	hdr = ikm_header(kmsg);
1323 	/* found one - return the requested info */
1324 	if (seqnop != NULL) {
1325 		*seqnop = seqno;
1326 	}
1327 	if (msg_sizep != NULL) {
1328 		*msg_sizep = hdr->msgh_size;
1329 	}
1330 	if (msg_idp != NULL) {
1331 		*msg_idp = hdr->msgh_id;
1332 	}
1333 	if (msg_trailerp != NULL) {
1334 		*msg_trailerp = *ipc_kmsg_get_trailer(kmsg);
1335 	}
1336 	if (kmsgp != NULL) {
1337 		*kmsgp = kmsg;
1338 	}
1339 
1340 	res = 1;
1341 
1342 out:
1343 	return res;
1344 }
1345 
1346 
1347 /*
1348  *	Routine:	ipc_mqueue_peek
1349  *	Purpose:
1350  *		Peek at a (non-set) message queue to see if it has a message
1351  *		matching the sequence number provided (if zero, then the
1352  *		first message in the queue) and return vital info about the
1353  *		message.
1354  *
1355  *	Conditions:
1356  *		The ipc_mqueue_t is unlocked.
1357  *		Locks may be held by callers, so this routine cannot block.
1358  *		Caller holds reference on the message queue.
1359  */
1360 unsigned
ipc_mqueue_peek(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1361 ipc_mqueue_peek(ipc_mqueue_t mq,
1362     mach_port_seqno_t * seqnop,
1363     mach_msg_size_t * msg_sizep,
1364     mach_msg_id_t * msg_idp,
1365     mach_msg_max_trailer_t * msg_trailerp,
1366     ipc_kmsg_t *kmsgp)
1367 {
1368 	ipc_port_t port = ip_from_mq(mq);
1369 	unsigned res;
1370 
1371 	ip_mq_lock(port);
1372 
1373 	res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
1374 	    msg_trailerp, kmsgp);
1375 
1376 	ip_mq_unlock(port);
1377 	return res;
1378 }
1379 
1380 #if MACH_FLIPC
1381 /*
1382  *	Routine:	ipc_mqueue_release_peek_ref
1383  *	Purpose:
1384  *		Release the reference on an mqueue's associated port which was
1385  *		granted to a thread in ipc_mqueue_peek_on_thread (on the
1386  *		MACH64_PEEK_MSG thread wakeup path).
1387  *
1388  *	Conditions:
1389  *		The ipc_mqueue_t should be locked on entry.
1390  *		The ipc_mqueue_t will be _unlocked_ on return
1391  *			(and potentially invalid!)
1392  *
1393  */
1394 void
ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)1395 ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)
1396 {
1397 	ipc_port_t port = ip_from_mq(mqueue);
1398 
1399 	ip_mq_lock_held(port);
1400 
1401 	/*
1402 	 * clear any preposts this mq may have generated
1403 	 * (which would cause subsequent immediate wakeups)
1404 	 */
1405 	waitq_clear_prepost_locked(&port->ip_waitq);
1406 
1407 	ip_mq_unlock(port);
1408 
1409 	/*
1410 	 * release the port reference: we need to do this outside the lock
1411 	 * because we might be holding the last port reference!
1412 	 **/
1413 	ip_release(port);
1414 }
1415 #endif /* MACH_FLIPC */
1416 
1417 /*
1418  *	Routine:	ipc_mqueue_destroy_locked
1419  *	Purpose:
1420  *		Destroy a message queue.
1421  *		Set any blocked senders running.
1422  *		Destroy the kmsgs in the queue.
1423  *	Conditions:
1424  *		port locked
1425  *		Receivers were removed when the receive right was "changed"
1426  */
1427 boolean_t
ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue,waitq_link_list_t * free_l)1428 ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue, waitq_link_list_t *free_l)
1429 {
1430 	ipc_port_t port = ip_from_mq(mqueue);
1431 	boolean_t reap = FALSE;
1432 	struct turnstile *send_turnstile = port_send_turnstile(port);
1433 
1434 	/*
1435 	 *	rouse all blocked senders
1436 	 *	(don't boost anyone - we're tearing this queue down)
1437 	 *	(never preposts)
1438 	 */
1439 	port->ip_fullwaiters = false;
1440 
1441 	if (send_turnstile != TURNSTILE_NULL) {
1442 		waitq_wakeup64_all(&send_turnstile->ts_waitq,
1443 		    IPC_MQUEUE_FULL,
1444 		    THREAD_RESTART, WAITQ_WAKEUP_DEFAULT);
1445 	}
1446 
1447 #if MACH_FLIPC
1448 	ipc_kmsg_t kmsg;
1449 
1450 	cqe_foreach_element_safe(kmsg, &mqueue->imq_messages, ikm_link) {
1451 		if (MACH_NODE_VALID(kmsg->ikm_node) &&
1452 		    FPORT_VALID(mqueue->imq_fport)) {
1453 			flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
1454 		}
1455 	}
1456 #endif
1457 
1458 	/*
1459 	 * Move messages from the specified queue to the per-thread
1460 	 * clean/drain queue while we have the mqueue lock.
1461 	 */
1462 	reap = ipc_kmsg_delayed_destroy_queue(&mqueue->imq_messages);
1463 
1464 	/*
1465 	 * Wipe out message count, both for messages about to be
1466 	 * reaped and for reserved space for (previously) woken senders.
1467 	 * This is the indication to them that their reserved space is gone
1468 	 * (the mqueue was destroyed).
1469 	 */
1470 	mqueue->imq_msgcount = 0;
1471 
1472 	/*
1473 	 * invalidate the waitq for subsequent mqueue operations,
1474 	 * the port lock could be dropped after invalidating the mqueue.
1475 	 */
1476 
1477 	waitq_invalidate(&port->ip_waitq);
1478 
1479 	waitq_unlink_all_locked(&port->ip_waitq, NULL, free_l);
1480 
1481 	return reap;
1482 }
1483 
1484 /*
1485  *	Routine:	ipc_mqueue_set_qlimit_locked
1486  *	Purpose:
1487  *		Changes a message queue limit; the maximum number
1488  *		of messages which may be queued.
1489  *	Conditions:
1490  *		Port locked.
1491  */
1492 
1493 void
ipc_mqueue_set_qlimit_locked(ipc_mqueue_t mqueue,mach_port_msgcount_t qlimit)1494 ipc_mqueue_set_qlimit_locked(
1495 	ipc_mqueue_t           mqueue,
1496 	mach_port_msgcount_t   qlimit)
1497 {
1498 	ipc_port_t port = ip_from_mq(mqueue);
1499 
1500 	assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1501 
1502 	/* wake up senders allowed by the new qlimit */
1503 	if (qlimit > mqueue->imq_qlimit) {
1504 		mach_port_msgcount_t i, wakeup;
1505 		struct turnstile *send_turnstile = port_send_turnstile(port);
1506 
1507 		/* caution: wakeup, qlimit are unsigned */
1508 		wakeup = qlimit - mqueue->imq_qlimit;
1509 
1510 		for (i = 0; i < wakeup; i++) {
1511 			/*
1512 			 * boost the priority of the awoken thread
1513 			 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1514 			 * the message queue slot we've just reserved.
1515 			 *
1516 			 * NOTE: this will never prepost
1517 			 */
1518 			if (send_turnstile == TURNSTILE_NULL ||
1519 			    waitq_wakeup64_one(&send_turnstile->ts_waitq,
1520 			    IPC_MQUEUE_FULL,
1521 			    THREAD_AWAKENED,
1522 			    WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
1523 				port->ip_fullwaiters = false;
1524 				break;
1525 			}
1526 			mqueue->imq_msgcount++;  /* give it to the awakened thread */
1527 		}
1528 	}
1529 	mqueue->imq_qlimit = (uint16_t)qlimit;
1530 }
1531 
1532 /*
1533  *	Routine:	ipc_mqueue_set_seqno_locked
1534  *	Purpose:
1535  *		Changes an mqueue's sequence number.
1536  *	Conditions:
1537  *		Caller holds a reference to the queue's containing object.
1538  */
1539 void
ipc_mqueue_set_seqno_locked(ipc_mqueue_t mqueue,mach_port_seqno_t seqno)1540 ipc_mqueue_set_seqno_locked(
1541 	ipc_mqueue_t            mqueue,
1542 	mach_port_seqno_t       seqno)
1543 {
1544 	mqueue->imq_seqno = seqno;
1545 }
1546 
1547 
1548 /*
1549  *	Routine:	ipc_mqueue_copyin
1550  *	Purpose:
1551  *		Convert a name in a space to a message queue.
1552  *	Conditions:
1553  *		Nothing locked.  If successful, the caller gets a ref for
1554  *		for the object.	This ref ensures the continued existence of
1555  *		the queue.
1556  *	Returns:
1557  *		MACH_MSG_SUCCESS	Found a message queue.
1558  *		MACH_RCV_INVALID_NAME	The space is dead.
1559  *		MACH_RCV_INVALID_NAME	The name doesn't denote a right.
1560  *		MACH_RCV_INVALID_NAME
1561  *			The denoted right is not receive or port set.
1562  *		MACH_RCV_IN_SET		Receive right is a member of a set.
1563  */
1564 
1565 mach_msg_return_t
ipc_mqueue_copyin(ipc_space_t space,mach_port_name_t name,ipc_object_t * objectp)1566 ipc_mqueue_copyin(
1567 	ipc_space_t             space,
1568 	mach_port_name_t        name,
1569 	ipc_object_t            *objectp)
1570 {
1571 	ipc_entry_bits_t bits;
1572 	ipc_object_t object;
1573 	kern_return_t kr;
1574 
1575 	kr = ipc_right_lookup_read(space, name, &bits, &object);
1576 	if (kr != KERN_SUCCESS) {
1577 		return MACH_RCV_INVALID_NAME;
1578 	}
1579 	/* object is locked and active */
1580 
1581 	if (bits & MACH_PORT_TYPE_RECEIVE) {
1582 		__assert_only ipc_port_t port = ip_object_to_port(object);
1583 		assert(ip_get_receiver_name(port) == name);
1584 		assert(ip_in_space(port, space));
1585 	}
1586 	if (bits & (MACH_PORT_TYPE_RECEIVE | MACH_PORT_TYPE_PORT_SET)) {
1587 		io_reference(object);
1588 		io_unlock(object);
1589 	} else {
1590 		io_unlock(object);
1591 		/* guard exception if we never held the receive right in this entry */
1592 		if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) {
1593 			mach_port_guard_exception(name, 0, kGUARD_EXC_RCV_INVALID_NAME);
1594 		}
1595 		return MACH_RCV_INVALID_NAME;
1596 	}
1597 
1598 	*objectp = object;
1599 	return MACH_MSG_SUCCESS;
1600 }
1601