switch to spaces only..
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #define _GNU_SOURCE
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <pthread.h>
5 #include <sched.h>
6 #include <sys/syscall.h>
7
8 #include "mem.h"
9 #include "workschedule.h"
10 #include "mlp_runtime.h"
11 #include "psemaphore.h"
12 #include "coreprof/coreprof.h"
13 #ifdef SQUEUE
14 #include "squeue.h"
15 #else
16 #include "deque.h"
17 #endif
18 #ifdef RCR
19 #include "rcr_runtime.h"
20 #include "trqueue.h"
21 #endif
22
23
24
25
26 //////////////////////////////////////////////////
27 //
28 //  for coordination with the garbage collector
29 //
30 //////////////////////////////////////////////////
31 int threadcount;
32 pthread_mutex_t gclock;
33 pthread_mutex_t gclistlock;
34 pthread_cond_t gccond;
35 #ifdef RCR
36 extern pthread_mutex_t queuelock;
37 #endif
38 // in garbage.h, listitem is a struct with a pointer
39 // to a stack, objects, etc. such that the garbage
40 // collector can find pointers for garbage collection
41
42 // this is a global list of listitem structs that the
43 // garbage collector uses to know about each thread
44 extern struct listitem* list;
45
46 // this is the local thread's item on the above list,
47 // it should be added to the global list before a thread
48 // starts doing work, and should be removed only when
49 // the thread is completely finished--in OoOJava/MLP the
50 // only thing hanging from this litem should be a single
51 // task record that the worker thread is executing, if any!
52 extern __thread struct listitem litem;
53 //////////////////////////////////////////////////
54 //
55 //  end coordination with the garbage collector
56 //
57 //////////////////////////////////////////////////
58
59
60
61
62 typedef struct workerData_t {
63   pthread_t workerThread;
64   int id;
65 } WorkerData;
66
67 // a thread should know its worker id in any
68 // functions below
69 __thread int myWorkerID;
70
71 // the original thread starts up the work scheduler
72 // and sleeps while it is running, it has no worker
73 // ID so use this to realize that
74 const int workerID_NOTAWORKER = 0xffffff0;
75
76
77 int oidIncrement;
78 volatile int numWorkSchedWorkers;
79 int realnumWorkSchedWorkers;
80 static WorkerData*  workerDataArray;
81 static pthread_t*   workerArray;
82
83 static void (*workFunc)(void*);
84
85 // each thread can create objects but should assign
86 // globally-unique object ID's (oid) so have threads
87 // give out this as next id, then increment by number
88 // of threads to ensure disjoint oid sets
89 __thread int oid;
90
91 // global array of work-stealing deques, where
92 // each thread uses its ID as the index to its deque
93 deque* deques;
94
95
96
97 #ifdef RCR
98 #include "trqueue.h"
99 __thread struct trQueue * TRqueue=NULL;
100 #endif
101
102
103
104 // this is a read-by-all and write-by-one variable
105 // IT IS UNPROTECTED, BUT SAFE for all threads to
106 // read it (periodically, only when they can find no work)
107 // and only the worker that retires the main thread will
108 // write it to 1, at which time other workers will see
109 // that they should exit gracefully
110 static volatile int mainTaskRetired = FALSE;
111
112
113
114
115 void* workerMain(void* arg) {
116   void*       workUnit;
117   WorkerData* myData  = (WorkerData*) arg;
118   deque*      myDeque = &(deques[myData->id]);
119   int keepRunning = TRUE;
120   int haveWork;
121   int lastVictim = 0;
122   int i;
123
124   myWorkerID = myData->id;
125
126   // ensure that object ID's start at 1 so that using
127   // oid with value 0 indicates an invalid object
128   oid = myData->id + 1;
129
130   // each thread has a single semaphore that a running
131   // task should hand off to children threads it is
132   // going to stall on
133   psem_init(&runningSESEstallSem);
134
135   // the worker threads really have no context relevant to the
136   // user program, so build an empty garbage list struct to
137   // pass to the collector if collection occurs
138   struct garbagelist emptygarbagelist = { 0, NULL };
139
140   // Add this worker to the gc list
141   pthread_mutex_lock(&gclistlock);
142   threadcount++;
143   litem.prev = NULL;
144   litem.next = list;
145   if( list != NULL )
146     list->prev = &litem;
147   list = &litem;
148   pthread_mutex_unlock(&gclistlock);
149
150
151   // start timing events in this thread
152   CP_CREATE();
153
154
155   // then continue to process work
156   while( keepRunning ) {
157
158     // wait for work
159 #ifdef CP_EVENTID_WORKSCHEDGRAB
160     CP_LOGEVENT(CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN);
161 #endif
162
163     haveWork = FALSE;
164     while( !haveWork ) {
165
166       workUnit = dqPopBottom(myDeque);
167
168
169       if( workUnit != DQ_POP_EMPTY ) {
170         haveWork = TRUE;
171         goto dowork;
172       } else {
173         // try to steal from another queue, starting
174         // with the last successful victim, don't check
175         // your own deque
176         int mynumWorkSchedWorkers=numWorkSchedWorkers;
177         for( i = 0; i < mynumWorkSchedWorkers - 1; ++i ) {
178
179           workUnit = dqPopTop(&(deques[lastVictim]) );
180
181 #ifdef SQUEUE
182           if( workUnit != DQ_POP_EMPTY ) {
183 #else
184           if( workUnit != DQ_POP_ABORT &&
185               workUnit != DQ_POP_EMPTY ) {
186 #endif
187             // successful steal!
188             haveWork = TRUE;
189             goto dowork;
190           }
191
192           // choose next victim
193           lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) {
194             lastVictim = 0;
195           }
196
197           if( lastVictim == myWorkerID ) {
198             lastVictim++; if( lastVictim == mynumWorkSchedWorkers ) {
199               lastVictim = 0;
200             }
201           }
202         }
203         // end steal attempts
204
205
206         // if we successfully stole work, break out of the
207         // while-not-have-work loop, otherwise we looked
208         // everywhere, so drop down to "I'm idle" code below
209         if( haveWork ) {
210           goto dowork;
211         }
212       }
213
214       // if we drop down this far, we didn't find any work,
215       // so do a garbage collection, yield the processor,
216       // then check if the entire system is out of work
217       if( unlikely(needtocollect) ) {
218         checkcollect(&emptygarbagelist);
219       }
220
221       sched_yield();
222
223       if( mainTaskRetired ) {
224         keepRunning = FALSE;
225         break;
226       }
227
228     } // end the while-not-have-work loop
229
230 dowork:
231
232 #ifdef CP_EVENTID_WORKSCHEDGRAB
233     CP_LOGEVENT(CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END);
234 #endif
235
236     // when is no work left we will pop out
237     // here, so only do work if any left
238     if( haveWork ) {
239       // let GC see current work
240       litem.seseCommon = (void*)workUnit;
241
242 #ifdef DEBUG_DEQUE
243       if( workUnit == NULL ) {
244         printf("About to execute a null work item\n");
245       }
246 #endif
247
248       workFunc(workUnit);
249       litem.seseCommon = NULL;
250     }
251   }
252
253
254   CP_EXIT();
255
256
257   // remove from GC list
258   pthread_mutex_lock(&gclistlock);
259   threadcount--;
260   if( litem.prev == NULL ) {
261     list = litem.next;
262   } else {
263     litem.prev->next = litem.next;
264   }
265   if( litem.next != NULL ) {
266     litem.next->prev = litem.prev;
267   }
268   pthread_mutex_unlock(&gclistlock);
269
270
271   return NULL;
272 }
273
274
275 void workScheduleInit(int numProcessors,
276                       void (*func)(void*) ) {
277   int i, status;
278   pthread_attr_t attr;
279
280   // the original thread must call this now to
281   // protect memory allocation events coming
282   CP_CREATE();
283
284   // the original thread is a worker
285   myWorkerID = 0;
286   oid = 1;
287
288 #ifdef RCR
289   pthread_mutex_init(&queuelock,     NULL);
290 #endif
291   pthread_mutex_init(&gclock,     NULL);
292   pthread_mutex_init(&gclistlock, NULL);
293   pthread_cond_init(&gccond,     NULL);
294
295
296   numWorkSchedWorkers = numProcessors;
297   realnumWorkSchedWorkers=numProcessors;
298   oidIncrement=numProcessors;
299   while(1) {
300     int x=2;
301     //check primality
302     for(; x<oidIncrement; x++) {
303       //not prime
304       if (oidIncrement%x==0) {
305         oidIncrement++;
306         break;
307       }
308     }
309     //have prime
310     if (x==oidIncrement)
311       break;
312   }
313
314   workFunc = func;
315
316 #ifdef RCR
317   deques          = RUNMALLOC(sizeof( deque      )*numWorkSchedWorkers*2);
318 #else
319   deques          = RUNMALLOC(sizeof( deque      )*numWorkSchedWorkers);
320 #endif
321   workerDataArray = RUNMALLOC(sizeof( WorkerData )*numWorkSchedWorkers);
322
323 #ifdef RCR
324   for( i = 0; i < numWorkSchedWorkers*2; ++i ) {
325 #else
326   for( i = 0; i < numWorkSchedWorkers; ++i ) {
327 #endif
328     dqInit(&(deques[i]) );
329   }
330
331 #ifndef COREPIN
332
333   pthread_attr_init(&attr);
334   pthread_attr_setdetachstate(&attr,
335                               PTHREAD_CREATE_JOINABLE);
336
337   workerDataArray[0].id = 0;
338
339   for( i = 1; i < numWorkSchedWorkers; ++i ) {
340
341     workerDataArray[i].id = i;
342
343     status = pthread_create(&(workerDataArray[i].workerThread),
344                             &attr,
345                             workerMain,
346                             (void*) &(workerDataArray[i])
347                             );
348
349     if( status != 0 ) {
350       printf("Error\n"); exit(-1);
351     }
352   }
353 #else
354   int numCore=24;
355   cpu_set_t cpuset;
356   pthread_attr_t thread_attr[numWorkSchedWorkers];
357   int idx;
358
359   workerDataArray[0].id = 0;
360   CPU_ZERO(&cpuset);
361   CPU_SET(0, &cpuset);
362   sched_setaffinity(syscall(SYS_gettid), sizeof(cpuset), &cpuset);
363
364   for(idx=1; idx<numWorkSchedWorkers; idx++) {
365     int coreidx=idx%numCore;
366     pthread_attr_t* attr = &thread_attr[idx];
367     pthread_attr_init(attr);
368     pthread_attr_setdetachstate(attr, PTHREAD_CREATE_JOINABLE);
369     CPU_ZERO(&cpuset);
370     CPU_SET(coreidx, &cpuset);
371     pthread_attr_setaffinity_np(attr, sizeof(cpuset), &cpuset);
372
373     workerDataArray[idx].id = idx;
374
375     status = pthread_create(&(workerDataArray[idx].workerThread),
376                             attr,
377                             workerMain,
378                             (void*) &(workerDataArray[idx])
379                             );
380
381   }
382 #endif
383 }
384
385
386 void workScheduleSubmit(void* workUnit) {
387 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
388   CP_LOGEVENT(CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_BEGIN);
389 #endif
390   dqPushBottom(&(deques[myWorkerID]), workUnit);
391 #ifdef CP_EVENTID_WORKSCHEDSUBMIT
392   CP_LOGEVENT(CP_EVENTID_WORKSCHEDSUBMIT, CP_EVENTTYPE_END);
393 #endif
394 }
395
396
397 // really should be named "wait for work in system to complete"
398 void workScheduleBegin() {
399   int i;
400
401   // original thread becomes a worker
402   workerMain( (void*) &(workerDataArray[0]) );
403
404   // then wait for all other workers to exit gracefully
405   for( i = 1; i < realnumWorkSchedWorkers; ++i ) {
406     pthread_join(workerDataArray[i].workerThread, NULL);
407   }
408
409   // write all thread's events to disk
410   CP_DUMP();
411 }
412
413
414 // only the worker that executes and then retires
415 // the main task should invoke this, which indicates to
416 // all other workers they should exit gracefully
417 void workScheduleExit() {
418   mainTaskRetired = 1;
419 }