projects
/
IRC.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
a better workscheduler--still has a deficiency because it does not dynamically create...
[IRC.git]
/
Robust
/
src
/
Runtime
/
workschedule.c
diff --git
a/Robust/src/Runtime/workschedule.c
b/Robust/src/Runtime/workschedule.c
index 659280717815a458c1c7a80e7fc9aa626fd720be..5b24b68d1769d46b4526ff948aacc88604a8c107 100644
(file)
--- a/
Robust/src/Runtime/workschedule.c
+++ b/
Robust/src/Runtime/workschedule.c
@@
-28,8
+28,12
@@
typedef struct workerData_t {
int nextWorkerToLoad;
} workerData;
*/
int nextWorkerToLoad;
} workerData;
*/
+
+
+static pthread_mutex_t systemLock;
+
// just one queue for everyone
// just one queue for everyone
-static pthread_mutex_t dequeLock;
+
//
static pthread_mutex_t dequeLock;
static deq* dequeWorkUnits;
static deq* dequeWorkUnits;
@@
-38,12
+42,18
@@
static deq* dequeWorkUnits;
static int numWorkers;
//static workerData* workerDataArray;
static pthread_t* workerArray;
static int numWorkers;
//static workerData* workerDataArray;
static pthread_t* workerArray;
-static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER;
+
+static int systemStarted = 0;
+
+//static pthread_mutex_t systemBeginLock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
-static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER;
+
//
static pthread_mutex_t systemReturnLock = PTHREAD_MUTEX_INITIALIZER;
+
//
static pthread_cond_t systemReturnCond = PTHREAD_COND_INITIALIZER;
static void(*workFunc)(void*);
static void(*workFunc)(void*);
+static pthread_cond_t workAvailCond = PTHREAD_COND_INITIALIZER;
+
+
/*
// helper func
/*
// helper func
@@
-144,35
+154,29
@@
void* workerMain( void* arg ) {
void* workUnit;
void* workUnit;
- int tries = 3;
-
// all workers wait until system is ready
// all workers wait until system is ready
- pthread_mutex_lock ( &systemBeginLock );
- pthread_cond_wait ( &systemBeginCond, &systemBeginLock );
- pthread_mutex_unlock( &systemBeginLock );
-
- while( tries > 0 ) {
-
- pthread_mutex_lock( &dequeLock );
+ pthread_mutex_lock ( &systemLock );
+ while( !systemStarted ) {
+ pthread_cond_wait( &systemBeginCond, &systemLock );
+ }
+ pthread_mutex_unlock( &systemLock );
- // look in the queue for work
- if( !isEmpty( dequeWorkUnits ) ) {
- workUnit = getItem( dequeWorkUnits );
- } else {
- workUnit = NULL;
- }
+ while( 1 ) {
- pthread_mutex_unlock( &dequeLock );
+ pthread_mutex_lock( &systemLock );
+ // wait for work
+ while( isEmpty( dequeWorkUnits ) ) {
+ pthread_cond_wait( &workAvailCond, &systemLock );
+ }
+ workUnit = getItem( dequeWorkUnits );
+ pthread_mutex_unlock( &systemLock );
- // yield processor before moving on
+ // yield processor before moving on, just to exercise
+ // system's out-of-order correctness
if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
-
- if( workUnit != NULL ) {
- workFunc( workUnit );
- tries = 3;
- } else {
- --tries;
- }
+ if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
+
+ workFunc( workUnit );
}
return NULL;
}
return NULL;
@@
-236,7
+240,7
@@
void workScheduleInit( int numProcessors,
dequeWorkUnits = createQueue();
dequeWorkUnits = createQueue();
- status = pthread_mutex_init( &
deque
Lock, NULL );
+ status = pthread_mutex_init( &
system
Lock, NULL );
if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
workerArray = RUNMALLOC( sizeof( pthread_t ) * numWorkers );
@@
-275,9
+279,10
@@
void workScheduleSubmit( void* workUnit ) {
*/
void workScheduleSubmit( void* workUnit ) {
*/
void workScheduleSubmit( void* workUnit ) {
- pthread_mutex_lock ( &
deque
Lock );
+ pthread_mutex_lock ( &
system
Lock );
addNewItemBack ( dequeWorkUnits, workUnit );
addNewItemBack ( dequeWorkUnits, workUnit );
- pthread_mutex_unlock( &dequeLock );
+ pthread_cond_signal( &workAvailCond );
+ pthread_mutex_unlock( &systemLock );
}
}
@@
-287,9
+292,10
@@
void workScheduleBegin() {
int i;
// tell all workers to begin
int i;
// tell all workers to begin
- pthread_mutex_lock ( &systemBeginLock );
+ pthread_mutex_lock ( &systemLock );
+ systemStarted = 1;
pthread_cond_broadcast( &systemBeginCond );
pthread_cond_broadcast( &systemBeginCond );
- pthread_mutex_unlock ( &system
Begin
Lock );
+ pthread_mutex_unlock ( &systemLock );
for( i = 0; i < numWorkers; ++i ) {
pthread_join( workerArray[i], NULL );
for( i = 0; i < numWorkers; ++i ) {
pthread_join( workerArray[i], NULL );