buildscript options to enable exaclty which coreprof events are needed, keeps instrum...
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 #include "mem.h"
6 #include "workschedule.h"
7 #include "mlp_runtime.h"
8 #include "coreprof/coreprof.h"
9
10 // NOTE: Converting this from a work-stealing strategy
11 // to a single-queue thread pool protected by a single
12 // lock.  This will not scale, but it will support
13 // development of the system for now
14
15
16
17 // for convenience
18 typedef struct Queue deq;
19
20 typedef struct workerData_t{
21   pthread_t workerThread;
22   int id;
23 } WorkerData;
24
25
26 static pthread_mutex_t systemLockIn;
27 static pthread_mutex_t systemLockOut;
28
29 // implementation internal data
30 static WorkerData*     workerDataArray;
31 static pthread_t*      workerArray;
32
33 static int systemStarted = 0;
34
35 static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
36 static void(*workFunc)(void*);
37
38 static pthread_cond_t  workAvailCond  = PTHREAD_COND_INITIALIZER;
39
40 int             numWorkers;
41
42 int threadcount;
43 pthread_mutex_t gclock;
44 pthread_mutex_t gclistlock;
45 pthread_cond_t gccond;
46
47 extern struct listitem * list;
48 extern __thread struct listitem litem;
49 extern __thread SESEcommon* seseCommon;
50
51 __thread int oid;
52
53
54
55 void workerExit( void* arg ) {
56   //printf( "Thread %d canceled.\n", pthread_self() );
57   CP_EXIT();
58 }
59
60
61
62 void* workerMain( void* arg ) {
63   void*       workUnit;
64   WorkerData* myData = (WorkerData*) arg;
65   int         oldState;
66   int         haveWork;
67
68   // once-per-thread stuff
69   CP_CREATE();
70
71   //pthread_cleanup_push( workerExit, NULL );  
72   
73   // ensure that object ID's start at 1 so that using
74   // oid with value 0 indicates an invalid object
75   oid = myData->id + 1;
76
77   //pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS, &oldState );
78   //pthread_setcancelstate( PTHREAD_CANCEL_ENABLE,       &oldState );
79
80   // then continue to process work
81   while( 1 ) {
82
83     // wait for work
84 #ifdef CP_EVENTID_WORKSCHEDGRAB
85     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
86 #endif
87     haveWork = FALSE;
88     while( !haveWork ) {
89       pthread_mutex_lock( &systemLockOut );
90       if( headqi->next == NULL ) {
91         pthread_mutex_unlock( &systemLockOut );
92         sched_yield();
93         continue;
94       } else {
95         haveWork = TRUE;
96       }
97     }
98     struct QI * tmp=headqi;
99     headqi = headqi->next;
100     workUnit = headqi->value;
101     pthread_mutex_unlock( &systemLockOut );
102     free( tmp );
103 #ifdef CP_EVENTID_WORKSCHEDGRAB
104     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
105 #endif
106     
107     pthread_mutex_lock(&gclistlock);
108     threadcount++;
109     litem.seseCommon=(void*)workUnit;
110     litem.prev=NULL;
111     litem.next=list;
112     if(list!=NULL)
113       list->prev=&litem;
114     list=&litem;
115     seseCommon=(SESEcommon*)workUnit;   
116     pthread_mutex_unlock(&gclistlock);
117
118     workFunc( workUnit );
119     
120     pthread_mutex_lock(&gclistlock);
121     threadcount--;
122     if (litem.prev==NULL) {
123       list=litem.next;
124     } else {
125       litem.prev->next=litem.next;
126     }
127     if (litem.next!=NULL) {
128       litem.next->prev=litem.prev;
129     }
130     pthread_mutex_unlock(&gclistlock);
131   }
132
133   //pthread_cleanup_pop( 0 );
134
135   return NULL;
136 }
137
138 void workScheduleInit( int numProcessors,
139                        void(*func)(void*) ) {
140   int i, status;
141
142   // the original thread must call this now to
143   // protect memory allocation events coming, but it
144   // will also add itself to the worker pool and therefore
145   // try to call it again, CP_CREATE should just ignore
146   // duplicate calls
147   CP_CREATE();
148
149   pthread_mutex_init(&gclock, NULL);
150   pthread_mutex_init(&gclistlock, NULL);
151   pthread_cond_init(&gccond, NULL);
152
153   numWorkers = numProcessors + 1;
154
155   workFunc   = func;
156
157   headqi=tailqi=RUNMALLOC(sizeof(struct QI));
158   headqi->next=NULL;
159   
160   status = pthread_mutex_init( &systemLockIn, NULL );
161   status = pthread_mutex_init( &systemLockOut, NULL );
162
163   // allocate space for one more--the original thread (running
164   // this code) will become a worker thread after setup
165   workerDataArray = RUNMALLOC( sizeof( WorkerData ) * (numWorkers+1) );
166
167   for( i = 0; i < numWorkers; ++i ) {
168
169     // the original thread is ID 1, start counting from there
170     workerDataArray[i].id = 2 + i;
171
172     status = pthread_create( &(workerDataArray[i].workerThread), 
173                              NULL,
174                              workerMain,
175                              (void*) &(workerDataArray[i])
176                            );
177
178     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
179
180     // yield and let all workers get to the begin
181     // condition variable, waiting--we have to hold them
182     // so they don't all see empty work queues right away
183     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
184   }
185 }
186
187 void workScheduleSubmit( void* workUnit ) {
188   struct QI* item=RUNMALLOC(sizeof(struct QI));
189   item->value=workUnit;
190   item->next=NULL;
191   
192   pthread_mutex_lock  ( &systemLockIn );
193   tailqi->next=item;
194   tailqi=item;
195   pthread_mutex_unlock( &systemLockIn );
196 }
197
198
199 // really should be named "add original thread as a worker"
200 void workScheduleBegin() {
201   int i;
202
203   // space was saved for the original thread to become a
204   // worker after setup is complete
205   workerDataArray[numWorkers].id           = 1;
206   workerDataArray[numWorkers].workerThread = pthread_self();
207   ++numWorkers;
208
209   workerMain( &(workerDataArray[numWorkers-1]) );
210 }
211
212
213 // the above function does NOT naturally join all the worker
214 // threads at exit, once the main SESE/Rblock/Task completes
215 // we know all worker threads are finished executing other
216 // tasks so we can explicitly kill the workers, and therefore
217 // trigger any worker-specific cleanup (like coreprof!)
218 void workScheduleExit() {
219   int i;
220
221   // This is not working well--canceled threads don't run their
222   // thread-level exit routines?  Anyway, its not critical for
223   // coreprof but if we ever need a per-worker exit routine to
224   // run we'll have to look back into this.
225
226   //printf( "Thread %d performing schedule exit.\n", pthread_self() );
227   //
228   //for( i = 0; i < numWorkers; ++i ) {   
229   //  if( pthread_self() != workerDataArray[i].workerThread ) {
230   //    pthread_cancel( workerDataArray[i].workerThread );      
231   //  }
232   //}
233   //
234   //// how to let all the threads actually get canceled?  
235   //sleep( 2 );
236 }