xref: /xnu-11417.140.69/tests/skywalk/skt_kqueue.c (revision 43a90889846e00bfb5cf1d255cdc0a701a1e05a4)
1 /*
2  * Copyright (c) 2016-2024 Apple Inc. All rights reserved.
3  *
4  * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5  *
6  * This file contains Original Code and/or Modifications of Original Code
7  * as defined in and that are subject to the Apple Public Source License
8  * Version 2.0 (the 'License'). You may not use this file except in
9  * compliance with the License. The rights granted to you under the License
10  * may not be used to create, or enable the creation or redistribution of,
11  * unlawful or unlicensed copies of an Apple operating system, or to
12  * circumvent, violate, or enable the circumvention or violation of, any
13  * terms of an Apple operating system software license agreement.
14  *
15  * Please obtain a copy of the License at
16  * http://www.opensource.apple.com/apsl/ and read it before using this file.
17  *
18  * The Original Code and all software distributed under the License are
19  * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20  * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21  * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23  * Please see the License for the specific language governing rights and
24  * limitations under the License.
25  *
26  * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27  */
28 
29 #include <assert.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <stdio.h>
33 #include <stdbool.h>
34 #include <pthread.h>
35 #include <unistd.h>
36 #include <uuid/uuid.h>
37 #include <sys/select.h>
38 #include <poll.h>
39 #include <time.h>
40 #include <sys/event.h>
41 #include "skywalk_test_driver.h"
42 #include "skywalk_test_common.h"
43 #include "skywalk_test_utils.h"
44 
45 /************************************************************************
46  * Utility code
47  */
48 
49 struct context {
50 	uuid_t nexus_uuid;
51 	int argc;
52 	char **argv;
53 
54 	int test_selector;
55 
56 	struct stage_ctx stage;
57 };
58 
59 static void
skt_kqueue_init(void)60 skt_kqueue_init(void)
61 {
62 	struct sktc_nexus_attr attr = SKTC_NEXUS_ATTR_INIT();
63 
64 	strncpy((char *)attr.name, "skywalk_test_kqueue_upipe",
65 	    sizeof(nexus_name_t) - 1);
66 	attr.type = NEXUS_TYPE_USER_PIPE;
67 	attr.ntxrings = 1;
68 	attr.nrxrings = 1;
69 	attr.ntxslots = 64;
70 	attr.nrxslots = 64;
71 	attr.anonymous = 1;
72 
73 	sktc_setup_nexus(&attr);
74 }
75 
76 static void
skt_kqueue_fini(void)77 skt_kqueue_fini(void)
78 {
79 	sktc_cleanup_nexus();
80 }
81 
82 enum kqueue_common_test_stage {
83 	SKT_KQUEUE_INIT=0,
84 };
85 
86 static int
skt_kqueue_main(int argc,char * argv[],int test_selector,void * (rx_body)(void *),void * (tx_body)(void *))87 skt_kqueue_main(int argc, char *argv[], int test_selector,
88     void *(rx_body)(void *), void *(tx_body)(void *))
89 {
90 	pthread_t rx_thread, tx_thread;
91 	struct context ctx;
92 	int error;
93 
94 	test_stage_init(&ctx.stage, SKT_KQUEUE_INIT);
95 	ctx.argc = argc;
96 	ctx.argv = argv;
97 	ctx.test_selector = test_selector;
98 
99 	error = uuid_parse(argv[3], ctx.nexus_uuid);
100 	SKTC_ASSERT_ERR(!error);
101 
102 	error = pthread_create(&rx_thread, NULL, rx_body, &ctx);
103 	SKTC_ASSERT_ERR(!error);
104 	error = pthread_create(&tx_thread, NULL, tx_body, &ctx);
105 	SKTC_ASSERT_ERR(!error);
106 
107 	pthread_join(rx_thread, NULL);
108 	pthread_join(tx_thread, NULL);
109 
110 	test_stage_destroy(&ctx.stage);
111 
112 	return 0;
113 }
114 
115 /************************************************************************
116  * Basic sweep test
117  */
118 
119 enum kqueue_basic_test_stage {
120 	SKT_KQUEUE_BASIC_RX_SWEEP_1=1,
121 	SKT_KQUEUE_BASIC_RX_SWEEP_2,
122 	SKT_KQUEUE_BASIC_TX_SWEEP_1,
123 	SKT_KQUEUE_BASIC_TX_SWEEP_2,
124 	SKT_KQUEUE_BASIC_TX_SWEEP_3,
125 };
126 
127 static void *
skt_kqueue_basic_rx(void * ctx_)128 skt_kqueue_basic_rx(void *ctx_)
129 {
130 	struct context *ctx = (struct context *)ctx_;
131 	channel_t channel;
132 	channel_ring_t rxring;
133 	channel_attr_t ch_attr;
134 	ring_dir_t ring_dir = CHANNEL_DIR_RX;
135 	ring_id_t ring_id = CHANNEL_RING_ID_ANY;
136 	uint32_t port = 0;
137 	int kq_fd;
138 	int error;
139 	int i;
140 
141 	assert(ctx->stage.test_stage == SKT_KQUEUE_INIT);
142 
143 	/* Initialize kqueue */
144 	kq_fd = kqueue();
145 	assert(kq_fd >= 0);
146 
147 	/* Initialize channel */
148 	ch_attr = os_channel_attr_create();
149 	channel = sktu_channel_create_extended(
150 		ctx->nexus_uuid,
151 		port, ring_dir, ring_id,
152 		ch_attr,
153 		-1, -1, -1, -1, -1, -1, -1, 1, -1, -1);
154 	assert(channel != NULL);
155 
156 	ring_id = os_channel_ring_id(channel, CHANNEL_FIRST_RX_RING);
157 	rxring = os_channel_rx_ring(channel, ring_id);
158 	assert(rxring);
159 
160 	/* Wait before any data and confirm that the ring is reported as empty */
161 	error = wait_on_fd(kq_fd, EVFILT_READ, channel, 0, TIMEOUT_EXPECT);
162 	SKTC_ASSERT_ERR(!error);
163 
164 	/* Test basic RX data reporting, sweep from 0 to slots-1 to make sure
165 	 * the ring pointers wrap at some point */
166 	test_stage_change(&ctx->stage, SKT_KQUEUE_BASIC_RX_SWEEP_1);
167 
168 	int lim = ch_attr->cha_rx_slots - 1;
169 	for (i = 1; i <= lim; i++) {
170 		test_stage_wait(&ctx->stage, SKT_KQUEUE_BASIC_RX_SWEEP_2);
171 #ifndef NDEBUG
172 		int result =
173 #endif
174 		wait_on_fd(kq_fd, EVFILT_READ, channel, 0, TIMEOUT_FAIL);
175 		assert(result == i);
176 
177 		chew_slots(rxring, 0);
178 
179 		error = os_channel_sync(channel, CHANNEL_SYNC_RX);
180 		SKTC_ASSERT_ERR(!error);
181 
182 		test_stage_change(&ctx->stage, SKT_KQUEUE_BASIC_RX_SWEEP_1);
183 	}
184 	test_stage_wait(&ctx->stage, SKT_KQUEUE_BASIC_RX_SWEEP_2);
185 
186 	/* Get ready for TX sweep part of the test */
187 
188 	test_stage_change(&ctx->stage, SKT_KQUEUE_BASIC_TX_SWEEP_1);
189 
190 	/* Drain RX ring */
191 	test_stage_wait(&ctx->stage, SKT_KQUEUE_BASIC_TX_SWEEP_2);
192 	error = os_channel_sync(channel, CHANNEL_SYNC_RX);
193 	SKTC_ASSERT_ERR(!error);
194 	test_stage_change(&ctx->stage, SKT_KQUEUE_BASIC_TX_SWEEP_1);
195 
196 	/* Allow TX backlog to trickle in */
197 	for (i = 1; i <= lim; i++) {
198 		test_stage_wait(&ctx->stage, SKT_KQUEUE_BASIC_TX_SWEEP_2);
199 
200 		chew_slots(rxring, 1);
201 		error = os_channel_sync(channel, CHANNEL_SYNC_RX);
202 		SKTC_ASSERT_ERR(!error);
203 
204 		test_stage_change(&ctx->stage, SKT_KQUEUE_BASIC_TX_SWEEP_1);
205 	}
206 
207 	os_channel_attr_destroy(ch_attr);
208 	os_channel_destroy(channel);
209 	return 0;
210 }
211 
212 static void *
skt_kqueue_basic_tx(void * ctx_)213 skt_kqueue_basic_tx(void *ctx_)
214 {
215 	struct context *ctx = (struct context *)ctx_;
216 	channel_t channel;
217 	channel_ring_t txring;
218 	channel_attr_t ch_attr;
219 	ring_dir_t ring_dir = CHANNEL_DIR_TX;
220 	ring_id_t ring_id = CHANNEL_RING_ID_ANY;
221 	uint32_t port = 1;
222 	int error;
223 	int kq_fd;
224 	int i, j;
225 
226 	kq_fd = kqueue();
227 	assert(kq_fd >= 0);
228 
229 	ch_attr = os_channel_attr_create();
230 	channel = sktu_channel_create_extended(
231 		ctx->nexus_uuid,
232 		port, ring_dir, ring_id,
233 		ch_attr,
234 		-1, -1, -1, -1, -1, -1, -1, 1, -1, -1);
235 	assert(channel != NULL);
236 
237 	ring_id = os_channel_ring_id(channel, CHANNEL_FIRST_TX_RING);
238 	txring = os_channel_tx_ring(channel, ring_id);
239 	assert(txring);
240 
241 	/* Wait for RX to initialize */
242 	test_stage_wait(&ctx->stage, SKT_KQUEUE_BASIC_RX_SWEEP_1);
243 
244 	/* Test basic RX data reporting, sweep from 0 to slots-1 to make sure
245 	 * the ring pointers wrap at some point */
246 	int lim =  ch_attr->cha_tx_slots - 1;
247 	for (i = 1; i <= lim; i++) {
248 		while (os_channel_available_slot_count(txring) < i) {
249 			wait_on_fd(kq_fd, EVFILT_WRITE, channel, 0, TIMEOUT_DONT_CARE);
250 
251 			/* Abort if we were woken up but there are no slots.
252 			 * This can happen if the channel is defuncted such as
253 			 * in the skywalk_shutdown tests and we'll get stuck.
254 			 */
255 			assert(os_channel_available_slot_count(txring));
256 		}
257 
258 		for (j = 0; j < i; j++) {
259 			send_bytes(txring, i);
260 		}
261 
262 		error = os_channel_sync(channel, CHANNEL_SYNC_TX);
263 		SKTC_ASSERT_ERR(!error);
264 
265 		test_stage_change(&ctx->stage, SKT_KQUEUE_BASIC_RX_SWEEP_2);
266 		test_stage_wait(&ctx->stage, SKT_KQUEUE_BASIC_RX_SWEEP_1);
267 	}
268 	test_stage_change(&ctx->stage, SKT_KQUEUE_BASIC_RX_SWEEP_2);
269 
270 	/* Test TX data reporting - start by filling the RX ring */
271 	test_stage_wait(&ctx->stage, SKT_KQUEUE_BASIC_TX_SWEEP_1);
272 
273 	for (j = 0; j < lim; j++) {
274 		send_bytes(txring, j);
275 	}
276 	error = os_channel_sync(channel, CHANNEL_SYNC_TX);
277 	SKTC_ASSERT_ERR(!error);
278 
279 	/* Send more packets and confirm TX backs up */
280 	for (i = 0; i < lim; i++) {
281 		send_bytes(txring, 8);
282 		error = os_channel_sync(channel, CHANNEL_SYNC_TX);
283 		SKTC_ASSERT_ERR(!error);
284 		assert(os_channel_available_slot_count(txring) == lim - i - 1);
285 	}
286 
287 	/* Confirm we time out waiting for more slots */
288 	error = wait_on_fd(kq_fd, EVFILT_WRITE, channel, 0, TIMEOUT_EXPECT);
289 	SKTC_ASSERT_ERR(!error);
290 
291 	/* Start draining the rings */
292 	test_stage_change(&ctx->stage, SKT_KQUEUE_BASIC_TX_SWEEP_2);
293 	test_stage_wait(&ctx->stage, SKT_KQUEUE_BASIC_TX_SWEEP_1);
294 	error = os_channel_sync(channel, CHANNEL_SYNC_TX);
295 	SKTC_ASSERT_ERR(!error);
296 
297 	/* Drain the rings and confirm reporting is accurate */
298 	for (i = 1; i <= lim; i++) {
299 		test_stage_change(&ctx->stage, SKT_KQUEUE_BASIC_TX_SWEEP_2);
300 		test_stage_wait(&ctx->stage, SKT_KQUEUE_BASIC_TX_SWEEP_1);
301 		error = wait_on_fd(kq_fd, EVFILT_WRITE, channel, 0, TIMEOUT_FAIL);
302 		SKTC_ASSERT_ERR(error == i);
303 		error = os_channel_sync(channel, CHANNEL_SYNC_TX);
304 		SKTC_ASSERT_ERR(!error);
305 	}
306 
307 	os_channel_attr_destroy(ch_attr);
308 	os_channel_destroy(channel);
309 	return 0;
310 }
311 
312 static int
skt_kqueue_basic_main(int argc,char * argv[])313 skt_kqueue_basic_main(int argc, char *argv[])
314 {
315 	return skt_kqueue_main(argc, argv, 0,
316 	           &skt_kqueue_basic_rx,
317 	           &skt_kqueue_basic_tx);
318 }
319 
320 /************************************************************************
321  * Basic lowat test
322  */
323 
324 #define LOWAT_TYPE                      0x00000001
325 #define LOWAT_TYPE_CHAN         0x00000000
326 #define LOWAT_TYPE_NOTE         0x00000001
327 
328 #define LOWAT_UNIT                      0x00000010
329 #define LOWAT_UNIT_SLOTS        0x00000000
330 #define LOWAT_UNIT_BYTES        0x00000010
331 
332 enum kqueue_lowat_basic_test_stage {
333 	SKT_KQUEUE_LOWAT_BASIC_1=1,
334 	SKT_KQUEUE_LOWAT_BASIC_2,
335 	SKT_KQUEUE_LOWAT_BASIC_3
336 };
337 
338 static void *
skt_kqueue_lowat_basic_rx(void * ctx_)339 skt_kqueue_lowat_basic_rx(void *ctx_)
340 {
341 	struct context *ctx = (struct context *)ctx_;
342 	channel_t channel;
343 	channel_ring_t rxring;
344 	channel_attr_t ch_attr;
345 	ring_dir_t ring_dir = CHANNEL_DIR_RX;
346 	ring_id_t ring_id = CHANNEL_RING_ID_ANY;
347 	uint32_t port = 0;
348 	int kq_fd;
349 	channel_threshold_unit_t lowat_unit;
350 	int lowat_val;
351 	int error;
352 	int i;
353 
354 	assert(ctx->stage.test_stage == SKT_KQUEUE_INIT);
355 
356 	/* Initialize kqueue */
357 	kq_fd = kqueue();
358 	assert(kq_fd >= 0);
359 
360 	/* Initialize channel */
361 	ch_attr = os_channel_attr_create();
362 	channel = sktu_channel_create_extended(
363 		ctx->nexus_uuid,
364 		port, ring_dir, ring_id,
365 		ch_attr,
366 		-1, -1, -1, -1, -1, -1, -1, 1, -1, -1);
367 	assert(channel != NULL);
368 
369 	ring_id = os_channel_ring_id(channel, CHANNEL_FIRST_RX_RING);
370 	rxring = os_channel_rx_ring(channel, ring_id);
371 	assert(rxring);
372 
373 	/* Set up watermarks */
374 	if ((ctx->test_selector & LOWAT_UNIT) == LOWAT_UNIT_BYTES) {
375 		lowat_unit = CHANNEL_THRESHOLD_UNIT_BYTES;
376 	} else {
377 		lowat_unit = CHANNEL_THRESHOLD_UNIT_SLOTS;
378 	}
379 	if ((ctx->test_selector & LOWAT_TYPE) == LOWAT_TYPE_CHAN) {
380 		set_watermark(channel, false, lowat_unit, 10);
381 		set_watermark(channel, true, lowat_unit, 15);
382 		lowat_val = 0;
383 	} else {
384 		set_watermark(channel, false, lowat_unit, 1);
385 		set_watermark(channel, true, lowat_unit, 1);
386 		lowat_val = 10;
387 	}
388 	test_stage_change(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_1);
389 
390 	/* The first N waits should time out (3 waits for slots, 1 wait for
391 	 * bytes) */
392 	int N;
393 	if (lowat_unit == CHANNEL_THRESHOLD_UNIT_SLOTS) {
394 		N = 3;
395 	} else {
396 		N = 1;
397 	}
398 	for (i = 0; i < N; i++) {
399 		error = wait_on_fd(kq_fd, EVFILT_READ, channel, lowat_val,
400 		    TIMEOUT_EXPECT);
401 		SKTC_ASSERT_ERR(error == 0);
402 		error = os_channel_sync(channel, CHANNEL_SYNC_RX);
403 		SKTC_ASSERT_ERR(error == 0);
404 		test_stage_wait(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_2);
405 		test_stage_change(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_1);
406 	}
407 
408 	/* The next wait should trigger */
409 	error = wait_on_fd(kq_fd, EVFILT_READ, channel, lowat_val, TIMEOUT_FAIL);
410 	SKTC_ASSERT_ERR(error == 12);
411 	chew_slots(rxring, 0);
412 	error = os_channel_sync(channel, CHANNEL_SYNC_RX);
413 	SKTC_ASSERT_ERR(error == 0);
414 	test_stage_wait(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_2);
415 	test_stage_change(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_1);
416 
417 	/* Do it all again, but make sure the TX thread transmits while
418 	 * we're waiting this time */
419 	/* (TODO: is there a better way to do this, without the TX thread
420 	 * sleeping an arbitrary amount of time?) */
421 	error = wait_on_fd(kq_fd, EVFILT_READ, channel, lowat_val, TIMEOUT_DISABLE);
422 	SKTC_ASSERT_ERR(error == 12);
423 	chew_slots(rxring, 0);
424 	error = os_channel_sync(channel, CHANNEL_SYNC_RX);
425 	SKTC_ASSERT_ERR(error == 0);
426 	test_stage_wait(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_2);
427 	test_stage_change(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_1);
428 
429 	/* Let the TX thread fill our RX ring up, to test TX watermarks */
430 	test_stage_wait(&ctx->stage, SKT_KQUEUE_BASIC_RX_SWEEP_2);
431 	error = os_channel_sync(channel, CHANNEL_SYNC_RX);
432 	SKTC_ASSERT_ERR(error == 0);
433 
434 	/* Free up some slots for TX, enough to trigger its TX watermark but
435 	 * not its RX watermark */
436 	for (i = 0; i < 5; i++) {
437 		chew_slots(rxring, 1);
438 		error = os_channel_sync(channel, CHANNEL_SYNC_RX);
439 		SKTC_ASSERT_ERR(error == 0);
440 	}
441 
442 	test_stage_change(&ctx->stage, SKT_KQUEUE_BASIC_RX_SWEEP_1);
443 	test_stage_wait(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_2);
444 
445 	/* Chew more slots to wake TX up, but wait a bit first to make sure TX
446 	 * is really sleeping (TODO: better way to do this?) */
447 	usleep(50000);
448 	for (i = 0; i < 5; i++) {
449 		chew_slots(rxring, 1);
450 		error = os_channel_sync(channel, CHANNEL_SYNC_RX);
451 		SKTC_ASSERT_ERR(error == 0);
452 	}
453 
454 	os_channel_attr_destroy(ch_attr);
455 	os_channel_destroy(channel);
456 	return 0;
457 }
458 
459 static void *
skt_kqueue_lowat_basic_tx(void * ctx_)460 skt_kqueue_lowat_basic_tx(void *ctx_)
461 {
462 	struct context *ctx = (struct context *)ctx_;
463 	channel_t channel;
464 	channel_ring_t txring;
465 	channel_attr_t ch_attr;
466 	ring_dir_t ring_dir = CHANNEL_DIR_TX;
467 	ring_id_t ring_id = CHANNEL_RING_ID_ANY;
468 	uint32_t port = 1;
469 	int kq_fd;
470 	channel_threshold_unit_t lowat_unit;
471 	int lowat_val;
472 	int note_lowat_val;
473 	int slot_size;
474 	int error;
475 	int i;
476 
477 	kq_fd = kqueue();
478 	assert(kq_fd >= 0);
479 
480 	ch_attr = os_channel_attr_create();
481 	channel = sktu_channel_create_extended(
482 		ctx->nexus_uuid,
483 		port, ring_dir, ring_id,
484 		ch_attr,
485 		-1, -1, -1, -1, -1, -1, -1, 1, -1, -1);
486 	assert(channel != NULL);
487 
488 	slot_size = channel->chd_info->cinfo_nxprov_params.nxp_buf_size;
489 
490 	ring_id = os_channel_ring_id(channel, CHANNEL_FIRST_TX_RING);
491 	txring = os_channel_tx_ring(channel, ring_id);
492 	assert(txring);
493 
494 	if ((ctx->test_selector & LOWAT_UNIT) == LOWAT_UNIT_BYTES) {
495 		lowat_unit = CHANNEL_THRESHOLD_UNIT_BYTES;
496 	} else {
497 		lowat_unit = CHANNEL_THRESHOLD_UNIT_SLOTS;
498 	}
499 
500 	/* Wait for RX to initialize */
501 	test_stage_wait(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_1);
502 
503 	/* Send packets in bursts of 3 to allow RX to confirm its watermark is
504 	 * being respected */
505 	int N;
506 	if (lowat_unit == CHANNEL_THRESHOLD_UNIT_SLOTS) {
507 		N = 4;
508 	} else {
509 		N = 2;
510 	}
511 	for (i = 0; i < N; i++) {
512 		send_bytes(txring, 2);
513 		send_bytes(txring, 2);
514 		send_bytes(txring, 2);
515 		error = os_channel_sync(channel, CHANNEL_SYNC_TX);
516 		SKTC_ASSERT_ERR(error == 0);
517 		test_stage_change(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_2);
518 		test_stage_wait(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_1);
519 	}
520 
521 	/* Send packets in bursts of 3 again, but wait a bit to give the RX
522 	 * thread time to wait (TODO: is there a better way to do this, without
523 	 * sleeping an arbitrary amount of time?) */
524 	usleep(50000);
525 	for (i = 0; i < N; i++) {
526 		send_bytes(txring, 2);
527 		send_bytes(txring, 2);
528 		send_bytes(txring, 2);
529 		error = os_channel_sync(channel, CHANNEL_SYNC_TX);
530 		SKTC_ASSERT_ERR(error == 0);
531 	}
532 
533 	test_stage_change(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_2);
534 	test_stage_wait(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_1);
535 	error = os_channel_sync(channel, CHANNEL_SYNC_TX);
536 	SKTC_ASSERT_ERR(error == 0);
537 
538 	/* Test TX watermarks - start by filling the RX ring */
539 	int lim =  ch_attr->cha_tx_slots - 1;
540 	for (i = 0; i < lim; i++) {
541 		send_bytes(txring, 5);
542 	}
543 	error = os_channel_sync(channel, CHANNEL_SYNC_TX);
544 	SKTC_ASSERT_ERR(error == 0);
545 
546 	/* Send more packets and confirm TX backs up _almost_ all the way */
547 	for (i = 0; i < lim - 5; i++) {
548 		send_bytes(txring, 8);
549 		error = os_channel_sync(channel, CHANNEL_SYNC_TX);
550 		SKTC_ASSERT_ERR(error == 0);
551 		assert(os_channel_available_slot_count(txring) == lim - i - 1);
552 	}
553 
554 	/* Set up watermarks */
555 	lowat_val = 10;
556 	if (lowat_unit == CHANNEL_THRESHOLD_UNIT_BYTES) {
557 		lowat_val *= slot_size;
558 	}
559 	if ((ctx->test_selector & LOWAT_TYPE) == LOWAT_TYPE_CHAN) {
560 		set_watermark(channel, false, lowat_unit, lowat_val * 2);
561 		set_watermark(channel, true, lowat_unit, lowat_val);
562 		note_lowat_val = 0;
563 	} else {
564 		set_watermark(channel, false, lowat_unit, 1);
565 		set_watermark(channel, true, lowat_unit, 1);
566 		note_lowat_val = lowat_val;
567 	}
568 
569 	/* Wait for TX slots, confirm that even though some are available
570 	 * there aren't enough to trigger the watermark */
571 	assert(os_channel_available_slot_count(txring) > 0);
572 	error = wait_on_fd(kq_fd, EVFILT_WRITE, channel, note_lowat_val,
573 	    TIMEOUT_EXPECT);
574 	SKTC_ASSERT_ERR(error == 0);
575 
576 	/* Let the RX thread drain some slots and try again */
577 	test_stage_change(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_2);
578 	test_stage_wait(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_1);
579 
580 	error = wait_on_fd(kq_fd, EVFILT_WRITE, channel, note_lowat_val,
581 	    TIMEOUT_FAIL);
582 	assert(error == lowat_val);
583 
584 	/* Do it all again, but make sure RX triggers the watermark _while_
585 	 * we're sleeping this time */
586 	/* TODO: is there a better way to do this than having the RX
587 	 * thread sleep? */
588 	lowat_val = 15;
589 	if (lowat_unit == CHANNEL_THRESHOLD_UNIT_BYTES) {
590 		lowat_val *= slot_size;
591 	}
592 	if ((ctx->test_selector & LOWAT_TYPE) == LOWAT_TYPE_CHAN) {
593 		set_watermark(channel, false, lowat_unit, lowat_val * 2);
594 		set_watermark(channel, true, lowat_unit, lowat_val);
595 		note_lowat_val = 0;
596 	} else {
597 		set_watermark(channel, false, lowat_unit, 1);
598 		set_watermark(channel, true, lowat_unit, 1);
599 		note_lowat_val = lowat_val;
600 	}
601 	test_stage_change(&ctx->stage, SKT_KQUEUE_LOWAT_BASIC_2);
602 	error = wait_on_fd(kq_fd, EVFILT_WRITE, channel, note_lowat_val,
603 	    TIMEOUT_DISABLE);
604 	SKTC_ASSERT_ERR(error == lowat_val);
605 
606 	os_channel_attr_destroy(ch_attr);
607 	os_channel_destroy(channel);
608 	return 0;
609 }
610 
611 static int
skt_kqueue_lowat_chan_bytes_main(int argc,char * argv[])612 skt_kqueue_lowat_chan_bytes_main(int argc, char *argv[])
613 {
614 	return skt_kqueue_main(argc, argv, LOWAT_TYPE_CHAN | LOWAT_UNIT_BYTES,
615 	           &skt_kqueue_lowat_basic_rx,
616 	           &skt_kqueue_lowat_basic_tx);
617 }
618 
619 static int
skt_kqueue_lowat_chan_slots_main(int argc,char * argv[])620 skt_kqueue_lowat_chan_slots_main(int argc, char *argv[])
621 {
622 	return skt_kqueue_main(argc, argv, LOWAT_TYPE_CHAN | LOWAT_UNIT_SLOTS,
623 	           &skt_kqueue_lowat_basic_rx,
624 	           &skt_kqueue_lowat_basic_tx);
625 }
626 
627 static int
skt_kqueue_lowat_note_bytes_main(int argc,char * argv[])628 skt_kqueue_lowat_note_bytes_main(int argc, char *argv[])
629 {
630 	return skt_kqueue_main(argc, argv, LOWAT_TYPE_NOTE | LOWAT_UNIT_BYTES,
631 	           &skt_kqueue_lowat_basic_rx,
632 	           &skt_kqueue_lowat_basic_tx);
633 }
634 
635 static int
skt_kqueue_lowat_note_slots_main(int argc,char * argv[])636 skt_kqueue_lowat_note_slots_main(int argc, char *argv[])
637 {
638 	return skt_kqueue_main(argc, argv, LOWAT_TYPE_NOTE | LOWAT_UNIT_SLOTS,
639 	           &skt_kqueue_lowat_basic_rx,
640 	           &skt_kqueue_lowat_basic_tx);
641 }
642 
643 /****************************************************************
644  * TODO: Tests to write:
645  *	- kqueue_selwakeup - Test that wakeups associated with one channel
646  *		descriptor don't spuriously wake up other threads waiting on other
647  *		channels, or on channel descriptors that don't include the rings
648  *		relevant to the waking event
649  *	- lowat_edge - Test edge cases related to low watermarks: changing the
650  *		unit while running, wakeups as a result of that unit change, issuing
651  *              the watermark in the knote vs the channel, etc
652  */
653 
654 struct skywalk_test skt_kqueue_basic = {
655 	"kqueue_basic", "tests kqueue return values",
656 	SK_FEATURE_SKYWALK | SK_FEATURE_NEXUS_USER_PIPE,
657 	skt_kqueue_basic_main, SKTC_GENERIC_UPIPE_ARGV,
658 	skt_kqueue_init, skt_kqueue_fini,
659 };
660 
661 struct skywalk_test skt_kqueue_lowat_chan_bytes = {
662 	"kqueue_lowat_chan_bytes", "tests kqueue low watermark (byte watermark on channel)",
663 	SK_FEATURE_SKYWALK | SK_FEATURE_NEXUS_USER_PIPE,
664 	skt_kqueue_lowat_chan_bytes_main, SKTC_GENERIC_UPIPE_ARGV,
665 	skt_kqueue_init, skt_kqueue_fini,
666 };
667 
668 struct skywalk_test skt_kqueue_lowat_chan_slots = {
669 	"kqueue_lowat_chan_slots", "tests kqueue low watermark (slot watermark on channel)",
670 	SK_FEATURE_SKYWALK | SK_FEATURE_NEXUS_USER_PIPE,
671 	skt_kqueue_lowat_chan_slots_main, SKTC_GENERIC_UPIPE_ARGV,
672 	skt_kqueue_init, skt_kqueue_fini,
673 };
674 
675 struct skywalk_test skt_kqueue_lowat_note_bytes = {
676 	"kqueue_lowat_note_bytes", "tests kqueue low watermark (byte watermark on knote)",
677 	SK_FEATURE_SKYWALK | SK_FEATURE_NEXUS_USER_PIPE,
678 	skt_kqueue_lowat_note_bytes_main, SKTC_GENERIC_UPIPE_ARGV,
679 	skt_kqueue_init, skt_kqueue_fini,
680 };
681 
682 struct skywalk_test skt_kqueue_lowat_note_slots = {
683 	"kqueue_lowat_note_slots", "tests kqueue low watermark (slot watermark on knote)",
684 	SK_FEATURE_SKYWALK | SK_FEATURE_NEXUS_USER_PIPE,
685 	skt_kqueue_lowat_note_slots_main, SKTC_GENERIC_UPIPE_ARGV,
686 	skt_kqueue_init, skt_kqueue_fini,
687 };
688 
689 /****************************************************************/
690