From 0eb2cfd5d50e08dd5ed4be28bd0720bd249df68d Mon Sep 17 00:00:00 2001 From: bdemsky Date: Tue, 26 Oct 2010 06:53:10 +0000 Subject: [PATCH] optimize dispatch a little more --- Robust/src/IR/Flat/BuildCode.java | 33 ++++++----- Robust/src/Runtime/Queue.c | 5 ++ Robust/src/Runtime/Queue.h | 1 + Robust/src/Runtime/garbage.c | 2 +- Robust/src/Runtime/memPool.h | 5 +- Robust/src/Runtime/mlp_lock.h | 7 +++ Robust/src/Runtime/mlp_runtime.c | 1 + Robust/src/Runtime/mlp_runtime.h | 92 ++++++++++++++++++++++++------- Robust/src/Runtime/workschedule.c | 4 ++ 9 files changed, 111 insertions(+), 39 deletions(-) diff --git a/Robust/src/IR/Flat/BuildCode.java b/Robust/src/IR/Flat/BuildCode.java index b50fd27f..bdaec07a 100644 --- a/Robust/src/IR/Flat/BuildCode.java +++ b/Robust/src/IR/Flat/BuildCode.java @@ -2487,6 +2487,7 @@ public class BuildCode { output.println(" "); output.println(" // code of this task's body should use this to access the running task record"); output.println(" runningSESE = &(___params___->common);"); + output.println(" childSESE = 0;"); output.println(" "); // setup memory queue @@ -2535,7 +2536,7 @@ public class BuildCode { // don't bother if the task never has children (a leaf task) output.println( "#ifndef OOO_DISABLE_TASKMEMPOOL" ); if( !fsen.getIsLeafSESE() ) { - output.println(" runningSESE->taskRecordMemPool = poolcreate( "+ + output.println(" runningSESE->taskRecordMemPool = taskpoolcreate( "+ maxTaskRecSizeStr+" );"); } else { // make it clear we purposefully did not initialize this @@ -3900,8 +3901,8 @@ public class BuildCode { // before doing anything, lock your own record and increment the running children if( (state.MLP && fsen != mlpa.getMainSESE()) || (state.OOOJAVA && fsen != oooa.getMainSESE()) - ) { - output.println(" atomic_inc(&(runningSESE->numRunningChildren));"); + ) { + output.println(" childSESE++;"); } // allocate the space for this record @@ -3915,7 +3916,7 @@ public class BuildCode { ) { output.println(" "+ fsen.getSESErecordName()+"* seseToIssue = ("+ - fsen.getSESErecordName()+"*) poolalloc( runningSESE->taskRecordMemPool );"); + fsen.getSESErecordName()+"*) taskpoolalloc( runningSESE->taskRecordMemPool );"); } else { output.println(" "+ fsen.getSESErecordName()+"* seseToIssue = ("+ @@ -3968,12 +3969,14 @@ public class BuildCode { // fill in common data output.println(" int localCount=0;"); output.println(" seseToIssue->common.classID = "+fsen.getIdentifier()+";"); - output.println(" seseToIssue->common.parentsStallSem = NULL;"); - output.println(" seseToIssue->common.forwardList = createQueue();"); output.println(" seseToIssue->common.unresolvedDependencies = 10000;"); + output.println(" seseToIssue->common.parentsStallSem = NULL;"); + output.println(" initQueue(&seseToIssue->common.forwardList);"); output.println(" seseToIssue->common.doneExecuting = FALSE;"); - output.println(" pthread_cond_init( &(seseToIssue->common.runningChildrenCond), NULL );"); output.println(" seseToIssue->common.numRunningChildren = 0;"); + output.println( "#ifdef OOO_DISABLE_TASKMEMPOOL" ); + output.println(" pthread_cond_init( &(seseToIssue->common.runningChildrenCond), NULL );"); + output.println("#endif"); output.println(" seseToIssue->common.parent = runningSESE;"); // start with refCount = 2, one being the count that the child itself // will decrement when it retires, to say it is done using its own @@ -4017,8 +4020,9 @@ public class BuildCode { // before potentially adding this SESE to other forwarding lists, // create it's lock + output.println( "#ifdef OOO_DISABLE_TASKMEMPOOL" ); output.println(" pthread_mutex_init( &(seseToIssue->common.lock), NULL );"); - + output.println("#endif"); if( (state.MLP && fsen != mlpa.getMainSESE()) || (state.OOOJAVA && fsen != oooa.getMainSESE()) @@ -4032,7 +4036,7 @@ public class BuildCode { output.println(" pthread_mutex_lock( &(src->lock) );"); // FORWARD TODO output.println(" if( !src->doneExecuting ) {"); - output.println(" addNewItem( src->forwardList, seseToIssue );"); + output.println(" addNewItem( &src->forwardList, seseToIssue );"); output.println(" ++(localCount);"); output.println(" }"); output.println("#ifndef OOO_DISABLE_TASKMEMPOOL" ); @@ -4064,10 +4068,10 @@ public class BuildCode { // FORWARD TODO - output.println(" if( isEmpty( src->forwardList ) ||"); - output.println(" seseToIssue != peekItem( src->forwardList ) ) {"); + output.println(" if( isEmpty( &src->forwardList ) ||"); + output.println(" seseToIssue != peekItem( &src->forwardList ) ) {"); output.println(" if( !src->doneExecuting ) {"); - output.println(" addNewItem( src->forwardList, seseToIssue );"); + output.println(" addNewItem( &src->forwardList, seseToIssue );"); output.println(" ++(localCount);"); output.println(" }"); output.println(" }"); @@ -4490,6 +4494,7 @@ public class BuildCode { // this SESE cannot be done until all of its children are done // so grab your own lock with the condition variable for watching // that the number of your running children is greater than zero + output.println(" atomic_add(childSESE, &runningSESE->numRunningChildren);"); output.println(" pthread_mutex_lock( &(runningSESE->lock) );"); output.println(" if( runningSESE->numRunningChildren > 0 ) {"); output.println(" stopforgc( (struct garbagelist *)&___locals___ );"); @@ -4554,8 +4559,8 @@ public class BuildCode { // decrement dependency count for all SESE's on your forwarding list // FORWARD TODO - output.println(" while( !isEmpty( runningSESE->forwardList ) ) {"); - output.println(" SESEcommon* consumer = (SESEcommon*) getItem( runningSESE->forwardList );"); + output.println(" while( !isEmpty( &runningSESE->forwardList ) ) {"); + output.println(" SESEcommon* consumer = (SESEcommon*) getItem( &runningSESE->forwardList );"); output.println(" if(consumer->rentryIdx>0){"); diff --git a/Robust/src/Runtime/Queue.c b/Robust/src/Runtime/Queue.c index dfd6202f..5afcfbd6 100644 --- a/Robust/src/Runtime/Queue.c +++ b/Robust/src/Runtime/Queue.c @@ -16,6 +16,11 @@ struct Queue * createQueue() { return queue; } +void initQueue(struct Queue * q) { + q->head=NULL; + q->tail=NULL; +} + void freeQueue(struct Queue * q) { RUNFREE(q); } diff --git a/Robust/src/Runtime/Queue.h b/Robust/src/Runtime/Queue.h index 53cf3311..5c86662d 100644 --- a/Robust/src/Runtime/Queue.h +++ b/Robust/src/Runtime/Queue.h @@ -15,6 +15,7 @@ struct QueueItem { #define isEmpty(x) ((x)->head==NULL) +void initQueue(struct Queue *); struct Queue * createQueue(); void freeQueue(struct Queue * q); diff --git a/Robust/src/Runtime/garbage.c b/Robust/src/Runtime/garbage.c index dde8e846..75d85032 100644 --- a/Robust/src/Runtime/garbage.c +++ b/Robust/src/Runtime/garbage.c @@ -1051,7 +1051,7 @@ updateForwardList(struct Queue *forwardList, int prevUpdate){ gl=gl->next; } // iterate forwarding list of seseRec - struct Queue* fList=seseRec->forwardList; + struct Queue* fList=&seseRec->forwardList; updateForwardList(fList,prevUpdate); fqItem=getNextQueueItem(fqItem); } diff --git a/Robust/src/Runtime/memPool.h b/Robust/src/Runtime/memPool.h index 3087881a..f2217c17 100644 --- a/Robust/src/Runtime/memPool.h +++ b/Robust/src/Runtime/memPool.h @@ -22,6 +22,8 @@ #include "runtime.h" #include "mem.h" #include "mlp_lock.h" + + #define CACHELINESIZE 64 @@ -29,7 +31,6 @@ typedef struct MemPoolItem_t { void* next; } MemPoolItem; - typedef struct MemPool_t { int itemSize; MemPoolItem* head; @@ -95,8 +96,6 @@ static inline void poolfreeinto( MemPool* p, void* ptr ) { } } - - static inline void* poolalloc( MemPool* p ) { // to protect CAS in poolfree from dereferencing diff --git a/Robust/src/Runtime/mlp_lock.h b/Robust/src/Runtime/mlp_lock.h index c5f6ac0b..d462534b 100644 --- a/Robust/src/Runtime/mlp_lock.h +++ b/Robust/src/Runtime/mlp_lock.h @@ -43,6 +43,13 @@ static inline int atomic_sub_and_test(int i, volatile int *v) { return c; } + +static inline void atomic_add(int i, volatile int *v) { + __asm__ __volatile__ (LOCK_PREFIX "addl %1,%0" + : "+m" (*v) + : "ir" (i)); +} + static inline int LOCKXCHG32(volatile int* ptr, int val){ int retval; //note: xchgl always implies lock diff --git a/Robust/src/Runtime/mlp_runtime.c b/Robust/src/Runtime/mlp_runtime.c index 75a296d2..fb863e2b 100644 --- a/Robust/src/Runtime/mlp_runtime.c +++ b/Robust/src/Runtime/mlp_runtime.c @@ -11,6 +11,7 @@ __thread SESEcommon* runningSESE; +__thread int childSESE=0; __thread psemaphore runningSESEstallSem; diff --git a/Robust/src/Runtime/mlp_runtime.h b/Robust/src/Runtime/mlp_runtime.h index b7657e45..bf0bc137 100644 --- a/Robust/src/Runtime/mlp_runtime.h +++ b/Robust/src/Runtime/mlp_runtime.h @@ -56,8 +56,6 @@ #define OBJPTRPTR_2_OBJTYPE( opp ) ((int*)(opp))[0] #define OBJPTRPTR_2_OBJOID( opp ) ((int*)(opp))[1] - - // forwarding list elements is a linked // structure of arrays, should help task // dispatch because the first element is @@ -71,7 +69,7 @@ typedef struct ForwardingListElement_t { INTPTR items[FLIST_ITEMS_PER_ELEMENT]; } ForwardingListElement; - +struct MemPool_t; // these fields are common to any SESE, and casting the // generated SESE record to this can be used, because @@ -84,56 +82,52 @@ typedef struct SESEcommon_t { // IMPORTANT: the class ID must be the first field of // the task record so task dispatch works correctly! int classID; - + volatile int unresolvedDependencies; // a parent waits on this semaphore when stalling on // this child, the child gives it at its SESE exit psemaphore* parentsStallSem; - // the lock guards the following data SESE's - // use to coordinate with one another - pthread_mutex_t lock; - // NOTE: first element is embedded in the task // record, so don't free it! //ForwardingListElement forwardList; - struct Queue* forwardList; - - volatile int unresolvedDependencies; + struct Queue forwardList; - pthread_cond_t doneCond; volatile int doneExecuting; - - pthread_cond_t runningChildrenCond; - int numRunningChildren; + volatile int numRunningChildren; struct SESEcommon_t* parent; int numMemoryQueue; int rentryIdx; int unresolvedRentryIdx; + volatile int refCount; + int numDependentSESErecords; + int offsetToDepSESErecords; + struct MemPool_t * taskRecordMemPool; + struct MemoryQueue_t** memoryQueueArray; struct REntry_t* rentryArray[NUMRENTRY]; struct REntry_t* unresolvedRentryArray[NUMRENTRY]; - int numDependentSESErecords; - int offsetToDepSESErecords; + #ifdef RCR int offsetToParamRecords; volatile int rcrstatus; volatile int retired; #endif - // for determining when task records can be returned - // to the parent record's memory pool - MemPool* taskRecordMemPool; - volatile int refCount; + // the lock guards the following data SESE's + // use to coordinate with one another + pthread_mutex_t lock; + pthread_cond_t runningChildrenCond; } SESEcommon; // a thread-local var refers to the currently // running task extern __thread SESEcommon* runningSESE; +extern __thread int childSESE; // there only needs to be one stall semaphore // per thread, just give a reference to it to @@ -276,5 +270,61 @@ static inline void RELEASE_REFERENCE_TO( SESEcommon* seseRec ) { } } +static MemPool* taskpoolcreate( int itemSize ) { + MemPool* p = calloc( 1, sizeof( MemPool ) ); + SESEcommon *c = (SESEcommon *) p; + pthread_cond_init( &(c->runningChildrenCond), NULL ); + pthread_mutex_init( &(c->lock), NULL ); + + p->itemSize = itemSize; + p->head = calloc( 1, itemSize ); + p->head->next = NULL; + p->tail = p->head; + return p; +} + + +static inline void* taskpoolalloc( MemPool* p ) { + + // to protect CAS in poolfree from dereferencing + // null, treat the queue as empty when there is + // only one item. The dequeue operation is only + // executed by the thread that owns the pool, so + // it doesn't require an atomic op + MemPoolItem* headCurrent = p->head; + MemPoolItem* next=headCurrent->next; + int i; + if(next == NULL) { + // only one item, so don't take from pool + SESEcommon *c = (SESEcommon*) RUNMALLOC( p->itemSize ); + pthread_cond_init( &(c->runningChildrenCond), NULL ); + pthread_mutex_init( &(c->lock), NULL ); + return c; + } + + p->head = next; + + ////////////////////////////////////////////////////////// + // + // + // static inline void prefetch(void *x) + // { + // asm volatile("prefetcht0 %0" :: "m" (*(unsigned long *)x)); + // } + // + // + // but this built-in gcc one seems the most portable: + ////////////////////////////////////////////////////////// + //__builtin_prefetch( &(p->head->next) ); + asm volatile( "prefetcht0 (%0)" :: "r" (next)); + next=(MemPoolItem*)(((char *)next)+CACHELINESIZE); + asm volatile( "prefetcht0 (%0)" :: "r" (next)); + next=(MemPoolItem*)(((char *)next)+CACHELINESIZE); + asm volatile( "prefetcht0 (%0)" :: "r" (next)); + next=(MemPoolItem*)(((char *)next)+CACHELINESIZE); + asm volatile( "prefetcht0 (%0)" :: "r" (next)); + + return (void*)headCurrent; +} #endif /* __MLP_RUNTIME__ */ diff --git a/Robust/src/Runtime/workschedule.c b/Robust/src/Runtime/workschedule.c index f828a5e8..24c452f7 100644 --- a/Robust/src/Runtime/workschedule.c +++ b/Robust/src/Runtime/workschedule.c @@ -335,11 +335,15 @@ void workScheduleInit( int numProcessors, void workScheduleSubmit( void* workUnit ) { if( myWorkerID == workerID_NOTAWORKER ) { + CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_BEGIN ); dqPushBottom( &(deques[0]), workUnit ); + CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_END ); return; } + CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_BEGIN ); dqPushBottom( &(deques[myWorkerID]), workUnit ); + CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_END ); } -- 2.34.1