6 #include "workschedule.h"
7 #include "mlp_runtime.h"
8 #include "coreprof/coreprof.h"
10 #include "rcr_runtime.h"
13 // NOTE: Converting this from a work-stealing strategy
14 // to a single-queue thread pool protected by a single
15 // lock. This will not scale, but it will support
16 // development of the system for now
21 typedef struct Queue deq;
23 typedef struct workerData_t{
24 pthread_t workerThread;
29 static pthread_mutex_t systemLockIn;
30 static pthread_mutex_t systemLockOut;
32 // implementation internal data
33 static WorkerData* workerDataArray;
34 static pthread_t* workerArray;
36 static int systemStarted = 0;
38 static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
39 static void(*workFunc)(void*);
41 static pthread_cond_t workAvailCond = PTHREAD_COND_INITIALIZER;
46 pthread_mutex_t gclock;
47 pthread_mutex_t gclistlock;
48 pthread_cond_t gccond;
50 extern struct listitem * list;
51 extern __thread struct listitem litem;
52 extern __thread SESEcommon* seseCommon;
58 __thread struct trqueue * TRqueue;
62 void workerExit( void* arg ) {
63 //printf( "Thread %d canceled.\n", pthread_self() );
67 void* workerMain( void* arg ) {
69 WorkerData* myData = (WorkerData*) arg;
73 // once-per-thread stuff
76 //pthread_cleanup_push( workerExit, NULL );
78 // ensure that object ID's start at 1 so that using
79 // oid with value 0 indicates an invalid object
83 //allocate task record queue
86 pthread_attr_init(&nattr);
87 pthread_attr_setdetachstate(&nattr, PTHREAD_CREATE_DETACHED);
89 int status = pthread_create( &(workerDataArray[i].workerThread),
93 pthread_attr_destroy(&nattr);
94 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
97 //pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS, &oldState );
98 //pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, &oldState );
100 // then continue to process work
104 #ifdef CP_EVENTID_WORKSCHEDGRAB
105 CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
109 pthread_mutex_lock( &systemLockOut );
110 if( headqi->next == NULL ) {
111 pthread_mutex_unlock( &systemLockOut );
118 struct QI * tmp=headqi;
119 headqi = headqi->next;
120 workUnit = headqi->value;
121 pthread_mutex_unlock( &systemLockOut );
123 #ifdef CP_EVENTID_WORKSCHEDGRAB
124 CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
127 pthread_mutex_lock(&gclistlock);
129 litem.seseCommon=(void*)workUnit;
135 seseCommon=(SESEcommon*)workUnit;
136 pthread_mutex_unlock(&gclistlock);
138 workFunc( workUnit );
140 pthread_mutex_lock(&gclistlock);
142 if (litem.prev==NULL) {
145 litem.prev->next=litem.next;
147 if (litem.next!=NULL) {
148 litem.next->prev=litem.prev;
150 pthread_mutex_unlock(&gclistlock);
153 //pthread_cleanup_pop( 0 );
158 void workScheduleInit( int numProcessors,
159 void(*func)(void*) ) {
162 // the original thread must call this now to
163 // protect memory allocation events coming, but it
164 // will also add itself to the worker pool and therefore
165 // try to call it again, CP_CREATE should just ignore
169 pthread_mutex_init(&gclock, NULL);
170 pthread_mutex_init(&gclistlock, NULL);
171 pthread_cond_init(&gccond, NULL);
173 numWorkers = numProcessors + 1;
177 headqi=tailqi=RUNMALLOC(sizeof(struct QI));
180 status = pthread_mutex_init( &systemLockIn, NULL );
181 status = pthread_mutex_init( &systemLockOut, NULL );
183 // allocate space for one more--the original thread (running
184 // this code) will become a worker thread after setup
185 workerDataArray = RUNMALLOC( sizeof( WorkerData ) * (numWorkers+1) );
187 for( i = 0; i < numWorkers; ++i ) {
189 // the original thread is ID 1, start counting from there
190 workerDataArray[i].id = 2 + i;
192 status = pthread_create( &(workerDataArray[i].workerThread),
195 (void*) &(workerDataArray[i])
198 if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
200 // yield and let all workers get to the begin
201 // condition variable, waiting--we have to hold them
202 // so they don't all see empty work queues right away
203 if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
207 void workScheduleSubmit( void* workUnit ) {
208 struct QI* item=RUNMALLOC(sizeof(struct QI));
209 item->value=workUnit;
212 pthread_mutex_lock ( &systemLockIn );
215 pthread_mutex_unlock( &systemLockIn );
219 // really should be named "add original thread as a worker"
220 void workScheduleBegin() {
223 // space was saved for the original thread to become a
224 // worker after setup is complete
225 workerDataArray[numWorkers].id = 1;
226 workerDataArray[numWorkers].workerThread = pthread_self();
229 workerMain( &(workerDataArray[numWorkers-1]) );
233 // the above function does NOT naturally join all the worker
234 // threads at exit, once the main SESE/Rblock/Task completes
235 // we know all worker threads are finished executing other
236 // tasks so we can explicitly kill the workers, and therefore
237 // trigger any worker-specific cleanup (like coreprof!)
238 void workScheduleExit() {
241 // This is not working well--canceled threads don't run their
242 // thread-level exit routines? Anyway, its not critical for
243 // coreprof but if we ever need a per-worker exit routine to
244 // run we'll have to look back into this.
246 //printf( "Thread %d performing schedule exit.\n", pthread_self() );
248 //for( i = 0; i < numWorkers; ++i ) {
249 // if( pthread_self() != workerDataArray[i].workerThread ) {
250 // pthread_cancel( workerDataArray[i].workerThread );
254 //// how to let all the threads actually get canceled?