e3c9db2ae60e32bcf7ed1fe65a0b03b1c80e6f12
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 #include "mem.h"
6 #include "workschedule.h"
7 #include "mlp_runtime.h"
8 #include "psemaphore.h"
9 #include "coreprof/coreprof.h"
10 #ifdef SQUEUE
11 #include "squeue.h"
12 #else
13 #include "deque.h"
14 #endif
15 #ifdef RCR
16 #include "rcr_runtime.h"
17 #include "trqueue.h"
18 #endif
19
20
21 //////////////////////////////////////////////////
22 //
23 //  for coordination with the garbage collector
24 //
25 //////////////////////////////////////////////////
26 int threadcount;
27 pthread_mutex_t gclock;
28 pthread_mutex_t gclistlock;
29 pthread_cond_t gccond;
30
31 // in garbage.h, listitem is a struct with a pointer
32 // to a stack, objects, etc. such that the garbage
33 // collector can find pointers for garbage collection
34
35 // this is a global list of listitem structs that the
36 // garbage collector uses to know about each thread
37 extern struct listitem* list;
38
39 // this is the local thread's item on the above list,
40 // it should be added to the global list before a thread
41 // starts doing work, and should be removed only when
42 // the thread is completely finished--in OoOJava/MLP the
43 // only thing hanging from this litem should be a single
44 // task record that the worker thread is executing, if any!
45 extern __thread struct listitem litem;
46 //////////////////////////////////////////////////
47 //
48 //  end coordination with the garbage collector
49 //
50 //////////////////////////////////////////////////
51
52
53
54
55 typedef struct workerData_t {
56   pthread_t workerThread;
57   int       id;
58 } WorkerData;
59
60 // a thread should know its worker id in any
61 // functions below
62 static __thread int myWorkerID;
63
64 // the original thread starts up the work scheduler
65 // and sleeps while it is running, it has no worker
66 // ID so use this to realize that
67 static const int workerID_NOTAWORKER = 0xffffff0;
68
69
70 int numWorkSchedWorkers;
71 static WorkerData*  workerDataArray;
72 static pthread_t*   workerArray;
73
74 static void(*workFunc)(void*);
75
76 // each thread can create objects but should assign
77 // globally-unique object ID's (oid) so have threads
78 // give out this as next id, then increment by number
79 // of threads to ensure disjoint oid sets
80 __thread int oid;
81
82 // global array of work-stealing deques, where
83 // each thread uses its ID as the index to its deque
84 deque* deques;
85
86
87
88 #ifdef RCR
89 #include "trqueue.h"
90 __thread struct trQueue * TRqueue=NULL;
91 #endif
92
93
94
95 // this is a read-by-all and write-by-one variable
96 // IT IS UNPROTECTED, BUT SAFE for all threads to
97 // read it (periodically, only when they can find no work)
98 // and only the worker that retires the main thread will
99 // write it to 1, at which time other workers will see
100 // that they should exit gracefully
101 static volatile int mainTaskRetired = FALSE;
102
103
104
105
106 void* workerMain( void* arg ) {
107   void*       workUnit;
108   WorkerData* myData  = (WorkerData*) arg;
109   deque*      myDeque = &(deques[myData->id]);
110   int         keepRunning = TRUE;
111   int         haveWork;
112   int         lastVictim = 0;
113   int         i;
114
115   myWorkerID = myData->id;
116
117   // ensure that object ID's start at 1 so that using
118   // oid with value 0 indicates an invalid object
119   oid = myData->id + 1;
120
121   // each thread has a single semaphore that a running
122   // task should hand off to children threads it is
123   // going to stall on
124   psem_init( &runningSESEstallSem );
125
126   // the worker threads really have no context relevant to the
127   // user program, so build an empty garbage list struct to
128   // pass to the collector if collection occurs
129   struct garbagelist emptygarbagelist = { 0, NULL };
130
131 #ifdef RCR
132   //allocate task record queue
133   pthread_t thread;
134   pthread_attr_t nattr;  
135   pthread_attr_init( &nattr );
136   pthread_attr_setdetachstate( &nattr, PTHREAD_CREATE_DETACHED );
137
138   //set up the stall site SESErecord
139   stallrecord.common.offsetToParamRecords=(INTPTR) &((SESEstall *)0)->rcrRecords;
140   stallrecord.common.classID=-1;
141   stallrecord.common.rcrstatus=0;
142
143   //initialize rcrRecord
144   stallrecord.rcrRecords[0].next=NULL;
145   stallrecord.rcrRecords[0].index=0;
146   stallrecord.rcrRecords[0].count=0;
147
148   if( TRqueue == NULL ) {
149     TRqueue = allocTR();
150   }
151
152   int status = pthread_create( &thread,
153                                NULL,
154                                workerTR,
155                                (void*) TRqueue );
156
157   pthread_attr_destroy( &nattr );
158
159   if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
160 #endif
161
162
163   // Add this worker to the gc list
164   pthread_mutex_lock( &gclistlock );
165   threadcount++;
166   litem.prev = NULL;
167   litem.next = list;
168   if( list != NULL ) 
169     list->prev = &litem;
170   list = &litem;
171   pthread_mutex_unlock( &gclistlock );
172
173
174   // start timing events in this thread
175   CP_CREATE();
176
177
178   // then continue to process work
179   while( keepRunning ) {
180
181     // wait for work
182 #ifdef CP_EVENTID_WORKSCHEDGRAB
183     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
184 #endif
185     workUnit = (void*) 0x5;
186     haveWork = FALSE;
187
188     while( !haveWork ) {
189
190       workUnit = dqPopBottom( myDeque );
191
192 #if defined(DEBUG_DEQUE)&&!defined(SQUEUE)
193       if( workUnit == 0x0 ) {
194         printf( "Got invalid work from the deque bottom.\n" );
195       }
196 #endif
197
198       if( workUnit != DQ_POP_EMPTY ) {
199         haveWork = TRUE;
200         break;
201         
202       } else {
203         // try to steal from another queue, starting
204         // with the last successful victim, don't check
205         // your own deque
206         for( i = 0; i < numWorkSchedWorkers - 1; ++i ) {
207           workUnit = dqPopTop( &(deques[lastVictim]) );
208
209 #if defined(DEBUG_DEQUE)&&!defined(SQUEUE)
210           if( workUnit == 0x0 ) {
211             printf( "Got invalid work from the deque top.\n" );
212           }
213 #endif
214           
215 #ifdef SQUEUE
216           if( workUnit != DQ_POP_EMPTY ) {
217 #else
218           if( workUnit != DQ_POP_ABORT &&
219               workUnit != DQ_POP_EMPTY ) {
220 #endif
221             // successful steal!
222             haveWork = TRUE;
223             break;
224           }
225        
226           // choose next victim
227           lastVictim++; if( lastVictim == numWorkSchedWorkers ) { lastVictim = 0; }
228           
229           if( lastVictim == myWorkerID ) {
230             lastVictim++; if( lastVictim == numWorkSchedWorkers ) { lastVictim = 0; }
231           }
232         }
233         // end steal attempts
234
235
236         // if we successfully stole work, break out of the
237         // while-not-have-work loop, otherwise we looked
238         // everywhere, so drop down to "I'm idle" code below
239         if( haveWork ) {
240           break;
241         }
242       }
243
244       // if we drop down this far, we didn't find any work,
245       // so do a garbage collection, yield the processor,
246       // then check if the entire system is out of work
247       if( unlikely( needtocollect ) ) {
248         checkcollect( &emptygarbagelist );
249       }
250
251       sched_yield();
252
253       if( mainTaskRetired ) {
254         keepRunning = FALSE;
255         break;
256       }
257
258     } // end the while-not-have-work loop
259
260 #ifdef CP_EVENTID_WORKSCHEDGRAB
261     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
262 #endif
263     
264     // when is no work left we will pop out
265     // here, so only do work if any left
266     if( haveWork ) {
267       // let GC see current work
268       litem.seseCommon = (void*)workUnit;
269
270       // unclear how useful this is
271       if( unlikely( needtocollect ) ) {
272         checkcollect( &emptygarbagelist );
273       }
274
275       workFunc( workUnit );
276     }
277   } 
278
279
280   CP_EXIT();
281
282
283   // remove from GC list
284   pthread_mutex_lock( &gclistlock );
285   threadcount--;
286   if( litem.prev == NULL ) {
287     list = litem.next;
288   } else {
289     litem.prev->next = litem.next;
290   }
291   if( litem.next != NULL ) {
292     litem.next->prev = litem.prev;
293   }
294   pthread_mutex_unlock( &gclistlock );
295
296
297   return NULL;
298 }
299
300
301 void workScheduleInit( int numProcessors,
302                        void(*func)(void*) ) {
303   int i, status;
304   pthread_attr_t attr;
305
306   // the original thread must call this now to
307   // protect memory allocation events coming
308   CP_CREATE();
309
310   // the original thread will not become a worker, remember
311   myWorkerID = workerID_NOTAWORKER;
312
313
314   pthread_mutex_init( &gclock,     NULL );
315   pthread_mutex_init( &gclistlock, NULL );
316   pthread_cond_init ( &gccond,     NULL );
317
318
319   numWorkSchedWorkers = numProcessors + 1;
320
321   workFunc = func;
322
323   deques          = RUNMALLOC( sizeof( deque      )*numWorkSchedWorkers );
324   workerDataArray = RUNMALLOC( sizeof( WorkerData )*numWorkSchedWorkers );
325
326   for( i = 0; i < numWorkSchedWorkers; ++i ) {
327     dqInit( &(deques[i]) );
328   }
329   
330 #ifdef RCR
331   //make sure the queue is initialized
332   if (TRqueue==NULL)
333     TRqueue=allocTR();
334 #endif
335
336   pthread_attr_init( &attr );
337   pthread_attr_setdetachstate( &attr, 
338                                PTHREAD_CREATE_JOINABLE );
339
340   for( i = 0; i < numWorkSchedWorkers; ++i ) {
341
342     workerDataArray[i].id = i;
343
344     status = pthread_create( &(workerDataArray[i].workerThread), 
345                              &attr,
346                              workerMain,
347                              (void*) &(workerDataArray[i])
348                              );
349
350     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
351   }
352 }
353
354
355 void workScheduleSubmit( void* workUnit ) {
356
357 #ifdef DEBUG_DEQUE
358   if( workUnit == 0x0 ) {
359     printf( "Submitting invalid task record as work.\n" );
360   }
361 #endif
362
363   if( myWorkerID == workerID_NOTAWORKER ) {
364     CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_BEGIN );
365     dqPushBottom( &(deques[0]), workUnit );
366     CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_END );
367     return;
368   }
369
370     CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_BEGIN );
371   dqPushBottom( &(deques[myWorkerID]), workUnit );
372     CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_END );
373 }
374
375
376 // really should be named "wait for work in system to complete"
377 void workScheduleBegin() {
378   int i;
379
380   // wait for all workers to exit gracefully
381   for( i = 0; i < numWorkSchedWorkers; ++i ) {
382     pthread_join( workerDataArray[i].workerThread, NULL );
383   }
384
385   // for the original, non-worker thread to close up its events
386   CP_EXIT();
387
388   // write all thread's events to disk
389   CP_DUMP();
390 }
391
392
393 // only the worker that executes and then retires
394 // the main task should invoke this, which indicates to
395 // all other workers they should exit gracefully
396 void workScheduleExit() {
397   mainTaskRetired = 1;
398 }