6 #include "workschedule.h"
7 #include "mlp_runtime.h"
8 #include "psemaphore.h"
9 #include "coreprof/coreprof.h"
16 #include "rcr_runtime.h"
21 //////////////////////////////////////////////////
23 // for coordination with the garbage collector
25 //////////////////////////////////////////////////
27 pthread_mutex_t gclock;
28 pthread_mutex_t gclistlock;
29 pthread_cond_t gccond;
31 extern pthread_mutex_t queuelock;
33 // in garbage.h, listitem is a struct with a pointer
34 // to a stack, objects, etc. such that the garbage
35 // collector can find pointers for garbage collection
37 // this is a global list of listitem structs that the
38 // garbage collector uses to know about each thread
39 extern struct listitem* list;
41 // this is the local thread's item on the above list,
42 // it should be added to the global list before a thread
43 // starts doing work, and should be removed only when
44 // the thread is completely finished--in OoOJava/MLP the
45 // only thing hanging from this litem should be a single
46 // task record that the worker thread is executing, if any!
47 extern __thread struct listitem litem;
48 //////////////////////////////////////////////////
50 // end coordination with the garbage collector
52 //////////////////////////////////////////////////
57 typedef struct workerData_t {
58 pthread_t workerThread;
62 // a thread should know its worker id in any
64 __thread int myWorkerID;
66 // the original thread starts up the work scheduler
67 // and sleeps while it is running, it has no worker
68 // ID so use this to realize that
69 const int workerID_NOTAWORKER = 0xffffff0;
72 volatile int numWorkSchedWorkers;
73 int realnumWorkSchedWorkers;
74 static WorkerData* workerDataArray;
75 static pthread_t* workerArray;
77 static void(*workFunc)(void*);
79 // each thread can create objects but should assign
80 // globally-unique object ID's (oid) so have threads
81 // give out this as next id, then increment by number
82 // of threads to ensure disjoint oid sets
85 // global array of work-stealing deques, where
86 // each thread uses its ID as the index to its deque
93 __thread struct trQueue * TRqueue=NULL;
98 // this is a read-by-all and write-by-one variable
99 // IT IS UNPROTECTED, BUT SAFE for all threads to
100 // read it (periodically, only when they can find no work)
101 // and only the worker that retires the main thread will
102 // write it to 1, at which time other workers will see
103 // that they should exit gracefully
104 static volatile int mainTaskRetired = FALSE;
109 void* workerMain( void* arg ) {
111 WorkerData* myData = (WorkerData*) arg;
112 deque* myDeque = &(deques[myData->id]);
113 int keepRunning = TRUE;
118 myWorkerID = myData->id;
120 // ensure that object ID's start at 1 so that using
121 // oid with value 0 indicates an invalid object
122 oid = myData->id + 1;
124 // each thread has a single semaphore that a running
125 // task should hand off to children threads it is
127 psem_init( &runningSESEstallSem );
129 // the worker threads really have no context relevant to the
130 // user program, so build an empty garbage list struct to
131 // pass to the collector if collection occurs
132 struct garbagelist emptygarbagelist = { 0, NULL };
134 // Add this worker to the gc list
135 pthread_mutex_lock( &gclistlock );
142 pthread_mutex_unlock( &gclistlock );
145 // start timing events in this thread
149 // then continue to process work
150 while( keepRunning ) {
153 #ifdef CP_EVENTID_WORKSCHEDGRAB
154 CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
160 workUnit = dqPopBottom( myDeque );
163 if( workUnit != DQ_POP_EMPTY ) {
167 // try to steal from another queue, starting
168 // with the last successful victim, don't check
170 int mynumWorkSchedWorkers=numWorkSchedWorkers;
171 for( i = 0; i < mynumWorkSchedWorkers - 1; ++i ) {
173 workUnit = dqPopTop( &(deques[lastVictim]) );
176 if( workUnit != DQ_POP_EMPTY ) {
178 if( workUnit != DQ_POP_ABORT &&
179 workUnit != DQ_POP_EMPTY ) {
186 // choose next victim
187 lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) { lastVictim = 0; }
189 if( lastVictim == myWorkerID ) {
190 lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) { lastVictim = 0; }
193 // end steal attempts
196 // if we successfully stole work, break out of the
197 // while-not-have-work loop, otherwise we looked
198 // everywhere, so drop down to "I'm idle" code below
204 // if we drop down this far, we didn't find any work,
205 // so do a garbage collection, yield the processor,
206 // then check if the entire system is out of work
207 if( unlikely( needtocollect ) ) {
208 checkcollect( &emptygarbagelist );
213 if( mainTaskRetired ) {
218 } // end the while-not-have-work loop
222 #ifdef CP_EVENTID_WORKSCHEDGRAB
223 CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
226 // when is no work left we will pop out
227 // here, so only do work if any left
229 // let GC see current work
230 litem.seseCommon = (void*)workUnit;
233 if( workUnit == NULL ) {
234 printf( "About to execute a null work item\n" );
238 workFunc( workUnit );
246 // remove from GC list
247 pthread_mutex_lock( &gclistlock );
249 if( litem.prev == NULL ) {
252 litem.prev->next = litem.next;
254 if( litem.next != NULL ) {
255 litem.next->prev = litem.prev;
257 pthread_mutex_unlock( &gclistlock );
264 void workScheduleInit( int numProcessors,
265 void(*func)(void*) ) {
269 // the original thread must call this now to
270 // protect memory allocation events coming
273 // the original thread is a worker
278 pthread_mutex_init( &queuelock, NULL );
280 pthread_mutex_init( &gclock, NULL );
281 pthread_mutex_init( &gclistlock, NULL );
282 pthread_cond_init ( &gccond, NULL );
285 numWorkSchedWorkers = numProcessors;
286 realnumWorkSchedWorkers=numProcessors;
291 deques = RUNMALLOC( sizeof( deque )*numWorkSchedWorkers*2);
293 deques = RUNMALLOC( sizeof( deque )*numWorkSchedWorkers );
295 workerDataArray = RUNMALLOC( sizeof( WorkerData )*numWorkSchedWorkers );
298 for( i = 0; i < numWorkSchedWorkers*2; ++i ) {
300 for( i = 0; i < numWorkSchedWorkers; ++i ) {
302 dqInit( &(deques[i]) );
305 pthread_attr_init( &attr );
306 pthread_attr_setdetachstate( &attr,
307 PTHREAD_CREATE_JOINABLE );
309 workerDataArray[0].id = 0;
311 for( i = 1; i < numWorkSchedWorkers; ++i ) {
313 workerDataArray[i].id = i;
315 status = pthread_create( &(workerDataArray[i].workerThread),
318 (void*) &(workerDataArray[i])
321 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
326 void workScheduleSubmit( void* workUnit ) {
327 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
328 CP_LOGEVENT( CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_BEGIN );
330 dqPushBottom( &(deques[myWorkerID]), workUnit );
331 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
332 CP_LOGEVENT( CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_END );
337 // really should be named "wait for work in system to complete"
338 void workScheduleBegin() {
341 // original thread becomes a worker
342 workerMain( (void*) &(workerDataArray[0]) );
344 // then wait for all other workers to exit gracefully
345 for( i = 1; i < realnumWorkSchedWorkers; ++i ) {
346 pthread_join( workerDataArray[i].workerThread, NULL );
349 // write all thread's events to disk
354 // only the worker that executes and then retires
355 // the main task should invoke this, which indicates to
356 // all other workers they should exit gracefully
357 void workScheduleExit() {