1*e3723e1fSApple OSS Distributions #include <AvailabilityMacros.h>
2*e3723e1fSApple OSS Distributions #include <mach/thread_policy.h>
3*e3723e1fSApple OSS Distributions #include <mach/mach.h>
4*e3723e1fSApple OSS Distributions #include <mach/mach_error.h>
5*e3723e1fSApple OSS Distributions #include <mach/mach_time.h>
6*e3723e1fSApple OSS Distributions #include <pthread.h>
7*e3723e1fSApple OSS Distributions #include <sys/queue.h>
8*e3723e1fSApple OSS Distributions #include <stdio.h>
9*e3723e1fSApple OSS Distributions #include <stdlib.h>
10*e3723e1fSApple OSS Distributions #include <string.h>
11*e3723e1fSApple OSS Distributions #include <unistd.h>
12*e3723e1fSApple OSS Distributions #include <err.h>
13*e3723e1fSApple OSS Distributions
14*e3723e1fSApple OSS Distributions /*
15*e3723e1fSApple OSS Distributions * Pool is another multithreaded test/benchmarking program to evaluate
16*e3723e1fSApple OSS Distributions * affinity set placement in Leopard.
17*e3723e1fSApple OSS Distributions *
18*e3723e1fSApple OSS Distributions * The basic picture is:
19*e3723e1fSApple OSS Distributions *
20*e3723e1fSApple OSS Distributions * -> producer -- -> consumer --
21*e3723e1fSApple OSS Distributions * free / \ work / \
22*e3723e1fSApple OSS Distributions * -> queue -- ... --> queue -- --
23*e3723e1fSApple OSS Distributions * | \ / \ / |
24*e3723e1fSApple OSS Distributions * | -> producer -- -> consumer -- |
25*e3723e1fSApple OSS Distributions * ---------------------------------------------------------------
26*e3723e1fSApple OSS Distributions *
27*e3723e1fSApple OSS Distributions * <---------- "stage" ---------> <---------- "stage" --------->
28*e3723e1fSApple OSS Distributions *
29*e3723e1fSApple OSS Distributions * There are a series of work stages. Each stage has an input and an output
30*e3723e1fSApple OSS Distributions * queue and multiple threads. The first stage is the producer and subsequent
31*e3723e1fSApple OSS Distributions * stages are consumers. By defuaut there are 2 stages. There are N producer
32*e3723e1fSApple OSS Distributions * and M consumer threads. The are B buffers per producer threads circulating
33*e3723e1fSApple OSS Distributions * through the system.
34*e3723e1fSApple OSS Distributions *
35*e3723e1fSApple OSS Distributions * When affinity is enabled, each producer thread is tagged with an affinity tag
36*e3723e1fSApple OSS Distributions * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
37*e3723e1fSApple OSS Distributions * the work queue it is tagged with this affinity. When a consumer dequeues a
38*e3723e1fSApple OSS Distributions * work item, it sets its affinity to this tag. Hence consumer threads migrate
39*e3723e1fSApple OSS Distributions * to the same affinity set where the data was produced.
40*e3723e1fSApple OSS Distributions *
41*e3723e1fSApple OSS Distributions * Buffer management uses pthread mutex/condition variables. A thread blocks
42*e3723e1fSApple OSS Distributions * when no buffer is available on a queue and it is signaled when a buffer
43*e3723e1fSApple OSS Distributions * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
44*e3723e1fSApple OSS Distributions * The queue management is centralized in a single routine: what queues to
45*e3723e1fSApple OSS Distributions * use as input and output and what function to call for processing is
46*e3723e1fSApple OSS Distributions * data-driven.
47*e3723e1fSApple OSS Distributions */
48*e3723e1fSApple OSS Distributions
49*e3723e1fSApple OSS Distributions pthread_mutex_t funnel;
50*e3723e1fSApple OSS Distributions pthread_cond_t barrier;
51*e3723e1fSApple OSS Distributions
52*e3723e1fSApple OSS Distributions uint64_t timer;
53*e3723e1fSApple OSS Distributions int threads;
54*e3723e1fSApple OSS Distributions int threads_ready = 0;
55*e3723e1fSApple OSS Distributions
56*e3723e1fSApple OSS Distributions int iterations = 10000;
57*e3723e1fSApple OSS Distributions boolean_t affinity = FALSE;
58*e3723e1fSApple OSS Distributions boolean_t halting = FALSE;
59*e3723e1fSApple OSS Distributions int verbosity = 1;
60*e3723e1fSApple OSS Distributions
61*e3723e1fSApple OSS Distributions typedef struct work {
62*e3723e1fSApple OSS Distributions TAILQ_ENTRY(work) link;
63*e3723e1fSApple OSS Distributions int *data;
64*e3723e1fSApple OSS Distributions int isize;
65*e3723e1fSApple OSS Distributions int tag;
66*e3723e1fSApple OSS Distributions int number;
67*e3723e1fSApple OSS Distributions } work_t;
68*e3723e1fSApple OSS Distributions
69*e3723e1fSApple OSS Distributions /*
70*e3723e1fSApple OSS Distributions * A work queue, complete with pthread objects for its management
71*e3723e1fSApple OSS Distributions */
72*e3723e1fSApple OSS Distributions typedef struct work_queue {
73*e3723e1fSApple OSS Distributions pthread_mutex_t mtx;
74*e3723e1fSApple OSS Distributions pthread_cond_t cnd;
75*e3723e1fSApple OSS Distributions TAILQ_HEAD(, work) queue;
76*e3723e1fSApple OSS Distributions unsigned int waiters;
77*e3723e1fSApple OSS Distributions } work_queue_t;
78*e3723e1fSApple OSS Distributions
79*e3723e1fSApple OSS Distributions /* Worker functions take a integer array and size */
80*e3723e1fSApple OSS Distributions typedef void (worker_fn_t)(int *, int);
81*e3723e1fSApple OSS Distributions
82*e3723e1fSApple OSS Distributions /* This struct controls the function of a stage */
83*e3723e1fSApple OSS Distributions #define WORKERS_MAX 10
84*e3723e1fSApple OSS Distributions typedef struct {
85*e3723e1fSApple OSS Distributions int stagenum;
86*e3723e1fSApple OSS Distributions char *name;
87*e3723e1fSApple OSS Distributions worker_fn_t *fn;
88*e3723e1fSApple OSS Distributions work_queue_t *input;
89*e3723e1fSApple OSS Distributions work_queue_t *output;
90*e3723e1fSApple OSS Distributions work_queue_t bufq;
91*e3723e1fSApple OSS Distributions int work_todo;
92*e3723e1fSApple OSS Distributions } stage_info_t;
93*e3723e1fSApple OSS Distributions
94*e3723e1fSApple OSS Distributions /* This defines a worker thread */
95*e3723e1fSApple OSS Distributions typedef struct worker_info {
96*e3723e1fSApple OSS Distributions int setnum;
97*e3723e1fSApple OSS Distributions stage_info_t *stage;
98*e3723e1fSApple OSS Distributions pthread_t thread;
99*e3723e1fSApple OSS Distributions } worker_info_t;
100*e3723e1fSApple OSS Distributions
101*e3723e1fSApple OSS Distributions #define DBG(x...) do { \
102*e3723e1fSApple OSS Distributions if (verbosity > 1) { \
103*e3723e1fSApple OSS Distributions pthread_mutex_lock(&funnel); \
104*e3723e1fSApple OSS Distributions printf(x); \
105*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&funnel); \
106*e3723e1fSApple OSS Distributions } \
107*e3723e1fSApple OSS Distributions } while (0)
108*e3723e1fSApple OSS Distributions
109*e3723e1fSApple OSS Distributions #define mutter(x...) do { \
110*e3723e1fSApple OSS Distributions if (verbosity > 0) { \
111*e3723e1fSApple OSS Distributions printf(x); \
112*e3723e1fSApple OSS Distributions } \
113*e3723e1fSApple OSS Distributions } while (0)
114*e3723e1fSApple OSS Distributions
115*e3723e1fSApple OSS Distributions #define s_if_plural(x) (((x) > 1) ? "s" : "")
116*e3723e1fSApple OSS Distributions
117*e3723e1fSApple OSS Distributions static void
usage()118*e3723e1fSApple OSS Distributions usage()
119*e3723e1fSApple OSS Distributions {
120*e3723e1fSApple OSS Distributions fprintf(stderr,
121*e3723e1fSApple OSS Distributions "usage: pool [-a] Turn affinity on (off)\n"
122*e3723e1fSApple OSS Distributions " [-b B] Number of buffers per producer (2)\n"
123*e3723e1fSApple OSS Distributions " [-i I] Number of buffers to produce (10000)\n"
124*e3723e1fSApple OSS Distributions " [-s S] Number of stages (2)\n"
125*e3723e1fSApple OSS Distributions " [-p P] Number of pages per buffer (256=1MB)]\n"
126*e3723e1fSApple OSS Distributions " [-w] Consumer writes data\n"
127*e3723e1fSApple OSS Distributions " [-v V] Verbosity level 0..2 (1)\n"
128*e3723e1fSApple OSS Distributions " [N [M]] Number of producer and consumers (2)\n"
129*e3723e1fSApple OSS Distributions );
130*e3723e1fSApple OSS Distributions exit(1);
131*e3723e1fSApple OSS Distributions }
132*e3723e1fSApple OSS Distributions
133*e3723e1fSApple OSS Distributions /* Trivial producer: write to each byte */
134*e3723e1fSApple OSS Distributions void
writer_fn(int * data,int isize)135*e3723e1fSApple OSS Distributions writer_fn(int *data, int isize)
136*e3723e1fSApple OSS Distributions {
137*e3723e1fSApple OSS Distributions int i;
138*e3723e1fSApple OSS Distributions
139*e3723e1fSApple OSS Distributions for (i = 0; i < isize; i++) {
140*e3723e1fSApple OSS Distributions data[i] = i;
141*e3723e1fSApple OSS Distributions }
142*e3723e1fSApple OSS Distributions }
143*e3723e1fSApple OSS Distributions
144*e3723e1fSApple OSS Distributions /* Trivial consumer: read each byte */
145*e3723e1fSApple OSS Distributions void
reader_fn(int * data,int isize)146*e3723e1fSApple OSS Distributions reader_fn(int *data, int isize)
147*e3723e1fSApple OSS Distributions {
148*e3723e1fSApple OSS Distributions int i;
149*e3723e1fSApple OSS Distributions int datum;
150*e3723e1fSApple OSS Distributions
151*e3723e1fSApple OSS Distributions for (i = 0; i < isize; i++) {
152*e3723e1fSApple OSS Distributions datum = data[i];
153*e3723e1fSApple OSS Distributions }
154*e3723e1fSApple OSS Distributions }
155*e3723e1fSApple OSS Distributions
156*e3723e1fSApple OSS Distributions /* Consumer reading and writing the buffer */
157*e3723e1fSApple OSS Distributions void
reader_writer_fn(int * data,int isize)158*e3723e1fSApple OSS Distributions reader_writer_fn(int *data, int isize)
159*e3723e1fSApple OSS Distributions {
160*e3723e1fSApple OSS Distributions int i;
161*e3723e1fSApple OSS Distributions
162*e3723e1fSApple OSS Distributions for (i = 0; i < isize; i++) {
163*e3723e1fSApple OSS Distributions data[i] += 1;
164*e3723e1fSApple OSS Distributions }
165*e3723e1fSApple OSS Distributions }
166*e3723e1fSApple OSS Distributions
167*e3723e1fSApple OSS Distributions void
affinity_set(int tag)168*e3723e1fSApple OSS Distributions affinity_set(int tag)
169*e3723e1fSApple OSS Distributions {
170*e3723e1fSApple OSS Distributions kern_return_t ret;
171*e3723e1fSApple OSS Distributions thread_affinity_policy_data_t policy;
172*e3723e1fSApple OSS Distributions if (affinity) {
173*e3723e1fSApple OSS Distributions policy.affinity_tag = tag;
174*e3723e1fSApple OSS Distributions ret = thread_policy_set(
175*e3723e1fSApple OSS Distributions mach_thread_self(), THREAD_AFFINITY_POLICY,
176*e3723e1fSApple OSS Distributions (thread_policy_t) &policy,
177*e3723e1fSApple OSS Distributions THREAD_AFFINITY_POLICY_COUNT);
178*e3723e1fSApple OSS Distributions if (ret != KERN_SUCCESS) {
179*e3723e1fSApple OSS Distributions printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
180*e3723e1fSApple OSS Distributions }
181*e3723e1fSApple OSS Distributions }
182*e3723e1fSApple OSS Distributions }
183*e3723e1fSApple OSS Distributions
184*e3723e1fSApple OSS Distributions /*
185*e3723e1fSApple OSS Distributions * This is the central function for every thread.
186*e3723e1fSApple OSS Distributions * For each invocation, its role is ets by (a pointer to) a stage_info_t.
187*e3723e1fSApple OSS Distributions */
188*e3723e1fSApple OSS Distributions void *
manager_fn(void * arg)189*e3723e1fSApple OSS Distributions manager_fn(void *arg)
190*e3723e1fSApple OSS Distributions {
191*e3723e1fSApple OSS Distributions worker_info_t *wp = (worker_info_t *) arg;
192*e3723e1fSApple OSS Distributions stage_info_t *sp = wp->stage;
193*e3723e1fSApple OSS Distributions boolean_t is_producer = (sp->stagenum == 0);
194*e3723e1fSApple OSS Distributions long iteration = 0;
195*e3723e1fSApple OSS Distributions int current_tag = 0;
196*e3723e1fSApple OSS Distributions
197*e3723e1fSApple OSS Distributions kern_return_t ret;
198*e3723e1fSApple OSS Distributions thread_extended_policy_data_t epolicy;
199*e3723e1fSApple OSS Distributions epolicy.timeshare = FALSE;
200*e3723e1fSApple OSS Distributions ret = thread_policy_set(
201*e3723e1fSApple OSS Distributions mach_thread_self(), THREAD_EXTENDED_POLICY,
202*e3723e1fSApple OSS Distributions (thread_policy_t) &epolicy,
203*e3723e1fSApple OSS Distributions THREAD_EXTENDED_POLICY_COUNT);
204*e3723e1fSApple OSS Distributions if (ret != KERN_SUCCESS) {
205*e3723e1fSApple OSS Distributions printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
206*e3723e1fSApple OSS Distributions }
207*e3723e1fSApple OSS Distributions
208*e3723e1fSApple OSS Distributions /*
209*e3723e1fSApple OSS Distributions * If we're using affinity sets and we're a producer
210*e3723e1fSApple OSS Distributions * set our tag to by our thread set number.
211*e3723e1fSApple OSS Distributions */
212*e3723e1fSApple OSS Distributions if (affinity && is_producer) {
213*e3723e1fSApple OSS Distributions affinity_set(wp->setnum);
214*e3723e1fSApple OSS Distributions current_tag = wp->setnum;
215*e3723e1fSApple OSS Distributions }
216*e3723e1fSApple OSS Distributions
217*e3723e1fSApple OSS Distributions DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum);
218*e3723e1fSApple OSS Distributions
219*e3723e1fSApple OSS Distributions /*
220*e3723e1fSApple OSS Distributions * Start barrier.
221*e3723e1fSApple OSS Distributions * The tets thread to get here releases everyone and starts the timer.
222*e3723e1fSApple OSS Distributions */
223*e3723e1fSApple OSS Distributions pthread_mutex_lock(&funnel);
224*e3723e1fSApple OSS Distributions threads_ready++;
225*e3723e1fSApple OSS Distributions if (threads_ready == threads) {
226*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&funnel);
227*e3723e1fSApple OSS Distributions if (halting) {
228*e3723e1fSApple OSS Distributions printf(" all threads ready for process %d, "
229*e3723e1fSApple OSS Distributions "hit any key to start", getpid());
230*e3723e1fSApple OSS Distributions fflush(stdout);
231*e3723e1fSApple OSS Distributions (void) getchar();
232*e3723e1fSApple OSS Distributions }
233*e3723e1fSApple OSS Distributions pthread_cond_broadcast(&barrier);
234*e3723e1fSApple OSS Distributions timer = mach_absolute_time();
235*e3723e1fSApple OSS Distributions } else {
236*e3723e1fSApple OSS Distributions pthread_cond_wait(&barrier, &funnel);
237*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&funnel);
238*e3723e1fSApple OSS Distributions }
239*e3723e1fSApple OSS Distributions
240*e3723e1fSApple OSS Distributions do {
241*e3723e1fSApple OSS Distributions work_t *workp;
242*e3723e1fSApple OSS Distributions
243*e3723e1fSApple OSS Distributions /*
244*e3723e1fSApple OSS Distributions * Get a buffer from the input queue.
245*e3723e1fSApple OSS Distributions * Block if none.
246*e3723e1fSApple OSS Distributions * Quit if all work done.
247*e3723e1fSApple OSS Distributions */
248*e3723e1fSApple OSS Distributions pthread_mutex_lock(&sp->input->mtx);
249*e3723e1fSApple OSS Distributions while (1) {
250*e3723e1fSApple OSS Distributions if (sp->work_todo == 0) {
251*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&sp->input->mtx);
252*e3723e1fSApple OSS Distributions goto out;
253*e3723e1fSApple OSS Distributions }
254*e3723e1fSApple OSS Distributions workp = TAILQ_FIRST(&(sp->input->queue));
255*e3723e1fSApple OSS Distributions if (workp != NULL) {
256*e3723e1fSApple OSS Distributions break;
257*e3723e1fSApple OSS Distributions }
258*e3723e1fSApple OSS Distributions DBG(" %s[%d,%d] todo %d waiting for buffer\n",
259*e3723e1fSApple OSS Distributions sp->name, wp->setnum, sp->stagenum, sp->work_todo);
260*e3723e1fSApple OSS Distributions sp->input->waiters++;
261*e3723e1fSApple OSS Distributions pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
262*e3723e1fSApple OSS Distributions sp->input->waiters--;
263*e3723e1fSApple OSS Distributions }
264*e3723e1fSApple OSS Distributions TAILQ_REMOVE(&(sp->input->queue), workp, link);
265*e3723e1fSApple OSS Distributions iteration = sp->work_todo--;
266*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&sp->input->mtx);
267*e3723e1fSApple OSS Distributions
268*e3723e1fSApple OSS Distributions if (is_producer) {
269*e3723e1fSApple OSS Distributions workp->number = iteration;
270*e3723e1fSApple OSS Distributions workp->tag = wp->setnum;
271*e3723e1fSApple OSS Distributions } else {
272*e3723e1fSApple OSS Distributions if (affinity && current_tag != workp->tag) {
273*e3723e1fSApple OSS Distributions affinity_set(workp->tag);
274*e3723e1fSApple OSS Distributions current_tag = workp->tag;
275*e3723e1fSApple OSS Distributions }
276*e3723e1fSApple OSS Distributions }
277*e3723e1fSApple OSS Distributions
278*e3723e1fSApple OSS Distributions DBG(" %s[%d,%d] todo %d work %p data %p\n",
279*e3723e1fSApple OSS Distributions sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data);
280*e3723e1fSApple OSS Distributions
281*e3723e1fSApple OSS Distributions /* Do our stuff with the buffer */
282*e3723e1fSApple OSS Distributions (void) sp->fn(workp->data, workp->isize);
283*e3723e1fSApple OSS Distributions
284*e3723e1fSApple OSS Distributions /*
285*e3723e1fSApple OSS Distributions * Place the buffer on the input queue of the next stage.
286*e3723e1fSApple OSS Distributions * Signal waiters if required.
287*e3723e1fSApple OSS Distributions */
288*e3723e1fSApple OSS Distributions pthread_mutex_lock(&sp->output->mtx);
289*e3723e1fSApple OSS Distributions TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
290*e3723e1fSApple OSS Distributions if (sp->output->waiters) {
291*e3723e1fSApple OSS Distributions DBG(" %s[%d,%d] todo %d signaling work\n",
292*e3723e1fSApple OSS Distributions sp->name, wp->setnum, sp->stagenum, iteration);
293*e3723e1fSApple OSS Distributions pthread_cond_signal(&sp->output->cnd);
294*e3723e1fSApple OSS Distributions }
295*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&sp->output->mtx);
296*e3723e1fSApple OSS Distributions } while (1);
297*e3723e1fSApple OSS Distributions
298*e3723e1fSApple OSS Distributions out:
299*e3723e1fSApple OSS Distributions pthread_cond_broadcast(&sp->output->cnd);
300*e3723e1fSApple OSS Distributions
301*e3723e1fSApple OSS Distributions DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum);
302*e3723e1fSApple OSS Distributions
303*e3723e1fSApple OSS Distributions return (void *) iteration;
304*e3723e1fSApple OSS Distributions }
305*e3723e1fSApple OSS Distributions
306*e3723e1fSApple OSS Distributions void (*producer_fnp)(int *data, int isize) = &writer_fn;
307*e3723e1fSApple OSS Distributions void (*consumer_fnp)(int *data, int isize) = &reader_fn;
308*e3723e1fSApple OSS Distributions
309*e3723e1fSApple OSS Distributions int
main(int argc,char * argv[])310*e3723e1fSApple OSS Distributions main(int argc, char *argv[])
311*e3723e1fSApple OSS Distributions {
312*e3723e1fSApple OSS Distributions int i;
313*e3723e1fSApple OSS Distributions int j;
314*e3723e1fSApple OSS Distributions int k;
315*e3723e1fSApple OSS Distributions int pages = 256; /* 1MB */
316*e3723e1fSApple OSS Distributions int buffers = 2;
317*e3723e1fSApple OSS Distributions int producers = 2;
318*e3723e1fSApple OSS Distributions int consumers = 2;
319*e3723e1fSApple OSS Distributions int stages = 2;
320*e3723e1fSApple OSS Distributions int *status;
321*e3723e1fSApple OSS Distributions stage_info_t *stage_info;
322*e3723e1fSApple OSS Distributions stage_info_t *sp;
323*e3723e1fSApple OSS Distributions worker_info_t *worker_info;
324*e3723e1fSApple OSS Distributions worker_info_t *wp;
325*e3723e1fSApple OSS Distributions kern_return_t ret;
326*e3723e1fSApple OSS Distributions int c;
327*e3723e1fSApple OSS Distributions
328*e3723e1fSApple OSS Distributions /* Do switch parsing: */
329*e3723e1fSApple OSS Distributions while ((c = getopt(argc, argv, "ab:i:p:s:twv:")) != -1) {
330*e3723e1fSApple OSS Distributions switch (c) {
331*e3723e1fSApple OSS Distributions case 'a':
332*e3723e1fSApple OSS Distributions affinity = !affinity;
333*e3723e1fSApple OSS Distributions break;
334*e3723e1fSApple OSS Distributions case 'b':
335*e3723e1fSApple OSS Distributions buffers = atoi(optarg);
336*e3723e1fSApple OSS Distributions break;
337*e3723e1fSApple OSS Distributions case 'i':
338*e3723e1fSApple OSS Distributions iterations = atoi(optarg);
339*e3723e1fSApple OSS Distributions break;
340*e3723e1fSApple OSS Distributions case 'p':
341*e3723e1fSApple OSS Distributions pages = atoi(optarg);
342*e3723e1fSApple OSS Distributions break;
343*e3723e1fSApple OSS Distributions case 's':
344*e3723e1fSApple OSS Distributions stages = atoi(optarg);
345*e3723e1fSApple OSS Distributions if (stages >= WORKERS_MAX) {
346*e3723e1fSApple OSS Distributions usage();
347*e3723e1fSApple OSS Distributions }
348*e3723e1fSApple OSS Distributions break;
349*e3723e1fSApple OSS Distributions case 't':
350*e3723e1fSApple OSS Distributions halting = TRUE;
351*e3723e1fSApple OSS Distributions break;
352*e3723e1fSApple OSS Distributions case 'w':
353*e3723e1fSApple OSS Distributions consumer_fnp = &reader_writer_fn;
354*e3723e1fSApple OSS Distributions break;
355*e3723e1fSApple OSS Distributions case 'v':
356*e3723e1fSApple OSS Distributions verbosity = atoi(optarg);
357*e3723e1fSApple OSS Distributions break;
358*e3723e1fSApple OSS Distributions case 'h':
359*e3723e1fSApple OSS Distributions case '?':
360*e3723e1fSApple OSS Distributions default:
361*e3723e1fSApple OSS Distributions usage();
362*e3723e1fSApple OSS Distributions }
363*e3723e1fSApple OSS Distributions }
364*e3723e1fSApple OSS Distributions argc -= optind; argv += optind;
365*e3723e1fSApple OSS Distributions if (argc > 0) {
366*e3723e1fSApple OSS Distributions producers = atoi(*argv);
367*e3723e1fSApple OSS Distributions }
368*e3723e1fSApple OSS Distributions argc--; argv++;
369*e3723e1fSApple OSS Distributions if (argc > 0) {
370*e3723e1fSApple OSS Distributions consumers = atoi(*argv);
371*e3723e1fSApple OSS Distributions }
372*e3723e1fSApple OSS Distributions
373*e3723e1fSApple OSS Distributions pthread_mutex_init(&funnel, NULL);
374*e3723e1fSApple OSS Distributions pthread_cond_init(&barrier, NULL);
375*e3723e1fSApple OSS Distributions
376*e3723e1fSApple OSS Distributions /*
377*e3723e1fSApple OSS Distributions * Fire up the worker threads.
378*e3723e1fSApple OSS Distributions */
379*e3723e1fSApple OSS Distributions threads = consumers * (stages - 1) + producers;
380*e3723e1fSApple OSS Distributions mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
381*e3723e1fSApple OSS Distributions " with %saffinity, consumer reads%s data\n",
382*e3723e1fSApple OSS Distributions producers, s_if_plural(producers),
383*e3723e1fSApple OSS Distributions stages - 1, s_if_plural(stages - 1),
384*e3723e1fSApple OSS Distributions consumers, s_if_plural(consumers),
385*e3723e1fSApple OSS Distributions affinity? "": "no ",
386*e3723e1fSApple OSS Distributions (consumer_fnp == &reader_writer_fn)? " and writes" : "");
387*e3723e1fSApple OSS Distributions if (pages < 256) {
388*e3723e1fSApple OSS Distributions mutter(" %dkB bytes per buffer, ", pages * 4);
389*e3723e1fSApple OSS Distributions } else {
390*e3723e1fSApple OSS Distributions mutter(" %dMB bytes per buffer, ", pages / 256);
391*e3723e1fSApple OSS Distributions }
392*e3723e1fSApple OSS Distributions mutter("%d buffer%s per producer ",
393*e3723e1fSApple OSS Distributions buffers, s_if_plural(buffers));
394*e3723e1fSApple OSS Distributions if (buffers * pages < 256) {
395*e3723e1fSApple OSS Distributions mutter("(total %dkB)\n", buffers * pages * 4);
396*e3723e1fSApple OSS Distributions } else {
397*e3723e1fSApple OSS Distributions mutter("(total %dMB)\n", buffers * pages / 256);
398*e3723e1fSApple OSS Distributions }
399*e3723e1fSApple OSS Distributions mutter(" processing %d buffer%s...\n",
400*e3723e1fSApple OSS Distributions iterations, s_if_plural(iterations));
401*e3723e1fSApple OSS Distributions
402*e3723e1fSApple OSS Distributions stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t));
403*e3723e1fSApple OSS Distributions worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t));
404*e3723e1fSApple OSS Distributions
405*e3723e1fSApple OSS Distributions /* Set up the queue for the workers of this thread set: */
406*e3723e1fSApple OSS Distributions for (i = 0; i < stages; i++) {
407*e3723e1fSApple OSS Distributions sp = &stage_info[i];
408*e3723e1fSApple OSS Distributions sp->stagenum = i;
409*e3723e1fSApple OSS Distributions pthread_mutex_init(&sp->bufq.mtx, NULL);
410*e3723e1fSApple OSS Distributions pthread_cond_init(&sp->bufq.cnd, NULL);
411*e3723e1fSApple OSS Distributions TAILQ_INIT(&sp->bufq.queue);
412*e3723e1fSApple OSS Distributions sp->bufq.waiters = 0;
413*e3723e1fSApple OSS Distributions if (i == 0) {
414*e3723e1fSApple OSS Distributions sp->fn = producer_fnp;
415*e3723e1fSApple OSS Distributions sp->name = "producer";
416*e3723e1fSApple OSS Distributions } else {
417*e3723e1fSApple OSS Distributions sp->fn = consumer_fnp;
418*e3723e1fSApple OSS Distributions sp->name = "consumer";
419*e3723e1fSApple OSS Distributions }
420*e3723e1fSApple OSS Distributions sp->input = &sp->bufq;
421*e3723e1fSApple OSS Distributions sp->output = &stage_info[(i + 1) % stages].bufq;
422*e3723e1fSApple OSS Distributions stage_info[i].work_todo = iterations;
423*e3723e1fSApple OSS Distributions }
424*e3723e1fSApple OSS Distributions
425*e3723e1fSApple OSS Distributions /* Create the producers */
426*e3723e1fSApple OSS Distributions for (i = 0; i < producers; i++) {
427*e3723e1fSApple OSS Distributions work_t *work_array;
428*e3723e1fSApple OSS Distributions int *data;
429*e3723e1fSApple OSS Distributions int isize;
430*e3723e1fSApple OSS Distributions
431*e3723e1fSApple OSS Distributions isize = pages * 4096 / sizeof(int);
432*e3723e1fSApple OSS Distributions data = (int *) malloc(buffers * pages * 4096);
433*e3723e1fSApple OSS Distributions
434*e3723e1fSApple OSS Distributions /* Set up the empty work buffers */
435*e3723e1fSApple OSS Distributions work_array = (work_t *) malloc(buffers * sizeof(work_t));
436*e3723e1fSApple OSS Distributions for (j = 0; j < buffers; j++) {
437*e3723e1fSApple OSS Distributions work_array[j].data = data + (isize * j);
438*e3723e1fSApple OSS Distributions work_array[j].isize = isize;
439*e3723e1fSApple OSS Distributions work_array[j].tag = 0;
440*e3723e1fSApple OSS Distributions TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link);
441*e3723e1fSApple OSS Distributions DBG(" empty work item %p for data %p\n",
442*e3723e1fSApple OSS Distributions &work_array[j], work_array[j].data);
443*e3723e1fSApple OSS Distributions }
444*e3723e1fSApple OSS Distributions wp = &worker_info[i];
445*e3723e1fSApple OSS Distributions wp->setnum = i + 1;
446*e3723e1fSApple OSS Distributions wp->stage = &stage_info[0];
447*e3723e1fSApple OSS Distributions if (ret = pthread_create(&wp->thread,
448*e3723e1fSApple OSS Distributions NULL,
449*e3723e1fSApple OSS Distributions &manager_fn,
450*e3723e1fSApple OSS Distributions (void *) wp)) {
451*e3723e1fSApple OSS Distributions err(1, "pthread_create %d,%d", 0, i);
452*e3723e1fSApple OSS Distributions }
453*e3723e1fSApple OSS Distributions }
454*e3723e1fSApple OSS Distributions
455*e3723e1fSApple OSS Distributions /* Create consumers */
456*e3723e1fSApple OSS Distributions for (i = 1; i < stages; i++) {
457*e3723e1fSApple OSS Distributions for (j = 0; j < consumers; j++) {
458*e3723e1fSApple OSS Distributions wp = &worker_info[producers + (consumers * (i - 1)) + j];
459*e3723e1fSApple OSS Distributions wp->setnum = j + 1;
460*e3723e1fSApple OSS Distributions wp->stage = &stage_info[i];
461*e3723e1fSApple OSS Distributions if (ret = pthread_create(&wp->thread,
462*e3723e1fSApple OSS Distributions NULL,
463*e3723e1fSApple OSS Distributions &manager_fn,
464*e3723e1fSApple OSS Distributions (void *) wp)) {
465*e3723e1fSApple OSS Distributions err(1, "pthread_create %d,%d", i, j);
466*e3723e1fSApple OSS Distributions }
467*e3723e1fSApple OSS Distributions }
468*e3723e1fSApple OSS Distributions }
469*e3723e1fSApple OSS Distributions
470*e3723e1fSApple OSS Distributions /*
471*e3723e1fSApple OSS Distributions * We sit back anf wait for the slaves to finish.
472*e3723e1fSApple OSS Distributions */
473*e3723e1fSApple OSS Distributions for (k = 0; k < threads; k++) {
474*e3723e1fSApple OSS Distributions int i;
475*e3723e1fSApple OSS Distributions int j;
476*e3723e1fSApple OSS Distributions
477*e3723e1fSApple OSS Distributions wp = &worker_info[k];
478*e3723e1fSApple OSS Distributions if (k < producers) {
479*e3723e1fSApple OSS Distributions i = 0;
480*e3723e1fSApple OSS Distributions j = k;
481*e3723e1fSApple OSS Distributions } else {
482*e3723e1fSApple OSS Distributions i = (k - producers) / consumers;
483*e3723e1fSApple OSS Distributions j = (k - producers) % consumers;
484*e3723e1fSApple OSS Distributions }
485*e3723e1fSApple OSS Distributions if (ret = pthread_join(wp->thread, (void **)&status)) {
486*e3723e1fSApple OSS Distributions err(1, "pthread_join %d,%d", i, j);
487*e3723e1fSApple OSS Distributions }
488*e3723e1fSApple OSS Distributions DBG("Thread %d,%d status %d\n", i, j, status);
489*e3723e1fSApple OSS Distributions }
490*e3723e1fSApple OSS Distributions
491*e3723e1fSApple OSS Distributions /*
492*e3723e1fSApple OSS Distributions * See how long the work took.
493*e3723e1fSApple OSS Distributions */
494*e3723e1fSApple OSS Distributions timer = mach_absolute_time() - timer;
495*e3723e1fSApple OSS Distributions timer = timer / 1000000ULL;
496*e3723e1fSApple OSS Distributions printf("%d.%03d seconds elapsed.\n",
497*e3723e1fSApple OSS Distributions (int) (timer / 1000ULL), (int) (timer % 1000ULL));
498*e3723e1fSApple OSS Distributions
499*e3723e1fSApple OSS Distributions return 0;
500*e3723e1fSApple OSS Distributions }
501