xref: /xnu-8020.121.3/osfmk/ipc/ipc_mqueue.c (revision fdd8201d7b966f0c3ea610489d29bd841d358941)
1 /*
2  * Copyright (c) 2000-2007 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 /*
29  * @OSF_FREE_COPYRIGHT@
30  */
31 /*
32  * Mach Operating System
33  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
34  * All Rights Reserved.
35  *
36  * Permission to use, copy, modify and distribute this software and its
37  * documentation is hereby granted, provided that both the copyright
38  * notice and this permission notice appear in all copies of the
39  * software, derivative works or modified versions, and any portions
40  * thereof, and that both notices appear in supporting documentation.
41  *
42  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
43  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
44  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
45  *
46  * Carnegie Mellon requests users of this software to return to
47  *
48  *  Software Distribution Coordinator  or  [email protected]
49  *  School of Computer Science
50  *  Carnegie Mellon University
51  *  Pittsburgh PA 15213-3890
52  *
53  * any improvements or extensions that they make and grant Carnegie Mellon
54  * the rights to redistribute these changes.
55  */
56 /*
57  */
58 /*
59  *	File:	ipc/ipc_mqueue.c
60  *	Author:	Rich Draves
61  *	Date:	1989
62  *
63  *	Functions to manipulate IPC message queues.
64  */
65 /*
66  * NOTICE: This file was modified by SPARTA, Inc. in 2006 to introduce
67  * support for mandatory and extensible security protections.  This notice
68  * is included in support of clause 2.2 (b) of the Apple Public License,
69  * Version 2.0.
70  */
71 
72 
73 #include <mach/port.h>
74 #include <mach/message.h>
75 #include <mach/sync_policy.h>
76 
77 #include <kern/assert.h>
78 #include <kern/counter.h>
79 #include <kern/sched_prim.h>
80 #include <kern/ipc_kobject.h>
81 #include <kern/ipc_mig.h>       /* XXX - for mach_msg_receive_continue */
82 #include <kern/misc_protos.h>
83 #include <kern/task.h>
84 #include <kern/thread.h>
85 #include <kern/waitq.h>
86 
87 #include <ipc/port.h>
88 #include <ipc/ipc_mqueue.h>
89 #include <ipc/ipc_kmsg.h>
90 #include <ipc/ipc_right.h>
91 #include <ipc/ipc_port.h>
92 #include <ipc/ipc_pset.h>
93 #include <ipc/ipc_space.h>
94 
95 #if MACH_FLIPC
96 #include <ipc/flipc.h>
97 #endif
98 
99 #ifdef __LP64__
100 #include <vm/vm_map.h>
101 #endif
102 
103 #include <sys/event.h>
104 
105 extern char     *proc_name_address(void *p);
106 
107 int ipc_mqueue_full;            /* address is event for queue space */
108 int ipc_mqueue_rcv;             /* address is event for message arrival */
109 
110 /* forward declarations */
111 static void ipc_mqueue_receive_results(wait_result_t result);
112 static void ipc_mqueue_peek_on_thread_locked(
113 	ipc_mqueue_t        port_mq,
114 	mach_msg_option_t   option,
115 	thread_t            thread);
116 
117 /* Deliver message to message queue or waiting receiver */
118 static void ipc_mqueue_post(
119 	ipc_mqueue_t            mqueue,
120 	ipc_kmsg_t              kmsg,
121 	mach_msg_option_t       option);
122 
123 /*
124  *	Routine:	ipc_mqueue_init
125  *	Purpose:
126  *		Initialize a newly-allocated message queue.
127  */
128 void
ipc_mqueue_init(ipc_mqueue_t mqueue)129 ipc_mqueue_init(
130 	ipc_mqueue_t            mqueue)
131 {
132 	ipc_kmsg_queue_init(&mqueue->imq_messages);
133 	mqueue->imq_qlimit = MACH_PORT_QLIMIT_DEFAULT;
134 	klist_init(&mqueue->imq_klist);
135 }
136 
137 /*
138  *	Routine:	ipc_mqueue_add_locked.
139  *	Purpose:
140  *		Associate the portset's mqueue with the port's mqueue.
141  *		This has to be done so that posting the port will wakeup
142  *		a portset waiter.  If there are waiters on the portset
143  *		mqueue and messages on the port mqueue, try to match them
144  *		up now.
145  *	Conditions:
146  *		Port and Pset both locked.
147  */
148 kern_return_t
ipc_mqueue_add_locked(ipc_mqueue_t port_mqueue,ipc_pset_t pset,waitq_link_t * linkp)149 ipc_mqueue_add_locked(
150 	ipc_mqueue_t    port_mqueue,
151 	ipc_pset_t      pset,
152 	waitq_link_t   *linkp)
153 {
154 	ipc_port_t       port = ip_from_mq(port_mqueue);
155 	struct waitq_set *wqset = &pset->ips_wqset;
156 	ipc_kmsg_queue_t kmsgq = &port_mqueue->imq_messages;
157 	kern_return_t    kr = KERN_SUCCESS;
158 	ipc_kmsg_t       kmsg;
159 
160 	kr = waitq_link_locked(&port->ip_waitq, wqset, linkp);
161 	if (kr != KERN_SUCCESS) {
162 		return kr;
163 	}
164 
165 	/*
166 	 * Now that the set has been added to the port, there may be
167 	 * messages queued on the port and threads waiting on the set
168 	 * waitq.  Lets get them together.
169 	 *
170 	 * Only consider this set however, as the other ones have been
171 	 * posted to already.
172 	 */
173 	while ((kmsg = ipc_kmsg_queue_first(kmsgq)) != IKM_NULL) {
174 		thread_t th;
175 		mach_msg_size_t msize;
176 		spl_t th_spl;
177 
178 		th = waitq_wakeup64_identify_locked(
179 			wqset, IPC_MQUEUE_RECEIVE,
180 			THREAD_AWAKENED, &th_spl,
181 			WAITQ_ALL_PRIORITIES, WAITQ_KEEP_LOCKED);
182 		/* port and pset still locked, thread locked */
183 
184 		if (th == THREAD_NULL) {
185 			/*
186 			 * Didn't find a thread to wake up but messages
187 			 * are enqueued, prepost the set instead,
188 			 * as calling waitq_wakeup64_identify_locked()
189 			 * on the set directly will not take care of it.
190 			 */
191 			waitq_link_prepost_locked(&port->ip_waitq, wqset);
192 			break;
193 		}
194 
195 		/*
196 		 * If the receiver waited with a facility not directly
197 		 * related to Mach messaging, then it isn't prepared to get
198 		 * handed the message directly.  Just set it running, and
199 		 * go look for another thread that can.
200 		 */
201 		if (th->ith_state != MACH_RCV_IN_PROGRESS) {
202 			if (th->ith_state == MACH_PEEK_IN_PROGRESS) {
203 				/*
204 				 * wakeup the peeking thread, but
205 				 * continue to loop over the threads
206 				 * waiting on the port's mqueue to see
207 				 * if there are any actual receivers
208 				 */
209 				ipc_mqueue_peek_on_thread_locked(port_mqueue,
210 				    th->ith_option,
211 				    th);
212 			}
213 			thread_unlock(th);
214 			splx(th_spl);
215 			continue;
216 		}
217 
218 		/*
219 		 * Found a receiver. see if they can handle the message
220 		 * correctly (the message is not too large for them, or
221 		 * they didn't care to be informed that the message was
222 		 * too large).  If they can't handle it, take them off
223 		 * the list and let them go back and figure it out and
224 		 * just move onto the next.
225 		 */
226 		msize = ipc_kmsg_copyout_size(kmsg, th->map);
227 		if (th->ith_rsize <
228 		    (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(th), th->ith_option))) {
229 			th->ith_state = MACH_RCV_TOO_LARGE;
230 			th->ith_msize = msize;
231 			if (th->ith_option & MACH_RCV_LARGE) {
232 				/*
233 				 * let him go without message
234 				 */
235 				th->ith_receiver_name = port_mqueue->imq_receiver_name;
236 				th->ith_kmsg = IKM_NULL;
237 				th->ith_seqno = 0;
238 				thread_unlock(th);
239 				splx(th_spl);
240 				continue; /* find another thread */
241 			}
242 		} else {
243 			th->ith_state = MACH_MSG_SUCCESS;
244 		}
245 
246 		/*
247 		 * This thread is going to take this message,
248 		 * so give it to him.
249 		 */
250 		ipc_kmsg_rmqueue(kmsgq, kmsg);
251 #if MACH_FLIPC
252 		mach_node_t  node = kmsg->ikm_node;
253 #endif
254 		ipc_mqueue_release_msgcount(port_mqueue);
255 
256 		th->ith_kmsg = kmsg;
257 		th->ith_seqno = port_mqueue->imq_seqno++;
258 		thread_unlock(th);
259 		splx(th_spl);
260 #if MACH_FLIPC
261 		if (MACH_NODE_VALID(node) && FPORT_VALID(port_mqueue->imq_fport)) {
262 			flipc_msg_ack(node, port_mqueue, TRUE);
263 		}
264 #endif
265 	}
266 
267 	return KERN_SUCCESS;
268 }
269 
270 
271 /*
272  *	Routine:	ipc_port_has_klist
273  *	Purpose:
274  *		Returns whether the given port imq_klist field can be used as a klist.
275  */
276 bool
ipc_port_has_klist(ipc_port_t port)277 ipc_port_has_klist(ipc_port_t port)
278 {
279 	return !port->ip_specialreply &&
280 	       port->ip_sync_link_state == PORT_SYNC_LINK_ANY;
281 }
282 
283 static inline struct klist *
ipc_object_klist(ipc_object_t object)284 ipc_object_klist(ipc_object_t object)
285 {
286 	if (io_otype(object) == IOT_PORT) {
287 		ipc_port_t port = ip_object_to_port(object);
288 
289 		return ipc_port_has_klist(port) ? &port->ip_klist : NULL;
290 	}
291 	return &ips_object_to_pset(object)->ips_klist;
292 }
293 
294 /*
295  *	Routine:	ipc_mqueue_changed
296  *	Purpose:
297  *		Wake up receivers waiting in a message queue.
298  *	Conditions:
299  *		The object containing the message queue is locked.
300  */
301 void
ipc_mqueue_changed(ipc_space_t space,struct waitq * waitq)302 ipc_mqueue_changed(
303 	ipc_space_t         space,
304 	struct waitq       *waitq)
305 {
306 	ipc_object_t object = io_from_waitq(waitq);
307 	struct klist *klist = ipc_object_klist(object);
308 
309 	if (klist && SLIST_FIRST(klist)) {
310 		/*
311 		 * Indicate that this message queue is vanishing
312 		 *
313 		 * When this is called, the associated receive right may be in flight
314 		 * between two tasks: the one it used to live in, and the one that armed
315 		 * a port destroyed notification for it.
316 		 *
317 		 * The new process may want to register the port it gets back with an
318 		 * EVFILT_MACHPORT filter again, and may have pending sync IPC on this
319 		 * port pending already, in which case we want the imq_klist field to be
320 		 * reusable for nefarious purposes.
321 		 *
322 		 * Fortunately, we really don't need this linkage anymore after this
323 		 * point as EV_VANISHED / EV_EOF will be the last thing delivered ever.
324 		 *
325 		 * Note: we don't have the space lock here, however, this covers the
326 		 *       case of when a task is terminating the space, triggering
327 		 *       several knote_vanish() calls.
328 		 *
329 		 *       We don't need the lock to observe that the space is inactive as
330 		 *       we just deactivated it on the same thread.
331 		 *
332 		 *       We still need to call knote_vanish() so that the knote is
333 		 *       marked with EV_VANISHED or EV_EOF so that the detach step
334 		 *       in filt_machportdetach is skipped correctly.
335 		 */
336 		assert(space);
337 		knote_vanish(klist, is_active(space));
338 	}
339 
340 	if (io_otype(object) == IOT_PORT) {
341 		ipc_port_adjust_sync_link_state_locked(ip_object_to_port(object),
342 		    PORT_SYNC_LINK_ANY, NULL);
343 	} else {
344 		klist_init(klist);
345 	}
346 
347 	waitq_wakeup64_all_locked(waitq, IPC_MQUEUE_RECEIVE,
348 	    THREAD_RESTART, WAITQ_ALL_PRIORITIES, WAITQ_KEEP_LOCKED);
349 }
350 
351 
352 
353 
354 /*
355  *	Routine:	ipc_mqueue_send
356  *	Purpose:
357  *		Send a message to a message queue.  The message holds a reference
358  *		for the destination port for this message queue in the
359  *		msgh_remote_port field.
360  *
361  *		If unsuccessful, the caller still has possession of
362  *		the message and must do something with it.  If successful,
363  *		the message is queued, given to a receiver, or destroyed.
364  *	Conditions:
365  *		port is locked.
366  *	Returns:
367  *		MACH_MSG_SUCCESS	The message was accepted.
368  *		MACH_SEND_TIMED_OUT	Caller still has message.
369  *		MACH_SEND_INTERRUPTED	Caller still has message.
370  */
371 mach_msg_return_t
ipc_mqueue_send_locked(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option_t option,mach_msg_timeout_t send_timeout)372 ipc_mqueue_send_locked(
373 	ipc_mqueue_t            mqueue,
374 	ipc_kmsg_t              kmsg,
375 	mach_msg_option_t       option,
376 	mach_msg_timeout_t  send_timeout)
377 {
378 	ipc_port_t port = ip_from_mq(mqueue);
379 	int wresult;
380 
381 	/*
382 	 *  Don't block if:
383 	 *	1) We're under the queue limit.
384 	 *	2) Caller used the MACH_SEND_ALWAYS internal option.
385 	 *	3) Message is sent to a send-once right.
386 	 */
387 	if (!imq_full(mqueue) ||
388 	    (!imq_full_kernel(mqueue) &&
389 	    ((option & MACH_SEND_ALWAYS) ||
390 	    (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
391 	    MACH_MSG_TYPE_PORT_SEND_ONCE)))) {
392 		mqueue->imq_msgcount++;
393 		assert(mqueue->imq_msgcount > 0);
394 		ip_mq_unlock(port);
395 	} else {
396 		thread_t cur_thread = current_thread();
397 		struct turnstile *send_turnstile = TURNSTILE_NULL;
398 		uint64_t deadline;
399 
400 		/*
401 		 * We have to wait for space to be granted to us.
402 		 */
403 		if ((option & MACH_SEND_TIMEOUT) && (send_timeout == 0)) {
404 			ip_mq_unlock(port);
405 			return MACH_SEND_TIMED_OUT;
406 		}
407 		if (imq_full_kernel(mqueue)) {
408 			ip_mq_unlock(port);
409 			return MACH_SEND_NO_BUFFER;
410 		}
411 		port->ip_fullwaiters = true;
412 
413 		if (option & MACH_SEND_TIMEOUT) {
414 			clock_interval_to_deadline(send_timeout, 1000 * NSEC_PER_USEC, &deadline);
415 		} else {
416 			deadline = 0;
417 		}
418 
419 		thread_set_pending_block_hint(cur_thread, kThreadWaitPortSend);
420 
421 		send_turnstile = turnstile_prepare((uintptr_t)port,
422 		    port_send_turnstile_address(port),
423 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
424 
425 		ipc_port_send_update_inheritor(port, send_turnstile,
426 		    TURNSTILE_DELAYED_UPDATE);
427 
428 		wresult = waitq_assert_wait64_leeway(
429 			&send_turnstile->ts_waitq,
430 			IPC_MQUEUE_FULL,
431 			THREAD_ABORTSAFE,
432 			TIMEOUT_URGENCY_USER_NORMAL,
433 			deadline,
434 			TIMEOUT_NO_LEEWAY);
435 
436 		ip_mq_unlock(port);
437 		turnstile_update_inheritor_complete(send_turnstile,
438 		    TURNSTILE_INTERLOCK_NOT_HELD);
439 
440 		if (wresult == THREAD_WAITING) {
441 			wresult = thread_block(THREAD_CONTINUE_NULL);
442 		}
443 
444 		/* Call turnstile complete with interlock held */
445 		ip_mq_lock(port);
446 		turnstile_complete((uintptr_t)port, port_send_turnstile_address(port), NULL, TURNSTILE_SYNC_IPC);
447 		ip_mq_unlock(port);
448 
449 		/* Call cleanup after dropping the interlock */
450 		turnstile_cleanup();
451 
452 		switch (wresult) {
453 		case THREAD_AWAKENED:
454 			/*
455 			 * we can proceed - inherited msgcount from waker
456 			 * or the message queue has been destroyed and the msgcount
457 			 * has been reset to zero (will detect in ipc_mqueue_post()).
458 			 */
459 			break;
460 
461 		case THREAD_TIMED_OUT:
462 			assert(option & MACH_SEND_TIMEOUT);
463 			return MACH_SEND_TIMED_OUT;
464 
465 		case THREAD_INTERRUPTED:
466 			return MACH_SEND_INTERRUPTED;
467 
468 		case THREAD_RESTART:
469 			/* mqueue is being destroyed */
470 			return MACH_SEND_INVALID_DEST;
471 		default:
472 			panic("ipc_mqueue_send");
473 		}
474 	}
475 
476 	ipc_mqueue_post(mqueue, kmsg, option);
477 	return MACH_MSG_SUCCESS;
478 }
479 
480 /*
481  *	Routine:	ipc_mqueue_override_send_locked
482  *	Purpose:
483  *		Set an override qos on the first message in the queue
484  *		(if the queue is full). This is a send-possible override
485  *		that will go away as soon as we drain a message from the
486  *		queue.
487  *
488  *	Conditions:
489  *		The port corresponding to mqueue is locked.
490  *		The caller holds a reference on the message queue.
491  */
492 void
ipc_mqueue_override_send_locked(ipc_mqueue_t mqueue,mach_msg_qos_t qos_ovr)493 ipc_mqueue_override_send_locked(
494 	ipc_mqueue_t        mqueue,
495 	mach_msg_qos_t      qos_ovr)
496 {
497 	ipc_port_t port = ip_from_mq(mqueue);
498 	boolean_t __unused full_queue_empty = FALSE;
499 
500 	assert(waitq_is_valid(&port->ip_waitq));
501 
502 	if (imq_full(mqueue)) {
503 		ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
504 
505 		if (first && ipc_kmsg_override_qos(&mqueue->imq_messages, first, qos_ovr)) {
506 			if (ip_in_a_space(port) &&
507 			    is_active(ip_get_receiver(port)) &&
508 			    ipc_port_has_klist(port)) {
509 				KNOTE(&port->ip_klist, 0);
510 			}
511 		}
512 		if (!first) {
513 			full_queue_empty = TRUE;
514 		}
515 	}
516 
517 #if DEVELOPMENT || DEBUG
518 	if (full_queue_empty) {
519 		int dst_pid = 0;
520 		dst_pid = ipc_port_get_receiver_task(port, NULL);
521 	}
522 #endif
523 }
524 
525 /*
526  *	Routine:	ipc_mqueue_release_msgcount
527  *	Purpose:
528  *		Release a message queue reference in the case where we
529  *		found a waiter.
530  *
531  *	Conditions:
532  *		The port corresponding to message queue is locked.
533  *		The message corresponding to this reference is off the queue.
534  *		There is no need to pass reserved preposts because this will
535  *		never prepost to anyone
536  */
537 void
ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)538 ipc_mqueue_release_msgcount(ipc_mqueue_t port_mq)
539 {
540 	ipc_port_t port = ip_from_mq(port_mq);
541 	struct turnstile *send_turnstile = port_send_turnstile(port);
542 
543 	ip_mq_lock_held(port);
544 	assert(port_mq->imq_msgcount > 1 || ipc_kmsg_queue_empty(&port_mq->imq_messages));
545 
546 	port_mq->imq_msgcount--;
547 
548 	if (!imq_full(port_mq) && port->ip_fullwaiters &&
549 	    send_turnstile != TURNSTILE_NULL) {
550 		/*
551 		 * boost the priority of the awoken thread
552 		 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
553 		 * the message queue slot we've just reserved.
554 		 *
555 		 * NOTE: this will never prepost
556 		 *
557 		 * The wakeup happens on a turnstile waitq
558 		 * which will wakeup the highest priority waiter.
559 		 * A potential downside of this would be starving low
560 		 * priority senders if there is a constant churn of
561 		 * high priority threads trying to send to this port.
562 		 */
563 		if (waitq_wakeup64_one(&send_turnstile->ts_waitq,
564 		    IPC_MQUEUE_FULL,
565 		    THREAD_AWAKENED,
566 		    WAITQ_PROMOTE_PRIORITY) != KERN_SUCCESS) {
567 			port->ip_fullwaiters = false;
568 		} else {
569 			/* gave away our slot - add reference back */
570 			port_mq->imq_msgcount++;
571 		}
572 	}
573 
574 	if (ipc_kmsg_queue_empty(&port_mq->imq_messages)) {
575 		waitq_clear_prepost_locked(&port->ip_waitq);
576 	}
577 }
578 
579 /*
580  *	Routine:	ipc_mqueue_post
581  *	Purpose:
582  *		Post a message to a waiting receiver or enqueue it.  If a
583  *		receiver is waiting, we can release our reserved space in
584  *		the message queue.
585  *
586  *	Conditions:
587  *		port is unlocked
588  *		If we need to queue, our space in the message queue is reserved.
589  */
590 static void
ipc_mqueue_post(ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,mach_msg_option_t __unused option)591 ipc_mqueue_post(
592 	ipc_mqueue_t               mqueue,
593 	ipc_kmsg_t                 kmsg,
594 	mach_msg_option_t __unused option)
595 {
596 	ipc_port_t port = ip_from_mq(mqueue);
597 	struct waitq *waitq = &port->ip_waitq;
598 	boolean_t destroy_msg = FALSE;
599 
600 	ipc_kmsg_trace_send(kmsg, option);
601 
602 	/*
603 	 *	While the msg queue is locked, we have control of the
604 	 *	kmsg, so the ref in it for the port is still good.
605 	 *
606 	 *	Check for a receiver for the message.
607 	 */
608 	ip_mq_lock(port);
609 
610 	/* we may have raced with port destruction! */
611 	if (!waitq_is_valid(&port->ip_waitq)) {
612 		destroy_msg = TRUE;
613 		goto out_unlock;
614 	}
615 
616 	for (;;) {
617 		spl_t th_spl;
618 		thread_t receiver;
619 		mach_msg_size_t msize;
620 
621 		receiver = waitq_wakeup64_identify_locked(waitq,
622 		    IPC_MQUEUE_RECEIVE, THREAD_AWAKENED,
623 		    &th_spl, WAITQ_ALL_PRIORITIES, WAITQ_KEEP_LOCKED);
624 		/* waitq still locked, thread locked */
625 
626 		if (receiver == THREAD_NULL) {
627 			/*
628 			 * no receivers; queue kmsg if space still reserved
629 			 * Reservations are cancelled when the port goes inactive.
630 			 * note that this will enqueue the message for any
631 			 * "peeking" receivers.
632 			 *
633 			 * Also, post the knote to wake up any threads waiting
634 			 * on that style of interface if this insertion is of
635 			 * note (first insertion, or adjusted override qos all
636 			 * the way to the head of the queue).
637 			 *
638 			 * This is just for ports. port-sets knotes are being
639 			 * posted to by the waitq_wakeup64_identify_locked()
640 			 * above already.
641 			 */
642 			if (mqueue->imq_msgcount == 0) {
643 				/*
644 				 * The message queue must belong
645 				 * to an inactive port, so just destroy
646 				 * the message and pretend it was posted.
647 				 */
648 				destroy_msg = TRUE;
649 			} else if (!ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg)) {
650 				/*
651 				 * queue was not empty and qos
652 				 * didn't change, nothing to do.
653 				 */
654 			} else if (ip_in_a_space(port) &&
655 			    is_active(ip_get_receiver(port)) &&
656 			    ipc_port_has_klist(port)) {
657 				/*
658 				 * queue was empty or qos changed
659 				 * we need to tell kqueue, unless
660 				 * the space is getting torn down
661 				 */
662 				KNOTE(&port->ip_klist, 0);
663 			}
664 			break;
665 		}
666 
667 		/*
668 		 * If a thread is attempting a "peek" into the message queue
669 		 * (MACH_PEEK_IN_PROGRESS), then we enqueue the message and set the
670 		 * thread running.  A successful peek is essentially the same as
671 		 * message delivery since the peeking thread takes responsibility
672 		 * for delivering the message and (eventually) removing it from
673 		 * the mqueue.  Only one thread can successfully use the peek
674 		 * facility on any given port, so we exit the waitq loop after
675 		 * encountering such a thread.
676 		 */
677 		if (receiver->ith_state == MACH_PEEK_IN_PROGRESS && mqueue->imq_msgcount > 0) {
678 			ipc_kmsg_enqueue_qos(&mqueue->imq_messages, kmsg);
679 			ipc_mqueue_peek_on_thread_locked(mqueue, receiver->ith_option, receiver);
680 			thread_unlock(receiver);
681 			splx(th_spl);
682 			break; /* Message was posted, so break out of loop */
683 		}
684 
685 		/*
686 		 * If the receiver waited with a facility not directly related
687 		 * to Mach messaging, then it isn't prepared to get handed the
688 		 * message directly. Just set it running, and go look for
689 		 * another thread that can.
690 		 */
691 		if (receiver->ith_state != MACH_RCV_IN_PROGRESS) {
692 			thread_unlock(receiver);
693 			splx(th_spl);
694 			continue;
695 		}
696 
697 
698 		/*
699 		 * We found a waiting thread.
700 		 * If the message is too large or the scatter list is too small
701 		 * the thread we wake up will get that as its status.
702 		 */
703 		msize = ipc_kmsg_copyout_size(kmsg, receiver->map);
704 		if (receiver->ith_rsize <
705 		    (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(receiver), receiver->ith_option))) {
706 			receiver->ith_msize = msize;
707 			receiver->ith_state = MACH_RCV_TOO_LARGE;
708 		} else {
709 			receiver->ith_state = MACH_MSG_SUCCESS;
710 		}
711 
712 		/*
713 		 * If there is no problem with the upcoming receive, or the
714 		 * receiver thread didn't specifically ask for special too
715 		 * large error condition, go ahead and select it anyway.
716 		 */
717 		if ((receiver->ith_state == MACH_MSG_SUCCESS) ||
718 		    !(receiver->ith_option & MACH_RCV_LARGE)) {
719 			receiver->ith_kmsg = kmsg;
720 			receiver->ith_seqno = mqueue->imq_seqno++;
721 #if MACH_FLIPC
722 			mach_node_t node = kmsg->ikm_node;
723 #endif
724 			thread_unlock(receiver);
725 			splx(th_spl);
726 
727 			/* we didn't need our reserved spot in the queue */
728 			ipc_mqueue_release_msgcount(mqueue);
729 
730 #if MACH_FLIPC
731 			if (MACH_NODE_VALID(node) && FPORT_VALID(mqueue->imq_fport)) {
732 				flipc_msg_ack(node, mqueue, TRUE);
733 			}
734 #endif
735 			break;
736 		}
737 
738 		/*
739 		 * Otherwise, this thread needs to be released to run
740 		 * and handle its error without getting the message.  We
741 		 * need to go back and pick another one.
742 		 */
743 		receiver->ith_receiver_name = mqueue->imq_receiver_name;
744 		receiver->ith_kmsg = IKM_NULL;
745 		receiver->ith_seqno = 0;
746 		thread_unlock(receiver);
747 		splx(th_spl);
748 	}
749 
750 out_unlock:
751 	/* clear the waitq boost we may have been given */
752 	waitq_clear_promotion_locked(waitq, current_thread());
753 	waitq_unlock(waitq);
754 
755 	if (destroy_msg) {
756 		ipc_kmsg_destroy(kmsg, IPC_KMSG_DESTROY_ALL);
757 	}
758 
759 	counter_inc(&current_task()->messages_sent);
760 	return;
761 }
762 
763 
764 static void
ipc_mqueue_receive_results(wait_result_t saved_wait_result)765 ipc_mqueue_receive_results(wait_result_t saved_wait_result)
766 {
767 	thread_t                self = current_thread();
768 	mach_msg_option_t       option = self->ith_option;
769 
770 	/*
771 	 * why did we wake up?
772 	 */
773 	switch (saved_wait_result) {
774 	case THREAD_TIMED_OUT:
775 		self->ith_state = MACH_RCV_TIMED_OUT;
776 		return;
777 
778 	case THREAD_INTERRUPTED:
779 		self->ith_state = MACH_RCV_INTERRUPTED;
780 		return;
781 
782 	case THREAD_RESTART:
783 		/* something bad happened to the port/set */
784 		self->ith_state = MACH_RCV_PORT_CHANGED;
785 		return;
786 
787 	case THREAD_AWAKENED:
788 		/*
789 		 * We do not need to go select a message, somebody
790 		 * handed us one (or a too-large indication).
791 		 */
792 		switch (self->ith_state) {
793 		case MACH_RCV_SCATTER_SMALL:
794 		case MACH_RCV_TOO_LARGE:
795 			/*
796 			 * Somebody tried to give us a too large
797 			 * message. If we indicated that we cared,
798 			 * then they only gave us the indication,
799 			 * otherwise they gave us the indication
800 			 * AND the message anyway.
801 			 */
802 			if (option & MACH_RCV_LARGE) {
803 				return;
804 			}
805 			return;
806 		case MACH_MSG_SUCCESS:
807 			return;
808 		case MACH_PEEK_READY:
809 			return;
810 
811 		default:
812 			panic("ipc_mqueue_receive_results: strange ith_state");
813 		}
814 
815 	default:
816 		panic("ipc_mqueue_receive_results: strange wait_result");
817 	}
818 }
819 
820 void
ipc_mqueue_receive_continue(__unused void * param,wait_result_t wresult)821 ipc_mqueue_receive_continue(
822 	__unused void *param,
823 	wait_result_t wresult)
824 {
825 	ipc_mqueue_receive_results(wresult);
826 	mach_msg_receive_continue();  /* hard-coded for now */
827 }
828 
829 /*
830  *	Routine:	ipc_mqueue_receive
831  *	Purpose:
832  *		Receive a message from a message queue.
833  *
834  *	Conditions:
835  *		Our caller must hold a reference for the port or port set
836  *		to which this queue belongs, to keep the queue
837  *		from being deallocated.
838  *
839  *		The kmsg is returned with clean header fields
840  *		and with the circular bit turned off through the ith_kmsg
841  *		field of the thread's receive continuation state.
842  *	Returns:
843  *		MACH_MSG_SUCCESS	Message returned in ith_kmsg.
844  *		MACH_RCV_TOO_LARGE	Message size returned in ith_msize.
845  *		MACH_RCV_TIMED_OUT	No message obtained.
846  *		MACH_RCV_INTERRUPTED	No message obtained.
847  *		MACH_RCV_PORT_DIED	Port/set died; no message.
848  *		MACH_RCV_PORT_CHANGED	Port moved into set; no msg.
849  *
850  */
851 
852 void
ipc_mqueue_receive(struct waitq * waitq,mach_msg_option_t option,mach_msg_size_t max_size,mach_msg_timeout_t rcv_timeout,int interruptible)853 ipc_mqueue_receive(
854 	struct waitq           *waitq,
855 	mach_msg_option_t       option,
856 	mach_msg_size_t         max_size,
857 	mach_msg_timeout_t      rcv_timeout,
858 	int                     interruptible)
859 {
860 	wait_result_t           wresult;
861 	thread_t                self = current_thread();
862 
863 	waitq_lock(waitq);
864 
865 	wresult = ipc_mqueue_receive_on_thread_and_unlock(waitq, option, max_size,
866 	    rcv_timeout, interruptible, self);
867 	/* object unlocked */
868 	if (wresult == THREAD_NOT_WAITING) {
869 		return;
870 	}
871 
872 	if (wresult == THREAD_WAITING) {
873 		if (self->ith_continuation) {
874 			thread_block(ipc_mqueue_receive_continue);
875 		}
876 		/* NOTREACHED */
877 
878 		wresult = thread_block(THREAD_CONTINUE_NULL);
879 	}
880 	ipc_mqueue_receive_results(wresult);
881 }
882 
883 /*
884  *	Routine:	ipc_mqueue_receive_on_thread_and_unlock
885  *	Purpose:
886  *		Receive a message from a message queue using a specified thread.
887  *		If no message available, assert_wait on the appropriate waitq.
888  *
889  *	Conditions:
890  *		Assumes thread is self.
891  *		The port/port-set waitq is locked on entry, unlocked on return.
892  *		May have assert-waited. Caller must block in those cases.
893  */
894 wait_result_t
ipc_mqueue_receive_on_thread_and_unlock(struct waitq * waitq,mach_msg_option_t option,mach_msg_size_t max_size,mach_msg_timeout_t rcv_timeout,int interruptible,thread_t thread)895 ipc_mqueue_receive_on_thread_and_unlock(
896 	struct waitq           *waitq,
897 	mach_msg_option_t       option,
898 	mach_msg_size_t         max_size,
899 	mach_msg_timeout_t      rcv_timeout,
900 	int                     interruptible,
901 	thread_t                thread)
902 {
903 	ipc_object_t            object = io_from_waitq(waitq);
904 	ipc_port_t              port = IP_NULL;
905 	wait_result_t           wresult;
906 	uint64_t                deadline;
907 	struct turnstile        *rcv_turnstile = TURNSTILE_NULL;
908 
909 	if (waitq_type(waitq) == WQT_PORT_SET) {
910 		ipc_pset_t pset = ips_object_to_pset(object);
911 		struct waitq *port_wq;
912 
913 		/*
914 		 * Put the message at the back of the prepost list
915 		 * if it's not a PEEK.
916 		 *
917 		 * Might drop the pset lock temporarily.
918 		 */
919 		port_wq = waitq_set_first_prepost(&pset->ips_wqset, WQS_PREPOST_LOCK |
920 		    ((option & MACH_PEEK_MSG) ? WQS_PREPOST_PEEK: 0));
921 
922 		/* Returns with port locked */
923 
924 		if (port_wq != NULL) {
925 			/*
926 			 * We get here if there is at least one message
927 			 * waiting on port_wq. We have instructed the prepost
928 			 * iteration logic to leave both the port_wq and the
929 			 * set waitq locked.
930 			 *
931 			 * Continue on to handling the message with just
932 			 * the port waitq locked.
933 			 */
934 			io_unlock(object);
935 			port = ip_from_waitq(port_wq);
936 		}
937 	} else if (waitq_type(waitq) == WQT_PORT) {
938 		port = ip_from_waitq(waitq);
939 		if (ipc_kmsg_queue_empty(&port->ip_messages.imq_messages)) {
940 			port = IP_NULL;
941 		}
942 	} else {
943 		panic("Unknown waitq type (%p/0x%x)", waitq, waitq_type(waitq));
944 	}
945 
946 	if (port) {
947 		if (option & MACH_PEEK_MSG) {
948 			ipc_mqueue_peek_on_thread_locked(&port->ip_messages,
949 			    option, thread);
950 		} else {
951 			ipc_mqueue_select_on_thread_locked(&port->ip_messages,
952 			    option, max_size, thread);
953 		}
954 		ip_mq_unlock(port);
955 		return THREAD_NOT_WAITING;
956 	}
957 
958 	if (!waitq_is_valid(waitq)) {
959 		/* someone raced us to destroy this mqueue/port! */
960 		io_unlock(object);
961 		/*
962 		 * ipc_mqueue_receive_results updates the thread's ith_state
963 		 * TODO: differentiate between rights being moved and
964 		 * rights/ports being destroyed (21885327)
965 		 */
966 		return THREAD_RESTART;
967 	}
968 
969 	/*
970 	 * Looks like we'll have to block.  The waitq we will
971 	 * block on (whether the set's or the local port's) is
972 	 * still locked.
973 	 */
974 	if ((option & MACH_RCV_TIMEOUT) && rcv_timeout == 0) {
975 		io_unlock(object);
976 		thread->ith_state = MACH_RCV_TIMED_OUT;
977 		return THREAD_NOT_WAITING;
978 	}
979 
980 	thread->ith_option = option;
981 	thread->ith_rsize = max_size;
982 	thread->ith_msize = 0;
983 
984 	if (option & MACH_PEEK_MSG) {
985 		thread->ith_state = MACH_PEEK_IN_PROGRESS;
986 	} else {
987 		thread->ith_state = MACH_RCV_IN_PROGRESS;
988 	}
989 
990 	if (option & MACH_RCV_TIMEOUT) {
991 		clock_interval_to_deadline(rcv_timeout, 1000 * NSEC_PER_USEC, &deadline);
992 	} else {
993 		deadline = 0;
994 	}
995 
996 	/*
997 	 * Threads waiting on a reply port (not portset)
998 	 * will wait on its receive turnstile.
999 	 *
1000 	 * Donate waiting thread's turnstile and
1001 	 * setup inheritor for special reply port.
1002 	 * Based on the state of the special reply
1003 	 * port, the inheritor would be the send
1004 	 * turnstile of the connection port on which
1005 	 * the send of sync ipc would happen or
1006 	 * workloop's turnstile who would reply to
1007 	 * the sync ipc message.
1008 	 *
1009 	 * Pass in mqueue wait in waitq_assert_wait to
1010 	 * support port set wakeup. The mqueue waitq of port
1011 	 * will be converted to to turnstile waitq
1012 	 * in waitq_assert_wait instead of global waitqs.
1013 	 */
1014 	if (waitq_type(waitq) == WQT_PORT) {
1015 		port = ip_from_waitq(waitq);
1016 		rcv_turnstile = turnstile_prepare((uintptr_t)port,
1017 		    port_rcv_turnstile_address(port),
1018 		    TURNSTILE_NULL, TURNSTILE_SYNC_IPC);
1019 
1020 		ipc_port_recv_update_inheritor(port, rcv_turnstile,
1021 		    TURNSTILE_DELAYED_UPDATE);
1022 	}
1023 
1024 	thread_set_pending_block_hint(thread, kThreadWaitPortReceive);
1025 	wresult = waitq_assert_wait64_locked(waitq,
1026 	    IPC_MQUEUE_RECEIVE,
1027 	    interruptible,
1028 	    TIMEOUT_URGENCY_USER_NORMAL,
1029 	    deadline,
1030 	    TIMEOUT_NO_LEEWAY,
1031 	    thread);
1032 	if (wresult == THREAD_AWAKENED) {
1033 		/*
1034 		 * The first thing we did was to look for preposts
1035 		 * (using waitq_set_first_prepost() for sets, or looking
1036 		 * at the port's queue for ports).
1037 		 *
1038 		 * Since we found none, we kept the waitq locked.
1039 		 *
1040 		 * It ensures that waitq_assert_wait64_locked() can't
1041 		 * find pre-posts either, won't drop the waitq lock
1042 		 * either (even for a set), and can't return THREAD_AWAKENED.
1043 		 */
1044 		panic("ipc_mqueue_receive_on_thread: sleep walking");
1045 	}
1046 
1047 	io_unlock(object);
1048 
1049 	/* Check if its a port mqueue and if it needs to call turnstile_update_inheritor_complete */
1050 	if (rcv_turnstile != TURNSTILE_NULL) {
1051 		turnstile_update_inheritor_complete(rcv_turnstile, TURNSTILE_INTERLOCK_NOT_HELD);
1052 	}
1053 	/* Its callers responsibility to call turnstile_complete to get the turnstile back */
1054 
1055 	return wresult;
1056 }
1057 
1058 
1059 /*
1060  *	Routine:	ipc_mqueue_peek_on_thread_locked
1061  *	Purpose:
1062  *		A receiver discovered that there was a message on the queue
1063  *		before he had to block. Tell a thread about the message queue,
1064  *		but don't pick off any messages.
1065  *	Conditions:
1066  *		port_mq locked
1067  *		at least one message on port_mq's message queue
1068  *
1069  *	Returns: (on thread->ith_state)
1070  *		MACH_PEEK_READY		ith_peekq contains a message queue
1071  */
1072 void
ipc_mqueue_peek_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option_t option,thread_t thread)1073 ipc_mqueue_peek_on_thread_locked(
1074 	ipc_mqueue_t        port_mq,
1075 	mach_msg_option_t   option,
1076 	thread_t            thread)
1077 {
1078 	(void)option;
1079 	assert(option & MACH_PEEK_MSG);
1080 	assert(ipc_kmsg_queue_first(&port_mq->imq_messages) != IKM_NULL);
1081 
1082 	/*
1083 	 * Take a reference on the mqueue's associated port:
1084 	 * the peeking thread will be responsible to release this reference
1085 	 */
1086 	ip_reference(ip_from_mq(port_mq));
1087 	thread->ith_peekq = port_mq;
1088 	thread->ith_state = MACH_PEEK_READY;
1089 }
1090 
1091 /*
1092  *	Routine:	ipc_mqueue_select_on_thread_locked
1093  *	Purpose:
1094  *		A receiver discovered that there was a message on the queue
1095  *		before he had to block.  Pick the message off the queue and
1096  *		"post" it to thread.
1097  *	Conditions:
1098  *		port locked.
1099  *              thread not locked.
1100  *		There is a message.
1101  *		No need to reserve prepost objects - it will never prepost
1102  *
1103  *	Returns:
1104  *		MACH_MSG_SUCCESS	Actually selected a message for ourselves.
1105  *		MACH_RCV_TOO_LARGE  May or may not have pull it, but it is large
1106  */
1107 void
ipc_mqueue_select_on_thread_locked(ipc_mqueue_t port_mq,mach_msg_option_t option,mach_msg_size_t max_size,thread_t thread)1108 ipc_mqueue_select_on_thread_locked(
1109 	ipc_mqueue_t            port_mq,
1110 	mach_msg_option_t       option,
1111 	mach_msg_size_t         max_size,
1112 	thread_t                thread)
1113 {
1114 	ipc_kmsg_t kmsg;
1115 	mach_msg_return_t mr = MACH_MSG_SUCCESS;
1116 	mach_msg_size_t msize;
1117 
1118 	/*
1119 	 * Do some sanity checking of our ability to receive
1120 	 * before pulling the message off the queue.
1121 	 */
1122 	kmsg = ipc_kmsg_queue_first(&port_mq->imq_messages);
1123 	assert(kmsg != IKM_NULL);
1124 
1125 	/*
1126 	 * If we really can't receive it, but we had the
1127 	 * MACH_RCV_LARGE option set, then don't take it off
1128 	 * the queue, instead return the appropriate error
1129 	 * (and size needed).
1130 	 */
1131 	msize = ipc_kmsg_copyout_size(kmsg, thread->map);
1132 	if (msize + REQUESTED_TRAILER_SIZE(thread_is_64bit_addr(thread), option) > max_size) {
1133 		mr = MACH_RCV_TOO_LARGE;
1134 		if (option & MACH_RCV_LARGE) {
1135 			thread->ith_receiver_name = port_mq->imq_receiver_name;
1136 			thread->ith_kmsg = IKM_NULL;
1137 			thread->ith_msize = msize;
1138 			thread->ith_seqno = 0;
1139 			thread->ith_state = mr;
1140 			return;
1141 		}
1142 	}
1143 
1144 	ipc_kmsg_rmqueue(&port_mq->imq_messages, kmsg);
1145 #if MACH_FLIPC
1146 	if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(port_mq->imq_fport)) {
1147 		flipc_msg_ack(kmsg->ikm_node, port_mq, TRUE);
1148 	}
1149 #endif
1150 	ipc_mqueue_release_msgcount(port_mq);
1151 	thread->ith_seqno = port_mq->imq_seqno++;
1152 	thread->ith_kmsg = kmsg;
1153 	thread->ith_state = mr;
1154 
1155 	counter_inc(&current_task()->messages_received);
1156 	return;
1157 }
1158 
1159 /*
1160  *	Routine:	ipc_mqueue_peek_locked
1161  *	Purpose:
1162  *		Peek at a (non-set) message queue to see if it has a message
1163  *		matching the sequence number provided (if zero, then the
1164  *		first message in the queue) and return vital info about the
1165  *		message.
1166  *
1167  *	Conditions:
1168  *		The io object corresponding to mq is locked by callers.
1169  *		Other locks may be held by callers, so this routine cannot block.
1170  *		Caller holds reference on the message queue.
1171  */
1172 unsigned
ipc_mqueue_peek_locked(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1173 ipc_mqueue_peek_locked(ipc_mqueue_t mq,
1174     mach_port_seqno_t * seqnop,
1175     mach_msg_size_t * msg_sizep,
1176     mach_msg_id_t * msg_idp,
1177     mach_msg_max_trailer_t * msg_trailerp,
1178     ipc_kmsg_t *kmsgp)
1179 {
1180 	ipc_kmsg_queue_t kmsgq;
1181 	ipc_kmsg_t kmsg;
1182 	mach_port_seqno_t seqno, msgoff;
1183 	unsigned res = 0;
1184 
1185 	seqno = 0;
1186 	if (seqnop != NULL) {
1187 		seqno = *seqnop;
1188 	}
1189 
1190 	if (seqno == 0) {
1191 		seqno = mq->imq_seqno;
1192 		msgoff = 0;
1193 	} else if (seqno >= mq->imq_seqno &&
1194 	    seqno < mq->imq_seqno + mq->imq_msgcount) {
1195 		msgoff = seqno - mq->imq_seqno;
1196 	} else {
1197 		goto out;
1198 	}
1199 
1200 	/* look for the message that would match that seqno */
1201 	kmsgq = &mq->imq_messages;
1202 	kmsg = ipc_kmsg_queue_first(kmsgq);
1203 	while (msgoff-- && kmsg != IKM_NULL) {
1204 		kmsg = ipc_kmsg_queue_next(kmsgq, kmsg);
1205 	}
1206 	if (kmsg == IKM_NULL) {
1207 		goto out;
1208 	}
1209 
1210 	/* found one - return the requested info */
1211 	if (seqnop != NULL) {
1212 		*seqnop = seqno;
1213 	}
1214 	if (msg_sizep != NULL) {
1215 		*msg_sizep = kmsg->ikm_header->msgh_size;
1216 	}
1217 	if (msg_idp != NULL) {
1218 		*msg_idp = kmsg->ikm_header->msgh_id;
1219 	}
1220 	if (msg_trailerp != NULL) {
1221 		memcpy(msg_trailerp,
1222 		    (mach_msg_max_trailer_t *)((vm_offset_t)kmsg->ikm_header +
1223 		    mach_round_msg(kmsg->ikm_header->msgh_size)),
1224 		    sizeof(mach_msg_max_trailer_t));
1225 	}
1226 	if (kmsgp != NULL) {
1227 		*kmsgp = kmsg;
1228 	}
1229 
1230 	res = 1;
1231 
1232 out:
1233 	return res;
1234 }
1235 
1236 
1237 /*
1238  *	Routine:	ipc_mqueue_peek
1239  *	Purpose:
1240  *		Peek at a (non-set) message queue to see if it has a message
1241  *		matching the sequence number provided (if zero, then the
1242  *		first message in the queue) and return vital info about the
1243  *		message.
1244  *
1245  *	Conditions:
1246  *		The ipc_mqueue_t is unlocked.
1247  *		Locks may be held by callers, so this routine cannot block.
1248  *		Caller holds reference on the message queue.
1249  */
1250 unsigned
ipc_mqueue_peek(ipc_mqueue_t mq,mach_port_seqno_t * seqnop,mach_msg_size_t * msg_sizep,mach_msg_id_t * msg_idp,mach_msg_max_trailer_t * msg_trailerp,ipc_kmsg_t * kmsgp)1251 ipc_mqueue_peek(ipc_mqueue_t mq,
1252     mach_port_seqno_t * seqnop,
1253     mach_msg_size_t * msg_sizep,
1254     mach_msg_id_t * msg_idp,
1255     mach_msg_max_trailer_t * msg_trailerp,
1256     ipc_kmsg_t *kmsgp)
1257 {
1258 	ipc_port_t port = ip_from_mq(mq);
1259 	unsigned res;
1260 
1261 	ip_mq_lock(port);
1262 
1263 	res = ipc_mqueue_peek_locked(mq, seqnop, msg_sizep, msg_idp,
1264 	    msg_trailerp, kmsgp);
1265 
1266 	ip_mq_unlock(port);
1267 	return res;
1268 }
1269 
1270 #if MACH_FLIPC
1271 /*
1272  *	Routine:	ipc_mqueue_release_peek_ref
1273  *	Purpose:
1274  *		Release the reference on an mqueue's associated port which was
1275  *		granted to a thread in ipc_mqueue_peek_on_thread (on the
1276  *		MACH_PEEK_MSG thread wakeup path).
1277  *
1278  *	Conditions:
1279  *		The ipc_mqueue_t should be locked on entry.
1280  *		The ipc_mqueue_t will be _unlocked_ on return
1281  *			(and potentially invalid!)
1282  *
1283  */
1284 void
ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)1285 ipc_mqueue_release_peek_ref(ipc_mqueue_t mqueue)
1286 {
1287 	ipc_port_t port = ip_from_mq(mqueue);
1288 
1289 	ip_mq_lock_held(port);
1290 
1291 	/*
1292 	 * clear any preposts this mq may have generated
1293 	 * (which would cause subsequent immediate wakeups)
1294 	 */
1295 	waitq_clear_prepost_locked(&port->ip_waitq);
1296 
1297 	ip_mq_unlock(port);
1298 
1299 	/*
1300 	 * release the port reference: we need to do this outside the lock
1301 	 * because we might be holding the last port reference!
1302 	 **/
1303 	ip_release(port);
1304 }
1305 #endif /* MACH_FLIPC */
1306 
1307 /*
1308  *	Routine:	ipc_mqueue_destroy_locked
1309  *	Purpose:
1310  *		Destroy a message queue.
1311  *		Set any blocked senders running.
1312  *		Destroy the kmsgs in the queue.
1313  *	Conditions:
1314  *		port locked
1315  *		Receivers were removed when the receive right was "changed"
1316  */
1317 boolean_t
ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue,waitq_link_list_t * free_l)1318 ipc_mqueue_destroy_locked(ipc_mqueue_t mqueue, waitq_link_list_t *free_l)
1319 {
1320 	ipc_port_t port = ip_from_mq(mqueue);
1321 	boolean_t reap = FALSE;
1322 	struct turnstile *send_turnstile = port_send_turnstile(port);
1323 
1324 	/*
1325 	 *	rouse all blocked senders
1326 	 *	(don't boost anyone - we're tearing this queue down)
1327 	 *	(never preposts)
1328 	 */
1329 	port->ip_fullwaiters = false;
1330 
1331 	if (send_turnstile != TURNSTILE_NULL) {
1332 		waitq_wakeup64_all(&send_turnstile->ts_waitq,
1333 		    IPC_MQUEUE_FULL,
1334 		    THREAD_RESTART,
1335 		    WAITQ_ALL_PRIORITIES);
1336 	}
1337 
1338 #if MACH_FLIPC
1339 	ipc_kmsg_t first = ipc_kmsg_queue_first(&mqueue->imq_messages);
1340 	if (first) {
1341 		ipc_kmsg_t kmsg = first;
1342 		do {
1343 			if (MACH_NODE_VALID(kmsg->ikm_node) && FPORT_VALID(mqueue->imq_fport)) {
1344 				flipc_msg_ack(kmsg->ikm_node, mqueue, TRUE);
1345 			}
1346 			kmsg = kmsg->ikm_next;
1347 		} while (kmsg != first);
1348 	}
1349 #endif
1350 
1351 	/*
1352 	 * Move messages from the specified queue to the per-thread
1353 	 * clean/drain queue while we have the mqueue lock.
1354 	 */
1355 	reap = ipc_kmsg_delayed_destroy_queue(&mqueue->imq_messages);
1356 
1357 	/*
1358 	 * Wipe out message count, both for messages about to be
1359 	 * reaped and for reserved space for (previously) woken senders.
1360 	 * This is the indication to them that their reserved space is gone
1361 	 * (the mqueue was destroyed).
1362 	 */
1363 	mqueue->imq_msgcount = 0;
1364 
1365 	/*
1366 	 * invalidate the waitq for subsequent mqueue operations,
1367 	 * the port lock could be dropped after invalidating the mqueue.
1368 	 */
1369 
1370 	waitq_invalidate(&port->ip_waitq);
1371 
1372 	waitq_unlink_all_locked(&port->ip_waitq, NULL, free_l);
1373 
1374 	return reap;
1375 }
1376 
1377 /*
1378  *	Routine:	ipc_mqueue_set_qlimit_locked
1379  *	Purpose:
1380  *		Changes a message queue limit; the maximum number
1381  *		of messages which may be queued.
1382  *	Conditions:
1383  *		Port locked.
1384  */
1385 
1386 void
ipc_mqueue_set_qlimit_locked(ipc_mqueue_t mqueue,mach_port_msgcount_t qlimit)1387 ipc_mqueue_set_qlimit_locked(
1388 	ipc_mqueue_t           mqueue,
1389 	mach_port_msgcount_t   qlimit)
1390 {
1391 	ipc_port_t port = ip_from_mq(mqueue);
1392 
1393 	assert(qlimit <= MACH_PORT_QLIMIT_MAX);
1394 
1395 	/* wake up senders allowed by the new qlimit */
1396 	if (qlimit > mqueue->imq_qlimit) {
1397 		mach_port_msgcount_t i, wakeup;
1398 		struct turnstile *send_turnstile = port_send_turnstile(port);
1399 
1400 		/* caution: wakeup, qlimit are unsigned */
1401 		wakeup = qlimit - mqueue->imq_qlimit;
1402 
1403 		for (i = 0; i < wakeup; i++) {
1404 			/*
1405 			 * boost the priority of the awoken thread
1406 			 * (WAITQ_PROMOTE_PRIORITY) to ensure it uses
1407 			 * the message queue slot we've just reserved.
1408 			 *
1409 			 * NOTE: this will never prepost
1410 			 */
1411 			if (send_turnstile == TURNSTILE_NULL ||
1412 			    waitq_wakeup64_one(&send_turnstile->ts_waitq,
1413 			    IPC_MQUEUE_FULL,
1414 			    THREAD_AWAKENED,
1415 			    WAITQ_PROMOTE_PRIORITY) == KERN_NOT_WAITING) {
1416 				port->ip_fullwaiters = false;
1417 				break;
1418 			}
1419 			mqueue->imq_msgcount++;  /* give it to the awakened thread */
1420 		}
1421 	}
1422 	mqueue->imq_qlimit = (uint16_t)qlimit;
1423 }
1424 
1425 /*
1426  *	Routine:	ipc_mqueue_set_seqno_locked
1427  *	Purpose:
1428  *		Changes an mqueue's sequence number.
1429  *	Conditions:
1430  *		Caller holds a reference to the queue's containing object.
1431  */
1432 void
ipc_mqueue_set_seqno_locked(ipc_mqueue_t mqueue,mach_port_seqno_t seqno)1433 ipc_mqueue_set_seqno_locked(
1434 	ipc_mqueue_t            mqueue,
1435 	mach_port_seqno_t       seqno)
1436 {
1437 	mqueue->imq_seqno = seqno;
1438 }
1439 
1440 
1441 /*
1442  *	Routine:	ipc_mqueue_copyin
1443  *	Purpose:
1444  *		Convert a name in a space to a message queue.
1445  *	Conditions:
1446  *		Nothing locked.  If successful, the caller gets a ref for
1447  *		for the object.	This ref ensures the continued existence of
1448  *		the queue.
1449  *	Returns:
1450  *		MACH_MSG_SUCCESS	Found a message queue.
1451  *		MACH_RCV_INVALID_NAME	The space is dead.
1452  *		MACH_RCV_INVALID_NAME	The name doesn't denote a right.
1453  *		MACH_RCV_INVALID_NAME
1454  *			The denoted right is not receive or port set.
1455  *		MACH_RCV_IN_SET		Receive right is a member of a set.
1456  */
1457 
1458 mach_msg_return_t
ipc_mqueue_copyin(ipc_space_t space,mach_port_name_t name,ipc_object_t * objectp)1459 ipc_mqueue_copyin(
1460 	ipc_space_t             space,
1461 	mach_port_name_t        name,
1462 	ipc_object_t            *objectp)
1463 {
1464 	ipc_entry_bits_t bits;
1465 	ipc_object_t object;
1466 	kern_return_t kr;
1467 
1468 	kr = ipc_right_lookup_read(space, name, &bits, &object);
1469 	if (kr != KERN_SUCCESS) {
1470 		return MACH_RCV_INVALID_NAME;
1471 	}
1472 	/* object is locked and active */
1473 
1474 	if (bits & MACH_PORT_TYPE_RECEIVE) {
1475 		__assert_only ipc_port_t port = ip_object_to_port(object);
1476 		assert(ip_get_receiver_name(port) == name);
1477 		assert(ip_in_space(port, space));
1478 	}
1479 	if (bits & (MACH_PORT_TYPE_RECEIVE | MACH_PORT_TYPE_PORT_SET)) {
1480 		io_reference(object);
1481 		io_unlock(object);
1482 	} else {
1483 		io_unlock(object);
1484 		/* guard exception if we never held the receive right in this entry */
1485 		if ((bits & MACH_PORT_TYPE_EX_RECEIVE) == 0) {
1486 			mach_port_guard_exception(name, 0, 0, kGUARD_EXC_RCV_INVALID_NAME);
1487 		}
1488 		return MACH_RCV_INVALID_NAME;
1489 	}
1490 
1491 	*objectp = object;
1492 	return MACH_MSG_SUCCESS;
1493 }
1494