1*e3723e1fSApple OSS Distributions #include <AvailabilityMacros.h>
2*e3723e1fSApple OSS Distributions #include <mach/thread_policy.h>
3*e3723e1fSApple OSS Distributions #include <mach/mach.h>
4*e3723e1fSApple OSS Distributions #include <mach/mach_error.h>
5*e3723e1fSApple OSS Distributions #include <mach/mach_time.h>
6*e3723e1fSApple OSS Distributions #include <pthread.h>
7*e3723e1fSApple OSS Distributions #include <sys/queue.h>
8*e3723e1fSApple OSS Distributions #include <stdio.h>
9*e3723e1fSApple OSS Distributions #include <stdlib.h>
10*e3723e1fSApple OSS Distributions #include <string.h>
11*e3723e1fSApple OSS Distributions #include <unistd.h>
12*e3723e1fSApple OSS Distributions #include <err.h>
13*e3723e1fSApple OSS Distributions #include <errno.h>
14*e3723e1fSApple OSS Distributions #include <sys/sysctl.h>
15*e3723e1fSApple OSS Distributions
16*e3723e1fSApple OSS Distributions /*
17*e3723e1fSApple OSS Distributions * Sets is a multithreaded test/benchmarking program to evaluate
18*e3723e1fSApple OSS Distributions * affinity set placement in Leopard.
19*e3723e1fSApple OSS Distributions *
20*e3723e1fSApple OSS Distributions * The picture here, for each set, is:
21*e3723e1fSApple OSS Distributions *
22*e3723e1fSApple OSS Distributions * free work
23*e3723e1fSApple OSS Distributions * -> queue --> producer --> queue --> consumer --
24*e3723e1fSApple OSS Distributions * | |
25*e3723e1fSApple OSS Distributions * -----------------------------------------------
26*e3723e1fSApple OSS Distributions *
27*e3723e1fSApple OSS Distributions * <------ "stage" -----> <------ "stage" ----->
28*e3723e1fSApple OSS Distributions *
29*e3723e1fSApple OSS Distributions * We spin off sets of production line threads (2 sets by default).
30*e3723e1fSApple OSS Distributions * All threads of each line sets the same affinity tag (unless disabled).
31*e3723e1fSApple OSS Distributions * By default there are 2 stage (worker) threads per production line.
32*e3723e1fSApple OSS Distributions * A worker thread removes a buffer from an input queue, processses it and
33*e3723e1fSApple OSS Distributions * queues it on an output queue. By default the initial stage (producer)
34*e3723e1fSApple OSS Distributions * writes every byte in a buffer and the other (consumer) stages read every
35*e3723e1fSApple OSS Distributions * byte. By default the buffers are 1MB (256 pages) in size but this can be
36*e3723e1fSApple OSS Distributions * overidden. By default there are 2 buffers per set (again overridable).
37*e3723e1fSApple OSS Distributions * Worker threads process (iterate over) 10000 buffers by default.
38*e3723e1fSApple OSS Distributions *
39*e3723e1fSApple OSS Distributions * With affinity enabled, each producer and consumer thread sets its affinity
40*e3723e1fSApple OSS Distributions * to the set number, 1 .. N. So the threads of each set share an L2 cache.
41*e3723e1fSApple OSS Distributions *
42*e3723e1fSApple OSS Distributions * Buffer management uses pthread mutex/condition variables. A thread blocks
43*e3723e1fSApple OSS Distributions * when no buffer is available on a queue and it is signaled when a buffer
44*e3723e1fSApple OSS Distributions * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
45*e3723e1fSApple OSS Distributions * The queue management is centralized in a single routine: what queues to
46*e3723e1fSApple OSS Distributions * use as input and output and what function to call for processing is
47*e3723e1fSApple OSS Distributions * data-driven.
48*e3723e1fSApple OSS Distributions */
49*e3723e1fSApple OSS Distributions
50*e3723e1fSApple OSS Distributions pthread_mutex_t funnel;
51*e3723e1fSApple OSS Distributions pthread_cond_t barrier;
52*e3723e1fSApple OSS Distributions
53*e3723e1fSApple OSS Distributions uint64_t timer;
54*e3723e1fSApple OSS Distributions int threads;
55*e3723e1fSApple OSS Distributions int threads_ready = 0;
56*e3723e1fSApple OSS Distributions
57*e3723e1fSApple OSS Distributions int iterations = 10000;
58*e3723e1fSApple OSS Distributions boolean_t affinity = FALSE;
59*e3723e1fSApple OSS Distributions boolean_t halting = FALSE;
60*e3723e1fSApple OSS Distributions boolean_t cache_config = FALSE;
61*e3723e1fSApple OSS Distributions int verbosity = 1;
62*e3723e1fSApple OSS Distributions
63*e3723e1fSApple OSS Distributions typedef struct work {
64*e3723e1fSApple OSS Distributions TAILQ_ENTRY(work) link;
65*e3723e1fSApple OSS Distributions int *data;
66*e3723e1fSApple OSS Distributions } work_t;
67*e3723e1fSApple OSS Distributions
68*e3723e1fSApple OSS Distributions /*
69*e3723e1fSApple OSS Distributions * A work queue, complete with pthread objects for its management
70*e3723e1fSApple OSS Distributions */
71*e3723e1fSApple OSS Distributions typedef struct work_queue {
72*e3723e1fSApple OSS Distributions pthread_mutex_t mtx;
73*e3723e1fSApple OSS Distributions pthread_cond_t cnd;
74*e3723e1fSApple OSS Distributions TAILQ_HEAD(, work) queue;
75*e3723e1fSApple OSS Distributions boolean_t waiters;
76*e3723e1fSApple OSS Distributions } work_queue_t;
77*e3723e1fSApple OSS Distributions
78*e3723e1fSApple OSS Distributions /* Worker functions take a integer array and size */
79*e3723e1fSApple OSS Distributions typedef void (worker_fn_t)(int *, int);
80*e3723e1fSApple OSS Distributions
81*e3723e1fSApple OSS Distributions /* This struct controls the function of a thread */
82*e3723e1fSApple OSS Distributions typedef struct {
83*e3723e1fSApple OSS Distributions int stagenum;
84*e3723e1fSApple OSS Distributions char *name;
85*e3723e1fSApple OSS Distributions worker_fn_t *fn;
86*e3723e1fSApple OSS Distributions work_queue_t *input;
87*e3723e1fSApple OSS Distributions work_queue_t *output;
88*e3723e1fSApple OSS Distributions struct line_info *set;
89*e3723e1fSApple OSS Distributions pthread_t thread;
90*e3723e1fSApple OSS Distributions work_queue_t bufq;
91*e3723e1fSApple OSS Distributions } stage_info_t;
92*e3723e1fSApple OSS Distributions
93*e3723e1fSApple OSS Distributions /* This defines a thread set */
94*e3723e1fSApple OSS Distributions #define WORKERS_MAX 10
95*e3723e1fSApple OSS Distributions typedef struct line_info {
96*e3723e1fSApple OSS Distributions int setnum;
97*e3723e1fSApple OSS Distributions int *data;
98*e3723e1fSApple OSS Distributions int isize;
99*e3723e1fSApple OSS Distributions stage_info_t *stage[WORKERS_MAX];
100*e3723e1fSApple OSS Distributions } line_info_t;
101*e3723e1fSApple OSS Distributions
102*e3723e1fSApple OSS Distributions #define DBG(x...) do { \
103*e3723e1fSApple OSS Distributions if (verbosity > 1) { \
104*e3723e1fSApple OSS Distributions pthread_mutex_lock(&funnel); \
105*e3723e1fSApple OSS Distributions printf(x); \
106*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&funnel); \
107*e3723e1fSApple OSS Distributions } \
108*e3723e1fSApple OSS Distributions } while (0)
109*e3723e1fSApple OSS Distributions
110*e3723e1fSApple OSS Distributions #define mutter(x...) do { \
111*e3723e1fSApple OSS Distributions if (verbosity > 0) { \
112*e3723e1fSApple OSS Distributions printf(x); \
113*e3723e1fSApple OSS Distributions } \
114*e3723e1fSApple OSS Distributions } while (0)
115*e3723e1fSApple OSS Distributions
116*e3723e1fSApple OSS Distributions #define s_if_plural(x) (((x) > 1) ? "s" : "")
117*e3723e1fSApple OSS Distributions
118*e3723e1fSApple OSS Distributions static void
usage()119*e3723e1fSApple OSS Distributions usage()
120*e3723e1fSApple OSS Distributions {
121*e3723e1fSApple OSS Distributions fprintf(stderr,
122*e3723e1fSApple OSS Distributions "usage: sets [-a] Turn affinity on (off)\n"
123*e3723e1fSApple OSS Distributions " [-b B] Number of buffers per set/line (2)\n"
124*e3723e1fSApple OSS Distributions " [-c] Configure for max cache performance\n"
125*e3723e1fSApple OSS Distributions " [-h] Print this\n"
126*e3723e1fSApple OSS Distributions " [-i I] Number of items/buffers to process (1000)\n"
127*e3723e1fSApple OSS Distributions " [-s S] Number of stages per set/line (2)\n"
128*e3723e1fSApple OSS Distributions " [-t] Halt for keyboard input to start\n"
129*e3723e1fSApple OSS Distributions " [-p P] Number of pages per buffer (256=1MB)]\n"
130*e3723e1fSApple OSS Distributions " [-w] Consumer writes data\n"
131*e3723e1fSApple OSS Distributions " [-v V] Level of verbosity 0..2 (1)\n"
132*e3723e1fSApple OSS Distributions " [N] Number of sets/lines (2)\n"
133*e3723e1fSApple OSS Distributions );
134*e3723e1fSApple OSS Distributions exit(1);
135*e3723e1fSApple OSS Distributions }
136*e3723e1fSApple OSS Distributions
137*e3723e1fSApple OSS Distributions /* Trivial producer: write to each byte */
138*e3723e1fSApple OSS Distributions void
writer_fn(int * data,int isize)139*e3723e1fSApple OSS Distributions writer_fn(int *data, int isize)
140*e3723e1fSApple OSS Distributions {
141*e3723e1fSApple OSS Distributions int i;
142*e3723e1fSApple OSS Distributions
143*e3723e1fSApple OSS Distributions for (i = 0; i < isize; i++) {
144*e3723e1fSApple OSS Distributions data[i] = i;
145*e3723e1fSApple OSS Distributions }
146*e3723e1fSApple OSS Distributions }
147*e3723e1fSApple OSS Distributions
148*e3723e1fSApple OSS Distributions /* Trivial consumer: read each byte */
149*e3723e1fSApple OSS Distributions void
reader_fn(int * data,int isize)150*e3723e1fSApple OSS Distributions reader_fn(int *data, int isize)
151*e3723e1fSApple OSS Distributions {
152*e3723e1fSApple OSS Distributions int i;
153*e3723e1fSApple OSS Distributions int datum;
154*e3723e1fSApple OSS Distributions
155*e3723e1fSApple OSS Distributions for (i = 0; i < isize; i++) {
156*e3723e1fSApple OSS Distributions datum = data[i];
157*e3723e1fSApple OSS Distributions }
158*e3723e1fSApple OSS Distributions }
159*e3723e1fSApple OSS Distributions
160*e3723e1fSApple OSS Distributions /* Consumer reading and writing the buffer */
161*e3723e1fSApple OSS Distributions void
reader_writer_fn(int * data,int isize)162*e3723e1fSApple OSS Distributions reader_writer_fn(int *data, int isize)
163*e3723e1fSApple OSS Distributions {
164*e3723e1fSApple OSS Distributions int i;
165*e3723e1fSApple OSS Distributions
166*e3723e1fSApple OSS Distributions for (i = 0; i < isize; i++) {
167*e3723e1fSApple OSS Distributions data[i] += 1;
168*e3723e1fSApple OSS Distributions }
169*e3723e1fSApple OSS Distributions }
170*e3723e1fSApple OSS Distributions
171*e3723e1fSApple OSS Distributions /*
172*e3723e1fSApple OSS Distributions * This is the central function for every thread.
173*e3723e1fSApple OSS Distributions * For each invocation, its role is ets by (a pointer to) a stage_info_t.
174*e3723e1fSApple OSS Distributions */
175*e3723e1fSApple OSS Distributions void *
manager_fn(void * arg)176*e3723e1fSApple OSS Distributions manager_fn(void *arg)
177*e3723e1fSApple OSS Distributions {
178*e3723e1fSApple OSS Distributions stage_info_t *sp = (stage_info_t *) arg;
179*e3723e1fSApple OSS Distributions line_info_t *lp = sp->set;
180*e3723e1fSApple OSS Distributions kern_return_t ret;
181*e3723e1fSApple OSS Distributions long iteration = 0;
182*e3723e1fSApple OSS Distributions
183*e3723e1fSApple OSS Distributions /*
184*e3723e1fSApple OSS Distributions * If we're using affinity sets (we are by default)
185*e3723e1fSApple OSS Distributions * set our tag to by our thread set number.
186*e3723e1fSApple OSS Distributions */
187*e3723e1fSApple OSS Distributions thread_extended_policy_data_t epolicy;
188*e3723e1fSApple OSS Distributions thread_affinity_policy_data_t policy;
189*e3723e1fSApple OSS Distributions
190*e3723e1fSApple OSS Distributions epolicy.timeshare = FALSE;
191*e3723e1fSApple OSS Distributions ret = thread_policy_set(
192*e3723e1fSApple OSS Distributions mach_thread_self(), THREAD_EXTENDED_POLICY,
193*e3723e1fSApple OSS Distributions (thread_policy_t) &epolicy,
194*e3723e1fSApple OSS Distributions THREAD_EXTENDED_POLICY_COUNT);
195*e3723e1fSApple OSS Distributions if (ret != KERN_SUCCESS) {
196*e3723e1fSApple OSS Distributions printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
197*e3723e1fSApple OSS Distributions }
198*e3723e1fSApple OSS Distributions
199*e3723e1fSApple OSS Distributions if (affinity) {
200*e3723e1fSApple OSS Distributions policy.affinity_tag = lp->setnum;
201*e3723e1fSApple OSS Distributions ret = thread_policy_set(
202*e3723e1fSApple OSS Distributions mach_thread_self(), THREAD_AFFINITY_POLICY,
203*e3723e1fSApple OSS Distributions (thread_policy_t) &policy,
204*e3723e1fSApple OSS Distributions THREAD_AFFINITY_POLICY_COUNT);
205*e3723e1fSApple OSS Distributions if (ret != KERN_SUCCESS) {
206*e3723e1fSApple OSS Distributions printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
207*e3723e1fSApple OSS Distributions }
208*e3723e1fSApple OSS Distributions }
209*e3723e1fSApple OSS Distributions
210*e3723e1fSApple OSS Distributions DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum);
211*e3723e1fSApple OSS Distributions
212*e3723e1fSApple OSS Distributions /*
213*e3723e1fSApple OSS Distributions * Start barrier.
214*e3723e1fSApple OSS Distributions * The tets thread to get here releases everyone and starts the timer.
215*e3723e1fSApple OSS Distributions */
216*e3723e1fSApple OSS Distributions pthread_mutex_lock(&funnel);
217*e3723e1fSApple OSS Distributions threads_ready++;
218*e3723e1fSApple OSS Distributions if (threads_ready == threads) {
219*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&funnel);
220*e3723e1fSApple OSS Distributions if (halting) {
221*e3723e1fSApple OSS Distributions printf(" all threads ready for process %d, "
222*e3723e1fSApple OSS Distributions "hit any key to start", getpid());
223*e3723e1fSApple OSS Distributions fflush(stdout);
224*e3723e1fSApple OSS Distributions (void) getchar();
225*e3723e1fSApple OSS Distributions }
226*e3723e1fSApple OSS Distributions pthread_cond_broadcast(&barrier);
227*e3723e1fSApple OSS Distributions timer = mach_absolute_time();
228*e3723e1fSApple OSS Distributions } else {
229*e3723e1fSApple OSS Distributions pthread_cond_wait(&barrier, &funnel);
230*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&funnel);
231*e3723e1fSApple OSS Distributions }
232*e3723e1fSApple OSS Distributions
233*e3723e1fSApple OSS Distributions do {
234*e3723e1fSApple OSS Distributions int i;
235*e3723e1fSApple OSS Distributions work_t *workp;
236*e3723e1fSApple OSS Distributions
237*e3723e1fSApple OSS Distributions /*
238*e3723e1fSApple OSS Distributions * Get a buffer from the input queue.
239*e3723e1fSApple OSS Distributions * Block if none.
240*e3723e1fSApple OSS Distributions */
241*e3723e1fSApple OSS Distributions pthread_mutex_lock(&sp->input->mtx);
242*e3723e1fSApple OSS Distributions while (1) {
243*e3723e1fSApple OSS Distributions workp = TAILQ_FIRST(&(sp->input->queue));
244*e3723e1fSApple OSS Distributions if (workp != NULL) {
245*e3723e1fSApple OSS Distributions break;
246*e3723e1fSApple OSS Distributions }
247*e3723e1fSApple OSS Distributions DBG(" %s[%d,%d] iteration %d waiting for buffer\n",
248*e3723e1fSApple OSS Distributions sp->name, lp->setnum, sp->stagenum, iteration);
249*e3723e1fSApple OSS Distributions sp->input->waiters = TRUE;
250*e3723e1fSApple OSS Distributions pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
251*e3723e1fSApple OSS Distributions sp->input->waiters = FALSE;
252*e3723e1fSApple OSS Distributions }
253*e3723e1fSApple OSS Distributions TAILQ_REMOVE(&(sp->input->queue), workp, link);
254*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&sp->input->mtx);
255*e3723e1fSApple OSS Distributions
256*e3723e1fSApple OSS Distributions DBG(" %s[%d,%d] iteration %d work %p data %p\n",
257*e3723e1fSApple OSS Distributions sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data);
258*e3723e1fSApple OSS Distributions
259*e3723e1fSApple OSS Distributions /* Do our stuff with the buffer */
260*e3723e1fSApple OSS Distributions (void) sp->fn(workp->data, lp->isize);
261*e3723e1fSApple OSS Distributions
262*e3723e1fSApple OSS Distributions /*
263*e3723e1fSApple OSS Distributions * Place the buffer on the input queue.
264*e3723e1fSApple OSS Distributions * Signal waiters if required.
265*e3723e1fSApple OSS Distributions */
266*e3723e1fSApple OSS Distributions pthread_mutex_lock(&sp->output->mtx);
267*e3723e1fSApple OSS Distributions TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
268*e3723e1fSApple OSS Distributions if (sp->output->waiters) {
269*e3723e1fSApple OSS Distributions DBG(" %s[%d,%d] iteration %d signaling work\n",
270*e3723e1fSApple OSS Distributions sp->name, lp->setnum, sp->stagenum, iteration);
271*e3723e1fSApple OSS Distributions pthread_cond_signal(&sp->output->cnd);
272*e3723e1fSApple OSS Distributions }
273*e3723e1fSApple OSS Distributions pthread_mutex_unlock(&sp->output->mtx);
274*e3723e1fSApple OSS Distributions } while (++iteration < iterations);
275*e3723e1fSApple OSS Distributions
276*e3723e1fSApple OSS Distributions DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum);
277*e3723e1fSApple OSS Distributions
278*e3723e1fSApple OSS Distributions return (void *) iteration;
279*e3723e1fSApple OSS Distributions }
280*e3723e1fSApple OSS Distributions
281*e3723e1fSApple OSS Distributions #define MAX_CACHE_DEPTH 10
282*e3723e1fSApple OSS Distributions static void
auto_config(int npages,int * nbufs,int * nsets)283*e3723e1fSApple OSS Distributions auto_config(int npages, int *nbufs, int *nsets)
284*e3723e1fSApple OSS Distributions {
285*e3723e1fSApple OSS Distributions size_t len;
286*e3723e1fSApple OSS Distributions int ncpu;
287*e3723e1fSApple OSS Distributions int llc;
288*e3723e1fSApple OSS Distributions int64_t cacheconfig[MAX_CACHE_DEPTH];
289*e3723e1fSApple OSS Distributions int64_t cachesize[MAX_CACHE_DEPTH];
290*e3723e1fSApple OSS Distributions
291*e3723e1fSApple OSS Distributions mutter("Autoconfiguring...\n");
292*e3723e1fSApple OSS Distributions
293*e3723e1fSApple OSS Distributions len = sizeof(cacheconfig);
294*e3723e1fSApple OSS Distributions if (sysctlbyname("hw.cacheconfig",
295*e3723e1fSApple OSS Distributions &cacheconfig[0], &len, NULL, 0) != 0) {
296*e3723e1fSApple OSS Distributions printf("Unable to get hw.cacheconfig, %d\n", errno);
297*e3723e1fSApple OSS Distributions exit(1);
298*e3723e1fSApple OSS Distributions }
299*e3723e1fSApple OSS Distributions len = sizeof(cachesize);
300*e3723e1fSApple OSS Distributions if (sysctlbyname("hw.cachesize",
301*e3723e1fSApple OSS Distributions &cachesize[0], &len, NULL, 0) != 0) {
302*e3723e1fSApple OSS Distributions printf("Unable to get hw.cachesize, %d\n", errno);
303*e3723e1fSApple OSS Distributions exit(1);
304*e3723e1fSApple OSS Distributions }
305*e3723e1fSApple OSS Distributions
306*e3723e1fSApple OSS Distributions /*
307*e3723e1fSApple OSS Distributions * Find LLC
308*e3723e1fSApple OSS Distributions */
309*e3723e1fSApple OSS Distributions for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--) {
310*e3723e1fSApple OSS Distributions if (cacheconfig[llc] != 0) {
311*e3723e1fSApple OSS Distributions break;
312*e3723e1fSApple OSS Distributions }
313*e3723e1fSApple OSS Distributions }
314*e3723e1fSApple OSS Distributions
315*e3723e1fSApple OSS Distributions /*
316*e3723e1fSApple OSS Distributions * Calculate number of buffers of size pages*4096 bytes
317*e3723e1fSApple OSS Distributions * fit into 90% of an L2 cache.
318*e3723e1fSApple OSS Distributions */
319*e3723e1fSApple OSS Distributions *nbufs = cachesize[llc] * 9 / (npages * 4096 * 10);
320*e3723e1fSApple OSS Distributions mutter(" L%d (LLC) cache %qd bytes: "
321*e3723e1fSApple OSS Distributions "using %d buffers of size %d bytes\n",
322*e3723e1fSApple OSS Distributions llc, cachesize[llc], *nbufs, (npages * 4096));
323*e3723e1fSApple OSS Distributions
324*e3723e1fSApple OSS Distributions /*
325*e3723e1fSApple OSS Distributions * Calcalute how many sets:
326*e3723e1fSApple OSS Distributions */
327*e3723e1fSApple OSS Distributions *nsets = cacheconfig[0] / cacheconfig[llc];
328*e3723e1fSApple OSS Distributions mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n",
329*e3723e1fSApple OSS Distributions cacheconfig[0], cacheconfig[llc], llc, *nsets);
330*e3723e1fSApple OSS Distributions }
331*e3723e1fSApple OSS Distributions
332*e3723e1fSApple OSS Distributions void (*producer_fnp)(int *data, int isize) = &writer_fn;
333*e3723e1fSApple OSS Distributions void (*consumer_fnp)(int *data, int isize) = &reader_fn;
334*e3723e1fSApple OSS Distributions
335*e3723e1fSApple OSS Distributions int
main(int argc,char * argv[])336*e3723e1fSApple OSS Distributions main(int argc, char *argv[])
337*e3723e1fSApple OSS Distributions {
338*e3723e1fSApple OSS Distributions int i;
339*e3723e1fSApple OSS Distributions int j;
340*e3723e1fSApple OSS Distributions int pages = 256; /* 1MB */
341*e3723e1fSApple OSS Distributions int buffers = 2;
342*e3723e1fSApple OSS Distributions int sets = 2;
343*e3723e1fSApple OSS Distributions int stages = 2;
344*e3723e1fSApple OSS Distributions int *status;
345*e3723e1fSApple OSS Distributions line_info_t *line_info;
346*e3723e1fSApple OSS Distributions line_info_t *lp;
347*e3723e1fSApple OSS Distributions stage_info_t *stage_info;
348*e3723e1fSApple OSS Distributions stage_info_t *sp;
349*e3723e1fSApple OSS Distributions kern_return_t ret;
350*e3723e1fSApple OSS Distributions int c;
351*e3723e1fSApple OSS Distributions
352*e3723e1fSApple OSS Distributions /* Do switch parsing: */
353*e3723e1fSApple OSS Distributions while ((c = getopt(argc, argv, "ab:chi:p:s:twv:")) != -1) {
354*e3723e1fSApple OSS Distributions switch (c) {
355*e3723e1fSApple OSS Distributions case 'a':
356*e3723e1fSApple OSS Distributions affinity = !affinity;
357*e3723e1fSApple OSS Distributions break;
358*e3723e1fSApple OSS Distributions case 'b':
359*e3723e1fSApple OSS Distributions buffers = atoi(optarg);
360*e3723e1fSApple OSS Distributions break;
361*e3723e1fSApple OSS Distributions case 'c':
362*e3723e1fSApple OSS Distributions cache_config = TRUE;
363*e3723e1fSApple OSS Distributions break;
364*e3723e1fSApple OSS Distributions case 'i':
365*e3723e1fSApple OSS Distributions iterations = atoi(optarg);
366*e3723e1fSApple OSS Distributions break;
367*e3723e1fSApple OSS Distributions case 'p':
368*e3723e1fSApple OSS Distributions pages = atoi(optarg);
369*e3723e1fSApple OSS Distributions break;
370*e3723e1fSApple OSS Distributions case 's':
371*e3723e1fSApple OSS Distributions stages = atoi(optarg);
372*e3723e1fSApple OSS Distributions if (stages >= WORKERS_MAX) {
373*e3723e1fSApple OSS Distributions usage();
374*e3723e1fSApple OSS Distributions }
375*e3723e1fSApple OSS Distributions break;
376*e3723e1fSApple OSS Distributions case 't':
377*e3723e1fSApple OSS Distributions halting = TRUE;
378*e3723e1fSApple OSS Distributions break;
379*e3723e1fSApple OSS Distributions case 'w':
380*e3723e1fSApple OSS Distributions consumer_fnp = &reader_writer_fn;
381*e3723e1fSApple OSS Distributions break;
382*e3723e1fSApple OSS Distributions case 'v':
383*e3723e1fSApple OSS Distributions verbosity = atoi(optarg);
384*e3723e1fSApple OSS Distributions break;
385*e3723e1fSApple OSS Distributions case '?':
386*e3723e1fSApple OSS Distributions case 'h':
387*e3723e1fSApple OSS Distributions default:
388*e3723e1fSApple OSS Distributions usage();
389*e3723e1fSApple OSS Distributions }
390*e3723e1fSApple OSS Distributions }
391*e3723e1fSApple OSS Distributions argc -= optind; argv += optind;
392*e3723e1fSApple OSS Distributions if (argc > 0) {
393*e3723e1fSApple OSS Distributions sets = atoi(*argv);
394*e3723e1fSApple OSS Distributions }
395*e3723e1fSApple OSS Distributions
396*e3723e1fSApple OSS Distributions if (cache_config) {
397*e3723e1fSApple OSS Distributions auto_config(pages, &buffers, &sets);
398*e3723e1fSApple OSS Distributions }
399*e3723e1fSApple OSS Distributions
400*e3723e1fSApple OSS Distributions pthread_mutex_init(&funnel, NULL);
401*e3723e1fSApple OSS Distributions pthread_cond_init(&barrier, NULL);
402*e3723e1fSApple OSS Distributions
403*e3723e1fSApple OSS Distributions /*
404*e3723e1fSApple OSS Distributions * Fire up the worker threads.
405*e3723e1fSApple OSS Distributions */
406*e3723e1fSApple OSS Distributions threads = sets * stages;
407*e3723e1fSApple OSS Distributions mutter("Launching %d set%s of %d threads with %saffinity, "
408*e3723e1fSApple OSS Distributions "consumer reads%s data\n",
409*e3723e1fSApple OSS Distributions sets, s_if_plural(sets), stages, affinity? "": "no ",
410*e3723e1fSApple OSS Distributions (consumer_fnp == &reader_writer_fn)? " and writes" : "");
411*e3723e1fSApple OSS Distributions if (pages < 256) {
412*e3723e1fSApple OSS Distributions mutter(" %dkB bytes per buffer, ", pages * 4);
413*e3723e1fSApple OSS Distributions } else {
414*e3723e1fSApple OSS Distributions mutter(" %dMB bytes per buffer, ", pages / 256);
415*e3723e1fSApple OSS Distributions }
416*e3723e1fSApple OSS Distributions mutter("%d buffer%s per set ",
417*e3723e1fSApple OSS Distributions buffers, s_if_plural(buffers));
418*e3723e1fSApple OSS Distributions if (buffers * pages < 256) {
419*e3723e1fSApple OSS Distributions mutter("(total %dkB)\n", buffers * pages * 4);
420*e3723e1fSApple OSS Distributions } else {
421*e3723e1fSApple OSS Distributions mutter("(total %dMB)\n", buffers * pages / 256);
422*e3723e1fSApple OSS Distributions }
423*e3723e1fSApple OSS Distributions mutter(" processing %d buffer%s...\n",
424*e3723e1fSApple OSS Distributions iterations, s_if_plural(iterations));
425*e3723e1fSApple OSS Distributions line_info = (line_info_t *) malloc(sets * sizeof(line_info_t));
426*e3723e1fSApple OSS Distributions stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t));
427*e3723e1fSApple OSS Distributions for (i = 0; i < sets; i++) {
428*e3723e1fSApple OSS Distributions work_t *work_array;
429*e3723e1fSApple OSS Distributions
430*e3723e1fSApple OSS Distributions lp = &line_info[i];
431*e3723e1fSApple OSS Distributions
432*e3723e1fSApple OSS Distributions lp->setnum = i + 1;
433*e3723e1fSApple OSS Distributions lp->isize = pages * 4096 / sizeof(int);
434*e3723e1fSApple OSS Distributions lp->data = (int *) malloc(buffers * pages * 4096);
435*e3723e1fSApple OSS Distributions
436*e3723e1fSApple OSS Distributions /* Set up the queue for the workers of this thread set: */
437*e3723e1fSApple OSS Distributions for (j = 0; j < stages; j++) {
438*e3723e1fSApple OSS Distributions sp = &stage_info[(i * stages) + j];
439*e3723e1fSApple OSS Distributions sp->stagenum = j;
440*e3723e1fSApple OSS Distributions sp->set = lp;
441*e3723e1fSApple OSS Distributions lp->stage[j] = sp;
442*e3723e1fSApple OSS Distributions pthread_mutex_init(&sp->bufq.mtx, NULL);
443*e3723e1fSApple OSS Distributions pthread_cond_init(&sp->bufq.cnd, NULL);
444*e3723e1fSApple OSS Distributions TAILQ_INIT(&sp->bufq.queue);
445*e3723e1fSApple OSS Distributions sp->bufq.waiters = FALSE;
446*e3723e1fSApple OSS Distributions }
447*e3723e1fSApple OSS Distributions
448*e3723e1fSApple OSS Distributions /*
449*e3723e1fSApple OSS Distributions * Take a second pass through the stages
450*e3723e1fSApple OSS Distributions * to define what the workers are and to interconnect their input/outputs
451*e3723e1fSApple OSS Distributions */
452*e3723e1fSApple OSS Distributions for (j = 0; j < stages; j++) {
453*e3723e1fSApple OSS Distributions sp = lp->stage[j];
454*e3723e1fSApple OSS Distributions if (j == 0) {
455*e3723e1fSApple OSS Distributions sp->fn = producer_fnp;
456*e3723e1fSApple OSS Distributions sp->name = "producer";
457*e3723e1fSApple OSS Distributions } else {
458*e3723e1fSApple OSS Distributions sp->fn = consumer_fnp;
459*e3723e1fSApple OSS Distributions sp->name = "consumer";
460*e3723e1fSApple OSS Distributions }
461*e3723e1fSApple OSS Distributions sp->input = &lp->stage[j]->bufq;
462*e3723e1fSApple OSS Distributions sp->output = &lp->stage[(j + 1) % stages]->bufq;
463*e3723e1fSApple OSS Distributions }
464*e3723e1fSApple OSS Distributions
465*e3723e1fSApple OSS Distributions /* Set up the buffers on the first worker of the set. */
466*e3723e1fSApple OSS Distributions work_array = (work_t *) malloc(buffers * sizeof(work_t));
467*e3723e1fSApple OSS Distributions for (j = 0; j < buffers; j++) {
468*e3723e1fSApple OSS Distributions work_array[j].data = lp->data + (lp->isize * j);
469*e3723e1fSApple OSS Distributions TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link);
470*e3723e1fSApple OSS Distributions DBG(" empty work item %p for set %d data %p\n",
471*e3723e1fSApple OSS Distributions &work_array[j], i, work_array[j].data);
472*e3723e1fSApple OSS Distributions }
473*e3723e1fSApple OSS Distributions
474*e3723e1fSApple OSS Distributions /* Create this set of threads */
475*e3723e1fSApple OSS Distributions for (j = 0; j < stages; j++) {
476*e3723e1fSApple OSS Distributions if (ret = pthread_create(&lp->stage[j]->thread, NULL,
477*e3723e1fSApple OSS Distributions &manager_fn,
478*e3723e1fSApple OSS Distributions (void *) lp->stage[j])) {
479*e3723e1fSApple OSS Distributions err(1, "pthread_create %d,%d", i, j);
480*e3723e1fSApple OSS Distributions }
481*e3723e1fSApple OSS Distributions }
482*e3723e1fSApple OSS Distributions }
483*e3723e1fSApple OSS Distributions
484*e3723e1fSApple OSS Distributions /*
485*e3723e1fSApple OSS Distributions * We sit back anf wait for the slave to finish.
486*e3723e1fSApple OSS Distributions */
487*e3723e1fSApple OSS Distributions for (i = 0; i < sets; i++) {
488*e3723e1fSApple OSS Distributions lp = &line_info[i];
489*e3723e1fSApple OSS Distributions for (j = 0; j < stages; j++) {
490*e3723e1fSApple OSS Distributions if (ret = pthread_join(lp->stage[j]->thread, (void **)&status)) {
491*e3723e1fSApple OSS Distributions err(1, "pthread_join %d,%d", i, j);
492*e3723e1fSApple OSS Distributions }
493*e3723e1fSApple OSS Distributions DBG("Thread %d,%d status %d\n", i, j, status);
494*e3723e1fSApple OSS Distributions }
495*e3723e1fSApple OSS Distributions }
496*e3723e1fSApple OSS Distributions
497*e3723e1fSApple OSS Distributions /*
498*e3723e1fSApple OSS Distributions * See how long the work took.
499*e3723e1fSApple OSS Distributions */
500*e3723e1fSApple OSS Distributions timer = mach_absolute_time() - timer;
501*e3723e1fSApple OSS Distributions timer = timer / 1000000ULL;
502*e3723e1fSApple OSS Distributions printf("%d.%03d seconds elapsed.\n",
503*e3723e1fSApple OSS Distributions (int) (timer / 1000ULL), (int) (timer % 1000ULL));
504*e3723e1fSApple OSS Distributions
505*e3723e1fSApple OSS Distributions return 0;
506*e3723e1fSApple OSS Distributions }
507