more core...fix traversers...
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 #include "mem.h"
6 #include "workschedule.h"
7 #include "mlp_runtime.h"
8 #include "psemaphore.h"
9 #include "coreprof/coreprof.h"
10 #ifdef RCR
11 #include "rcr_runtime.h"
12 #include "trqueue.h"
13 #endif
14
15 // NOTE: Converting this from a work-stealing strategy
16 // to a single-queue thread pool protected by a single
17 // lock.  This will not scale, but it will support
18 // development of the system for now
19
20
21
22
23
24
25 // for convenience
26 typedef struct Queue deq;
27
28 typedef struct workerData_t{
29   pthread_t workerThread;
30   int id;
31 } WorkerData;
32
33
34 static pthread_mutex_t systemLockIn;
35 static pthread_mutex_t systemLockOut;
36
37 // implementation internal data
38 static WorkerData*     workerDataArray;
39 static pthread_t*      workerArray;
40
41 static int systemStarted = 0;
42
43 static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
44 static void(*workFunc)(void*);
45
46 static pthread_cond_t  workAvailCond  = PTHREAD_COND_INITIALIZER;
47
48 int             numWorkers;
49
50 int threadcount;
51 pthread_mutex_t gclock;
52 pthread_mutex_t gclistlock;
53 pthread_cond_t gccond;
54
55 extern struct listitem * list;
56 extern __thread struct listitem litem;
57 extern __thread SESEcommon* seseCommon;
58
59 __thread int oid;
60
61 #ifdef RCR
62 #include "trqueue.h"
63 __thread struct trQueue * TRqueue=NULL;
64 #endif
65
66
67 void workerExit( void* arg ) {
68   //printf( "Thread %d canceled.\n", pthread_self() );
69   CP_EXIT();
70 }
71
72 void* workerMain( void* arg ) {
73   void*       workUnit;
74   WorkerData* myData = (WorkerData*) arg;
75   int         oldState;
76   int         haveWork;
77
78   // the worker threads really have no context relevant to the
79   // user program, so build an empty garbage list struct to
80   // pass to the collector if collection occurs
81   struct garbagelist emptygarbagelist = { 0, NULL };
82
83   // once-per-thread stuff
84   CP_CREATE();
85
86   //pthread_cleanup_push( workerExit, NULL );  
87   
88   // ensure that object ID's start at 1 so that using
89   // oid with value 0 indicates an invalid object
90   oid = myData->id + 1;
91
92   // each thread has a single semaphore that a running
93   // task should hand off to children threads it is
94   // going to stall on
95   psem_init( &runningSESEstallSem );
96   
97
98 #ifdef RCR
99   //allocate task record queue
100   pthread_t thread;
101   pthread_attr_t nattr;  
102   pthread_attr_init( &nattr );
103   pthread_attr_setdetachstate( &nattr, PTHREAD_CREATE_DETACHED );
104
105   //set up the stall site SESErecord
106   stallrecord.common.offsetToParamRecords=(INTPTR) &((SESEstall *)0)->rcrRecords;
107   stallrecord.common.classID=-1;
108
109   if( TRqueue == NULL ) {
110     TRqueue = allocTR();
111   }
112
113   int status = pthread_create( &thread,
114                                NULL,
115                                workerTR,
116                                (void*) TRqueue );
117
118   pthread_attr_destroy( &nattr );
119
120   if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
121 #endif
122
123
124   //pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS, &oldState );
125   //pthread_setcancelstate( PTHREAD_CANCEL_ENABLE,       &oldState );
126
127
128   // Add this worker to the gc list
129   pthread_mutex_lock( &gclistlock );
130   threadcount++;
131   litem.prev = NULL;
132   litem.next = list;
133   if( list != NULL ) 
134     list->prev = &litem;
135   list = &litem;
136   pthread_mutex_unlock( &gclistlock );
137
138
139   // then continue to process work
140   while( 1 ) {
141
142     // wait for work
143 #ifdef CP_EVENTID_WORKSCHEDGRAB
144     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
145 #endif
146
147     haveWork = FALSE;
148     while( !haveWork ) {
149
150       //NOTE...Fix these things...
151       pthread_mutex_lock( &systemLockOut );
152       if( headqi->next == NULL ) {
153         pthread_mutex_unlock( &systemLockOut );
154
155         //NOTE: Do a check to see if we need to collect..
156         if( unlikely( needtocollect ) ) {
157           checkcollect( &emptygarbagelist );
158         }
159
160         sched_yield();
161         continue;
162       } else {
163         haveWork = TRUE;
164       }
165     }
166
167     struct QI* tmp = headqi;
168     headqi         = headqi->next;
169     workUnit       = headqi->value;
170     pthread_mutex_unlock( &systemLockOut );
171     free( tmp );
172
173 #ifdef CP_EVENTID_WORKSCHEDGRAB
174     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
175 #endif
176     
177     // let GC see current work
178     litem.seseCommon = (void*)workUnit;
179
180     // unclear how useful this is
181     if( unlikely( needtocollect ) ) {
182       checkcollect( &emptygarbagelist );
183     }
184
185     workFunc( workUnit );
186   } 
187
188
189   // remove from GC list
190   pthread_mutex_lock( &gclistlock );
191   threadcount--;
192   if( litem.prev == NULL ) {
193     list = litem.next;
194   } else {
195     litem.prev->next = litem.next;
196   }
197   if( litem.next != NULL ) {
198     litem.next->prev = litem.prev;
199   }
200   pthread_mutex_unlock( &gclistlock );
201
202
203   //pthread_cleanup_pop( 0 );
204
205   return NULL;
206 }
207
208 void workScheduleInit( int numProcessors,
209                        void(*func)(void*) ) {
210   int i, status;
211
212   // the original thread must call this now to
213   // protect memory allocation events coming, but it
214   // will also add itself to the worker pool and therefore
215   // try to call it again, CP_CREATE should just ignore
216   // duplicate calls
217   CP_CREATE();
218
219   pthread_mutex_init(&gclock, NULL);
220   pthread_mutex_init(&gclistlock, NULL);
221   pthread_cond_init(&gccond, NULL);
222
223   numWorkers = numProcessors + 1;
224
225   workFunc   = func;
226
227   headqi=tailqi=RUNMALLOC(sizeof(struct QI));
228   headqi->next=NULL;
229   
230   status = pthread_mutex_init( &systemLockIn, NULL );
231   status = pthread_mutex_init( &systemLockOut, NULL );
232
233   // allocate space for one more--the original thread (running
234   // this code) will become a worker thread after setup
235   workerDataArray = RUNMALLOC( sizeof( WorkerData ) * (numWorkers+1) );
236
237 #ifdef RCR
238   //make sure the queue is initialized
239   if (TRqueue==NULL)
240     TRqueue=allocTR();
241 #endif
242
243   for( i = 0; i < numWorkers; ++i ) {
244
245     // the original thread is ID 1, start counting from there
246     workerDataArray[i].id = 2 + i;
247
248     status = pthread_create( &(workerDataArray[i].workerThread), 
249                              NULL,
250                              workerMain,
251                              (void*) &(workerDataArray[i])
252                            );
253
254     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
255
256     // yield and let all workers get to the begin
257     // condition variable, waiting--we have to hold them
258     // so they don't all see empty work queues right away
259     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
260   }
261 }
262
263
264 void workScheduleSubmit( void* workUnit ) {
265   struct QI* item=RUNMALLOC(sizeof(struct QI));
266   item->value=workUnit;
267   item->next=NULL;
268   
269   pthread_mutex_lock  ( &systemLockIn );
270   tailqi->next=item;
271   tailqi=item;
272   pthread_mutex_unlock( &systemLockIn );
273 }
274
275
276 // really should be named "add original thread as a worker"
277 void workScheduleBegin() {
278   int i;
279
280   // space was saved for the original thread to become a
281   // worker after setup is complete
282   workerDataArray[numWorkers].id           = 1;
283   workerDataArray[numWorkers].workerThread = pthread_self();
284   ++numWorkers;
285
286   workerMain( &(workerDataArray[numWorkers-1]) );
287 }
288
289
290 // the above function does NOT naturally join all the worker
291 // threads at exit, once the main SESE/Rblock/Task completes
292 // we know all worker threads are finished executing other
293 // tasks so we can explicitly kill the workers, and therefore
294 // trigger any worker-specific cleanup (like coreprof!)
295 void workScheduleExit() {
296   int i;
297
298   // This is not working well--canceled threads don't run their
299   // thread-level exit routines?  Anyway, its not critical for
300   // coreprof but if we ever need a per-worker exit routine to
301   // run we'll have to look back into this.
302
303   //printf( "Thread %d performing schedule exit.\n", pthread_self() );
304   //
305   //for( i = 0; i < numWorkers; ++i ) {   
306   //  if( pthread_self() != workerDataArray[i].workerThread ) {
307   //    pthread_cancel( workerDataArray[i].workerThread );      
308   //  }
309   //}
310   //
311   //// how to let all the threads actually get canceled?  
312   //sleep( 2 );
313 }