xref: /xnu-12377.41.6/tools/tests/affinity/pool.c (revision bbb1b6f9e71b8cdde6e5cd6f4841f207dee3d828)
1*bbb1b6f9SApple OSS Distributions #include <AvailabilityMacros.h>
2*bbb1b6f9SApple OSS Distributions #include <mach/thread_policy.h>
3*bbb1b6f9SApple OSS Distributions #include <mach/mach.h>
4*bbb1b6f9SApple OSS Distributions #include <mach/mach_error.h>
5*bbb1b6f9SApple OSS Distributions #include <mach/mach_time.h>
6*bbb1b6f9SApple OSS Distributions #include <pthread.h>
7*bbb1b6f9SApple OSS Distributions #include <sys/queue.h>
8*bbb1b6f9SApple OSS Distributions #include <stdio.h>
9*bbb1b6f9SApple OSS Distributions #include <stdlib.h>
10*bbb1b6f9SApple OSS Distributions #include <string.h>
11*bbb1b6f9SApple OSS Distributions #include <unistd.h>
12*bbb1b6f9SApple OSS Distributions #include <err.h>
13*bbb1b6f9SApple OSS Distributions 
14*bbb1b6f9SApple OSS Distributions /*
15*bbb1b6f9SApple OSS Distributions  * Pool is another multithreaded test/benchmarking program to evaluate
16*bbb1b6f9SApple OSS Distributions  * affinity set placement in Leopard.
17*bbb1b6f9SApple OSS Distributions  *
18*bbb1b6f9SApple OSS Distributions  * The basic picture is:
19*bbb1b6f9SApple OSS Distributions  *
20*bbb1b6f9SApple OSS Distributions  *                  -> producer --                 -> consumer --
21*bbb1b6f9SApple OSS Distributions  *       free     /                \    work     /                \
22*bbb1b6f9SApple OSS Distributions  *    -> queue --      ...          --> queue --                   --
23*bbb1b6f9SApple OSS Distributions  *   |            \                /             \                /  |
24*bbb1b6f9SApple OSS Distributions  *   |              -> producer --                 -> consumer --    |
25*bbb1b6f9SApple OSS Distributions  *    ---------------------------------------------------------------
26*bbb1b6f9SApple OSS Distributions  *
27*bbb1b6f9SApple OSS Distributions  *       <---------- "stage" ---------> <---------- "stage" --------->
28*bbb1b6f9SApple OSS Distributions  *
29*bbb1b6f9SApple OSS Distributions  * There are a series of work stages. Each stage has an input and an output
30*bbb1b6f9SApple OSS Distributions  * queue and multiple threads. The first stage is the producer and subsequent
31*bbb1b6f9SApple OSS Distributions  * stages are consumers. By defuaut there are 2 stages. There are N producer
32*bbb1b6f9SApple OSS Distributions  * and M consumer threads. The are B buffers per producer threads circulating
33*bbb1b6f9SApple OSS Distributions  * through the system.
34*bbb1b6f9SApple OSS Distributions  *
35*bbb1b6f9SApple OSS Distributions  * When affinity is enabled, each producer thread is tagged with an affinity tag
36*bbb1b6f9SApple OSS Distributions  * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
37*bbb1b6f9SApple OSS Distributions  * the work queue it is tagged with this affinity. When a consumer dequeues a
38*bbb1b6f9SApple OSS Distributions  * work item, it sets its affinity to this tag. Hence consumer threads migrate
39*bbb1b6f9SApple OSS Distributions  * to the same affinity set where the data was produced.
40*bbb1b6f9SApple OSS Distributions  *
41*bbb1b6f9SApple OSS Distributions  * Buffer management uses pthread mutex/condition variables. A thread blocks
42*bbb1b6f9SApple OSS Distributions  * when no buffer is available on a queue and it is signaled when a buffer
43*bbb1b6f9SApple OSS Distributions  * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
44*bbb1b6f9SApple OSS Distributions  * The queue management is centralized in a single routine: what queues to
45*bbb1b6f9SApple OSS Distributions  * use as input and output and what function to call for processing is
46*bbb1b6f9SApple OSS Distributions  * data-driven.
47*bbb1b6f9SApple OSS Distributions  */
48*bbb1b6f9SApple OSS Distributions 
49*bbb1b6f9SApple OSS Distributions pthread_mutex_t funnel;
50*bbb1b6f9SApple OSS Distributions pthread_cond_t  barrier;
51*bbb1b6f9SApple OSS Distributions 
52*bbb1b6f9SApple OSS Distributions uint64_t        timer;
53*bbb1b6f9SApple OSS Distributions int             threads;
54*bbb1b6f9SApple OSS Distributions int             threads_ready = 0;
55*bbb1b6f9SApple OSS Distributions 
56*bbb1b6f9SApple OSS Distributions int             iterations = 10000;
57*bbb1b6f9SApple OSS Distributions boolean_t       affinity = FALSE;
58*bbb1b6f9SApple OSS Distributions boolean_t       halting = FALSE;
59*bbb1b6f9SApple OSS Distributions int             verbosity = 1;
60*bbb1b6f9SApple OSS Distributions 
61*bbb1b6f9SApple OSS Distributions typedef struct work {
62*bbb1b6f9SApple OSS Distributions 	TAILQ_ENTRY(work)       link;
63*bbb1b6f9SApple OSS Distributions 	int                     *data;
64*bbb1b6f9SApple OSS Distributions 	int                     isize;
65*bbb1b6f9SApple OSS Distributions 	int                     tag;
66*bbb1b6f9SApple OSS Distributions 	int                     number;
67*bbb1b6f9SApple OSS Distributions } work_t;
68*bbb1b6f9SApple OSS Distributions 
69*bbb1b6f9SApple OSS Distributions /*
70*bbb1b6f9SApple OSS Distributions  * A work queue, complete with pthread objects for its management
71*bbb1b6f9SApple OSS Distributions  */
72*bbb1b6f9SApple OSS Distributions typedef struct work_queue {
73*bbb1b6f9SApple OSS Distributions 	pthread_mutex_t         mtx;
74*bbb1b6f9SApple OSS Distributions 	pthread_cond_t          cnd;
75*bbb1b6f9SApple OSS Distributions 	TAILQ_HEAD(, work)      queue;
76*bbb1b6f9SApple OSS Distributions 	unsigned int            waiters;
77*bbb1b6f9SApple OSS Distributions } work_queue_t;
78*bbb1b6f9SApple OSS Distributions 
79*bbb1b6f9SApple OSS Distributions /* Worker functions take a integer array and size */
80*bbb1b6f9SApple OSS Distributions typedef void (worker_fn_t)(int *, int);
81*bbb1b6f9SApple OSS Distributions 
82*bbb1b6f9SApple OSS Distributions /* This struct controls the function of a stage */
83*bbb1b6f9SApple OSS Distributions #define WORKERS_MAX 10
84*bbb1b6f9SApple OSS Distributions typedef struct {
85*bbb1b6f9SApple OSS Distributions 	int                     stagenum;
86*bbb1b6f9SApple OSS Distributions 	char                    *name;
87*bbb1b6f9SApple OSS Distributions 	worker_fn_t             *fn;
88*bbb1b6f9SApple OSS Distributions 	work_queue_t            *input;
89*bbb1b6f9SApple OSS Distributions 	work_queue_t            *output;
90*bbb1b6f9SApple OSS Distributions 	work_queue_t            bufq;
91*bbb1b6f9SApple OSS Distributions 	int                     work_todo;
92*bbb1b6f9SApple OSS Distributions } stage_info_t;
93*bbb1b6f9SApple OSS Distributions 
94*bbb1b6f9SApple OSS Distributions /* This defines a worker thread */
95*bbb1b6f9SApple OSS Distributions typedef struct worker_info {
96*bbb1b6f9SApple OSS Distributions 	int                     setnum;
97*bbb1b6f9SApple OSS Distributions 	stage_info_t            *stage;
98*bbb1b6f9SApple OSS Distributions 	pthread_t               thread;
99*bbb1b6f9SApple OSS Distributions } worker_info_t;
100*bbb1b6f9SApple OSS Distributions 
101*bbb1b6f9SApple OSS Distributions #define DBG(x...) do {                          \
102*bbb1b6f9SApple OSS Distributions 	if (verbosity > 1) {                    \
103*bbb1b6f9SApple OSS Distributions 	        pthread_mutex_lock(&funnel);    \
104*bbb1b6f9SApple OSS Distributions 	        printf(x);                      \
105*bbb1b6f9SApple OSS Distributions 	        pthread_mutex_unlock(&funnel);  \
106*bbb1b6f9SApple OSS Distributions 	}                                       \
107*bbb1b6f9SApple OSS Distributions } while (0)
108*bbb1b6f9SApple OSS Distributions 
109*bbb1b6f9SApple OSS Distributions #define mutter(x...) do {                       \
110*bbb1b6f9SApple OSS Distributions 	if (verbosity > 0) {                    \
111*bbb1b6f9SApple OSS Distributions 	        printf(x);                      \
112*bbb1b6f9SApple OSS Distributions 	}                                       \
113*bbb1b6f9SApple OSS Distributions } while (0)
114*bbb1b6f9SApple OSS Distributions 
115*bbb1b6f9SApple OSS Distributions #define s_if_plural(x)  (((x) > 1) ? "s" : "")
116*bbb1b6f9SApple OSS Distributions 
117*bbb1b6f9SApple OSS Distributions static void
usage()118*bbb1b6f9SApple OSS Distributions usage()
119*bbb1b6f9SApple OSS Distributions {
120*bbb1b6f9SApple OSS Distributions 	fprintf(stderr,
121*bbb1b6f9SApple OSS Distributions 	    "usage: pool [-a]    Turn affinity on (off)\n"
122*bbb1b6f9SApple OSS Distributions 	    "            [-b B]  Number of buffers per producer (2)\n"
123*bbb1b6f9SApple OSS Distributions 	    "            [-i I]  Number of buffers to produce (10000)\n"
124*bbb1b6f9SApple OSS Distributions 	    "            [-s S]  Number of stages (2)\n"
125*bbb1b6f9SApple OSS Distributions 	    "            [-p P]  Number of pages per buffer (256=1MB)]\n"
126*bbb1b6f9SApple OSS Distributions 	    "            [-w]    Consumer writes data\n"
127*bbb1b6f9SApple OSS Distributions 	    "            [-v V]  Verbosity level 0..2 (1)\n"
128*bbb1b6f9SApple OSS Distributions 	    "            [N [M]] Number of producer and consumers (2)\n"
129*bbb1b6f9SApple OSS Distributions 	    );
130*bbb1b6f9SApple OSS Distributions 	exit(1);
131*bbb1b6f9SApple OSS Distributions }
132*bbb1b6f9SApple OSS Distributions 
133*bbb1b6f9SApple OSS Distributions /* Trivial producer: write to each byte */
134*bbb1b6f9SApple OSS Distributions void
writer_fn(int * data,int isize)135*bbb1b6f9SApple OSS Distributions writer_fn(int *data, int isize)
136*bbb1b6f9SApple OSS Distributions {
137*bbb1b6f9SApple OSS Distributions 	int     i;
138*bbb1b6f9SApple OSS Distributions 
139*bbb1b6f9SApple OSS Distributions 	for (i = 0; i < isize; i++) {
140*bbb1b6f9SApple OSS Distributions 		data[i] = i;
141*bbb1b6f9SApple OSS Distributions 	}
142*bbb1b6f9SApple OSS Distributions }
143*bbb1b6f9SApple OSS Distributions 
144*bbb1b6f9SApple OSS Distributions /* Trivial consumer: read each byte */
145*bbb1b6f9SApple OSS Distributions void
reader_fn(int * data,int isize)146*bbb1b6f9SApple OSS Distributions reader_fn(int *data, int isize)
147*bbb1b6f9SApple OSS Distributions {
148*bbb1b6f9SApple OSS Distributions 	int     i;
149*bbb1b6f9SApple OSS Distributions 	int     datum;
150*bbb1b6f9SApple OSS Distributions 
151*bbb1b6f9SApple OSS Distributions 	for (i = 0; i < isize; i++) {
152*bbb1b6f9SApple OSS Distributions 		datum = data[i];
153*bbb1b6f9SApple OSS Distributions 	}
154*bbb1b6f9SApple OSS Distributions }
155*bbb1b6f9SApple OSS Distributions 
156*bbb1b6f9SApple OSS Distributions /* Consumer reading and writing the buffer */
157*bbb1b6f9SApple OSS Distributions void
reader_writer_fn(int * data,int isize)158*bbb1b6f9SApple OSS Distributions reader_writer_fn(int *data, int isize)
159*bbb1b6f9SApple OSS Distributions {
160*bbb1b6f9SApple OSS Distributions 	int     i;
161*bbb1b6f9SApple OSS Distributions 
162*bbb1b6f9SApple OSS Distributions 	for (i = 0; i < isize; i++) {
163*bbb1b6f9SApple OSS Distributions 		data[i] += 1;
164*bbb1b6f9SApple OSS Distributions 	}
165*bbb1b6f9SApple OSS Distributions }
166*bbb1b6f9SApple OSS Distributions 
167*bbb1b6f9SApple OSS Distributions void
affinity_set(int tag)168*bbb1b6f9SApple OSS Distributions affinity_set(int tag)
169*bbb1b6f9SApple OSS Distributions {
170*bbb1b6f9SApple OSS Distributions 	kern_return_t                   ret;
171*bbb1b6f9SApple OSS Distributions 	thread_affinity_policy_data_t   policy;
172*bbb1b6f9SApple OSS Distributions 	if (affinity) {
173*bbb1b6f9SApple OSS Distributions 		policy.affinity_tag = tag;
174*bbb1b6f9SApple OSS Distributions 		ret = thread_policy_set(
175*bbb1b6f9SApple OSS Distributions 			mach_thread_self(), THREAD_AFFINITY_POLICY,
176*bbb1b6f9SApple OSS Distributions 			(thread_policy_t) &policy,
177*bbb1b6f9SApple OSS Distributions 			THREAD_AFFINITY_POLICY_COUNT);
178*bbb1b6f9SApple OSS Distributions 		if (ret != KERN_SUCCESS) {
179*bbb1b6f9SApple OSS Distributions 			printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
180*bbb1b6f9SApple OSS Distributions 		}
181*bbb1b6f9SApple OSS Distributions 	}
182*bbb1b6f9SApple OSS Distributions }
183*bbb1b6f9SApple OSS Distributions 
184*bbb1b6f9SApple OSS Distributions /*
185*bbb1b6f9SApple OSS Distributions  * This is the central function for every thread.
186*bbb1b6f9SApple OSS Distributions  * For each invocation, its role is ets by (a pointer to) a stage_info_t.
187*bbb1b6f9SApple OSS Distributions  */
188*bbb1b6f9SApple OSS Distributions void *
manager_fn(void * arg)189*bbb1b6f9SApple OSS Distributions manager_fn(void *arg)
190*bbb1b6f9SApple OSS Distributions {
191*bbb1b6f9SApple OSS Distributions 	worker_info_t   *wp = (worker_info_t *) arg;
192*bbb1b6f9SApple OSS Distributions 	stage_info_t    *sp = wp->stage;
193*bbb1b6f9SApple OSS Distributions 	boolean_t       is_producer = (sp->stagenum == 0);
194*bbb1b6f9SApple OSS Distributions 	long            iteration = 0;
195*bbb1b6f9SApple OSS Distributions 	int             current_tag = 0;
196*bbb1b6f9SApple OSS Distributions 
197*bbb1b6f9SApple OSS Distributions 	kern_return_t                   ret;
198*bbb1b6f9SApple OSS Distributions 	thread_extended_policy_data_t   epolicy;
199*bbb1b6f9SApple OSS Distributions 	epolicy.timeshare = FALSE;
200*bbb1b6f9SApple OSS Distributions 	ret = thread_policy_set(
201*bbb1b6f9SApple OSS Distributions 		mach_thread_self(), THREAD_EXTENDED_POLICY,
202*bbb1b6f9SApple OSS Distributions 		(thread_policy_t) &epolicy,
203*bbb1b6f9SApple OSS Distributions 		THREAD_EXTENDED_POLICY_COUNT);
204*bbb1b6f9SApple OSS Distributions 	if (ret != KERN_SUCCESS) {
205*bbb1b6f9SApple OSS Distributions 		printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
206*bbb1b6f9SApple OSS Distributions 	}
207*bbb1b6f9SApple OSS Distributions 
208*bbb1b6f9SApple OSS Distributions 	/*
209*bbb1b6f9SApple OSS Distributions 	 * If we're using affinity sets and we're a producer
210*bbb1b6f9SApple OSS Distributions 	 * set our tag to by our thread set number.
211*bbb1b6f9SApple OSS Distributions 	 */
212*bbb1b6f9SApple OSS Distributions 	if (affinity && is_producer) {
213*bbb1b6f9SApple OSS Distributions 		affinity_set(wp->setnum);
214*bbb1b6f9SApple OSS Distributions 		current_tag = wp->setnum;
215*bbb1b6f9SApple OSS Distributions 	}
216*bbb1b6f9SApple OSS Distributions 
217*bbb1b6f9SApple OSS Distributions 	DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum);
218*bbb1b6f9SApple OSS Distributions 
219*bbb1b6f9SApple OSS Distributions 	/*
220*bbb1b6f9SApple OSS Distributions 	 * Start barrier.
221*bbb1b6f9SApple OSS Distributions 	 * The tets thread to get here releases everyone and starts the timer.
222*bbb1b6f9SApple OSS Distributions 	 */
223*bbb1b6f9SApple OSS Distributions 	pthread_mutex_lock(&funnel);
224*bbb1b6f9SApple OSS Distributions 	threads_ready++;
225*bbb1b6f9SApple OSS Distributions 	if (threads_ready == threads) {
226*bbb1b6f9SApple OSS Distributions 		pthread_mutex_unlock(&funnel);
227*bbb1b6f9SApple OSS Distributions 		if (halting) {
228*bbb1b6f9SApple OSS Distributions 			printf("  all threads ready for process %d, "
229*bbb1b6f9SApple OSS Distributions 			    "hit any key to start", getpid());
230*bbb1b6f9SApple OSS Distributions 			fflush(stdout);
231*bbb1b6f9SApple OSS Distributions 			(void) getchar();
232*bbb1b6f9SApple OSS Distributions 		}
233*bbb1b6f9SApple OSS Distributions 		pthread_cond_broadcast(&barrier);
234*bbb1b6f9SApple OSS Distributions 		timer = mach_absolute_time();
235*bbb1b6f9SApple OSS Distributions 	} else {
236*bbb1b6f9SApple OSS Distributions 		pthread_cond_wait(&barrier, &funnel);
237*bbb1b6f9SApple OSS Distributions 		pthread_mutex_unlock(&funnel);
238*bbb1b6f9SApple OSS Distributions 	}
239*bbb1b6f9SApple OSS Distributions 
240*bbb1b6f9SApple OSS Distributions 	do {
241*bbb1b6f9SApple OSS Distributions 		work_t          *workp;
242*bbb1b6f9SApple OSS Distributions 
243*bbb1b6f9SApple OSS Distributions 		/*
244*bbb1b6f9SApple OSS Distributions 		 * Get a buffer from the input queue.
245*bbb1b6f9SApple OSS Distributions 		 * Block if none.
246*bbb1b6f9SApple OSS Distributions 		 * Quit if all work done.
247*bbb1b6f9SApple OSS Distributions 		 */
248*bbb1b6f9SApple OSS Distributions 		pthread_mutex_lock(&sp->input->mtx);
249*bbb1b6f9SApple OSS Distributions 		while (1) {
250*bbb1b6f9SApple OSS Distributions 			if (sp->work_todo == 0) {
251*bbb1b6f9SApple OSS Distributions 				pthread_mutex_unlock(&sp->input->mtx);
252*bbb1b6f9SApple OSS Distributions 				goto out;
253*bbb1b6f9SApple OSS Distributions 			}
254*bbb1b6f9SApple OSS Distributions 			workp = TAILQ_FIRST(&(sp->input->queue));
255*bbb1b6f9SApple OSS Distributions 			if (workp != NULL) {
256*bbb1b6f9SApple OSS Distributions 				break;
257*bbb1b6f9SApple OSS Distributions 			}
258*bbb1b6f9SApple OSS Distributions 			DBG("    %s[%d,%d] todo %d waiting for buffer\n",
259*bbb1b6f9SApple OSS Distributions 			    sp->name, wp->setnum, sp->stagenum, sp->work_todo);
260*bbb1b6f9SApple OSS Distributions 			sp->input->waiters++;
261*bbb1b6f9SApple OSS Distributions 			pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
262*bbb1b6f9SApple OSS Distributions 			sp->input->waiters--;
263*bbb1b6f9SApple OSS Distributions 		}
264*bbb1b6f9SApple OSS Distributions 		TAILQ_REMOVE(&(sp->input->queue), workp, link);
265*bbb1b6f9SApple OSS Distributions 		iteration = sp->work_todo--;
266*bbb1b6f9SApple OSS Distributions 		pthread_mutex_unlock(&sp->input->mtx);
267*bbb1b6f9SApple OSS Distributions 
268*bbb1b6f9SApple OSS Distributions 		if (is_producer) {
269*bbb1b6f9SApple OSS Distributions 			workp->number = iteration;
270*bbb1b6f9SApple OSS Distributions 			workp->tag = wp->setnum;
271*bbb1b6f9SApple OSS Distributions 		} else {
272*bbb1b6f9SApple OSS Distributions 			if (affinity && current_tag != workp->tag) {
273*bbb1b6f9SApple OSS Distributions 				affinity_set(workp->tag);
274*bbb1b6f9SApple OSS Distributions 				current_tag = workp->tag;
275*bbb1b6f9SApple OSS Distributions 			}
276*bbb1b6f9SApple OSS Distributions 		}
277*bbb1b6f9SApple OSS Distributions 
278*bbb1b6f9SApple OSS Distributions 		DBG("  %s[%d,%d] todo %d work %p data %p\n",
279*bbb1b6f9SApple OSS Distributions 		    sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data);
280*bbb1b6f9SApple OSS Distributions 
281*bbb1b6f9SApple OSS Distributions 		/* Do our stuff with the buffer */
282*bbb1b6f9SApple OSS Distributions 		(void) sp->fn(workp->data, workp->isize);
283*bbb1b6f9SApple OSS Distributions 
284*bbb1b6f9SApple OSS Distributions 		/*
285*bbb1b6f9SApple OSS Distributions 		 * Place the buffer on the input queue of the next stage.
286*bbb1b6f9SApple OSS Distributions 		 * Signal waiters if required.
287*bbb1b6f9SApple OSS Distributions 		 */
288*bbb1b6f9SApple OSS Distributions 		pthread_mutex_lock(&sp->output->mtx);
289*bbb1b6f9SApple OSS Distributions 		TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
290*bbb1b6f9SApple OSS Distributions 		if (sp->output->waiters) {
291*bbb1b6f9SApple OSS Distributions 			DBG("    %s[%d,%d] todo %d signaling work\n",
292*bbb1b6f9SApple OSS Distributions 			    sp->name, wp->setnum, sp->stagenum, iteration);
293*bbb1b6f9SApple OSS Distributions 			pthread_cond_signal(&sp->output->cnd);
294*bbb1b6f9SApple OSS Distributions 		}
295*bbb1b6f9SApple OSS Distributions 		pthread_mutex_unlock(&sp->output->mtx);
296*bbb1b6f9SApple OSS Distributions 	} while (1);
297*bbb1b6f9SApple OSS Distributions 
298*bbb1b6f9SApple OSS Distributions out:
299*bbb1b6f9SApple OSS Distributions 	pthread_cond_broadcast(&sp->output->cnd);
300*bbb1b6f9SApple OSS Distributions 
301*bbb1b6f9SApple OSS Distributions 	DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum);
302*bbb1b6f9SApple OSS Distributions 
303*bbb1b6f9SApple OSS Distributions 	return (void *) iteration;
304*bbb1b6f9SApple OSS Distributions }
305*bbb1b6f9SApple OSS Distributions 
306*bbb1b6f9SApple OSS Distributions void (*producer_fnp)(int *data, int isize) = &writer_fn;
307*bbb1b6f9SApple OSS Distributions void (*consumer_fnp)(int *data, int isize) = &reader_fn;
308*bbb1b6f9SApple OSS Distributions 
309*bbb1b6f9SApple OSS Distributions int
main(int argc,char * argv[])310*bbb1b6f9SApple OSS Distributions main(int argc, char *argv[])
311*bbb1b6f9SApple OSS Distributions {
312*bbb1b6f9SApple OSS Distributions 	int                     i;
313*bbb1b6f9SApple OSS Distributions 	int                     j;
314*bbb1b6f9SApple OSS Distributions 	int                     k;
315*bbb1b6f9SApple OSS Distributions 	int                     pages = 256; /* 1MB */
316*bbb1b6f9SApple OSS Distributions 	int                     buffers = 2;
317*bbb1b6f9SApple OSS Distributions 	int                     producers = 2;
318*bbb1b6f9SApple OSS Distributions 	int                     consumers = 2;
319*bbb1b6f9SApple OSS Distributions 	int                     stages = 2;
320*bbb1b6f9SApple OSS Distributions 	int                     *status;
321*bbb1b6f9SApple OSS Distributions 	stage_info_t            *stage_info;
322*bbb1b6f9SApple OSS Distributions 	stage_info_t            *sp;
323*bbb1b6f9SApple OSS Distributions 	worker_info_t           *worker_info;
324*bbb1b6f9SApple OSS Distributions 	worker_info_t           *wp;
325*bbb1b6f9SApple OSS Distributions 	kern_return_t           ret;
326*bbb1b6f9SApple OSS Distributions 	int                     c;
327*bbb1b6f9SApple OSS Distributions 
328*bbb1b6f9SApple OSS Distributions 	/* Do switch parsing: */
329*bbb1b6f9SApple OSS Distributions 	while ((c = getopt(argc, argv, "ab:i:p:s:twv:")) != -1) {
330*bbb1b6f9SApple OSS Distributions 		switch (c) {
331*bbb1b6f9SApple OSS Distributions 		case 'a':
332*bbb1b6f9SApple OSS Distributions 			affinity = !affinity;
333*bbb1b6f9SApple OSS Distributions 			break;
334*bbb1b6f9SApple OSS Distributions 		case 'b':
335*bbb1b6f9SApple OSS Distributions 			buffers = atoi(optarg);
336*bbb1b6f9SApple OSS Distributions 			break;
337*bbb1b6f9SApple OSS Distributions 		case 'i':
338*bbb1b6f9SApple OSS Distributions 			iterations = atoi(optarg);
339*bbb1b6f9SApple OSS Distributions 			break;
340*bbb1b6f9SApple OSS Distributions 		case 'p':
341*bbb1b6f9SApple OSS Distributions 			pages = atoi(optarg);
342*bbb1b6f9SApple OSS Distributions 			break;
343*bbb1b6f9SApple OSS Distributions 		case 's':
344*bbb1b6f9SApple OSS Distributions 			stages = atoi(optarg);
345*bbb1b6f9SApple OSS Distributions 			if (stages >= WORKERS_MAX) {
346*bbb1b6f9SApple OSS Distributions 				usage();
347*bbb1b6f9SApple OSS Distributions 			}
348*bbb1b6f9SApple OSS Distributions 			break;
349*bbb1b6f9SApple OSS Distributions 		case 't':
350*bbb1b6f9SApple OSS Distributions 			halting = TRUE;
351*bbb1b6f9SApple OSS Distributions 			break;
352*bbb1b6f9SApple OSS Distributions 		case 'w':
353*bbb1b6f9SApple OSS Distributions 			consumer_fnp = &reader_writer_fn;
354*bbb1b6f9SApple OSS Distributions 			break;
355*bbb1b6f9SApple OSS Distributions 		case 'v':
356*bbb1b6f9SApple OSS Distributions 			verbosity = atoi(optarg);
357*bbb1b6f9SApple OSS Distributions 			break;
358*bbb1b6f9SApple OSS Distributions 		case 'h':
359*bbb1b6f9SApple OSS Distributions 		case '?':
360*bbb1b6f9SApple OSS Distributions 		default:
361*bbb1b6f9SApple OSS Distributions 			usage();
362*bbb1b6f9SApple OSS Distributions 		}
363*bbb1b6f9SApple OSS Distributions 	}
364*bbb1b6f9SApple OSS Distributions 	argc -= optind; argv += optind;
365*bbb1b6f9SApple OSS Distributions 	if (argc > 0) {
366*bbb1b6f9SApple OSS Distributions 		producers = atoi(*argv);
367*bbb1b6f9SApple OSS Distributions 	}
368*bbb1b6f9SApple OSS Distributions 	argc--; argv++;
369*bbb1b6f9SApple OSS Distributions 	if (argc > 0) {
370*bbb1b6f9SApple OSS Distributions 		consumers = atoi(*argv);
371*bbb1b6f9SApple OSS Distributions 	}
372*bbb1b6f9SApple OSS Distributions 
373*bbb1b6f9SApple OSS Distributions 	pthread_mutex_init(&funnel, NULL);
374*bbb1b6f9SApple OSS Distributions 	pthread_cond_init(&barrier, NULL);
375*bbb1b6f9SApple OSS Distributions 
376*bbb1b6f9SApple OSS Distributions 	/*
377*bbb1b6f9SApple OSS Distributions 	 * Fire up the worker threads.
378*bbb1b6f9SApple OSS Distributions 	 */
379*bbb1b6f9SApple OSS Distributions 	threads = consumers * (stages - 1) + producers;
380*bbb1b6f9SApple OSS Distributions 	mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
381*bbb1b6f9SApple OSS Distributions 	    "  with %saffinity, consumer reads%s data\n",
382*bbb1b6f9SApple OSS Distributions 	    producers, s_if_plural(producers),
383*bbb1b6f9SApple OSS Distributions 	    stages - 1, s_if_plural(stages - 1),
384*bbb1b6f9SApple OSS Distributions 	    consumers, s_if_plural(consumers),
385*bbb1b6f9SApple OSS Distributions 	    affinity? "": "no ",
386*bbb1b6f9SApple OSS Distributions 	    (consumer_fnp == &reader_writer_fn)? " and writes" : "");
387*bbb1b6f9SApple OSS Distributions 	if (pages < 256) {
388*bbb1b6f9SApple OSS Distributions 		mutter("  %dkB bytes per buffer, ", pages * 4);
389*bbb1b6f9SApple OSS Distributions 	} else {
390*bbb1b6f9SApple OSS Distributions 		mutter("  %dMB bytes per buffer, ", pages / 256);
391*bbb1b6f9SApple OSS Distributions 	}
392*bbb1b6f9SApple OSS Distributions 	mutter("%d buffer%s per producer ",
393*bbb1b6f9SApple OSS Distributions 	    buffers, s_if_plural(buffers));
394*bbb1b6f9SApple OSS Distributions 	if (buffers * pages < 256) {
395*bbb1b6f9SApple OSS Distributions 		mutter("(total %dkB)\n", buffers * pages * 4);
396*bbb1b6f9SApple OSS Distributions 	} else {
397*bbb1b6f9SApple OSS Distributions 		mutter("(total %dMB)\n", buffers * pages / 256);
398*bbb1b6f9SApple OSS Distributions 	}
399*bbb1b6f9SApple OSS Distributions 	mutter("  processing %d buffer%s...\n",
400*bbb1b6f9SApple OSS Distributions 	    iterations, s_if_plural(iterations));
401*bbb1b6f9SApple OSS Distributions 
402*bbb1b6f9SApple OSS Distributions 	stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t));
403*bbb1b6f9SApple OSS Distributions 	worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t));
404*bbb1b6f9SApple OSS Distributions 
405*bbb1b6f9SApple OSS Distributions 	/* Set up the queue for the workers of this thread set: */
406*bbb1b6f9SApple OSS Distributions 	for (i = 0; i < stages; i++) {
407*bbb1b6f9SApple OSS Distributions 		sp = &stage_info[i];
408*bbb1b6f9SApple OSS Distributions 		sp->stagenum = i;
409*bbb1b6f9SApple OSS Distributions 		pthread_mutex_init(&sp->bufq.mtx, NULL);
410*bbb1b6f9SApple OSS Distributions 		pthread_cond_init(&sp->bufq.cnd, NULL);
411*bbb1b6f9SApple OSS Distributions 		TAILQ_INIT(&sp->bufq.queue);
412*bbb1b6f9SApple OSS Distributions 		sp->bufq.waiters = 0;
413*bbb1b6f9SApple OSS Distributions 		if (i == 0) {
414*bbb1b6f9SApple OSS Distributions 			sp->fn = producer_fnp;
415*bbb1b6f9SApple OSS Distributions 			sp->name = "producer";
416*bbb1b6f9SApple OSS Distributions 		} else {
417*bbb1b6f9SApple OSS Distributions 			sp->fn = consumer_fnp;
418*bbb1b6f9SApple OSS Distributions 			sp->name = "consumer";
419*bbb1b6f9SApple OSS Distributions 		}
420*bbb1b6f9SApple OSS Distributions 		sp->input = &sp->bufq;
421*bbb1b6f9SApple OSS Distributions 		sp->output = &stage_info[(i + 1) % stages].bufq;
422*bbb1b6f9SApple OSS Distributions 		stage_info[i].work_todo = iterations;
423*bbb1b6f9SApple OSS Distributions 	}
424*bbb1b6f9SApple OSS Distributions 
425*bbb1b6f9SApple OSS Distributions 	/* Create the producers */
426*bbb1b6f9SApple OSS Distributions 	for (i = 0; i < producers; i++) {
427*bbb1b6f9SApple OSS Distributions 		work_t  *work_array;
428*bbb1b6f9SApple OSS Distributions 		int     *data;
429*bbb1b6f9SApple OSS Distributions 		int     isize;
430*bbb1b6f9SApple OSS Distributions 
431*bbb1b6f9SApple OSS Distributions 		isize = pages * 4096 / sizeof(int);
432*bbb1b6f9SApple OSS Distributions 		data = (int *) malloc(buffers * pages * 4096);
433*bbb1b6f9SApple OSS Distributions 
434*bbb1b6f9SApple OSS Distributions 		/* Set up the empty work buffers */
435*bbb1b6f9SApple OSS Distributions 		work_array = (work_t *)  malloc(buffers * sizeof(work_t));
436*bbb1b6f9SApple OSS Distributions 		for (j = 0; j < buffers; j++) {
437*bbb1b6f9SApple OSS Distributions 			work_array[j].data = data + (isize * j);
438*bbb1b6f9SApple OSS Distributions 			work_array[j].isize = isize;
439*bbb1b6f9SApple OSS Distributions 			work_array[j].tag = 0;
440*bbb1b6f9SApple OSS Distributions 			TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link);
441*bbb1b6f9SApple OSS Distributions 			DBG("  empty work item %p for data %p\n",
442*bbb1b6f9SApple OSS Distributions 			    &work_array[j], work_array[j].data);
443*bbb1b6f9SApple OSS Distributions 		}
444*bbb1b6f9SApple OSS Distributions 		wp = &worker_info[i];
445*bbb1b6f9SApple OSS Distributions 		wp->setnum = i + 1;
446*bbb1b6f9SApple OSS Distributions 		wp->stage = &stage_info[0];
447*bbb1b6f9SApple OSS Distributions 		if (ret = pthread_create(&wp->thread,
448*bbb1b6f9SApple OSS Distributions 		    NULL,
449*bbb1b6f9SApple OSS Distributions 		    &manager_fn,
450*bbb1b6f9SApple OSS Distributions 		    (void *) wp)) {
451*bbb1b6f9SApple OSS Distributions 			err(1, "pthread_create %d,%d", 0, i);
452*bbb1b6f9SApple OSS Distributions 		}
453*bbb1b6f9SApple OSS Distributions 	}
454*bbb1b6f9SApple OSS Distributions 
455*bbb1b6f9SApple OSS Distributions 	/* Create consumers */
456*bbb1b6f9SApple OSS Distributions 	for (i = 1; i < stages; i++) {
457*bbb1b6f9SApple OSS Distributions 		for (j = 0; j < consumers; j++) {
458*bbb1b6f9SApple OSS Distributions 			wp = &worker_info[producers + (consumers * (i - 1)) + j];
459*bbb1b6f9SApple OSS Distributions 			wp->setnum = j + 1;
460*bbb1b6f9SApple OSS Distributions 			wp->stage = &stage_info[i];
461*bbb1b6f9SApple OSS Distributions 			if (ret = pthread_create(&wp->thread,
462*bbb1b6f9SApple OSS Distributions 			    NULL,
463*bbb1b6f9SApple OSS Distributions 			    &manager_fn,
464*bbb1b6f9SApple OSS Distributions 			    (void *) wp)) {
465*bbb1b6f9SApple OSS Distributions 				err(1, "pthread_create %d,%d", i, j);
466*bbb1b6f9SApple OSS Distributions 			}
467*bbb1b6f9SApple OSS Distributions 		}
468*bbb1b6f9SApple OSS Distributions 	}
469*bbb1b6f9SApple OSS Distributions 
470*bbb1b6f9SApple OSS Distributions 	/*
471*bbb1b6f9SApple OSS Distributions 	 * We sit back anf wait for the slaves to finish.
472*bbb1b6f9SApple OSS Distributions 	 */
473*bbb1b6f9SApple OSS Distributions 	for (k = 0; k < threads; k++) {
474*bbb1b6f9SApple OSS Distributions 		int     i;
475*bbb1b6f9SApple OSS Distributions 		int     j;
476*bbb1b6f9SApple OSS Distributions 
477*bbb1b6f9SApple OSS Distributions 		wp = &worker_info[k];
478*bbb1b6f9SApple OSS Distributions 		if (k < producers) {
479*bbb1b6f9SApple OSS Distributions 			i = 0;
480*bbb1b6f9SApple OSS Distributions 			j = k;
481*bbb1b6f9SApple OSS Distributions 		} else {
482*bbb1b6f9SApple OSS Distributions 			i = (k - producers) / consumers;
483*bbb1b6f9SApple OSS Distributions 			j = (k - producers) % consumers;
484*bbb1b6f9SApple OSS Distributions 		}
485*bbb1b6f9SApple OSS Distributions 		if (ret = pthread_join(wp->thread, (void **)&status)) {
486*bbb1b6f9SApple OSS Distributions 			err(1, "pthread_join %d,%d", i, j);
487*bbb1b6f9SApple OSS Distributions 		}
488*bbb1b6f9SApple OSS Distributions 		DBG("Thread %d,%d status %d\n", i, j, status);
489*bbb1b6f9SApple OSS Distributions 	}
490*bbb1b6f9SApple OSS Distributions 
491*bbb1b6f9SApple OSS Distributions 	/*
492*bbb1b6f9SApple OSS Distributions 	 * See how long the work took.
493*bbb1b6f9SApple OSS Distributions 	 */
494*bbb1b6f9SApple OSS Distributions 	timer = mach_absolute_time() - timer;
495*bbb1b6f9SApple OSS Distributions 	timer = timer / 1000000ULL;
496*bbb1b6f9SApple OSS Distributions 	printf("%d.%03d seconds elapsed.\n",
497*bbb1b6f9SApple OSS Distributions 	    (int) (timer / 1000ULL), (int) (timer % 1000ULL));
498*bbb1b6f9SApple OSS Distributions 
499*bbb1b6f9SApple OSS Distributions 	return 0;
500*bbb1b6f9SApple OSS Distributions }
501