6 #include "workschedule.h"
7 #include "mlp_runtime.h"
8 #include "psemaphore.h"
9 #include "coreprof/coreprof.h"
16 #include "rcr_runtime.h"
21 //////////////////////////////////////////////////
23 // for coordination with the garbage collector
25 //////////////////////////////////////////////////
27 pthread_mutex_t gclock;
28 pthread_mutex_t gclistlock;
29 pthread_cond_t gccond;
31 // in garbage.h, listitem is a struct with a pointer
32 // to a stack, objects, etc. such that the garbage
33 // collector can find pointers for garbage collection
35 // this is a global list of listitem structs that the
36 // garbage collector uses to know about each thread
37 extern struct listitem* list;
39 // this is the local thread's item on the above list,
40 // it should be added to the global list before a thread
41 // starts doing work, and should be removed only when
42 // the thread is completely finished--in OoOJava/MLP the
43 // only thing hanging from this litem should be a single
44 // task record that the worker thread is executing, if any!
45 extern __thread struct listitem litem;
46 //////////////////////////////////////////////////
48 // end coordination with the garbage collector
50 //////////////////////////////////////////////////
55 typedef struct workerData_t {
56 pthread_t workerThread;
60 // a thread should know its worker id in any
62 static __thread int myWorkerID;
64 // the original thread starts up the work scheduler
65 // and sleeps while it is running, it has no worker
66 // ID so use this to realize that
67 static const int workerID_NOTAWORKER = 0xffffff0;
70 int numWorkSchedWorkers;
71 static WorkerData* workerDataArray;
72 static pthread_t* workerArray;
74 static void(*workFunc)(void*);
76 // each thread can create objects but should assign
77 // globally-unique object ID's (oid) so have threads
78 // give out this as next id, then increment by number
79 // of threads to ensure disjoint oid sets
82 // global array of work-stealing deques, where
83 // each thread uses its ID as the index to its deque
90 __thread struct trQueue * TRqueue=NULL;
95 // this is a read-by-all and write-by-one variable
96 // IT IS UNPROTECTED, BUT SAFE for all threads to
97 // read it (periodically, only when they can find no work)
98 // and only the worker that retires the main thread will
99 // write it to 1, at which time other workers will see
100 // that they should exit gracefully
101 static volatile int mainTaskRetired = FALSE;
106 void* workerMain( void* arg ) {
108 WorkerData* myData = (WorkerData*) arg;
109 deque* myDeque = &(deques[myData->id]);
110 int keepRunning = TRUE;
115 myWorkerID = myData->id;
117 // ensure that object ID's start at 1 so that using
118 // oid with value 0 indicates an invalid object
119 oid = myData->id + 1;
121 // each thread has a single semaphore that a running
122 // task should hand off to children threads it is
124 psem_init( &runningSESEstallSem );
126 // the worker threads really have no context relevant to the
127 // user program, so build an empty garbage list struct to
128 // pass to the collector if collection occurs
129 struct garbagelist emptygarbagelist = { 0, NULL };
132 //allocate task record queue
134 pthread_attr_t nattr;
135 pthread_attr_init( &nattr );
136 pthread_attr_setdetachstate( &nattr, PTHREAD_CREATE_DETACHED );
138 //set up the stall site SESErecord
139 stallrecord.common.offsetToParamRecords=(INTPTR) &((SESEstall *)0)->rcrRecords;
140 stallrecord.common.classID=-1;
141 stallrecord.common.rcrstatus=0;
143 //initialize rcrRecord
144 stallrecord.rcrRecords[0].next=NULL;
145 stallrecord.rcrRecords[0].index=0;
146 stallrecord.rcrRecords[0].count=0;
148 if( TRqueue == NULL ) {
152 int status = pthread_create( &thread,
157 pthread_attr_destroy( &nattr );
159 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
163 // Add this worker to the gc list
164 pthread_mutex_lock( &gclistlock );
171 pthread_mutex_unlock( &gclistlock );
174 // start timing events in this thread
178 // then continue to process work
179 while( keepRunning ) {
182 #ifdef CP_EVENTID_WORKSCHEDGRAB
183 CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
185 workUnit = (void*) 0x5;
190 workUnit = dqPopBottom( myDeque );
192 #if defined(DEBUG_DEQUE)&&!defined(SQUEUE)
193 if( workUnit == 0x0 ) {
194 printf( "Got invalid work from the deque bottom.\n" );
198 if( workUnit != DQ_POP_EMPTY ) {
203 // try to steal from another queue, starting
204 // with the last successful victim, don't check
206 for( i = 0; i < numWorkSchedWorkers - 1; ++i ) {
207 workUnit = dqPopTop( &(deques[lastVictim]) );
209 #if defined(DEBUG_DEQUE)&&!defined(SQUEUE)
210 if( workUnit == 0x0 ) {
211 printf( "Got invalid work from the deque top.\n" );
216 if( workUnit != DQ_POP_EMPTY ) {
218 if( workUnit != DQ_POP_ABORT &&
219 workUnit != DQ_POP_EMPTY ) {
226 // choose next victim
227 lastVictim++; if( lastVictim == numWorkSchedWorkers ) { lastVictim = 0; }
229 if( lastVictim == myWorkerID ) {
230 lastVictim++; if( lastVictim == numWorkSchedWorkers ) { lastVictim = 0; }
233 // end steal attempts
236 // if we successfully stole work, break out of the
237 // while-not-have-work loop, otherwise we looked
238 // everywhere, so drop down to "I'm idle" code below
244 // if we drop down this far, we didn't find any work,
245 // so do a garbage collection, yield the processor,
246 // then check if the entire system is out of work
247 if( unlikely( needtocollect ) ) {
248 checkcollect( &emptygarbagelist );
253 if( mainTaskRetired ) {
258 } // end the while-not-have-work loop
260 #ifdef CP_EVENTID_WORKSCHEDGRAB
261 CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
264 // when is no work left we will pop out
265 // here, so only do work if any left
267 // let GC see current work
268 litem.seseCommon = (void*)workUnit;
270 // unclear how useful this is
271 if( unlikely( needtocollect ) ) {
272 checkcollect( &emptygarbagelist );
275 workFunc( workUnit );
283 // remove from GC list
284 pthread_mutex_lock( &gclistlock );
286 if( litem.prev == NULL ) {
289 litem.prev->next = litem.next;
291 if( litem.next != NULL ) {
292 litem.next->prev = litem.prev;
294 pthread_mutex_unlock( &gclistlock );
301 void workScheduleInit( int numProcessors,
302 void(*func)(void*) ) {
306 // the original thread must call this now to
307 // protect memory allocation events coming
310 // the original thread is a worker
314 pthread_mutex_init( &gclock, NULL );
315 pthread_mutex_init( &gclistlock, NULL );
316 pthread_cond_init ( &gccond, NULL );
319 numWorkSchedWorkers = numProcessors + 1;
323 deques = RUNMALLOC( sizeof( deque )*numWorkSchedWorkers );
324 workerDataArray = RUNMALLOC( sizeof( WorkerData )*numWorkSchedWorkers );
326 for( i = 0; i < numWorkSchedWorkers; ++i ) {
327 dqInit( &(deques[i]) );
331 //make sure the queue is initialized
336 pthread_attr_init( &attr );
337 pthread_attr_setdetachstate( &attr,
338 PTHREAD_CREATE_JOINABLE );
340 for( i = 1; i < numWorkSchedWorkers; ++i ) {
342 workerDataArray[i].id = i;
344 status = pthread_create( &(workerDataArray[i].workerThread),
347 (void*) &(workerDataArray[i])
350 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
355 void workScheduleSubmit( void* workUnit ) {
358 if( workUnit == 0x0 ) {
359 printf( "Submitting invalid task record as work.\n" );
363 CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_BEGIN );
364 dqPushBottom( &(deques[myWorkerID]), workUnit );
365 CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_END );
369 // really should be named "wait for work in system to complete"
370 void workScheduleBegin() {
373 // wait for all workers to exit gracefully
374 for( i = 1; i < numWorkSchedWorkers; ++i ) {
375 pthread_join( workerDataArray[i].workerThread, NULL );
378 // for the original, non-worker thread to close up its events
381 // write all thread's events to disk
386 // only the worker that executes and then retires
387 // the main task should invoke this, which indicates to
388 // all other workers they should exit gracefully
389 void workScheduleExit() {