changes. add MLP condition in the place of memory allocation.
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 #include "mem.h"
6 #include "Queue.h"
7 #include "workschedule.h"
8 #include "mlp_runtime.h"
9
10
11 // NOTE: Converting this from a work-stealing strategy
12 // to a single-queue thread pool protected by a single
13 // lock.  This will not scale, but it will support
14 // development of the system for now
15
16
17
18 // for convenience
19 typedef struct Queue deq;
20
21
22 /*
23 // each worker needs the following
24 typedef struct workerData_t {
25   pthread_t       workerThread;
26   pthread_mutex_t dequeLock;
27   deq*            dequeWorkUnits;
28   int             nextWorkerToLoad;
29 } workerData;
30 */
31
32
33 static pthread_mutex_t systemLock;
34
35 // just one queue for everyone
36 //static pthread_mutex_t dequeLock;
37 static deq*            dequeWorkUnits;
38
39
40
41 // implementation internal data
42 static int             numWorkers;
43 //static workerData*     workerDataArray;
44 static pthread_t*      workerArray;
45
46 static int systemStarted = 0;
47
48 //static pthread_mutex_t systemBeginLock  = PTHREAD_MUTEX_INITIALIZER;
49 static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
50 //static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
51 //static pthread_cond_t  systemReturnCond = PTHREAD_COND_INITIALIZER;
52 static void(*workFunc)(void*);
53
54 static pthread_cond_t  workAvailCond  = PTHREAD_COND_INITIALIZER;
55
56
57 int threadcount;
58 pthread_mutex_t gclock;
59 pthread_mutex_t gclistlock;
60 pthread_cond_t gccond;
61
62 /*
63 // helper func
64 int threadID2workerIndex( pthread_t id ) {
65   int i;
66   for( i = 0; i < numWorkers; ++i ) {
67     if( workerDataArray[i].workerThread == id ) {
68       return i;
69     }
70   }
71   // if we didn't find it, we are an outside
72   // thread and should pick arbitrary worker
73   return 0;
74 }
75 */
76
77
78 /*
79 // the worker thread main func, which takes a func
80 // from user for processing any one work unit, then
81 // workers use it to process work units and steal
82 // them from one another
83 void* workerMain( void* arg ) {
84
85   workerData* myData = (workerData*) arg;
86   
87   void* workUnit;
88
89   int i;
90   int j;
91
92   // all workers wait until system is ready
93   pthread_mutex_lock  ( &systemBeginLock );
94   pthread_cond_wait   ( &systemBeginCond, &systemBeginLock );
95   pthread_mutex_unlock( &systemBeginLock );
96
97   while( 1 ) {
98
99     // lock my deque
100     pthread_mutex_lock( &(myData->dequeLock) );
101
102     if( isEmpty( myData->dequeWorkUnits ) ) {
103
104       // my deque is empty, try to steal
105       pthread_mutex_unlock( &(myData->dequeLock) );
106       
107       workUnit = NULL;
108       j = myData->nextWorkerToLoad;
109
110       // look at everyone's queue at least twice
111       for( i = 0; i < numWorkers; ++i ) {
112         if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
113         
114         ++j; if( j == numWorkers ) { j = 0; }
115
116         pthread_mutex_lock( &(workerDataArray[j].dequeLock) );
117
118         if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) {
119           pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
120           // no work here, yield and then keep looking
121           if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
122           continue;
123         }
124
125         // found some work in another deque, steal it
126         workUnit = getItemBack( workerDataArray[j].dequeWorkUnits );
127         pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
128         break;
129       }
130
131       // didn't find any work, even in my own deque,
132       // after checking everyone twice?  Exit thread
133       if( workUnit == NULL ) {
134         break;
135       }
136
137     } else {
138       // have work in own deque, take out from front
139       workUnit = getItem( myData->dequeWorkUnits );
140       pthread_mutex_unlock( &(myData->dequeLock) );
141     }
142
143     // wherever the work came from, process it
144     workFunc( workUnit );
145
146     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
147   }
148
149   printf( "Worker %d exiting.\n", myData->workerThread );
150   fflush( stdout );
151
152   return NULL;
153 }
154 */
155
156
157 void* workerMain( void* arg ) {
158   
159   void* workUnit;
160
161   // make sure init mlp once-per-thread stuff
162   //pthread_once( &mlpOnceObj, mlpInitOncePerThread );
163
164   // all workers wait until system is ready
165   pthread_mutex_lock  ( &systemLock );
166   while( !systemStarted ) {
167     pthread_cond_wait( &systemBeginCond, &systemLock );
168   }
169   pthread_mutex_unlock( &systemLock );
170
171   // then continue to process work
172   while( 1 ) {
173
174     pthread_mutex_lock( &systemLock );
175     // wait for work
176     while( isEmpty( dequeWorkUnits ) ) {
177       pthread_cond_wait( &workAvailCond, &systemLock );
178     }     
179     workUnit = getItem( dequeWorkUnits );
180     pthread_mutex_unlock( &systemLock );
181
182     // yield processor before moving on, just to exercise
183     // system's out-of-order correctness
184     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
185     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
186     
187     workFunc( workUnit );
188   }
189
190   return NULL;
191 }
192
193
194 /*
195 void workScheduleInit( int numProcessors,
196                        void(*func)(void*) ) {
197   int i, status;
198
199   numWorkers = numProcessors;
200   workFunc   = func;
201
202   // allocate space for worker data
203   workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers );
204
205   for( i = 0; i < numWorkers; ++i ) {    
206
207     // the deque
208     workerDataArray[i].dequeWorkUnits = createQueue();
209
210     // set the next worker to add work to as itself
211     workerDataArray[i].nextWorkerToLoad = i;
212
213     // it's lock
214     status = pthread_mutex_init( &(workerDataArray[i].dequeLock), 
215                                  NULL
216                                  );
217     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
218   }
219
220   // only create the actual pthreads after all workers
221   // have data that is protected with initialized locks
222   for( i = 0; i < numWorkers; ++i ) {    
223     status = pthread_create( &(workerDataArray[i].workerThread), 
224                              NULL,
225                              workerMain,
226                              (void*) &(workerDataArray[i])
227                            );
228     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
229   }
230
231   // yield and let all workers get to the begin
232   // condition variable, waiting--we have to hold them
233   // so they don't all see empty work queues right away
234   if( sched_yield() == -1 ) {
235     printf( "Error thread trying to yield.\n" );
236     exit( -1 );
237   }
238 }
239 */
240
241
242 void workScheduleInit( int numProcessors,
243                        void(*func)(void*) ) {
244   int i, status;
245
246   
247   pthread_mutex_init(&gclock, NULL);
248   pthread_mutex_init(&gclistlock, NULL);
249   pthread_cond_init(&gccond, NULL);
250
251   numWorkers = numProcessors*5;
252   workFunc   = func;
253
254   dequeWorkUnits = createQueue();
255
256   status = pthread_mutex_init( &systemLock, NULL );
257   if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
258
259   workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
260
261   for( i = 0; i < numWorkers; ++i ) {    
262     status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL );
263     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
264
265     // yield and let all workers get to the beginx3
266     // condition variable, waiting--we have to hold them
267     // so they don't all see empty work queues right away
268     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
269   }
270 }
271
272
273 /*
274 void workScheduleSubmit( void* workUnit ) {
275
276   // query who is submitting and find out who they are scheduled to load
277   int submitterIndex = threadID2workerIndex( pthread_self() );
278   int workerIndex    = workerDataArray[submitterIndex].nextWorkerToLoad;
279   
280   // choose a new index and save it
281   ++workerIndex;
282   if( workerIndex == numWorkers ) {
283     workerIndex = 0;
284   }
285   workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex;
286
287   // load the chosen worker
288   pthread_mutex_lock  ( &(workerDataArray[workerIndex].dequeLock) );
289   addNewItemBack      (   workerDataArray[workerIndex].dequeWorkUnits, workUnit );
290   pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
291 }
292 */
293
294 void workScheduleSubmit( void* workUnit ) {
295   pthread_mutex_lock  ( &systemLock );
296   addNewItemBack      ( dequeWorkUnits, workUnit );
297   pthread_cond_signal( &workAvailCond );
298   pthread_mutex_unlock( &systemLock );
299 }
300
301
302 // really should be named "wait until work is finished"
303 void workScheduleBegin() {
304   
305   int i;
306
307   // tell all workers to begin
308   pthread_mutex_lock    ( &systemLock );
309   systemStarted = 1;
310   pthread_cond_broadcast( &systemBeginCond );
311   pthread_mutex_unlock  ( &systemLock );  
312
313   for( i = 0; i < numWorkers; ++i ) {
314     pthread_join( workerArray[i], NULL );
315   }
316 }