7 #include "workschedule.h"
8 #include "mlp_runtime.h"
11 // NOTE: Converting this from a work-stealing strategy
12 // to a single-queue thread pool protected by a single
13 // lock. This will not scale, but it will support
14 // development of the system for now
19 typedef struct Queue deq;
23 // each worker needs the following
24 typedef struct workerData_t {
25 pthread_t workerThread;
26 pthread_mutex_t dequeLock;
33 static pthread_mutex_t systemLock;
35 // just one queue for everyone
36 //static pthread_mutex_t dequeLock;
37 static deq* dequeWorkUnits;
41 // implementation internal data
42 static int numWorkers;
43 //static workerData* workerDataArray;
44 static pthread_t* workerArray;
46 static int systemStarted = 0;
48 //static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER;
49 static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
50 //static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
51 //static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER;
52 static void(*workFunc)(void*);
54 static pthread_cond_t workAvailCond = PTHREAD_COND_INITIALIZER;
60 int threadID2workerIndex( pthread_t id ) {
62 for( i = 0; i < numWorkers; ++i ) {
63 if( workerDataArray[i].workerThread == id ) {
67 // if we didn't find it, we are an outside
68 // thread and should pick arbitrary worker
75 // the worker thread main func, which takes a func
76 // from user for processing any one work unit, then
77 // workers use it to process work units and steal
78 // them from one another
79 void* workerMain( void* arg ) {
81 workerData* myData = (workerData*) arg;
88 // all workers wait until system is ready
89 pthread_mutex_lock ( &systemBeginLock );
90 pthread_cond_wait ( &systemBeginCond, &systemBeginLock );
91 pthread_mutex_unlock( &systemBeginLock );
96 pthread_mutex_lock( &(myData->dequeLock) );
98 if( isEmpty( myData->dequeWorkUnits ) ) {
100 // my deque is empty, try to steal
101 pthread_mutex_unlock( &(myData->dequeLock) );
104 j = myData->nextWorkerToLoad;
106 // look at everyone's queue at least twice
107 for( i = 0; i < numWorkers; ++i ) {
108 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
110 ++j; if( j == numWorkers ) { j = 0; }
112 pthread_mutex_lock( &(workerDataArray[j].dequeLock) );
114 if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) {
115 pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
116 // no work here, yield and then keep looking
117 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
121 // found some work in another deque, steal it
122 workUnit = getItemBack( workerDataArray[j].dequeWorkUnits );
123 pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
127 // didn't find any work, even in my own deque,
128 // after checking everyone twice? Exit thread
129 if( workUnit == NULL ) {
134 // have work in own deque, take out from front
135 workUnit = getItem( myData->dequeWorkUnits );
136 pthread_mutex_unlock( &(myData->dequeLock) );
139 // wherever the work came from, process it
140 workFunc( workUnit );
142 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
145 printf( "Worker %d exiting.\n", myData->workerThread );
153 void* workerMain( void* arg ) {
157 // make sure init mlp once-per-thread stuff
158 //pthread_once( &mlpOnceObj, mlpInitOncePerThread );
160 // all workers wait until system is ready
161 pthread_mutex_lock ( &systemLock );
162 while( !systemStarted ) {
163 pthread_cond_wait( &systemBeginCond, &systemLock );
165 pthread_mutex_unlock( &systemLock );
167 // then continue to process work
170 pthread_mutex_lock( &systemLock );
172 while( isEmpty( dequeWorkUnits ) ) {
173 pthread_cond_wait( &workAvailCond, &systemLock );
175 workUnit = getItem( dequeWorkUnits );
176 pthread_mutex_unlock( &systemLock );
178 // yield processor before moving on, just to exercise
179 // system's out-of-order correctness
180 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
181 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
183 workFunc( workUnit );
191 void workScheduleInit( int numProcessors,
192 void(*func)(void*) ) {
195 numWorkers = numProcessors;
198 // allocate space for worker data
199 workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers );
201 for( i = 0; i < numWorkers; ++i ) {
204 workerDataArray[i].dequeWorkUnits = createQueue();
206 // set the next worker to add work to as itself
207 workerDataArray[i].nextWorkerToLoad = i;
210 status = pthread_mutex_init( &(workerDataArray[i].dequeLock),
213 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
216 // only create the actual pthreads after all workers
217 // have data that is protected with initialized locks
218 for( i = 0; i < numWorkers; ++i ) {
219 status = pthread_create( &(workerDataArray[i].workerThread),
222 (void*) &(workerDataArray[i])
224 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
227 // yield and let all workers get to the begin
228 // condition variable, waiting--we have to hold them
229 // so they don't all see empty work queues right away
230 if( sched_yield() == -1 ) {
231 printf( "Error thread trying to yield.\n" );
238 void workScheduleInit( int numProcessors,
239 void(*func)(void*) ) {
242 numWorkers = numProcessors*5;
245 dequeWorkUnits = createQueue();
247 status = pthread_mutex_init( &systemLock, NULL );
248 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
250 workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
252 for( i = 0; i < numWorkers; ++i ) {
253 status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL );
254 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
256 // yield and let all workers get to the beginx3
257 // condition variable, waiting--we have to hold them
258 // so they don't all see empty work queues right away
259 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
265 void workScheduleSubmit( void* workUnit ) {
267 // query who is submitting and find out who they are scheduled to load
268 int submitterIndex = threadID2workerIndex( pthread_self() );
269 int workerIndex = workerDataArray[submitterIndex].nextWorkerToLoad;
271 // choose a new index and save it
273 if( workerIndex == numWorkers ) {
276 workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex;
278 // load the chosen worker
279 pthread_mutex_lock ( &(workerDataArray[workerIndex].dequeLock) );
280 addNewItemBack ( workerDataArray[workerIndex].dequeWorkUnits, workUnit );
281 pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
285 void workScheduleSubmit( void* workUnit ) {
286 pthread_mutex_lock ( &systemLock );
287 addNewItemBack ( dequeWorkUnits, workUnit );
288 pthread_cond_signal( &workAvailCond );
289 pthread_mutex_unlock( &systemLock );
293 // really should be named "wait until work is finished"
294 void workScheduleBegin() {
298 // tell all workers to begin
299 pthread_mutex_lock ( &systemLock );
301 pthread_cond_broadcast( &systemBeginCond );
302 pthread_mutex_unlock ( &systemLock );
304 for( i = 0; i < numWorkers; ++i ) {
305 pthread_join( workerArray[i], NULL );