7 #include "workschedule.h"
8 #include "mlp_runtime.h"
11 // NOTE: Converting this from a work-stealing strategy
12 // to a single-queue thread pool protected by a single
13 // lock. This will not scale, but it will support
14 // development of the system for now
19 typedef struct Queue deq;
23 // each worker needs the following
24 typedef struct workerData_t {
25 pthread_t workerThread;
26 pthread_mutex_t dequeLock;
33 static pthread_mutex_t systemLock;
35 // just one queue for everyone
36 //static pthread_mutex_t dequeLock;
37 static deq* dequeWorkUnits;
41 // implementation internal data
42 static int numWorkers;
43 //static workerData* workerDataArray;
44 static pthread_t* workerArray;
46 static int systemStarted = 0;
48 //static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER;
49 static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
50 //static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
51 //static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER;
52 static void(*workFunc)(void*);
54 static pthread_cond_t workAvailCond = PTHREAD_COND_INITIALIZER;
58 pthread_mutex_t gclock;
59 pthread_mutex_t gclistlock;
60 pthread_cond_t gccond;
64 int threadID2workerIndex( pthread_t id ) {
66 for( i = 0; i < numWorkers; ++i ) {
67 if( workerDataArray[i].workerThread == id ) {
71 // if we didn't find it, we are an outside
72 // thread and should pick arbitrary worker
79 // the worker thread main func, which takes a func
80 // from user for processing any one work unit, then
81 // workers use it to process work units and steal
82 // them from one another
83 void* workerMain( void* arg ) {
85 workerData* myData = (workerData*) arg;
92 // all workers wait until system is ready
93 pthread_mutex_lock ( &systemBeginLock );
94 pthread_cond_wait ( &systemBeginCond, &systemBeginLock );
95 pthread_mutex_unlock( &systemBeginLock );
100 pthread_mutex_lock( &(myData->dequeLock) );
102 if( isEmpty( myData->dequeWorkUnits ) ) {
104 // my deque is empty, try to steal
105 pthread_mutex_unlock( &(myData->dequeLock) );
108 j = myData->nextWorkerToLoad;
110 // look at everyone's queue at least twice
111 for( i = 0; i < numWorkers; ++i ) {
112 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
114 ++j; if( j == numWorkers ) { j = 0; }
116 pthread_mutex_lock( &(workerDataArray[j].dequeLock) );
118 if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) {
119 pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
120 // no work here, yield and then keep looking
121 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
125 // found some work in another deque, steal it
126 workUnit = getItemBack( workerDataArray[j].dequeWorkUnits );
127 pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
131 // didn't find any work, even in my own deque,
132 // after checking everyone twice? Exit thread
133 if( workUnit == NULL ) {
138 // have work in own deque, take out from front
139 workUnit = getItem( myData->dequeWorkUnits );
140 pthread_mutex_unlock( &(myData->dequeLock) );
143 // wherever the work came from, process it
144 workFunc( workUnit );
146 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
149 printf( "Worker %d exiting.\n", myData->workerThread );
157 void* workerMain( void* arg ) {
161 // make sure init mlp once-per-thread stuff
162 //pthread_once( &mlpOnceObj, mlpInitOncePerThread );
164 // all workers wait until system is ready
165 pthread_mutex_lock ( &systemLock );
166 while( !systemStarted ) {
167 pthread_cond_wait( &systemBeginCond, &systemLock );
169 pthread_mutex_unlock( &systemLock );
171 // then continue to process work
174 pthread_mutex_lock( &systemLock );
176 while( isEmpty( dequeWorkUnits ) ) {
177 pthread_cond_wait( &workAvailCond, &systemLock );
179 workUnit = getItem( dequeWorkUnits );
180 pthread_mutex_unlock( &systemLock );
182 // yield processor before moving on, just to exercise
183 // system's out-of-order correctness
184 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
185 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
187 workFunc( workUnit );
195 void workScheduleInit( int numProcessors,
196 void(*func)(void*) ) {
199 numWorkers = numProcessors;
202 // allocate space for worker data
203 workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers );
205 for( i = 0; i < numWorkers; ++i ) {
208 workerDataArray[i].dequeWorkUnits = createQueue();
210 // set the next worker to add work to as itself
211 workerDataArray[i].nextWorkerToLoad = i;
214 status = pthread_mutex_init( &(workerDataArray[i].dequeLock),
217 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
220 // only create the actual pthreads after all workers
221 // have data that is protected with initialized locks
222 for( i = 0; i < numWorkers; ++i ) {
223 status = pthread_create( &(workerDataArray[i].workerThread),
226 (void*) &(workerDataArray[i])
228 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
231 // yield and let all workers get to the begin
232 // condition variable, waiting--we have to hold them
233 // so they don't all see empty work queues right away
234 if( sched_yield() == -1 ) {
235 printf( "Error thread trying to yield.\n" );
242 void workScheduleInit( int numProcessors,
243 void(*func)(void*) ) {
247 pthread_mutex_init(&gclock, NULL);
248 pthread_mutex_init(&gclistlock, NULL);
249 pthread_cond_init(&gccond, NULL);
251 numWorkers = numProcessors*5;
254 dequeWorkUnits = createQueue();
256 status = pthread_mutex_init( &systemLock, NULL );
257 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
259 workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
261 for( i = 0; i < numWorkers; ++i ) {
262 status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL );
263 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
265 // yield and let all workers get to the beginx3
266 // condition variable, waiting--we have to hold them
267 // so they don't all see empty work queues right away
268 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
274 void workScheduleSubmit( void* workUnit ) {
276 // query who is submitting and find out who they are scheduled to load
277 int submitterIndex = threadID2workerIndex( pthread_self() );
278 int workerIndex = workerDataArray[submitterIndex].nextWorkerToLoad;
280 // choose a new index and save it
282 if( workerIndex == numWorkers ) {
285 workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex;
287 // load the chosen worker
288 pthread_mutex_lock ( &(workerDataArray[workerIndex].dequeLock) );
289 addNewItemBack ( workerDataArray[workerIndex].dequeWorkUnits, workUnit );
290 pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
294 void workScheduleSubmit( void* workUnit ) {
295 pthread_mutex_lock ( &systemLock );
296 addNewItemBack ( dequeWorkUnits, workUnit );
297 pthread_cond_signal( &workAvailCond );
298 pthread_mutex_unlock( &systemLock );
302 // really should be named "wait until work is finished"
303 void workScheduleBegin() {
307 // tell all workers to begin
308 pthread_mutex_lock ( &systemLock );
310 pthread_cond_broadcast( &systemBeginCond );
311 pthread_mutex_unlock ( &systemLock );
313 for( i = 0; i < numWorkers; ++i ) {
314 pthread_join( workerArray[i], NULL );