change
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 #include "mem.h"
6 #include "Queue.h"
7 #include "workschedule.h"
8 #include "mlp_runtime.h"
9
10
11 // NOTE: Converting this from a work-stealing strategy
12 // to a single-queue thread pool protected by a single
13 // lock.  This will not scale, but it will support
14 // development of the system for now
15
16
17
18 // for convenience
19 typedef struct Queue deq;
20
21
22 /*
23 // each worker needs the following
24 typedef struct workerData_t {
25   pthread_t       workerThread;
26   pthread_mutex_t dequeLock;
27   deq*            dequeWorkUnits;
28   int             nextWorkerToLoad;
29 } workerData;
30 */
31
32
33 static pthread_mutex_t systemLock;
34
35 // just one queue for everyone
36 //static pthread_mutex_t dequeLock;
37 static deq*            dequeWorkUnits;
38
39
40
41 // implementation internal data
42 static int             numWorkers;
43 //static workerData*     workerDataArray;
44 static pthread_t*      workerArray;
45
46 static int systemStarted = 0;
47
48 //static pthread_mutex_t systemBeginLock  = PTHREAD_MUTEX_INITIALIZER;
49 static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
50 //static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
51 //static pthread_cond_t  systemReturnCond = PTHREAD_COND_INITIALIZER;
52 static void(*workFunc)(void*);
53
54 static pthread_cond_t  workAvailCond  = PTHREAD_COND_INITIALIZER;
55
56
57
58 /*
59 // helper func
60 int threadID2workerIndex( pthread_t id ) {
61   int i;
62   for( i = 0; i < numWorkers; ++i ) {
63     if( workerDataArray[i].workerThread == id ) {
64       return i;
65     }
66   }
67   // if we didn't find it, we are an outside
68   // thread and should pick arbitrary worker
69   return 0;
70 }
71 */
72
73
74 /*
75 // the worker thread main func, which takes a func
76 // from user for processing any one work unit, then
77 // workers use it to process work units and steal
78 // them from one another
79 void* workerMain( void* arg ) {
80
81   workerData* myData = (workerData*) arg;
82   
83   void* workUnit;
84
85   int i;
86   int j;
87
88   // all workers wait until system is ready
89   pthread_mutex_lock  ( &systemBeginLock );
90   pthread_cond_wait   ( &systemBeginCond, &systemBeginLock );
91   pthread_mutex_unlock( &systemBeginLock );
92
93   while( 1 ) {
94
95     // lock my deque
96     pthread_mutex_lock( &(myData->dequeLock) );
97
98     if( isEmpty( myData->dequeWorkUnits ) ) {
99
100       // my deque is empty, try to steal
101       pthread_mutex_unlock( &(myData->dequeLock) );
102       
103       workUnit = NULL;
104       j = myData->nextWorkerToLoad;
105
106       // look at everyone's queue at least twice
107       for( i = 0; i < numWorkers; ++i ) {
108         if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
109         
110         ++j; if( j == numWorkers ) { j = 0; }
111
112         pthread_mutex_lock( &(workerDataArray[j].dequeLock) );
113
114         if( isEmpty( workerDataArray[j].dequeWorkUnits ) ) {
115           pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
116           // no work here, yield and then keep looking
117           if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
118           continue;
119         }
120
121         // found some work in another deque, steal it
122         workUnit = getItemBack( workerDataArray[j].dequeWorkUnits );
123         pthread_mutex_unlock( &(workerDataArray[j].dequeLock) );
124         break;
125       }
126
127       // didn't find any work, even in my own deque,
128       // after checking everyone twice?  Exit thread
129       if( workUnit == NULL ) {
130         break;
131       }
132
133     } else {
134       // have work in own deque, take out from front
135       workUnit = getItem( myData->dequeWorkUnits );
136       pthread_mutex_unlock( &(myData->dequeLock) );
137     }
138
139     // wherever the work came from, process it
140     workFunc( workUnit );
141
142     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
143   }
144
145   printf( "Worker %d exiting.\n", myData->workerThread );
146   fflush( stdout );
147
148   return NULL;
149 }
150 */
151
152
153 void* workerMain( void* arg ) {
154   
155   void* workUnit;
156
157   // make sure init mlp once-per-thread stuff
158   //pthread_once( &mlpOnceObj, mlpInitOncePerThread );
159
160   // all workers wait until system is ready
161   pthread_mutex_lock  ( &systemLock );
162   while( !systemStarted ) {
163     pthread_cond_wait( &systemBeginCond, &systemLock );
164   }
165   pthread_mutex_unlock( &systemLock );
166
167   // then continue to process work
168   while( 1 ) {
169
170     pthread_mutex_lock( &systemLock );
171     // wait for work
172     while( isEmpty( dequeWorkUnits ) ) {
173       pthread_cond_wait( &workAvailCond, &systemLock );
174     }     
175     workUnit = getItem( dequeWorkUnits );
176     pthread_mutex_unlock( &systemLock );
177
178     // yield processor before moving on, just to exercise
179     // system's out-of-order correctness
180     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
181     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
182     
183     workFunc( workUnit );
184   }
185
186   return NULL;
187 }
188
189
190 /*
191 void workScheduleInit( int numProcessors,
192                        void(*func)(void*) ) {
193   int i, status;
194
195   numWorkers = numProcessors;
196   workFunc   = func;
197
198   // allocate space for worker data
199   workerDataArray = RUNMALLOC( sizeof( workerData ) * numWorkers );
200
201   for( i = 0; i < numWorkers; ++i ) {    
202
203     // the deque
204     workerDataArray[i].dequeWorkUnits = createQueue();
205
206     // set the next worker to add work to as itself
207     workerDataArray[i].nextWorkerToLoad = i;
208
209     // it's lock
210     status = pthread_mutex_init( &(workerDataArray[i].dequeLock), 
211                                  NULL
212                                  );
213     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
214   }
215
216   // only create the actual pthreads after all workers
217   // have data that is protected with initialized locks
218   for( i = 0; i < numWorkers; ++i ) {    
219     status = pthread_create( &(workerDataArray[i].workerThread), 
220                              NULL,
221                              workerMain,
222                              (void*) &(workerDataArray[i])
223                            );
224     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
225   }
226
227   // yield and let all workers get to the begin
228   // condition variable, waiting--we have to hold them
229   // so they don't all see empty work queues right away
230   if( sched_yield() == -1 ) {
231     printf( "Error thread trying to yield.\n" );
232     exit( -1 );
233   }
234 }
235 */
236
237
238 void workScheduleInit( int numProcessors,
239                        void(*func)(void*) ) {
240   int i, status;
241
242   numWorkers = numProcessors*5;
243   workFunc   = func;
244
245   dequeWorkUnits = createQueue();
246
247   status = pthread_mutex_init( &systemLock, NULL );
248   if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
249
250   workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
251
252   for( i = 0; i < numWorkers; ++i ) {    
253     status = pthread_create( &(workerArray[i]), NULL, workerMain, NULL );
254     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
255
256     // yield and let all workers get to the beginx3
257     // condition variable, waiting--we have to hold them
258     // so they don't all see empty work queues right away
259     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
260   }
261 }
262
263
264 /*
265 void workScheduleSubmit( void* workUnit ) {
266
267   // query who is submitting and find out who they are scheduled to load
268   int submitterIndex = threadID2workerIndex( pthread_self() );
269   int workerIndex    = workerDataArray[submitterIndex].nextWorkerToLoad;
270   
271   // choose a new index and save it
272   ++workerIndex;
273   if( workerIndex == numWorkers ) {
274     workerIndex = 0;
275   }
276   workerDataArray[submitterIndex].nextWorkerToLoad = workerIndex;
277
278   // load the chosen worker
279   pthread_mutex_lock  ( &(workerDataArray[workerIndex].dequeLock) );
280   addNewItemBack      (   workerDataArray[workerIndex].dequeWorkUnits, workUnit );
281   pthread_mutex_unlock( &(workerDataArray[workerIndex].dequeLock) );
282 }
283 */
284
285 void workScheduleSubmit( void* workUnit ) {
286   pthread_mutex_lock  ( &systemLock );
287   addNewItemBack      ( dequeWorkUnits, workUnit );
288   pthread_cond_signal( &workAvailCond );
289   pthread_mutex_unlock( &systemLock );
290 }
291
292
293 // really should be named "wait until work is finished"
294 void workScheduleBegin() {
295   
296   int i;
297
298   // tell all workers to begin
299   pthread_mutex_lock    ( &systemLock );
300   systemStarted = 1;
301   pthread_cond_broadcast( &systemBeginCond );
302   pthread_mutex_unlock  ( &systemLock );  
303
304   for( i = 0; i < numWorkers; ++i ) {
305     pthread_join( workerArray[i], NULL );
306   }
307 }