assigns workerID to the workerTR
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 #include "mem.h"
6 #include "workschedule.h"
7 #include "mlp_runtime.h"
8 #include "psemaphore.h"
9 #include "coreprof/coreprof.h"
10 #ifdef SQUEUE
11 #include "squeue.h"
12 #else
13 #include "deque.h"
14 #endif
15 #ifdef RCR
16 #include "rcr_runtime.h"
17 #include "trqueue.h"
18 #endif
19
20
21 //////////////////////////////////////////////////
22 //
23 //  for coordination with the garbage collector
24 //
25 //////////////////////////////////////////////////
26 int threadcount;
27 pthread_mutex_t gclock;
28 pthread_mutex_t gclistlock;
29 pthread_cond_t gccond;
30 #ifdef RCR
31 extern pthread_mutex_t queuelock;
32 #endif
33 // in garbage.h, listitem is a struct with a pointer
34 // to a stack, objects, etc. such that the garbage
35 // collector can find pointers for garbage collection
36
37 // this is a global list of listitem structs that the
38 // garbage collector uses to know about each thread
39 extern struct listitem* list;
40
41 // this is the local thread's item on the above list,
42 // it should be added to the global list before a thread
43 // starts doing work, and should be removed only when
44 // the thread is completely finished--in OoOJava/MLP the
45 // only thing hanging from this litem should be a single
46 // task record that the worker thread is executing, if any!
47 extern __thread struct listitem litem;
48 //////////////////////////////////////////////////
49 //
50 //  end coordination with the garbage collector
51 //
52 //////////////////////////////////////////////////
53
54
55
56
57 typedef struct workerData_t {
58   pthread_t workerThread;
59   int       id;
60 } WorkerData;
61
62 // a thread should know its worker id in any
63 // functions below
64 __thread int myWorkerID;
65
66 // the original thread starts up the work scheduler
67 // and sleeps while it is running, it has no worker
68 // ID so use this to realize that
69 const int workerID_NOTAWORKER = 0xffffff0;
70
71
72 volatile int numWorkSchedWorkers;
73 int realnumWorkSchedWorkers;
74 static WorkerData*  workerDataArray;
75 static pthread_t*   workerArray;
76
77 static void(*workFunc)(void*);
78
79 // each thread can create objects but should assign
80 // globally-unique object ID's (oid) so have threads
81 // give out this as next id, then increment by number
82 // of threads to ensure disjoint oid sets
83 __thread int oid;
84
85 // global array of work-stealing deques, where
86 // each thread uses its ID as the index to its deque
87 deque* deques;
88
89
90
91 #ifdef RCR
92 #include "trqueue.h"
93 __thread struct trQueue * TRqueue=NULL;
94 #endif
95
96
97
98 // this is a read-by-all and write-by-one variable
99 // IT IS UNPROTECTED, BUT SAFE for all threads to
100 // read it (periodically, only when they can find no work)
101 // and only the worker that retires the main thread will
102 // write it to 1, at which time other workers will see
103 // that they should exit gracefully
104 static volatile int mainTaskRetired = FALSE;
105
106
107
108
109 void* workerMain( void* arg ) {
110   void*       workUnit;
111   WorkerData* myData  = (WorkerData*) arg;
112   deque*      myDeque = &(deques[myData->id]);
113   int         keepRunning = TRUE;
114   int         haveWork;
115   int         lastVictim = 0;
116   int         i;
117
118   myWorkerID = myData->id;
119
120   // ensure that object ID's start at 1 so that using
121   // oid with value 0 indicates an invalid object
122   oid = myData->id + 1;
123
124   // each thread has a single semaphore that a running
125   // task should hand off to children threads it is
126   // going to stall on
127   psem_init( &runningSESEstallSem );
128
129   // the worker threads really have no context relevant to the
130   // user program, so build an empty garbage list struct to
131   // pass to the collector if collection occurs
132   struct garbagelist emptygarbagelist = { 0, NULL };
133
134   // Add this worker to the gc list
135   pthread_mutex_lock( &gclistlock );
136   threadcount++;
137   litem.prev = NULL;
138   litem.next = list;
139   if( list != NULL ) 
140     list->prev = &litem;
141   list = &litem;
142   pthread_mutex_unlock( &gclistlock );
143
144
145   // start timing events in this thread
146   CP_CREATE();
147
148
149   // then continue to process work
150   while( keepRunning ) {
151
152     // wait for work
153 #ifdef CP_EVENTID_WORKSCHEDGRAB
154     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
155 #endif
156
157     haveWork = FALSE;
158     while( !haveWork ) {
159
160       workUnit = dqPopBottom( myDeque );
161
162
163       if( workUnit != DQ_POP_EMPTY ) {
164         haveWork = TRUE;
165         goto dowork;
166       } else {
167         // try to steal from another queue, starting
168         // with the last successful victim, don't check
169         // your own deque
170         int mynumWorkSchedWorkers=numWorkSchedWorkers;
171         for( i = 0; i < mynumWorkSchedWorkers - 1; ++i ) {
172
173           workUnit = dqPopTop( &(deques[lastVictim]) );
174           
175 #ifdef SQUEUE
176           if( workUnit != DQ_POP_EMPTY ) {
177 #else
178           if( workUnit != DQ_POP_ABORT &&
179               workUnit != DQ_POP_EMPTY ) {
180 #endif
181             // successful steal!
182             haveWork = TRUE;
183             goto dowork;
184           }
185        
186           // choose next victim
187           lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) { lastVictim = 0; }
188           
189           if( lastVictim == myWorkerID ) {
190             lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) { lastVictim = 0; }
191           }
192         }
193         // end steal attempts
194
195
196         // if we successfully stole work, break out of the
197         // while-not-have-work loop, otherwise we looked
198         // everywhere, so drop down to "I'm idle" code below
199         if( haveWork ) {
200           goto dowork;
201         }
202       }
203
204       // if we drop down this far, we didn't find any work,
205       // so do a garbage collection, yield the processor,
206       // then check if the entire system is out of work
207       if( unlikely( needtocollect ) ) {
208         checkcollect( &emptygarbagelist );
209       }
210
211       sched_yield();
212
213       if( mainTaskRetired ) {
214         keepRunning = FALSE;
215         break;
216       }
217
218     } // end the while-not-have-work loop
219
220     dowork:
221
222 #ifdef CP_EVENTID_WORKSCHEDGRAB
223     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
224 #endif
225
226     // when is no work left we will pop out
227     // here, so only do work if any left
228     if( haveWork ) {
229       // let GC see current work
230       litem.seseCommon = (void*)workUnit;
231
232 #ifdef DEBUG_DEQUE
233       if( workUnit == NULL ) {
234         printf( "About to execute a null work item\n" );
235       }
236 #endif
237
238       workFunc( workUnit );
239     }
240   } 
241
242
243   CP_EXIT();
244
245
246   // remove from GC list
247   pthread_mutex_lock( &gclistlock );
248   threadcount--;
249   if( litem.prev == NULL ) {
250     list = litem.next;
251   } else {
252     litem.prev->next = litem.next;
253   }
254   if( litem.next != NULL ) {
255     litem.next->prev = litem.prev;
256   }
257   pthread_mutex_unlock( &gclistlock );
258
259
260   return NULL;
261 }
262
263
264 void workScheduleInit( int numProcessors,
265                        void(*func)(void*) ) {
266   int i, status;
267   pthread_attr_t attr;
268
269   // the original thread must call this now to
270   // protect memory allocation events coming
271   CP_CREATE();
272
273   // the original thread is a worker
274   myWorkerID = 0;
275   oid = 1;
276
277 #ifdef RCR
278   pthread_mutex_init( &queuelock,     NULL );
279 #endif
280   pthread_mutex_init( &gclock,     NULL );
281   pthread_mutex_init( &gclistlock, NULL );
282   pthread_cond_init ( &gccond,     NULL );
283
284
285   numWorkSchedWorkers = numProcessors;
286   realnumWorkSchedWorkers=numProcessors;
287
288   workFunc = func;
289
290 #ifdef RCR
291   deques          = RUNMALLOC( sizeof( deque      )*numWorkSchedWorkers*2);
292 #else
293   deques          = RUNMALLOC( sizeof( deque      )*numWorkSchedWorkers );
294 #endif
295   workerDataArray = RUNMALLOC( sizeof( WorkerData )*numWorkSchedWorkers );
296
297 #ifdef RCR
298   for( i = 0; i < numWorkSchedWorkers*2; ++i ) {
299 #else
300   for( i = 0; i < numWorkSchedWorkers; ++i ) {
301 #endif
302     dqInit( &(deques[i]) );
303   }
304   
305   pthread_attr_init( &attr );
306   pthread_attr_setdetachstate( &attr, 
307                                PTHREAD_CREATE_JOINABLE );
308
309   workerDataArray[0].id = 0;
310
311   for( i = 1; i < numWorkSchedWorkers; ++i ) {
312
313     workerDataArray[i].id = i;
314
315     status = pthread_create( &(workerDataArray[i].workerThread), 
316                              &attr,
317                              workerMain,
318                              (void*) &(workerDataArray[i])
319                              );
320
321     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
322   }
323 }
324
325
326 void workScheduleSubmit( void* workUnit ) {
327 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
328   CP_LOGEVENT( CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_BEGIN );
329 #endif
330   dqPushBottom( &(deques[myWorkerID]), workUnit );
331 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
332   CP_LOGEVENT( CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_END );
333 #endif
334 }
335
336
337 // really should be named "wait for work in system to complete"
338 void workScheduleBegin() {
339   int i;
340
341   // original thread becomes a worker
342   workerMain( (void*) &(workerDataArray[0]) );
343
344   // then wait for all other workers to exit gracefully
345   for( i = 1; i < realnumWorkSchedWorkers; ++i ) {
346     pthread_join( workerDataArray[i].workerThread, NULL );
347   }
348
349   // write all thread's events to disk
350   CP_DUMP();
351 }
352
353
354 // only the worker that executes and then retires
355 // the main task should invoke this, which indicates to
356 // all other workers they should exit gracefully
357 void workScheduleExit() {
358   mainTaskRetired = 1;
359 }