Change the local hashtable for recording the pointer mapping info used in the gc...
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 #include "mem.h"
6 #include "workschedule.h"
7 #include "mlp_runtime.h"
8
9
10 // NOTE: Converting this from a work-stealing strategy
11 // to a single-queue thread pool protected by a single
12 // lock.  This will not scale, but it will support
13 // development of the system for now
14
15
16
17 // for convenience
18 typedef struct Queue deq;
19
20
21 /*
22 // each worker needs the following
23 typedef struct workerData_t {
24   pthread_t       workerThread;
25   pthread_mutex_t dequeLock;
26   deq*            dequeWorkUnits;
27   int             nextWorkerToLoad;
28 } workerData;
29 */
30
31 typedef struct workerData_t{
32   pthread_t workerThread;
33   int id;
34 } WorkerData;
35
36
37 static pthread_mutex_t systemLockIn;
38 static pthread_mutex_t systemLockOut;
39
40 // just one queue for everyone
41 //static pthread_mutex_t dequeLock;
42
43
44
45 // implementation internal data
46 static WorkerData*     workerDataArray;
47 static pthread_t*      workerArray;
48
49 static int systemStarted = 0;
50
51 //static pthread_mutex_t systemBeginLock  = PTHREAD_MUTEX_INITIALIZER;
52 static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
53 //static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
54 //static pthread_cond_t  systemReturnCond = PTHREAD_COND_INITIALIZER;
55 static void(*workFunc)(void*);
56
57 static pthread_cond_t  workAvailCond  = PTHREAD_COND_INITIALIZER;
58
59 int             numWorkers;
60
61 int threadcount;
62 pthread_mutex_t gclock;
63 pthread_mutex_t gclistlock;
64 pthread_cond_t gccond;
65
66 extern struct listitem * list;
67 extern __thread struct listitem litem;
68 extern __thread SESEcommon* seseCommon;
69
70 __thread int oid;
71
72 /*
73 struct QI {
74   struct QI * next;
75   void * value;
76 };
77
78 struct QI * headqi;
79 struct QI * tailqi;
80 */
81
82 /*
83 // helper func
84 int threadID2workerIndex( pthread_t id ) {
85   int i;
86   for( i = 0; i < numWorkers; ++i ) {
87     if( workerDataArray[i].workerThread == id ) {
88       return i;
89     }
90   }
91   // if we didn't find it, we are an outside
92   // thread and should pick arbitrary worker
93   return 0;
94 }
95 */
96
97
98 /*
99 // the worker thread main func, which takes a func
100 // from user for processing any one work unit, then
101 // workers use it to process work units and steal
102 // them from one another
103 void* workerMain( void* arg ) {
104
105   workerData* myData = (workerData*) arg;
106   
107   void* workUnit;
108
109   int i;
110   int j;
111
112   // all workers wait until system is ready
113   pthread_mutex_lock  ( &systemBeginLock );
114   pthread_cond_wait   ( &systemBeginCond, &systemBeginLock );
115   pthread_mutex_unlock( &systemBeginLock );
116
117   while( 1 ) {
118
119     // lock my deque
120     pthread_mutex_lock( &(myData->dequeLock) );
121
122     if( isEmpty( myData->dequeWorkUnits ) ) {
123
124       // my deque is empty, try to steal
125       pthread_mutex_unlock( &(myData->dequeLock) );
126       
127       workUnit = NULL;
128       j = myData->nextWorkerToLoad;
129
130       // look at everyone's queue at least twice
131       for( i = 0; i < numWorkers; ++i ) {
132         if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
133         
134         ++j; if( j == numWorkers ) { j = 0; }
135
136         pthread_mutex_lock( &(workerDataArray[j].dequeLock) );
137
138         if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) {
139           pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
140           // no work here, yield and then keep looking
141           if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
142           continue;
143         }
144
145         // found some work in another deque, steal it
146         workUnit = getItemBack( workerDataArray[j].dequeWorkUnits );
147         pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
148         break;
149       }
150
151       // didn't find any work, even in my own deque,
152       // after checking everyone twice?  Exit thread
153       if( workUnit == NULL ) {
154         break;
155       }
156
157     } else {
158       // have work in own deque, take out from front
159       workUnit = getItem( myData->dequeWorkUnits );
160       pthread_mutex_unlock( &(myData->dequeLock) );
161     }
162
163     // wherever the work came from, process it
164     workFunc( workUnit );
165
166     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
167   }
168
169   printf( "Worker %d exiting.\n", myData->workerThread );
170   fflush( stdout );
171
172   return NULL;
173 }
174 */
175
176
177 void* workerMain( void* arg ) {
178   
179   void* workUnit;
180   WorkerData* myData = (WorkerData*) arg;
181   oid=myData->id;
182
183   // make sure init mlp once-per-thread stuff
184   //pthread_once( &mlpOnceObj, mlpInitOncePerThread );
185
186   // all workers wait until system is ready
187
188   // then continue to process work
189   while( 1 ) {
190    
191     /*
192     while(1){
193       if(pthread_mutex_trylock(&systemLock)==0){
194         if(isEmpty(dequeWorkUnits)){
195           pthread_mutex_unlock(&systemLock);
196         }else{
197           break;
198         }
199       }
200     }
201     workUnit = getItem( dequeWorkUnits );
202     pthread_mutex_unlock( &systemLock );
203     */
204     
205     pthread_mutex_lock( &systemLockOut );
206     // wait for work
207     if (headqi->next==NULL) {
208       pthread_mutex_unlock( &systemLockOut );
209       sched_yield();
210       continue;
211     }
212     struct QI * tmp=headqi;
213     headqi = headqi->next;
214     workUnit = headqi->value;
215     pthread_mutex_unlock( &systemLockOut );
216     free(tmp);
217     // yield processor before moving on, just to exercise
218     // system's out-of-order correctness
219     //if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
220     //if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
221
222     
223     pthread_mutex_lock(&gclistlock);
224     threadcount++;
225     litem.seseCommon=(void*)workUnit;
226     litem.prev=NULL;
227     litem.next=list;
228     if(list!=NULL)
229       list->prev=&litem;
230     list=&litem;
231     seseCommon=(SESEcommon*)workUnit;   
232     pthread_mutex_unlock(&gclistlock);
233
234     workFunc( workUnit );
235     
236     pthread_mutex_lock(&gclistlock);
237     threadcount--;
238     if (litem.prev==NULL) {
239       list=litem.next;
240     } else {
241       litem.prev->next=litem.next;
242     }
243     if (litem.next!=NULL) {
244       litem.next->prev=litem.prev;
245     }
246     pthread_mutex_unlock(&gclistlock);
247     
248   }
249
250   return NULL;
251 }
252
253
254 /*
255 void workScheduleInit( int numProcessors,
256                        void(*func)(void*) ) {
257   int i, status;
258
259   numWorkers = numProcessors;
260   workFunc   = func;
261
262   // allocate space for worker data
263   workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers );
264
265   for( i = 0; i < numWorkers; ++i ) {    
266
267     // the deque
268     workerDataArray[i].dequeWorkUnits = createQueue();
269
270     // set the next worker to add work to as itself
271     workerDataArray[i].nextWorkerToLoad = i;
272
273     // it's lock
274     status = pthread_mutex_init( &(workerDataArray[i].dequeLock), 
275                                  NULL
276                                  );
277     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
278   }
279
280   // only create the actual pthreads after all workers
281   // have data that is protected with initialized locks
282   for( i = 0; i < numWorkers; ++i ) {    
283     status = pthread_create( &(workerDataArray[i].workerThread), 
284                              NULL,
285                              workerMain,
286                              (void*) &(workerDataArray[i])
287                            );
288     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
289   }
290
291   // yield and let all workers get to the begin
292   // condition variable, waiting--we have to hold them
293   // so they don't all see empty work queues right away
294   if( sched_yield() == -1 ) {
295     printf( "Error thread trying to yield.\n" );
296     exit( -1 );
297   }
298 }
299 */
300
301
302 void workScheduleInit( int numProcessors,
303                        void(*func)(void*) ) {
304   int i, status;
305
306   
307   pthread_mutex_init(&gclock, NULL);
308   pthread_mutex_init(&gclistlock, NULL);
309   pthread_cond_init(&gccond, NULL);
310
311   //numWorkers = numProcessors*5;
312   numWorkers = numProcessors + 1;
313
314   workFunc   = func;
315
316   headqi=tailqi=RUNMALLOC(sizeof(struct QI));
317   headqi->next=NULL;
318   
319   status = pthread_mutex_init( &systemLockIn, NULL );
320   status = pthread_mutex_init( &systemLockOut, NULL );
321
322   //workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
323   workerDataArray = RUNMALLOC( sizeof( WorkerData ) * numWorkers );
324
325   for( i = 0; i < numWorkers; ++i ) {   
326     workerDataArray[i].id=i+2;
327     status = pthread_create( &(workerDataArray[i].workerThread), 
328                              NULL,
329                              workerMain,
330                              (void*) &(workerDataArray[i])
331                            );
332     //status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL );
333     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
334
335     // yield and let all workers get to the beginx3
336     // condition variable, waiting--we have to hold them
337     // so they don't all see empty work queues right away
338     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
339   }
340 }
341
342
343 /*
344 void workScheduleSubmit( void* workUnit ) {
345
346   // query who is submitting and find out who they are scheduled to load
347   int submitterIndex = threadID2workerIndex( pthread_self() );
348   int workerIndex    = workerDataArray[submitterIndex].nextWorkerToLoad;
349   
350   // choose a new index and save it
351   ++workerIndex;
352   if( workerIndex == numWorkers ) {
353     workerIndex = 0;
354   }
355   workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex;
356
357   // load the chosen worker
358   pthread_mutex_lock  ( &(workerDataArray[workerIndex].dequeLock) );
359   addNewItemBack      (   workerDataArray[workerIndex].dequeWorkUnits, workUnit );
360   pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
361 }
362 */
363
364 void workScheduleSubmit( void* workUnit ) {
365   /*
366    while(1){
367       if(pthread_mutex_trylock(&systemLock)==0){
368          addNewItemBack( dequeWorkUnits, workUnit );
369          break;
370       }
371     }
372     pthread_mutex_unlock( &systemLock );
373   */
374   struct QI* item=RUNMALLOC(sizeof(struct QI));
375   item->value=workUnit;
376   item->next=NULL;
377   
378   pthread_mutex_lock  ( &systemLockIn );
379   tailqi->next=item;
380   tailqi=item;
381   pthread_mutex_unlock( &systemLockIn );
382 }
383
384
385 // really should be named "wait until work is finished"
386 void workScheduleBegin() {
387   
388   int i;  
389   WorkerData *workerData = RUNMALLOC( sizeof( WorkerData ) );
390   workerData->id=1;
391   // workerMain(NULL);
392   workerMain(workerData);
393
394   // tell all workers to begin
395   for( i = 0; i < numWorkers; ++i ) {
396     //pthread_join( workerArray[i], NULL );
397     pthread_join( workerDataArray[i].workerThread, NULL );
398   }
399 }