optimize dispatch a little more
authorbdemsky <bdemsky>
Tue, 26 Oct 2010 06:53:10 +0000 (06:53 +0000)
committerbdemsky <bdemsky>
Tue, 26 Oct 2010 06:53:10 +0000 (06:53 +0000)
Robust/src/IR/Flat/BuildCode.java
Robust/src/Runtime/Queue.c
Robust/src/Runtime/Queue.h
Robust/src/Runtime/garbage.c
Robust/src/Runtime/memPool.h
Robust/src/Runtime/mlp_lock.h
Robust/src/Runtime/mlp_runtime.c
Robust/src/Runtime/mlp_runtime.h
Robust/src/Runtime/workschedule.c

index b50fd27f7c3959b7c25cfc520f73c42beabfad29..bdaec07ac9aa688a3de714fa2751e99005cc0d90 100644 (file)
@@ -2487,6 +2487,7 @@ public class BuildCode {
     output.println("   ");
     output.println("   // code of this task's body should use this to access the running task record");
     output.println("   runningSESE = &(___params___->common);");
+    output.println("   childSESE = 0;");
     output.println("   ");
     
     // setup memory queue
@@ -2535,7 +2536,7 @@ public class BuildCode {
     // don't bother if the task never has children (a leaf task)
     output.println( "#ifndef OOO_DISABLE_TASKMEMPOOL" );
     if( !fsen.getIsLeafSESE() ) {
-      output.println("   runningSESE->taskRecordMemPool = poolcreate( "+
+      output.println("   runningSESE->taskRecordMemPool = taskpoolcreate( "+
                      maxTaskRecSizeStr+" );");
     } else {
       // make it clear we purposefully did not initialize this
@@ -3900,8 +3901,8 @@ public class BuildCode {
     // before doing anything, lock your own record and increment the running children
     if( (state.MLP     && fsen != mlpa.getMainSESE()) || 
         (state.OOOJAVA && fsen != oooa.getMainSESE())
-    ) {      
-      output.println("     atomic_inc(&(runningSESE->numRunningChildren));");
+    ) {
+       output.println("     childSESE++;");
     }
 
     // allocate the space for this record
@@ -3915,7 +3916,7 @@ public class BuildCode {
         ) {
       output.println("     "+
                      fsen.getSESErecordName()+"* seseToIssue = ("+
-                     fsen.getSESErecordName()+"*) poolalloc( runningSESE->taskRecordMemPool );");
+                     fsen.getSESErecordName()+"*) taskpoolalloc( runningSESE->taskRecordMemPool );");
     } else {
       output.println("     "+
                      fsen.getSESErecordName()+"* seseToIssue = ("+
@@ -3968,12 +3969,14 @@ public class BuildCode {
     // fill in common data
     output.println("     int localCount=0;");
     output.println("     seseToIssue->common.classID = "+fsen.getIdentifier()+";");
-    output.println("     seseToIssue->common.parentsStallSem = NULL;");
-    output.println("     seseToIssue->common.forwardList = createQueue();");
     output.println("     seseToIssue->common.unresolvedDependencies = 10000;");
+    output.println("     seseToIssue->common.parentsStallSem = NULL;");
+    output.println("     initQueue(&seseToIssue->common.forwardList);");
     output.println("     seseToIssue->common.doneExecuting = FALSE;");    
-    output.println("     pthread_cond_init( &(seseToIssue->common.runningChildrenCond), NULL );");
     output.println("     seseToIssue->common.numRunningChildren = 0;");
+    output.println( "#ifdef OOO_DISABLE_TASKMEMPOOL" );
+    output.println("     pthread_cond_init( &(seseToIssue->common.runningChildrenCond), NULL );");
+    output.println("#endif");
     output.println("     seseToIssue->common.parent = runningSESE;");
     // start with refCount = 2, one being the count that the child itself
     // will decrement when it retires, to say it is done using its own
@@ -4017,8 +4020,9 @@ public class BuildCode {
     
     // before potentially adding this SESE to other forwarding lists,
     // create it's lock
+    output.println( "#ifdef OOO_DISABLE_TASKMEMPOOL" );
     output.println("     pthread_mutex_init( &(seseToIssue->common.lock), NULL );");
-
+    output.println("#endif");
   
     if( (state.MLP && fsen != mlpa.getMainSESE()) ||
         (state.OOOJAVA && fsen != oooa.getMainSESE())    
@@ -4032,7 +4036,7 @@ public class BuildCode {
        output.println("       pthread_mutex_lock( &(src->lock) );");
         // FORWARD TODO
        output.println("       if( !src->doneExecuting ) {");
-        output.println("         addNewItem( src->forwardList, seseToIssue );");       
+        output.println("         addNewItem( &src->forwardList, seseToIssue );");      
        output.println("         ++(localCount);");
        output.println("       }");
         output.println("#ifndef OOO_DISABLE_TASKMEMPOOL" );
@@ -4064,10 +4068,10 @@ public class BuildCode {
 
         // FORWARD TODO
 
-       output.println("         if( isEmpty( src->forwardList ) ||");
-       output.println("             seseToIssue != peekItem( src->forwardList ) ) {");
+       output.println("         if( isEmpty( &src->forwardList ) ||");
+       output.println("             seseToIssue != peekItem( &src->forwardList ) ) {");
        output.println("           if( !src->doneExecuting ) {");
-       output.println("             addNewItem( src->forwardList, seseToIssue );");
+       output.println("             addNewItem( &src->forwardList, seseToIssue );");
        output.println("             ++(localCount);");
        output.println("           }");
        output.println("         }");
@@ -4490,6 +4494,7 @@ public class BuildCode {
     // this SESE cannot be done until all of its children are done
     // so grab your own lock with the condition variable for watching
     // that the number of your running children is greater than zero    
+    output.println("   atomic_add(childSESE, &runningSESE->numRunningChildren);");
     output.println("   pthread_mutex_lock( &(runningSESE->lock) );");
     output.println("   if( runningSESE->numRunningChildren > 0 ) {");
     output.println("     stopforgc( (struct garbagelist *)&___locals___ );");
@@ -4554,8 +4559,8 @@ public class BuildCode {
     // decrement dependency count for all SESE's on your forwarding list
 
     // FORWARD TODO
-    output.println("   while( !isEmpty( runningSESE->forwardList ) ) {");
-    output.println("     SESEcommon* consumer = (SESEcommon*) getItem( runningSESE->forwardList );");
+    output.println("   while( !isEmpty( &runningSESE->forwardList ) ) {");
+    output.println("     SESEcommon* consumer = (SESEcommon*) getItem( &runningSESE->forwardList );");
     
    
     output.println("     if(consumer->rentryIdx>0){");
index dfd6202fa9683b41de07576159d0820c2b5692d0..5afcfbd6867e3beb40b57d48917d2eca60b8be88 100644 (file)
@@ -16,6 +16,11 @@ struct Queue * createQueue() {
   return queue;
 }
 
+void initQueue(struct Queue * q) {
+  q->head=NULL;
+  q->tail=NULL;
+}
+
 void freeQueue(struct Queue * q) {
   RUNFREE(q);
 }
index 53cf33111682f094a0c1c5f8dfa60ae4388fe01c..5c86662d4d7a4c35bc5bd78912a53f8d770742fa 100644 (file)
@@ -15,6 +15,7 @@ struct QueueItem {
 
 #define isEmpty(x) ((x)->head==NULL)
 
+void initQueue(struct Queue *);
 struct Queue * createQueue();
 void freeQueue(struct Queue * q);
 
index dde8e84653e89f9665587e80473cce03209f22e0..75d8503281e564da5bca58e68183fd1dcbd55da2 100644 (file)
@@ -1051,7 +1051,7 @@ updateForwardList(struct Queue *forwardList, int prevUpdate){
       gl=gl->next;
     }    
     // iterate forwarding list of seseRec
-    struct Queue* fList=seseRec->forwardList;
+    struct Queue* fList=&seseRec->forwardList;
     updateForwardList(fList,prevUpdate);   
     fqItem=getNextQueueItem(fqItem);
   }   
index 3087881aee088c4b976aa3cc5fa130e39acf5c08..f2217c1791dc847bd2068fc00a81b4342beffbeb 100644 (file)
@@ -22,6 +22,8 @@
 #include "runtime.h"
 #include "mem.h"
 #include "mlp_lock.h"
+
+
 #define CACHELINESIZE 64
 
 
@@ -29,7 +31,6 @@ typedef struct MemPoolItem_t {
   void* next;
 } MemPoolItem;
 
-
 typedef struct MemPool_t {
   int itemSize;
   MemPoolItem* head;
@@ -95,8 +96,6 @@ static inline void poolfreeinto( MemPool* p, void* ptr ) {
   }
 }
 
-
-
 static inline void* poolalloc( MemPool* p ) {
 
   // to protect CAS in poolfree from dereferencing
index c5f6ac0b297c67a8fe759009b33fd192d5c4fe1a..d462534bf934c7c6ada01d29c03cfcb2bd20c1cf 100644 (file)
@@ -43,6 +43,13 @@ static inline int atomic_sub_and_test(int i, volatile int *v) {
   return c;
 }
 
+
+static inline void atomic_add(int i, volatile int *v) {
+  __asm__ __volatile__ (LOCK_PREFIX "addl %1,%0"
+                        : "+m" (*v)
+                        : "ir" (i));
+}
+
 static inline int LOCKXCHG32(volatile int* ptr, int val){
   int retval;
   //note: xchgl always implies lock 
index 75a296d2563c3c2d4ec83949698158ce032e1189..fb863e2b682c63b39d3e13e92fe0086325451e41 100644 (file)
@@ -11,6 +11,7 @@
 
 
 __thread SESEcommon* runningSESE;
+__thread int childSESE=0;
 
 __thread psemaphore runningSESEstallSem;
 
index b7657e45951701fbb9a1a7e9da6bc4641e0fe002..bf0bc137068be2d0a788cc7bc1eaf3c51e47a3eb 100644 (file)
@@ -56,8 +56,6 @@
 #define OBJPTRPTR_2_OBJTYPE( opp ) ((int*)(opp))[0]
 #define OBJPTRPTR_2_OBJOID(  opp ) ((int*)(opp))[1]
 
-
-
 // forwarding list elements is a linked
 // structure of arrays, should help task
 // dispatch because the first element is
@@ -71,7 +69,7 @@ typedef struct ForwardingListElement_t {
   INTPTR                          items[FLIST_ITEMS_PER_ELEMENT];
 } ForwardingListElement;
 
-
+struct MemPool_t;
 
 // these fields are common to any SESE, and casting the
 // generated SESE record to this can be used, because
@@ -84,56 +82,52 @@ typedef struct SESEcommon_t {
   // IMPORTANT: the class ID must be the first field of
   // the task record so task dispatch works correctly!
   int classID;
-
+  volatile int    unresolvedDependencies;
 
   // a parent waits on this semaphore when stalling on
   // this child, the child gives it at its SESE exit
   psemaphore* parentsStallSem;
 
   
-  // the lock guards the following data SESE's
-  // use to coordinate with one another
-  pthread_mutex_t lock;
-
   // NOTE: first element is embedded in the task
   // record, so don't free it!
   //ForwardingListElement forwardList;
-  struct Queue* forwardList;
-
-  volatile int    unresolvedDependencies;
+  struct Queue forwardList;
 
-  pthread_cond_t  doneCond;
   volatile int             doneExecuting;
-
-  pthread_cond_t  runningChildrenCond;
-  int             numRunningChildren;
+  volatile int             numRunningChildren;
 
   struct SESEcommon_t*   parent;
   
   int numMemoryQueue;
   int rentryIdx;
   int unresolvedRentryIdx;
+  volatile int refCount;
+  int numDependentSESErecords;
+  int offsetToDepSESErecords;
+  struct MemPool_t *     taskRecordMemPool;
+
   struct MemoryQueue_t** memoryQueueArray;
   struct REntry_t* rentryArray[NUMRENTRY];
   struct REntry_t* unresolvedRentryArray[NUMRENTRY];
 
-  int numDependentSESErecords;
-  int offsetToDepSESErecords;
+
 #ifdef RCR
   int offsetToParamRecords;
   volatile int rcrstatus;
   volatile int retired;
 #endif
 
-  // for determining when task records can be returned
-  // to the parent record's memory pool
-  MemPool*     taskRecordMemPool;
-  volatile int refCount;
+  // the lock guards the following data SESE's
+  // use to coordinate with one another
+  pthread_mutex_t lock;
+  pthread_cond_t  runningChildrenCond;
 } SESEcommon;
 
 // a thread-local var refers to the currently
 // running task
 extern __thread SESEcommon* runningSESE;
+extern __thread int childSESE;
 
 // there only needs to be one stall semaphore
 // per thread, just give a reference to it to
@@ -276,5 +270,61 @@ static inline void RELEASE_REFERENCE_TO( SESEcommon* seseRec ) {
   }
 }
 
+static MemPool* taskpoolcreate( int itemSize ) {
+  MemPool* p    = calloc( 1, sizeof( MemPool ) );
+  SESEcommon *c = (SESEcommon *) p;
+  pthread_cond_init( &(c->runningChildrenCond), NULL );
+  pthread_mutex_init( &(c->lock), NULL );
+
+  p->itemSize   = itemSize;
+  p->head       = calloc( 1, itemSize );
+  p->head->next = NULL;
+  p->tail       = p->head;
+  return p;
+}
+
+
+static inline void* taskpoolalloc( MemPool* p ) {
+
+  // to protect CAS in poolfree from dereferencing
+  // null, treat the queue as empty when there is
+  // only one item.  The dequeue operation is only
+  // executed by the thread that owns the pool, so
+  // it doesn't require an atomic op
+  MemPoolItem* headCurrent = p->head;
+  MemPoolItem* next=headCurrent->next;
+  int i;
+  if(next == NULL) {
+    // only one item, so don't take from pool
+    SESEcommon *c = (SESEcommon*) RUNMALLOC( p->itemSize );
+    pthread_cond_init( &(c->runningChildrenCond), NULL );
+    pthread_mutex_init( &(c->lock), NULL );
+    return c;
+  }
+  p->head = next;
+
+  //////////////////////////////////////////////////////////
+  //
+  //
+  //  static inline void prefetch(void *x) 
+  //  { 
+  //    asm volatile("prefetcht0 %0" :: "m" (*(unsigned long *)x));
+  //  } 
+  //
+  //
+  //  but this built-in gcc one seems the most portable:
+  //////////////////////////////////////////////////////////
+  //__builtin_prefetch( &(p->head->next) );
+  asm volatile( "prefetcht0 (%0)" :: "r" (next));
+  next=(MemPoolItem*)(((char *)next)+CACHELINESIZE);
+  asm volatile( "prefetcht0 (%0)" :: "r" (next));
+  next=(MemPoolItem*)(((char *)next)+CACHELINESIZE);
+  asm volatile( "prefetcht0 (%0)" :: "r" (next));
+  next=(MemPoolItem*)(((char *)next)+CACHELINESIZE);
+  asm volatile( "prefetcht0 (%0)" :: "r" (next));
+
+  return (void*)headCurrent;
+}
 
 #endif /* __MLP_RUNTIME__ */
index f828a5e82abe610803be44552024a7397a88d97c..24c452f740cab7be255a3d69fe97c92bc05dc16d 100644 (file)
@@ -335,11 +335,15 @@ void workScheduleInit( int numProcessors,
 void workScheduleSubmit( void* workUnit ) {
 
   if( myWorkerID == workerID_NOTAWORKER ) {
+    CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_BEGIN );
     dqPushBottom( &(deques[0]), workUnit );
+    CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_END );
     return;
   }
 
+    CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_BEGIN );
   dqPushBottom( &(deques[myWorkerID]), workUnit );
+    CP_LOGEVENT( CP_EVENTID_DEBUG_A, CP_EVENTTYPE_END );
 }