X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=Robust%2Fsrc%2FRuntime%2Fworkschedule.c;h=5b24b68d1769d46b4526ff948aacc88604a8c107;hb=296a232a015befc68bef08897bd7d3e45f1aca4f;hp=659280717815a458c1c7a80e7fc9aa626fd720be;hpb=edae4bedef87b02fa16acc12b68fc4727d90a6a4;p=IRC.git diff --git a/Robust/src/Runtime/workschedule.c b/Robust/src/Runtime/workschedule.c index 65928071..5b24b68d 100644 --- a/Robust/src/Runtime/workschedule.c +++ b/Robust/src/Runtime/workschedule.c @@ -28,8 +28,12 @@ typedef struct workerData_t { int nextWorkerToLoad; } workerData; */ + + +static pthread_mutex_t systemLock; + // just one queue for everyone -static pthread_mutex_t dequeLock; +//static pthread_mutex_t dequeLock; static deq* dequeWorkUnits; @@ -38,12 +42,18 @@ static deq* dequeWorkUnits; static int numWorkers; //static workerData* workerDataArray; static pthread_t* workerArray; -static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER; + +static int systemStarted = 0; + +//static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER; -static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER; +//static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER; +//static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER; static void(*workFunc)(void*); +static pthread_cond_t workAvailCond = PTHREAD_COND_INITIALIZER; + + /* // helper func @@ -144,35 +154,29 @@ void* workerMain( void* arg ) { void* workUnit; - int tries = 3; - // all workers wait until system is ready - pthread_mutex_lock ( &systemBeginLock ); - pthread_cond_wait ( &systemBeginCond, &systemBeginLock ); - pthread_mutex_unlock( &systemBeginLock ); - - while( tries > 0 ) { - - pthread_mutex_lock( &dequeLock ); + pthread_mutex_lock ( &systemLock ); + while( !systemStarted ) { + pthread_cond_wait( &systemBeginCond, &systemLock ); + } + pthread_mutex_unlock( &systemLock ); - // look in the queue for work - if( !isEmpty( dequeWorkUnits ) ) { - workUnit = getItem( dequeWorkUnits ); - } else { - workUnit = NULL; - } + while( 1 ) { - pthread_mutex_unlock( &dequeLock ); + pthread_mutex_lock( &systemLock ); + // wait for work + while( isEmpty( dequeWorkUnits ) ) { + pthread_cond_wait( &workAvailCond, &systemLock ); + } + workUnit = getItem( dequeWorkUnits ); + pthread_mutex_unlock( &systemLock ); - // yield processor before moving on + // yield processor before moving on, just to exercise + // system's out-of-order correctness if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } - - if( workUnit != NULL ) { - workFunc( workUnit ); - tries = 3; - } else { - --tries; - } + if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); } + + workFunc( workUnit ); } return NULL; @@ -236,7 +240,7 @@ void workScheduleInit( int numProcessors, dequeWorkUnits = createQueue(); - status = pthread_mutex_init( &dequeLock, NULL ); + status = pthread_mutex_init( &systemLock, NULL ); if( status != 0 ) { printf( "Error\n" ); exit( -1 ); } workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers ); @@ -275,9 +279,10 @@ void workScheduleSubmit( void* workUnit ) { */ void workScheduleSubmit( void* workUnit ) { - pthread_mutex_lock ( &dequeLock ); + pthread_mutex_lock ( &systemLock ); addNewItemBack ( dequeWorkUnits, workUnit ); - pthread_mutex_unlock( &dequeLock ); + pthread_cond_signal( &workAvailCond ); + pthread_mutex_unlock( &systemLock ); } @@ -287,9 +292,10 @@ void workScheduleBegin() { int i; // tell all workers to begin - pthread_mutex_lock ( &systemBeginLock ); + pthread_mutex_lock ( &systemLock ); + systemStarted = 1; pthread_cond_broadcast( &systemBeginCond ); - pthread_mutex_unlock ( &systemBeginLock ); + pthread_mutex_unlock ( &systemLock ); for( i = 0; i < numWorkers; ++i ) { pthread_join( workerArray[i], NULL );