xref: /xnu-12377.81.4/osfmk/ipc/ipc_mqueue.c (revision 043036a2b3718f7f0be807e2870f8f47d3fa0796)
1 /*
2  * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 /*
29  * @OSF_FREE_COPYRIGHT@
30  */
31 /*
32  * Mach Operating System
33  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34  * All Rights Reserved.
35  *
36  * Permission to use, copy, modify and distribute this software and its
37  * documentation is hereby granted, provided that both the copyright
38  * notice and this permission notice appear in all copies of the
39  * software, derivative works or modified versions, and any portions
40  * thereof, and that both notices appear in supporting documentation.
41  *
42  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45  *
46  * Carnegie Mellon requests users of this software to return to
47  *
48  *  Software Distribution Coordinator  or  [email protected]
49  *  School of Computer Science
50  *  Carnegie Mellon University
51  *  Pittsburgh PA 15213-3890
52  *
53  * any improvements or extensions that they make and grant Carnegie Mellon
54  * the rights to redistribute these changes.
55  */
56 /*
57  */
58 /*
59  *	File:	ipc/ipc_mqueue.c
60  *	Author:	Rich Draves
61  *	Date:	1989
62  *
63  *	Functions to manipulate IPC message queues.
64  */
65 /*
66  * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67  * support for mandatory and extensible security protections.  This notice
68  * is included in support of clause 2.2 (b) of the Apple Public License,
69  * Version 2.0.
70  */
71 
72 
73 #include <mach/port.h>
74 #include <mach/message.h>
75 #include <mach/sync_policy.h>
76 
77 #include <kern/assert.h>
78 #include <kern/counter.h>
79 #include <kern/sched_prim.h>
80 #include <kern/ipc_kobject.h>
81 #include <kern/ipc_mig.h>       /* XXX - for mach_msg_receive_continue */
82 #include <kern/misc_protos.h>
83 #include <kern/task.h>
84 #include <kern/thread.h>
85 #include <kern/waitq.h>
86 
87 #include <ipc/port.h>
88 #include <ipc/ipc_mqueue.h>
89 #include <ipc/ipc_kmsg.h>
90 #include <ipc/ipc_right.h>
91 #include <ipc/ipc_policy.h>
92 #include <ipc/ipc_port.h>
93 #include <ipc/ipc_pset.h>
94 #include <ipc/ipc_space.h>
95 
96 #ifdef __LP64__
97 #include <vm/vm_map.h>
98 #endif
99 
100 #include <sys/event.h>
101 
102 const bool ipc_mqueue_full;            /* address is event for queue space */
103 
104 KALLOC_TYPE_DEFINE(mqueue_zone, struct ipc_mqueue, KT_DEFAULT);
105 
106 /* forward declarations */
107 static void ipc_mqueue_receive_results(wait_result_t result);
108 
109 static void ipc_mqueue_select_on_thread_locked(
110 	ipc_mqueue_t            mqueue,
111 	mach_msg_option64_t     option64,
112 	thread_t                thread);
113 
114 /* Clear a message count reservation */
115 static void ipc_mqueue_release_msgcount(
116 	ipc_mqueue_t            mqueue);
117 
118 /* Deliver message to message queue or waiting receiver */
119 static void ipc_mqueue_post(
120 	ipc_mqueue_t            mqueue,
121 	ipc_kmsg_t              kmsg,
122 	mach_msg_option64_t     option);
123 
124 /*
125  *	Routine:	ipc_mqueue_init
126  *	Purpose:
127  *		Initialize a newly-allocated message queue.
128  */
129 void
ipc_mqueue_init(ipc_mqueue_t mqueue)130 ipc_mqueue_init(
131 	ipc_mqueue_t            mqueue)
132 {
133 	ipc_kmsg_queue_init(&mqueue->imq_messages);
134 	mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
135 	klist_init(&mqueue->imq_klist);
136 }
137 
138 /*
139  *	Routine:	ipc_mqueue_msg_too_large
140  *	Purpose:
141  *		Return true if kmsg is too large to be received:
142  *
143  *      If MACH64_RCV_LINEAR_VECTOR:
144  *          - combined message buffer is not large enough
145  *            to fit both the message (plus trailer) and
146  *            auxiliary data.
147  *      Otherwise:
148  *          - message buffer is not large enough
149  *          - auxiliary buffer is not large enough:
150  *	      (1) kmsg is a vector with aux, but user expects
151  *                a scalar kmsg (ith_max_asize is 0)
152  *            (2) kmsg is a vector with aux, but user aux
153  *                buffer is not large enough.
154  */
155 static bool
ipc_mqueue_msg_too_large(mach_msg_size_t msg_size,mach_msg_size_t trailer_size,mach_msg_size_t aux_size,mach_msg_option64_t options,mach_msg_recv_bufs_t * recv_bufs)156 ipc_mqueue_msg_too_large(
157 	mach_msg_size_t         msg_size,
158 	mach_msg_size_t         trailer_size,
159 	mach_msg_size_t         aux_size,
160 	mach_msg_option64_t     options,
161 	mach_msg_recv_bufs_t   *recv_bufs)
162 {
163 	mach_msg_size_t max_msg_size = recv_bufs->recv_msg_size;
164 	mach_msg_size_t max_aux_size = recv_bufs->recv_aux_size;
165 
166 	if (max_aux_size != 0) {
167 		assert(options & MACH64_MSG_VECTOR);
168 	}
169 
170 	if (options & MACH64_RCV_LINEAR_VECTOR) {
171 		assert(max_aux_size == 0);
172 		assert(options & MACH64_MSG_VECTOR);
173 
174 		if (max_msg_size < msg_size + trailer_size + aux_size) {
175 			return true;
176 		}
177 	} else {
178 		if (max_msg_size < msg_size + trailer_size) {
179 			return true;
180 		}
181 
182 		/*
183 		 * only return too large if MACH64_MSG_VECTOR.
184 		 *
185 		 * silently drop aux data when receiver is not expecting it for compat
186 		 * reasons.
187 		 */
188 		if ((options & MACH64_MSG_VECTOR) && max_aux_size < aux_size) {
189 			return true;
190 		}
191 	}
192 
193 	return false;
194 }
195 
196 /*
197  *	Routine:	ipc_mqueue_add_locked.
198  *	Purpose:
199  *		Associate the portset's mqueue with the port's mqueue.
200  *		This has to be done so that posting the port will wakeup
201  *		a portset waiter.  If there are waiters on the portset
202  *		mqueue and messages on the port mqueue, try to match them
203  *		up now.
204  *	Conditions:
205  *		Port and Pset both locked.
206  */
207 kern_return_t
ipc_mqueue_add_locked(ipc_mqueue_t port_mqueue,ipc_pset_t pset,waitq_link_t * linkp)208 ipc_mqueue_add_locked(
209 	ipc_mqueue_t    port_mqueue,
210 	ipc_pset_t      pset,
211 	waitq_link_t   *linkp)
212 {
213 	ipc_port_t       port = ip_from_mq(port_mqueue);
214 	struct waitq_set *wqset = &pset->ips_wqset;
215 	circle_queue_t   kmsgq = &port_mqueue->imq_messages;
216 	kern_return_t    kr = KERN_SUCCESS;
217 	ipc_kmsg_t       kmsg;
218 
219 	kr = waitq_link_locked(&port->ip_waitq, wqset, linkp);
220 	if (kr != KERN_SUCCESS) {
221 		return kr;
222 	}
223 
224 	/*
225 	 * Now that the set has been added to the port, there may be
226 	 * messages queued on the port and threads waiting on the set
227 	 * waitq.  Lets get them together.
228 	 *
229 	 * Only consider this set however, as the other ones have been
230 	 * posted to already.
231 	 */
232 	while ((kmsg = ipc_kmsg_queue_first(kmsgq)) != IKM_NULL) {
233 		mach_msg_size_t msize, tsize, asize;
234 		thread_t th;
235 
236 		th = waitq_wakeup64_identify_locked(wqset, IPC_MQUEUE_RECEIVE,
237 		    WAITQ_KEEP_LOCKED);
238 		/* port and pset still locked, thread not runnable */
239 
240 		if (th == THREAD_NULL) {
241 			/*
242 			 * Didn't find a thread to wake up but messages
243 			 * are enqueued, prepost the set instead,
244 			 * as calling waitq_wakeup64_identify_locked()
245 			 * on the set directly will not take care of it.
246 			 */
247 			waitq_link_prepost_locked(&port->ip_waitq, wqset);
248 			break;
249 		}
250 
251 		/*
252 		 * Because we hold the thread off the runqueue at this point,
253 		 * it's safe to modify ith_ fields on the thread, as
254 		 * until it is resumed, it must be off core or in between
255 		 * the assert wait and returning from the continuation.
256 		 */
257 
258 		/*
259 		 * If the receiver waited with a facility not directly
260 		 * related to Mach messaging, then it isn't prepared to get
261 		 * handed the message directly.  Just set it running, and
262 		 * go look for another thread that can.
263 		 */
264 		if (th->ith_state != MACH_RCV_IN_PROGRESS) {
265 			waitq_resume_identified_thread(wqset, th,
266 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
267 			continue;
268 		}
269 
270 		/*
271 		 * Found a receiver. see if they can handle the message
272 		 * correctly (the message is not too large for them, or
273 		 * they didn't care to be informed that the message was
274 		 * too large).  If they can't handle it, take them off
275 		 * the list and let them go back and figure it out and
276 		 * just move onto the next.
277 		 */
278 		msize = ipc_kmsg_copyout_size(kmsg, th->map);
279 		tsize = ipc_kmsg_trailer_size(th->ith_option, th->map);
280 		asize = kmsg->ikm_aux_size;
281 
282 		if (ipc_mqueue_msg_too_large(msize, tsize, asize, th->ith_option,
283 		    &th->ith_recv_bufs)) {
284 			th->ith_state = MACH_RCV_TOO_LARGE;
285 			th->ith_msize = msize;
286 			th->ith_asize = asize;
287 			if (th->ith_option & MACH_RCV_LARGE) {
288 				/*
289 				 * let him go without message
290 				 */
291 				th->ith_receiver_name = port_mqueue->imq_receiver_name;
292 				th->ith_kmsg = IKM_NULL;
293 				th->ith_seqno = 0;
294 
295 				waitq_resume_identified_thread(wqset, th,
296 				    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
297 
298 				continue; /* find another thread */
299 			}
300 		} else {
301 			th->ith_state = MACH_MSG_SUCCESS;
302 		}
303 
304 		/*
305 		 * This thread is going to take this message,
306 		 * so give it the message.
307 		 */
308 		ipc_kmsg_rmqueue(kmsgq, kmsg);
309 
310 		ipc_mqueue_release_msgcount(port_mqueue);
311 
312 		th->ith_kmsg = kmsg;
313 		th->ith_seqno = port_mqueue->imq_seqno++;
314 
315 		waitq_resume_identified_thread(wqset, th,
316 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
317 	}
318 
319 	return KERN_SUCCESS;
320 }
321 
322 
323 /*
324  *	Routine:	ipc_port_has_klist
325  *	Purpose:
326  *		Returns whether the given port imq_klist field can be used as a klist.
327  */
328 bool
ipc_port_has_klist(ipc_port_t port)329 ipc_port_has_klist(ipc_port_t port)
330 {
331 	return !ip_is_special_reply_port(port) &&
332 	       port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
333 }
334 
335 static inline struct klist *
ipc_object_klist(ipc_object_t object)336 ipc_object_klist(ipc_object_t object)
337 {
338 	if (io_is_any_port(object)) {
339 		ipc_port_t port = ip_object_to_port(object);
340 
341 		return ipc_port_has_klist(port) ? &port->ip_klist : NULL;
342 	}
343 	return &ips_object_to_pset(object)->ips_klist;
344 }
345 
346 /*
347  *	Routine:	ipc_mqueue_changed
348  *	Purpose:
349  *		Wake up receivers waiting in a message queue.
350  *	Conditions:
351  *		The object containing the message queue is locked.
352  */
353 void
ipc_mqueue_changed(ipc_space_t space,struct waitq * waitq)354 ipc_mqueue_changed(
355 	ipc_space_t         space,
356 	struct waitq       *waitq)
357 {
358 	ipc_object_t object = io_from_waitq(waitq);
359 	struct klist *klist = ipc_object_klist(object);
360 
361 	if (klist && SLIST_FIRST(klist)) {
362 		/*
363 		 * Indicate that this message queue is vanishing
364 		 *
365 		 * When this is called, the associated receive right may be in flight
366 		 * between two tasks: the one it used to live in, and the one that armed
367 		 * a port destroyed notification for it.
368 		 *
369 		 * The new process may want to register the port it gets back with an
370 		 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
371 		 * port pending already, in which case we want the imq_klist field to be
372 		 * reusable for nefarious purposes.
373 		 *
374 		 * Fortunately, we really don't need this linkage anymore after this
375 		 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
376 		 *
377 		 * Note: we don't have the space lock here, however, this covers the
378 		 *       case of when a task is terminating the space, triggering
379 		 *       several knote_vanish() calls.
380 		 *
381 		 *       We don't need the lock to observe that the space is inactive as
382 		 *       we just deactivated it on the same thread.
383 		 *
384 		 *       We still need to call knote_vanish() so that the knote is
385 		 *       marked with EV_VANISHED or EV_EOF so that the detach step
386 		 *       in filt_machportdetach is skipped correctly.
387 		 */
388 		assert(space);
389 		knote_vanish(klist, is_active(space));
390 	}
391 
392 	if (io_is_any_port(object)) {
393 		ipc_port_t port = ip_object_to_port(object);
394 		if (!ip_is_special_reply_port(port)) {
395 			ipc_port_adjust_sync_link_state_locked(port,
396 			    PORT_SYNC_LINK_ANY, NULL);
397 		}
398 	} else {
399 		klist_init(klist);
400 	}
401 
402 	/*
403 	 * do not pass WAITQ_UPDATE_INHERITOR, ipc_port_destroy()
404 	 * needs to handle this manually, and the port lock
405 	 * is the waitq lock, so there's really no inefficiency there.
406 	 */
407 	waitq_wakeup64_all_locked(waitq, IPC_MQUEUE_RECEIVE,
408 	    THREAD_RESTART, WAITQ_KEEP_LOCKED);
409 }
410 
411 
412 
413 
414 /*
415  *	Routine:	ipc_mqueue_send
416  *	Purpose:
417  *		Send a message to a message queue.  The message holds a reference
418  *		for the destination port for this message queue in the
419  *		msgh_remote_port field.
420  *
421  *		If unsuccessful, the caller still has possession of
422  *		the message and must do something with it.  If successful,
423  *		the message is queued, given to a receiver, or destroyed.
424  *	Conditions:
425  *		port is locked.
426  *	Returns:
427  *		MACH_MSG_SUCCESS	The message was accepted.
428  *		MACH_SEND_TIMED_OUT	Caller still has message.
429  *		MACH_SEND_INTERRUPTED	Caller still has message.
430  */
431 mach_msg_return_t
ipc_mqueue_send_locked(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option64_t option,mach_msg_timeout_t send_timeout)432 ipc_mqueue_send_locked(
433 	ipc_mqueue_t            mqueue,
434 	ipc_kmsg_t              kmsg,
435 	mach_msg_option64_t     option,
436 	mach_msg_timeout_t      send_timeout)
437 {
438 	ipc_port_t port = ip_from_mq(mqueue);
439 	int wresult;
440 
441 	/*
442 	 *  Don't block if:
443 	 *	1) We're under the queue limit.
444 	 *	2) Caller used the MACH_SEND_ALWAYS internal option.
445 	 *	3) Message is sent to a send-once right.
446 	 */
447 	if (!imq_full(mqueue) ||
448 	    (!imq_full_kernel(mqueue) &&
449 	    ((option & MACH_SEND_ALWAYS) ||
450 	    (MACH_MSGH_BITS_REMOTE(ikm_header(kmsg)->msgh_bits) ==
451 	    MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
452 		mqueue->imq_msgcount++;
453 		assert(mqueue->imq_msgcount > 0);
454 		ip_mq_unlock(port);
455 	} else {
456 		thread_t cur_thread = current_thread();
457 		struct turnstile *send_turnstile = TURNSTILE_NULL;
458 		uint64_t deadline;
459 
460 		/*
461 		 * We have to wait for space to be granted to us.
462 		 */
463 		if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
464 			ip_mq_unlock(port);
465 			return MACH_SEND_TIMED_OUT;
466 		}
467 		if (imq_full_kernel(mqueue)) {
468 			ip_mq_unlock(port);
469 			return MACH_SEND_NO_BUFFER;
470 		}
471 		port->ip_fullwaiters = true;
472 
473 		if (option & MACH_SEND_TIMEOUT) {
474 			clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
475 		} else {
476 			deadline = 0;
477 		}
478 
479 		thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
480 
481 		send_turnstile = turnstile_prepare((uintptr_t)port,
482 		    port_send_turnstile_address(port),
483 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
484 
485 		ipc_port_send_update_inheritor(port, send_turnstile,
486 		    TURNSTILE_DELAYED_UPDATE);
487 
488 		wresult = waitq_assert_wait64_leeway(
489 			&send_turnstile->ts_waitq,
490 			IPC_MQUEUE_FULL,
491 			THREAD_ABORTSAFE,
492 			TIMEOUT_URGENCY_USER_NORMAL,
493 			deadline,
494 			TIMEOUT_NO_LEEWAY);
495 
496 		ip_mq_unlock(port);
497 		turnstile_update_inheritor_complete(send_turnstile,
498 		    TURNSTILE_INTERLOCK_NOT_HELD);
499 
500 		if (wresult == THREAD_WAITING) {
501 			wresult = thread_block(THREAD_CONTINUE_NULL);
502 		}
503 
504 		/* Call turnstile complete with interlock held */
505 		ip_mq_lock(port);
506 		turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC);
507 		ip_mq_unlock(port);
508 
509 		/* Call cleanup after dropping the interlock */
510 		turnstile_cleanup();
511 
512 		switch (wresult) {
513 		case THREAD_AWAKENED:
514 			/*
515 			 * we can proceed - inherited msgcount from waker
516 			 * or the message queue has been destroyed and the msgcount
517 			 * has been reset to zero (will detect in ipc_mqueue_post()).
518 			 */
519 			break;
520 
521 		case THREAD_TIMED_OUT:
522 			assert(option & MACH_SEND_TIMEOUT);
523 			return MACH_SEND_TIMED_OUT;
524 
525 		case THREAD_INTERRUPTED:
526 			return MACH_SEND_INTERRUPTED;
527 
528 		case THREAD_RESTART:
529 			/* mqueue is being destroyed */
530 			return MACH_SEND_INVALID_DEST;
531 		default:
532 			panic("ipc_mqueue_send");
533 		}
534 	}
535 
536 	ipc_mqueue_post(mqueue, kmsg, option);
537 	return MACH_MSG_SUCCESS;
538 }
539 
540 /*
541  *	Routine:	ipc_mqueue_override_send_locked
542  *	Purpose:
543  *		Set an override qos on the first message in the queue
544  *		(if the queue is full). This is a send-possible override
545  *		that will go away as soon as we drain a message from the
546  *		queue.
547  *
548  *	Conditions:
549  *		The port corresponding to mqueue is locked.
550  *		The caller holds a reference on the message queue.
551  */
552 void
ipc_mqueue_override_send_locked(ipc_mqueue_t mqueue,mach_msg_qos_t qos_ovr)553 ipc_mqueue_override_send_locked(
554 	ipc_mqueue_t        mqueue,
555 	mach_msg_qos_t      qos_ovr)
556 {
557 	ipc_port_t port = ip_from_mq(mqueue);
558 
559 	assert(waitq_is_valid(&port->ip_waitq));
560 
561 	if (imq_full(mqueue)) {
562 		ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
563 
564 		if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, qos_ovr)) {
565 			if (ip_in_a_space(port) &&
566 			    is_active(ip_get_receiver(port)) &&
567 			    ipc_port_has_klist(port)) {
568 				KNOTE(&port->ip_klist, 0);
569 			}
570 		}
571 	}
572 }
573 
574 /*
575  *	Routine:	ipc_mqueue_release_msgcount
576  *	Purpose:
577  *		Release a message queue reference in the case where we
578  *		found a waiter.
579  *
580  *	Conditions:
581  *		The port corresponding to message queue is locked.
582  *		The message corresponding to this reference is off the queue.
583  *		There is no need to pass reserved preposts because this will
584  *		never prepost to anyone
585  */
586 static void
ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)587 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)
588 {
589 	ipc_port_t port = ip_from_mq(port_mq);
590 	struct turnstile *send_turnstile = port_send_turnstile(port);
591 
592 	ip_mq_lock_held(port);
593 	assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
594 
595 	port_mq->imq_msgcount--;
596 
597 	if (!imq_full(port_mq) && port->ip_fullwaiters &&
598 	    send_turnstile != TURNSTILE_NULL) {
599 		/*
600 		 * boost the priority of the awoken thread
601 		 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
602 		 * the message queue slot we've just reserved.
603 		 *
604 		 * NOTE: this will never prepost
605 		 *
606 		 * The wakeup happens on a turnstile waitq
607 		 * which will wakeup the highest priority waiter.
608 		 * A potential downside of this would be starving low
609 		 * priority senders if there is a constant churn of
610 		 * high priority threads trying to send to this port.
611 		 */
612 		if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
613 		    IPC_MQUEUE_FULL,
614 		    THREAD_AWAKENED,
615 		    WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
616 			port->ip_fullwaiters = false;
617 		} else {
618 			/* gave away our slot - add reference back */
619 			port_mq->imq_msgcount++;
620 		}
621 	}
622 
623 	if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
624 		waitq_clear_prepost_locked(&port->ip_waitq);
625 	}
626 }
627 
628 /*
629  *	Routine:	ipc_mqueue_post
630  *	Purpose:
631  *		Post a message to a waiting receiver or enqueue it.  If a
632  *		receiver is waiting, we can release our reserved space in
633  *		the message queue.
634  *
635  *	Conditions:
636  *		port is unlocked
637  *		If we need to queue, our space in the message queue is reserved.
638  */
639 static void
ipc_mqueue_post(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option64_t option __unused)640 ipc_mqueue_post(
641 	ipc_mqueue_t               mqueue,
642 	ipc_kmsg_t                 kmsg,
643 	mach_msg_option64_t        option __unused)
644 {
645 	ipc_port_t port = ip_from_mq(mqueue);
646 	struct waitq *waitq = &port->ip_waitq;
647 	boolean_t destroy_msg = FALSE;
648 
649 	ipc_kmsg_trace_send(kmsg, option);
650 
651 	/*
652 	 *	While the msg queue is locked, we have control of the
653 	 *	kmsg, so the ref in it for the port is still good.
654 	 *
655 	 *	Check for a receiver for the message.
656 	 */
657 	ip_mq_lock(port);
658 
659 	/* we may have raced with port destruction! */
660 	if (!waitq_is_valid(&port->ip_waitq)) {
661 		destroy_msg = TRUE;
662 		goto out_unlock;
663 	}
664 
665 	for (;;) {
666 		mach_msg_size_t msize, tsize, asize;
667 		thread_t receiver;
668 
669 		receiver = waitq_wakeup64_identify_locked(waitq,
670 		    IPC_MQUEUE_RECEIVE, WAITQ_KEEP_LOCKED);
671 		/* waitq still locked, thread not runnable */
672 
673 		if (receiver == THREAD_NULL) {
674 			/*
675 			 * no receivers; queue kmsg if space still reserved
676 			 * Reservations are cancelled when the port goes inactive.
677 			 * note that this will enqueue the message for any
678 			 * "peeking" receivers.
679 			 *
680 			 * Also, post the knote to wake up any threads waiting
681 			 * on that style of interface if this insertion is of
682 			 * note (first insertion, or adjusted override qos all
683 			 * the way to the head of the queue).
684 			 *
685 			 * This is just for ports. port-sets knotes are being
686 			 * posted to by the waitq_wakeup64_identify_locked()
687 			 * above already.
688 			 */
689 			if (mqueue->imq_msgcount == 0) {
690 				/*
691 				 * The message queue must belong
692 				 * to an inactive port, so just destroy
693 				 * the message and pretend it was posted.
694 				 */
695 				destroy_msg = TRUE;
696 			} else if (!ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
697 				/*
698 				 * queue was not empty and qos
699 				 * didn't change, nothing to do.
700 				 */
701 			} else if (ip_in_a_space(port) &&
702 			    is_active(ip_get_receiver(port)) &&
703 			    ipc_port_has_klist(port)) {
704 				/*
705 				 * queue was empty or qos changed
706 				 * we need to tell kqueue, unless
707 				 * the space is getting torn down
708 				 */
709 				KNOTE(&port->ip_klist, 0);
710 			}
711 			break;
712 		}
713 
714 		/*
715 		 * If the receiver waited with a facility not directly related
716 		 * to Mach messaging, then it isn't prepared to get handed the
717 		 * message directly. Just set it running, and go look for
718 		 * another thread that can.
719 		 */
720 		if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
721 			waitq_resume_identified_thread(waitq, receiver,
722 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
723 
724 			continue;
725 		}
726 
727 
728 		/*
729 		 * We found a waiting thread.
730 		 * If the message is too large or the scatter list is too small
731 		 * the thread we wake up will get that as its status.
732 		 */
733 		msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
734 		tsize = ipc_kmsg_trailer_size(receiver->ith_option, receiver->map);
735 		asize = kmsg->ikm_aux_size;
736 
737 		if (ipc_mqueue_msg_too_large(msize, tsize, asize, receiver->ith_option,
738 		    &receiver->ith_recv_bufs)) {
739 			receiver->ith_msize = msize;
740 			receiver->ith_asize = asize;
741 			receiver->ith_state = MACH_RCV_TOO_LARGE;
742 		} else {
743 			receiver->ith_state = MACH_MSG_SUCCESS;
744 		}
745 
746 		/*
747 		 * If there is no problem with the upcoming receive, or the
748 		 * receiver thread didn't specifically ask for special too
749 		 * large error condition, go ahead and select it anyway.
750 		 */
751 		if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
752 		    !(receiver->ith_option & MACH_RCV_LARGE)) {
753 			receiver->ith_kmsg = kmsg;
754 			receiver->ith_seqno = mqueue->imq_seqno++;
755 
756 			waitq_resume_identified_thread(waitq, receiver,
757 			    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
758 
759 			/* we didn't need our reserved spot in the queue */
760 			ipc_mqueue_release_msgcount(mqueue);
761 			break;
762 		}
763 
764 		/*
765 		 * Otherwise, this thread needs to be released to run
766 		 * and handle its error without getting the message.  We
767 		 * need to go back and pick another one.
768 		 */
769 		receiver->ith_receiver_name = mqueue->imq_receiver_name;
770 		receiver->ith_kmsg = IKM_NULL;
771 		receiver->ith_seqno = 0;
772 
773 		waitq_resume_identified_thread(waitq, receiver,
774 		    THREAD_AWAKENED, WAITQ_WAKEUP_DEFAULT);
775 	}
776 
777 out_unlock:
778 	/* clear the waitq boost we may have been given */
779 	waitq_clear_promotion_locked(waitq, current_thread());
780 	waitq_unlock(waitq);
781 
782 	if (destroy_msg) {
783 		ipc_kmsg_destroy(kmsg, IPC_KMSG_DESTROY_ALL);
784 	}
785 
786 	counter_inc(&current_task()->messages_sent);
787 	return;
788 }
789 
790 
791 static void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)792 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
793 {
794 	thread_t                self = current_thread();
795 	mach_msg_option64_t     option64 = self->ith_option;
796 
797 	/*
798 	 * why did we wake up?
799 	 */
800 	switch (saved_wait_result) {
801 	case THREAD_TIMED_OUT:
802 		self->ith_state = MACH_RCV_TIMED_OUT;
803 		return;
804 
805 	case THREAD_INTERRUPTED:
806 		self->ith_state = MACH_RCV_INTERRUPTED;
807 		return;
808 
809 	case THREAD_RESTART:
810 		/* something bad happened to the port/set */
811 		self->ith_state = MACH_RCV_PORT_CHANGED;
812 		return;
813 
814 	case THREAD_AWAKENED:
815 		/*
816 		 * We do not need to go select a message, somebody
817 		 * handed us one (or a too-large indication).
818 		 */
819 		switch (self->ith_state) {
820 		case MACH_RCV_SCATTER_SMALL:
821 		case MACH_RCV_TOO_LARGE:
822 			/*
823 			 * Somebody tried to give us a too large
824 			 * message. If we indicated that we cared,
825 			 * then they only gave us the indication,
826 			 * otherwise they gave us the indication
827 			 * AND the message anyway.
828 			 */
829 			if (option64 & MACH_RCV_LARGE) {
830 				return;
831 			}
832 			return;
833 		case MACH_MSG_SUCCESS:
834 			return;
835 		default:
836 			panic("ipc_mqueue_receive_results: strange ith_state %d", self->ith_state);
837 		}
838 
839 	default:
840 		panic("ipc_mqueue_receive_results: strange wait_result %d", saved_wait_result);
841 	}
842 }
843 
844 static void
ipc_mqueue_receive_continue(__unused void * param,wait_result_t wresult)845 ipc_mqueue_receive_continue(__unused void *param, wait_result_t wresult)
846 {
847 	ipc_mqueue_receive_results(wresult);
848 	mach_msg_receive_continue();  /* hard-coded for now */
849 }
850 
851 /*
852  *	Routine:	ipc_mqueue_receive
853  *	Purpose:
854  *		Receive a message from a message queue.
855  *
856  *	Conditions:
857  *		Our caller must hold a reference for the port or port set
858  *		to which this queue belongs, to keep the queue
859  *		from being deallocated.
860  *
861  *		The kmsg is returned with clean header fields
862  *		and with the circular bit turned off through the ith_kmsg
863  *		field of the thread's receive continuation state.
864  *	Returns:
865  *		MACH_MSG_SUCCESS	Message returned in ith_kmsg.
866  *		MACH_RCV_TOO_LARGE	Message size returned in ith_msize,
867  *                          Auxiliary data size returned in ith_asize
868  *		MACH_RCV_TIMED_OUT	No message obtained.
869  *		MACH_RCV_INTERRUPTED	No message obtained.
870  *		MACH_RCV_PORT_DIED	Port/set died; no message.
871  *		MACH_RCV_PORT_CHANGED	Port moved into set; no msg.
872  *
873  */
874 
875 void
ipc_mqueue_receive(struct waitq * waitq,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread,bool has_continuation)876 ipc_mqueue_receive(
877 	struct waitq           *waitq,
878 	mach_msg_timeout_t      rcv_timeout,
879 	int                     interruptible,
880 	thread_t                thread,
881 	bool                    has_continuation)
882 {
883 	wait_result_t           wresult;
884 
885 	assert(thread == current_thread());
886 	waitq_lock(waitq);
887 
888 	wresult = ipc_mqueue_receive_on_thread_and_unlock(waitq, rcv_timeout,
889 	    interruptible, thread);
890 	/* object unlocked */
891 	if (wresult == THREAD_NOT_WAITING) {
892 		return;
893 	}
894 
895 	if (wresult == THREAD_WAITING) {
896 		if (has_continuation) {
897 			wresult = thread_block(ipc_mqueue_receive_continue);
898 			/* NOTREACHED */
899 		}
900 		wresult = thread_block(THREAD_CONTINUE_NULL);
901 	}
902 	ipc_mqueue_receive_results(wresult);
903 }
904 
905 /*
906  *	Routine:	ipc_mqueue_receive_on_thread_and_unlock
907  *	Purpose:
908  *		Receive a message from a message queue using a specified thread.
909  *		If no message available, assert_wait on the appropriate waitq.
910  *
911  *	Conditions:
912  *		Assumes thread is self.
913  *		The port/port-set waitq is locked on entry, unlocked on return.
914  *		May have assert-waited. Caller must block in those cases.
915  */
916 wait_result_t
ipc_mqueue_receive_on_thread_and_unlock(struct waitq * waitq,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread)917 ipc_mqueue_receive_on_thread_and_unlock(
918 	struct waitq           *waitq,
919 	mach_msg_timeout_t      rcv_timeout,
920 	int                     interruptible,
921 	thread_t                thread)
922 {
923 	mach_msg_option64_t     option64 = thread->ith_option;
924 	ipc_port_t              port = IP_NULL;
925 	wait_result_t           wresult;
926 	uint64_t                deadline;
927 	struct turnstile        *rcv_turnstile = TURNSTILE_NULL;
928 
929 	assert(thread == current_thread());
930 
931 	if (waitq_type(waitq) == WQT_PORT_SET) {
932 		ipc_pset_t pset = ips_from_waitq(waitq);
933 		wqs_prepost_flags_t wqs_flags = WQS_PREPOST_LOCK;
934 		struct waitq *port_wq;
935 
936 		/*
937 		 * Put the message at the back of the prepost list
938 		 * if it's not a PEEK.
939 		 *
940 		 * Might drop the pset lock temporarily.
941 		 */
942 		port_wq = waitq_set_first_prepost(&pset->ips_wqset, wqs_flags);
943 
944 		/* Returns with port locked */
945 
946 		if (port_wq != NULL) {
947 			/*
948 			 * We get here if there is at least one message
949 			 * waiting on port_wq. We have instructed the prepost
950 			 * iteration logic to leave both the port_wq and the
951 			 * set waitq locked.
952 			 *
953 			 * Continue on to handling the message with just
954 			 * the port waitq locked.
955 			 */
956 			waitq_unlock(waitq);
957 			port = ip_from_waitq(port_wq);
958 		}
959 	} else if (waitq_type(waitq) == WQT_PORT) {
960 		port = ip_from_waitq(waitq);
961 		if (ipc_kmsg_queue_empty(&port->ip_messages.imq_messages)) {
962 			port = IP_NULL;
963 		}
964 	} else {
965 		panic("Unknown waitq type (%p/0x%x)", waitq, waitq_type(waitq));
966 	}
967 
968 	if (port) {
969 		ipc_mqueue_select_on_thread_locked(&port->ip_messages,
970 		    option64, thread);
971 		ip_mq_unlock(port);
972 		return THREAD_NOT_WAITING;
973 	}
974 
975 	if (!waitq_is_valid(waitq)) {
976 		/* someone raced us to destroy this mqueue/port! */
977 		waitq_unlock(waitq);
978 		/*
979 		 * ipc_mqueue_receive_results updates the thread's ith_state
980 		 * TODO: differentiate between rights being moved and
981 		 * rights/ports being destroyed (21885327)
982 		 */
983 		return THREAD_RESTART;
984 	}
985 
986 	/*
987 	 * Looks like we'll have to block.  The waitq we will
988 	 * block on (whether the set's or the local port's) is
989 	 * still locked.
990 	 */
991 	if ((option64 & MACH_RCV_TIMEOUT) && rcv_timeout == 0) {
992 		waitq_unlock(waitq);
993 		thread->ith_state = MACH_RCV_TIMED_OUT;
994 		return THREAD_NOT_WAITING;
995 	}
996 
997 	thread->ith_state = MACH_RCV_IN_PROGRESS;
998 
999 	if (option64 & MACH_RCV_TIMEOUT) {
1000 		clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
1001 	} else {
1002 		deadline = 0;
1003 	}
1004 
1005 	/*
1006 	 * Threads waiting on a reply port (not portset)
1007 	 * will wait on its receive turnstile.
1008 	 *
1009 	 * Donate waiting thread's turnstile and
1010 	 * setup inheritor for special reply port.
1011 	 * Based on the state of the special reply
1012 	 * port, the inheritor would be the send
1013 	 * turnstile of the connection port on which
1014 	 * the send of sync ipc would happen or
1015 	 * workloop's turnstile who would reply to
1016 	 * the sync ipc message.
1017 	 *
1018 	 * Pass in mqueue wait in waitq_assert_wait to
1019 	 * support port set wakeup. The mqueue waitq of port
1020 	 * will be converted to to turnstile waitq
1021 	 * in waitq_assert_wait instead of global waitqs.
1022 	 */
1023 	if (waitq_type(waitq) == WQT_PORT) {
1024 		port = ip_from_waitq(waitq);
1025 		rcv_turnstile = turnstile_prepare((uintptr_t)port,
1026 		    port_rcv_turnstile_address(port),
1027 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
1028 
1029 		ipc_port_recv_update_inheritor(port, rcv_turnstile,
1030 		    TURNSTILE_DELAYED_UPDATE);
1031 	}
1032 
1033 	thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
1034 	wresult = waitq_assert_wait64_locked(waitq,
1035 	    IPC_MQUEUE_RECEIVE,
1036 	    interruptible,
1037 	    TIMEOUT_URGENCY_USER_NORMAL,
1038 	    deadline,
1039 	    TIMEOUT_NO_LEEWAY,
1040 	    thread);
1041 	if (wresult == THREAD_AWAKENED) {
1042 		/*
1043 		 * The first thing we did was to look for preposts
1044 		 * (using waitq_set_first_prepost() for sets, or looking
1045 		 * at the port's queue for ports).
1046 		 *
1047 		 * Since we found none, we kept the waitq locked.
1048 		 *
1049 		 * It ensures that waitq_assert_wait64_locked() can't
1050 		 * find pre-posts either, won't drop the waitq lock
1051 		 * either (even for a set), and can't return THREAD_AWAKENED.
1052 		 */
1053 		panic("ipc_mqueue_receive_on_thread: sleep walking");
1054 	}
1055 
1056 	waitq_unlock(waitq);
1057 
1058 	/*
1059 	 * After this point, a waiting thread could be found by the wakeup
1060 	 * identify path, and the other side now owns the ith_ fields until
1061 	 * this thread blocks and resumes in the continuation
1062 	 */
1063 
1064 	/* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1065 	if (rcv_turnstile != TURNSTILE_NULL) {
1066 		turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
1067 	}
1068 	/* Its callers responsibility to call turnstile_complete to get the turnstile back */
1069 
1070 	return wresult;
1071 }
1072 
1073 /*
1074  *	Routine:	ipc_mqueue_select_on_thread_locked
1075  *	Purpose:
1076  *		A receiver discovered that there was a message on the queue
1077  *		before he had to block.  Pick the message off the queue and
1078  *		"post" it to thread.
1079  *	Conditions:
1080  *		port locked.
1081  *              thread not locked.
1082  *		There is a message.
1083  *		No need to reserve prepost objects - it will never prepost
1084  *
1085  *	Returns:
1086  *		MACH_MSG_SUCCESS	Actually selected a message for ourselves.
1087  *		MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
1088  */
1089 static void
ipc_mqueue_select_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option64_t options,thread_t thread)1090 ipc_mqueue_select_on_thread_locked(
1091 	ipc_mqueue_t            port_mq,
1092 	mach_msg_option64_t     options,
1093 	thread_t                thread)
1094 {
1095 	mach_msg_size_t msize, tsize, asize;
1096 	ipc_kmsg_t kmsg;
1097 
1098 	mach_msg_return_t mr = MACH_MSG_SUCCESS;
1099 
1100 	/*
1101 	 * Do some sanity checking of our ability to receive
1102 	 * before pulling the message off the queue.
1103 	 */
1104 	kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1105 	assert(kmsg != IKM_NULL);
1106 
1107 	/*
1108 	 * If we really can't receive it, but we had the
1109 	 * MACH_RCV_LARGE option set, then don't take it off
1110 	 * the queue, instead return the appropriate error
1111 	 * (and size needed).
1112 	 */
1113 	msize = ipc_kmsg_copyout_size(kmsg, thread->map);
1114 	tsize = ipc_kmsg_trailer_size(options, thread->map);
1115 	asize = kmsg->ikm_aux_size;
1116 
1117 	if (ipc_mqueue_msg_too_large(msize, tsize, asize, options,
1118 	    &thread->ith_recv_bufs)) {
1119 		mr = MACH_RCV_TOO_LARGE;
1120 		if (options & MACH_RCV_LARGE) {
1121 			(void)ipc_kmsg_validate_signature(kmsg);
1122 			thread->ith_receiver_name = port_mq->imq_receiver_name;
1123 			thread->ith_kmsg = IKM_NULL;
1124 			thread->ith_msize = msize;
1125 			thread->ith_asize = asize;
1126 			thread->ith_seqno = 0;
1127 			thread->ith_state = mr;
1128 			return;
1129 		}
1130 	}
1131 
1132 	ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1133 	ipc_mqueue_release_msgcount(port_mq);
1134 	thread->ith_seqno = port_mq->imq_seqno++;
1135 	thread->ith_kmsg = kmsg;
1136 	thread->ith_state = mr;
1137 
1138 	counter_inc(&current_task()->messages_received);
1139 	return;
1140 }
1141 
1142 /*
1143  *	Routine:	ipc_mqueue_peek_locked
1144  *	Purpose:
1145  *		Peek at a (non-set) message queue to see if it has a message
1146  *		matching the sequence number provided (if zero, then the
1147  *		first message in the queue) and return vital info about the
1148  *		message.
1149  *
1150  *	Conditions:
1151  *		The io object corresponding to mq is locked by callers.
1152  *		Other locks may be held by callers, so this routine cannot block.
1153  *		Caller holds reference on the message queue.
1154  */
1155 unsigned
ipc_mqueue_peek_locked(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1156 ipc_mqueue_peek_locked(
1157 	ipc_mqueue_t            mq,
1158 	mach_port_seqno_t      *seqnop,
1159 	mach_msg_size_t        *msg_sizep,
1160 	mach_msg_id_t          *msg_idp,
1161 	mach_msg_max_trailer_t *msg_trailerp,
1162 	ipc_kmsg_t             *kmsgp)
1163 {
1164 	ipc_kmsg_queue_t kmsgq;
1165 	ipc_kmsg_t kmsg;
1166 	mach_port_seqno_t seqno, msgoff;
1167 	unsigned res = 0;
1168 	mach_msg_header_t *hdr;
1169 
1170 	seqno = 0;
1171 	if (seqnop != NULL) {
1172 		seqno = *seqnop;
1173 	}
1174 
1175 	if (seqno == 0) {
1176 		seqno = mq->imq_seqno;
1177 		msgoff = 0;
1178 	} else if (seqno >= mq->imq_seqno &&
1179 	    seqno < mq->imq_seqno + mq->imq_msgcount) {
1180 		msgoff = seqno - mq->imq_seqno;
1181 	} else {
1182 		goto out;
1183 	}
1184 
1185 	/* look for the message that would match that seqno */
1186 	kmsgq = &mq->imq_messages;
1187 	kmsg = ipc_kmsg_queue_first(kmsgq);
1188 	while (msgoff-- && kmsg != IKM_NULL) {
1189 		kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1190 	}
1191 	if (kmsg == IKM_NULL) {
1192 		goto out;
1193 	}
1194 
1195 	/*
1196 	 * Validate kmsg signature before doing anything with it. Since we are holding
1197 	 * the mqueue lock here, and only header + trailer will be peeked on, just
1198 	 * do a partial validation to finish quickly.
1199 	 *
1200 	 * Partial kmsg signature is only supported on PAC devices.
1201 	 */
1202 	(void)ipc_kmsg_validate_signature(kmsg);
1203 
1204 	hdr = ikm_header(kmsg);
1205 	/* found one - return the requested info */
1206 	if (seqnop != NULL) {
1207 		*seqnop = seqno;
1208 	}
1209 	if (msg_sizep != NULL) {
1210 		*msg_sizep = hdr->msgh_size;
1211 	}
1212 	if (msg_idp != NULL) {
1213 		*msg_idp = hdr->msgh_id;
1214 	}
1215 	if (msg_trailerp != NULL) {
1216 		*msg_trailerp = *ipc_kmsg_get_trailer(kmsg);
1217 	}
1218 	if (kmsgp != NULL) {
1219 		*kmsgp = kmsg;
1220 	}
1221 
1222 	res = 1;
1223 
1224 out:
1225 	return res;
1226 }
1227 
1228 
1229 /*
1230  *	Routine:	ipc_mqueue_destroy_locked
1231  *	Purpose:
1232  *		Destroy a message queue.
1233  *		Set any blocked senders running.
1234  *		Destroy the kmsgs in the queue.
1235  *	Conditions:
1236  *		port locked
1237  *		Receivers were removed when the receive right was "changed"
1238  */
1239 boolean_t
ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue,waitq_link_list_t * free_l)1240 ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue, waitq_link_list_t *free_l)
1241 {
1242 	ipc_port_t port = ip_from_mq(mqueue);
1243 	boolean_t reap = FALSE;
1244 	struct turnstile *send_turnstile = port_send_turnstile(port);
1245 
1246 	/*
1247 	 *	rouse all blocked senders
1248 	 *	(don't boost anyone - we're tearing this queue down)
1249 	 *	(never preposts)
1250 	 */
1251 	port->ip_fullwaiters = false;
1252 
1253 	if (send_turnstile != TURNSTILE_NULL) {
1254 		waitq_wakeup64_all(&send_turnstile->ts_waitq,
1255 		    IPC_MQUEUE_FULL,
1256 		    THREAD_RESTART, WAITQ_WAKEUP_DEFAULT);
1257 	}
1258 
1259 	/*
1260 	 * Move messages from the specified queue to the per-thread
1261 	 * clean/drain queue while we have the mqueue lock.
1262 	 */
1263 	reap = ipc_kmsg_delayed_destroy_queue(&mqueue->imq_messages);
1264 
1265 	/*
1266 	 * Wipe out message count, both for messages about to be
1267 	 * reaped and for reserved space for (previously) woken senders.
1268 	 * This is the indication to them that their reserved space is gone
1269 	 * (the mqueue was destroyed).
1270 	 */
1271 	mqueue->imq_msgcount = 0;
1272 
1273 	/*
1274 	 * invalidate the waitq for subsequent mqueue operations,
1275 	 * the port lock could be dropped after invalidating the mqueue.
1276 	 */
1277 
1278 	waitq_invalidate(&port->ip_waitq);
1279 
1280 	waitq_unlink_all_locked(&port->ip_waitq, NULL, free_l);
1281 
1282 	return reap;
1283 }
1284 
1285 /*
1286  *	Routine:	ipc_mqueue_set_qlimit_locked
1287  *	Purpose:
1288  *		Changes a message queue limit; the maximum number
1289  *		of messages which may be queued.
1290  *	Conditions:
1291  *		Port locked.
1292  */
1293 
1294 void
ipc_mqueue_set_qlimit_locked(ipc_mqueue_t mqueue,mach_port_msgcount_t qlimit)1295 ipc_mqueue_set_qlimit_locked(
1296 	ipc_mqueue_t           mqueue,
1297 	mach_port_msgcount_t   qlimit)
1298 {
1299 	ipc_port_t port = ip_from_mq(mqueue);
1300 
1301 	assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1302 
1303 	/* wake up senders allowed by the new qlimit */
1304 	if (qlimit > mqueue->imq_qlimit) {
1305 		mach_port_msgcount_t i, wakeup;
1306 		struct turnstile *send_turnstile = port_send_turnstile(port);
1307 
1308 		/* caution: wakeup, qlimit are unsigned */
1309 		wakeup = qlimit - mqueue->imq_qlimit;
1310 
1311 		for (i = 0; i < wakeup; i++) {
1312 			/*
1313 			 * boost the priority of the awoken thread
1314 			 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1315 			 * the message queue slot we've just reserved.
1316 			 *
1317 			 * NOTE: this will never prepost
1318 			 */
1319 			if (send_turnstile == TURNSTILE_NULL ||
1320 			    waitq_wakeup64_one(&send_turnstile->ts_waitq,
1321 			    IPC_MQUEUE_FULL,
1322 			    THREAD_AWAKENED,
1323 			    WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
1324 				port->ip_fullwaiters = false;
1325 				break;
1326 			}
1327 			mqueue->imq_msgcount++;  /* give it to the awakened thread */
1328 		}
1329 	}
1330 	mqueue->imq_qlimit = (uint16_t)qlimit;
1331 }
1332 
1333 /*
1334  *	Routine:	ipc_mqueue_set_seqno_locked
1335  *	Purpose:
1336  *		Changes an mqueue's sequence number.
1337  *	Conditions:
1338  *		Caller holds a reference to the queue's containing object.
1339  */
1340 void
ipc_mqueue_set_seqno_locked(ipc_mqueue_t mqueue,mach_port_seqno_t seqno)1341 ipc_mqueue_set_seqno_locked(
1342 	ipc_mqueue_t            mqueue,
1343 	mach_port_seqno_t       seqno)
1344 {
1345 	mqueue->imq_seqno = seqno;
1346 }
1347 
1348 
1349 /*
1350  *	Routine:	ipc_mqueue_copyin
1351  *	Purpose:
1352  *		Convert a name in a space to a message queue.
1353  *	Conditions:
1354  *		Nothing locked.  If successful, the caller gets a ref for
1355  *		for the object.	This ref ensures the continued existence of
1356  *		the queue.
1357  *	Returns:
1358  *		MACH_MSG_SUCCESS	Found a message queue.
1359  *		MACH_RCV_INVALID_NAME	The space is dead.
1360  *		MACH_RCV_INVALID_NAME	The name doesn't denote a right.
1361  *		MACH_RCV_INVALID_NAME
1362  *			The denoted right is not receive or port set.
1363  *		MACH_RCV_IN_SET		Receive right is a member of a set.
1364  */
1365 
1366 mach_msg_return_t
ipc_mqueue_copyin(ipc_space_t space,mach_port_name_t name,ipc_object_t * objectp)1367 ipc_mqueue_copyin(
1368 	ipc_space_t             space,
1369 	mach_port_name_t        name,
1370 	ipc_object_t            *objectp)
1371 {
1372 	ipc_entry_bits_t bits;
1373 	ipc_object_t object;
1374 	kern_return_t kr;
1375 
1376 	kr = ipc_right_lookup_read(space, name, &bits, &object);
1377 	if (kr != KERN_SUCCESS) {
1378 		return MACH_RCV_INVALID_NAME;
1379 	}
1380 	/* object is locked and active */
1381 
1382 	if (bits & MACH_PORT_TYPE_RECEIVE) {
1383 		__assert_only ipc_port_t port = ip_object_to_port(object);
1384 		assert(ip_get_receiver_name(port) == name);
1385 		assert(ip_in_space(port, space));
1386 	}
1387 	if (bits & (MACH_PORT_TYPE_RECEIVE | MACH_PORT_TYPE_PORT_SET)) {
1388 		io_reference(object);
1389 		io_unlock(object);
1390 	} else {
1391 		io_unlock(object);
1392 		/* guard exception if we never held the receive right in this entry */
1393 		if ((bits & IE_BITS_EX_RECEIVE) == 0) {
1394 			mach_port_guard_exception(name, 0, kGUARD_EXC_RCV_INVALID_NAME);
1395 		}
1396 		return MACH_RCV_INVALID_NAME;
1397 	}
1398 
1399 	*objectp = object;
1400 	return MACH_MSG_SUCCESS;
1401 }
1402