changes...plus a little optimization
[IRC.git] / Robust / src / Runtime / workschedule.c
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <pthread.h>
4
5 #include "mem.h"
6 #include "workschedule.h"
7 #include "mlp_runtime.h"
8 #include "coreprof/coreprof.h"
9 #ifdef RCR
10 #include "rcr_runtime.h"
11 #endif
12
13 // NOTE: Converting this from a work-stealing strategy
14 // to a single-queue thread pool protected by a single
15 // lock.  This will not scale, but it will support
16 // development of the system for now
17
18
19
20 // for convenience
21 typedef struct Queue deq;
22
23 typedef struct workerData_t{
24   pthread_t workerThread;
25   int id;
26 } WorkerData;
27
28
29 static pthread_mutex_t systemLockIn;
30 static pthread_mutex_t systemLockOut;
31
32 // implementation internal data
33 static WorkerData*     workerDataArray;
34 static pthread_t*      workerArray;
35
36 static int systemStarted = 0;
37
38 static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
39 static void(*workFunc)(void*);
40
41 static pthread_cond_t  workAvailCond  = PTHREAD_COND_INITIALIZER;
42
43 int             numWorkers;
44
45 int threadcount;
46 pthread_mutex_t gclock;
47 pthread_mutex_t gclistlock;
48 pthread_cond_t gccond;
49
50 extern struct listitem * list;
51 extern __thread struct listitem litem;
52 extern __thread SESEcommon* seseCommon;
53
54 __thread int oid;
55
56 #ifdef RCR
57 #include "trqueue.h"
58 __thread struct trqueue * TRqueue;
59 #endif
60
61
62 void workerExit( void* arg ) {
63   //printf( "Thread %d canceled.\n", pthread_self() );
64   CP_EXIT();
65 }
66
67 void* workerMain( void* arg ) {
68   void*       workUnit;
69   WorkerData* myData = (WorkerData*) arg;
70   int         oldState;
71   int         haveWork;
72
73   // once-per-thread stuff
74   CP_CREATE();
75
76   //pthread_cleanup_push( workerExit, NULL );  
77   
78   // ensure that object ID's start at 1 so that using
79   // oid with value 0 indicates an invalid object
80   oid = myData->id + 1;
81
82 #ifdef RCR
83   //allocate task record queue
84   pthread_t thread;
85   pthread_attr_t nattr;  
86   pthread_attr_init(&nattr);
87   pthread_attr_setdetachstate(&nattr, PTHREAD_CREATE_DETACHED);
88   TRqueue=allocTR();
89   int status = pthread_create( &(workerDataArray[i].workerThread), 
90                                NULL,
91                                workerTR,
92                                (void*) TRqueue);
93   pthread_attr_destroy(&nattr);
94   if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
95 #endif
96
97   //pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS, &oldState );
98   //pthread_setcancelstate( PTHREAD_CANCEL_ENABLE,       &oldState );
99
100   // then continue to process work
101   while( 1 ) {
102
103     // wait for work
104 #ifdef CP_EVENTID_WORKSCHEDGRAB
105     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
106 #endif
107     haveWork = FALSE;
108     while( !haveWork ) {
109       pthread_mutex_lock( &systemLockOut );
110       if( headqi->next == NULL ) {
111         pthread_mutex_unlock( &systemLockOut );
112         sched_yield();
113         continue;
114       } else {
115         haveWork = TRUE;
116       }
117     }
118     struct QI * tmp=headqi;
119     headqi = headqi->next;
120     workUnit = headqi->value;
121     pthread_mutex_unlock( &systemLockOut );
122     free( tmp );
123 #ifdef CP_EVENTID_WORKSCHEDGRAB
124     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
125 #endif
126     
127     pthread_mutex_lock(&gclistlock);
128     threadcount++;
129     litem.seseCommon=(void*)workUnit;
130     litem.prev=NULL;
131     litem.next=list;
132     if(list!=NULL)
133       list->prev=&litem;
134     list=&litem;
135     seseCommon=(SESEcommon*)workUnit;   
136     pthread_mutex_unlock(&gclistlock);
137
138     workFunc( workUnit );
139     
140     pthread_mutex_lock(&gclistlock);
141     threadcount--;
142     if (litem.prev==NULL) {
143       list=litem.next;
144     } else {
145       litem.prev->next=litem.next;
146     }
147     if (litem.next!=NULL) {
148       litem.next->prev=litem.prev;
149     }
150     pthread_mutex_unlock(&gclistlock);
151   }
152
153   //pthread_cleanup_pop( 0 );
154
155   return NULL;
156 }
157
158 void workScheduleInit( int numProcessors,
159                        void(*func)(void*) ) {
160   int i, status;
161
162   // the original thread must call this now to
163   // protect memory allocation events coming, but it
164   // will also add itself to the worker pool and therefore
165   // try to call it again, CP_CREATE should just ignore
166   // duplicate calls
167   CP_CREATE();
168
169   pthread_mutex_init(&gclock, NULL);
170   pthread_mutex_init(&gclistlock, NULL);
171   pthread_cond_init(&gccond, NULL);
172
173   numWorkers = numProcessors + 1;
174
175   workFunc   = func;
176
177   headqi=tailqi=RUNMALLOC(sizeof(struct QI));
178   headqi->next=NULL;
179   
180   status = pthread_mutex_init( &systemLockIn, NULL );
181   status = pthread_mutex_init( &systemLockOut, NULL );
182
183   // allocate space for one more--the original thread (running
184   // this code) will become a worker thread after setup
185   workerDataArray = RUNMALLOC( sizeof( WorkerData ) * (numWorkers+1) );
186
187   for( i = 0; i < numWorkers; ++i ) {
188
189     // the original thread is ID 1, start counting from there
190     workerDataArray[i].id = 2 + i;
191
192     status = pthread_create( &(workerDataArray[i].workerThread), 
193                              NULL,
194                              workerMain,
195                              (void*) &(workerDataArray[i])
196                            );
197
198     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
199
200     // yield and let all workers get to the begin
201     // condition variable, waiting--we have to hold them
202     // so they don't all see empty work queues right away
203     if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
204   }
205 }
206
207 void workScheduleSubmit( void* workUnit ) {
208   struct QI* item=RUNMALLOC(sizeof(struct QI));
209   item->value=workUnit;
210   item->next=NULL;
211   
212   pthread_mutex_lock  ( &systemLockIn );
213   tailqi->next=item;
214   tailqi=item;
215   pthread_mutex_unlock( &systemLockIn );
216 }
217
218
219 // really should be named "add original thread as a worker"
220 void workScheduleBegin() {
221   int i;
222
223   // space was saved for the original thread to become a
224   // worker after setup is complete
225   workerDataArray[numWorkers].id           = 1;
226   workerDataArray[numWorkers].workerThread = pthread_self();
227   ++numWorkers;
228
229   workerMain( &(workerDataArray[numWorkers-1]) );
230 }
231
232
233 // the above function does NOT naturally join all the worker
234 // threads at exit, once the main SESE/Rblock/Task completes
235 // we know all worker threads are finished executing other
236 // tasks so we can explicitly kill the workers, and therefore
237 // trigger any worker-specific cleanup (like coreprof!)
238 void workScheduleExit() {
239   int i;
240
241   // This is not working well--canceled threads don't run their
242   // thread-level exit routines?  Anyway, its not critical for
243   // coreprof but if we ever need a per-worker exit routine to
244   // run we'll have to look back into this.
245
246   //printf( "Thread %d performing schedule exit.\n", pthread_self() );
247   //
248   //for( i = 0; i < numWorkers; ++i ) {   
249   //  if( pthread_self() != workerDataArray[i].workerThread ) {
250   //    pthread_cancel( workerDataArray[i].workerThread );      
251   //  }
252   //}
253   //
254   //// how to let all the threads actually get canceled?  
255   //sleep( 2 );
256 }