Changing OoOJava work scheduler to true work stealing, garbage collection can walk...
authorjjenista <jjenista>
Mon, 25 Oct 2010 20:09:31 +0000 (20:09 +0000)
committerjjenista <jjenista>
Mon, 25 Oct 2010 20:09:31 +0000 (20:09 +0000)
Robust/src/IR/Flat/BuildCode.java
Robust/src/Runtime/deque.c
Robust/src/Runtime/deque.h
Robust/src/Runtime/garbage.c
Robust/src/Runtime/workschedule.c
Robust/src/Runtime/workschedule.h

index e14ff60a4c7b607e712de8ad4eb04b34c11e83e2..09352df46bdbf30312244d2763c4f85db4a8f85f 100644 (file)
@@ -1251,7 +1251,7 @@ public class BuildCode {
     outclassdefs.print("int numprefetchsites = " + pa.prefetchsiteid + ";\n");
     if(this.state.MLP || state.OOOJAVA ){
        outclassdefs.print("extern __thread int oid;\n");
-       outclassdefs.print("extern int numWorkers;\n");
+       outclassdefs.print("extern int numWorkSchedWorkers;\n");
     }
 
     Iterator it=state.getClassSymbolTable().getDescriptorsIterator();
@@ -2668,11 +2668,7 @@ public class BuildCode {
       if( (state.MLP && fsen.equals( mlpa.getMainSESE() )) || 
           (state.OOOJAVA && fsen.equals( oooa.getMainSESE() ))
       ) {
-       outmethod.println(  "      /* work scheduler works forever, explicitly exit */");
-        outmethod.println(  "      CP_EXIT();");
-        outmethod.println(  "      CP_DUMP();");       
         outmethod.println(  "      workScheduleExit();");
-       outmethod.println(  "      exit( 0 );");
       }
 
       outmethod.println(    "      break;");
@@ -5406,7 +5402,7 @@ public class BuildCode {
       } else if ((GENERATEPRECISEGC) || (this.state.MULTICOREGC)) {
          if(this.state.MLP || state.OOOJAVA){
             output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newarray_mlp("+localsprefixaddr+", "+arrayid+", "+generateTemp(fm, fn.getSize(),lb)+", oid, "+oooa.getDisjointAnalysis().getAllocationSiteFromFlatNew(fn).getUniqueAllocSiteID()+");");
-       output.println("    oid += numWorkers;");
+       output.println("    oid += numWorkSchedWorkers;");
          }else{
     output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newarray("+localsprefixaddr+", "+arrayid+", "+generateTemp(fm, fn.getSize(),lb)+");");                     
          }
@@ -5419,7 +5415,7 @@ public class BuildCode {
       } else if ((GENERATEPRECISEGC) || (this.state.MULTICOREGC)) {
          if (this.state.MLP || state.OOOJAVA){
        output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_new_mlp("+localsprefixaddr+", "+fn.getType().getClassDesc().getId()+", oid, "+oooa.getDisjointAnalysis().getAllocationSiteFromFlatNew(fn).getUniqueAllocSiteID()+");");
-       output.println("    oid += numWorkers;");
+       output.println("    oid += numWorkSchedWorkers;");
          } else {
     output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_new("+localsprefixaddr+", "+fn.getType().getClassDesc().getId()+");");                     
          }
index 1b461d0586c4563769b43718a1a9641b75b64677..ca3f7521e59efe5f3e7d196761b93498ac481bae 100644 (file)
@@ -40,6 +40,7 @@
 #ifdef DEBUG_DEQUE
 #include <stdlib.h>
 #include <stdio.h>
+#include <string.h>
 #endif
 
 #include "deque.h"
@@ -54,17 +55,6 @@ void* DQ_POP_ABORT = (void*)0x3;
 #define BOTTOM_NULL_TAG 0x40001
 
 
-// there are 9 bits for the index into a Node's array,
-// so 2^9 = 512 elements per node of the deque
-#define DQNODE_ARRAYSIZE 512
-
-
-typedef struct dequeNode_t {
-  void* itsDataArr[DQNODE_ARRAYSIZE];
-  struct dequeNode_t* next;
-  struct dequeNode_t* prev;
-} dequeNode;
-
 
 // the dequeNode struct must be 4096-byte aligned, 
 // see above, so use the following magic to ask
@@ -74,17 +64,16 @@ typedef struct dequeNode_t {
 const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
 
 static inline dequeNode* dqGet4096aligned( void* fromAllocator ) { 
-  return (dequeNode*) ( ((INTPTR)fromAllocator) & (~4095) );
+  INTPTR aligned = ((INTPTR)fromAllocator + 4095) & (~4095);
+#ifdef DEBUG_DEQUE
+  printf( "from allocator: 0x%08x to 0x%08x\n", (INTPTR)fromAllocator, (INTPTR)fromAllocator + DQNODE_SIZETOREQUEST );
+  printf( "aligned:        0x%08x to 0x%08x\n", aligned,               aligned               + sizeof( dequeNode )  );
+  memset( (void*) aligned, 0, sizeof( dequeNode ) );
+#endif
+  return (dequeNode*) aligned;
 }
 
 
-
-static inline int        dqDecodeTag( INTPTR E ) { return (int)        ((0xffffe00000000000 & E) >> 45); }
-static inline dequeNode* dqDecodePtr( INTPTR E ) { return (dequeNode*) ((0x00001ffffffffe00 & E) <<  3); }
-static inline int        dqDecodeIdx( INTPTR E ) { return (int)        ((0x00000000000001ff & E)      ); }
-
-
-
 static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
   INTPTR ptrE = (0x00001ffffffffe00 &  // second, mask off the addr's high-order 1's
                  (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
@@ -178,6 +167,10 @@ void* dqPopTop( deque* dq ) {
   int        currTopTag  = dqDecodeTag( currTop );
   dequeNode* currTopNode = dqDecodePtr( currTop );
   int        currTopIndx = dqDecodeIdx( currTop );
+
+  // read of top followed by read of bottom, algorithm
+  // says specifically must be in this order
+  BARRIER();
   
   INTPTR currBottom = dq->bottom;
 
@@ -211,6 +204,10 @@ void* dqPopTop( deque* dq ) {
 
   INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
 
+  // algorithm states above should happen
+  // before attempting the CAS
+  BARRIER();
+
   INTPTR actualTop = (INTPTR)
     CAS( &(dq->top), // location
          currTop,    // expected value
@@ -252,6 +249,10 @@ void* dqPopBottom ( deque* dq ) {
 
   dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
 
+  // algorithm states above should happen
+  // before attempting the CAS
+  BARRIER();
+
   INTPTR currTop = dq->top;
 
   int        currTopTag  = dqDecodeTag( currTop );
index 156226cc101fbae80517c2a6b682d43f118f640b..7e310e43594ab71051290b2a476796918e55a85c 100644 (file)
@@ -5,16 +5,17 @@
 #include "memPool.h"
 
 
+
 // the bottom and top 64-bit values encode
 // several sub-values, see deque.c
 typedef struct deque_t {
-  MemPool* memPool;
-  INTPTR   bottom;
+  MemPool*        memPool;
+  volatile INTPTR bottom;
 
   // force bottom and top to different cache lines
   char buffer[CACHELINESIZE];
 
-  INTPTR top;
+  volatile INTPTR top;
 } deque;
 
 
@@ -30,7 +31,21 @@ extern void* DQ_POP_EMPTY;
 extern void* DQ_POP_ABORT;
 
 
-//void dq_take ( deque* sem, struct garbagelist* gl );
+// there are 9 bits for the index into a Node's array,
+// so 2^9 = 512 elements per node of the deque
+#define DQNODE_ARRAYSIZE 512
+
+
+typedef struct dequeNode_t {
+  void* itsDataArr[DQNODE_ARRAYSIZE];
+  struct dequeNode_t* next;
+  struct dequeNode_t* prev;
+} dequeNode;
+
+
+static inline int        dqDecodeTag( INTPTR E ) { return (int)        ((0xffffe00000000000 & E) >> 45); }
+static inline dequeNode* dqDecodePtr( INTPTR E ) { return (dequeNode*) ((0x00001ffffffffe00 & E) <<  3); }
+static inline int        dqDecodeIdx( INTPTR E ) { return (int)        ((0x00000000000001ff & E)      ); }
 
 
 #endif // ___DEQUE_H__
index ae1f65f095ca622818aac2ca861368f0d70b2169..dde8e84653e89f9665587e80473cce03209f22e0 100644 (file)
 #include "thread.h"
 #endif
 #ifdef MLP
+#include "deque.h"
 #include "workschedule.h"
-extern struct QI * headqi;
-extern struct QI * tailqi;
+extern int    numWorkSchedWorkers;
+extern deque* deques;
 #endif
 
 #ifdef DMALLOC
@@ -541,31 +542,69 @@ void collect(struct garbagelist * stackptr) {
 
 #ifdef MLP
   {
+    int        i;
+    deque*     dq;
+    dequeNode* botNode;
+    int        botIndx;
+    dequeNode* topNode;
+    int        topIndx;
+    dequeNode* n;
+    int        j;
+    int        jLo;
+    int        jHi;
+
     // goes over ready-to-run SESEs
-    struct QI * qitem = headqi;
-    while(qitem!=NULL){
-      SESEcommon* common=(SESEcommon*)qitem->value;
-      if(common==seseCommon){
-       // skip the current running SESE
-       qitem=qitem->next;
-       continue;
-      }
-      SESEcommon* seseRec = (SESEcommon*)(qitem->value);
-      struct garbagelist * gl=(struct garbagelist *)&(seseRec[1]);
-      struct garbagelist * glroot=gl;
-      // update its ascendant SESEs 
-      updateAscendantSESE(seseRec);    
+    for( i = 0; i < numWorkSchedWorkers; ++i ) {
+      dq = &(deques[i]);
+
+      botNode = dqDecodePtr( dq->bottom );
+      botIndx = dqDecodeIdx( dq->bottom );
+      
+      topNode = dqDecodePtr( dq->top );
+      topIndx = dqDecodeIdx( dq->top );
+
+      n = botNode;
+      do {
+        // check all the relevant indices of this
+        // node in the deque, noting if we are in
+        // the top/bottom node which can be partially
+        // full
+        if( n == botNode ) { jLo = botIndx; } else { jLo = 0; }
+        if( n == topNode ) { jHi = topIndx; } else { jHi = DQNODE_ARRAYSIZE; }
+        
+        for( j = jLo; j < jHi; ++j ) {
+          // WHAT? 
+          //SESEcommon* common = (SESEcommon*) n->itsDataArr[j];
+          //if(common==seseCommon){
+          // skip the current running SESE
+          //  continue;
+          //}
+
+          SESEcommon* seseRec = (SESEcommon*) n->itsDataArr[j];
+          struct garbagelist* gl     = (struct garbagelist*) &(seseRec[1]);
+          struct garbagelist* glroot = gl;
+
+          updateAscendantSESE( seseRec );
   
-      while(gl!=NULL) {
-       int i;
-       for(i=0; i<gl->size; i++) {
-         void * orig=gl->array[i];
-         ENQUEUE(orig, gl->array[i]);
-       }
-       gl=gl->next;
-      } 
-      qitem=qitem->next;
+          while( gl != NULL ) {
+            int k;
+            for( k = 0; k < gl->size; k++ ) {
+              void* orig = gl->array[k];
+              ENQUEUE( orig, gl->array[k] );
+            }
+            gl = gl->next;
+          } 
+        }
+        
+        // we only have to move across the nodes
+        // of the deque if the top and bottom are
+        // not the same already
+        if( botNode != topNode ) {
+          n = n->next;
+        }
+      } while( n != topNode );
     }
+
   }    
 #endif
 
index 1ce7e925d3e1cea422f891be36057755bdedf53d..f828a5e82abe610803be44552024a7397a88d97c 100644 (file)
@@ -7,56 +7,79 @@
 #include "mlp_runtime.h"
 #include "psemaphore.h"
 #include "coreprof/coreprof.h"
+#include "deque.h"
 #ifdef RCR
 #include "rcr_runtime.h"
 #include "trqueue.h"
 #endif
 
-// NOTE: Converting this from a work-stealing strategy
-// to a single-queue thread pool protected by a single
-// lock.  This will not scale, but it will support
-// development of the system for now
 
+//////////////////////////////////////////////////
+//
+//  for coordination with the garbage collector
+//
+//////////////////////////////////////////////////
+int threadcount;
+pthread_mutex_t gclock;
+pthread_mutex_t gclistlock;
+pthread_cond_t gccond;
+
+// in garbage.h, listitem is a struct with a pointer
+// to a stack, objects, etc. such that the garbage
+// collector can find pointers for garbage collection
 
+// this is a global list of listitem structs that the
+// garbage collector uses to know about each thread
+extern struct listitem* list;
 
+// this is the local thread's item on the above list,
+// it should be added to the global list before a thread
+// starts doing work, and should be removed only when
+// the thread is completely finished--in OoOJava/MLP the
+// only thing hanging from this litem should be a single
+// task record that the worker thread is executing, if any!
+extern __thread struct listitem litem;
+//////////////////////////////////////////////////
+//
+//  end coordination with the garbage collector
+//
+//////////////////////////////////////////////////
 
 
 
-// for convenience
-typedef struct Queue deq;
 
-typedef struct workerData_t{
+typedef struct workerData_t {
   pthread_t workerThread;
-  int id;
+  int       id;
 } WorkerData;
 
+// a thread should know its worker id in any
+// functions below
+static __thread int myWorkerID;
 
-static pthread_mutex_t systemLockIn;
-static pthread_mutex_t systemLockOut;
+// the original thread starts up the work scheduler
+// and sleeps while it is running, it has no worker
+// ID so use this to realize that
+static const int workerID_NOTAWORKER = 0xffffff0;
 
-// implementation internal data
-static WorkerData*     workerDataArray;
-static pthread_t*      workerArray;
 
-static int systemStarted = 0;
+int numWorkSchedWorkers;
+static WorkerData*  workerDataArray;
+static pthread_t*   workerArray;
 
-static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
 static void(*workFunc)(void*);
 
-static pthread_cond_t  workAvailCond  = PTHREAD_COND_INITIALIZER;
+// each thread can create objects but should assign
+// globally-unique object ID's (oid) so have threads
+// give out this as next id, then increment by number
+// of threads to ensure disjoint oid sets
+__thread int oid;
 
-int             numWorkers;
+// global array of work-stealing deques, where
+// each thread uses its ID as the index to its deque
+deque* deques;
 
-int threadcount;
-pthread_mutex_t gclock;
-pthread_mutex_t gclistlock;
-pthread_cond_t gccond;
 
-extern struct listitem * list;
-extern __thread struct listitem litem;
-extern __thread SESEcommon* seseCommon;
-
-__thread int oid;
 
 #ifdef RCR
 #include "trqueue.h"
@@ -64,27 +87,29 @@ __thread struct trQueue * TRqueue=NULL;
 #endif
 
 
-void workerExit( void* arg ) {
-  //printf( "Thread %d canceled.\n", pthread_self() );
-  CP_EXIT();
-}
+
+// this is a read-by-all and write-by-one variable
+// IT IS UNPROTECTED, BUT SAFE for all threads to
+// read it (periodically, only when they can find no work)
+// and only the worker that retires the main thread will
+// write it to 1, at which time other workers will see
+// that they should exit gracefully
+static volatile int mainTaskRetired = FALSE;
+
+
+
 
 void* workerMain( void* arg ) {
   void*       workUnit;
-  WorkerData* myData = (WorkerData*) arg;
-  int         oldState;
+  WorkerData* myData  = (WorkerData*) arg;
+  deque*      myDeque = &(deques[myData->id]);
+  int         keepRunning = TRUE;
   int         haveWork;
+  int         lastVictim = 0;
+  int         i;
 
-  // the worker threads really have no context relevant to the
-  // user program, so build an empty garbage list struct to
-  // pass to the collector if collection occurs
-  struct garbagelist emptygarbagelist = { 0, NULL };
-
-  // once-per-thread stuff
-  CP_CREATE();
+  myWorkerID = myData->id;
 
-  //pthread_cleanup_push( workerExit, NULL );  
-  
   // ensure that object ID's start at 1 so that using
   // oid with value 0 indicates an invalid object
   oid = myData->id + 1;
@@ -93,7 +118,11 @@ void* workerMain( void* arg ) {
   // task should hand off to children threads it is
   // going to stall on
   psem_init( &runningSESEstallSem );
-  
+
+  // the worker threads really have no context relevant to the
+  // user program, so build an empty garbage list struct to
+  // pass to the collector if collection occurs
+  struct garbagelist emptygarbagelist = { 0, NULL };
 
 #ifdef RCR
   //allocate task record queue
@@ -127,10 +156,6 @@ void* workerMain( void* arg ) {
 #endif
 
 
-  //pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS, &oldState );
-  //pthread_setcancelstate( PTHREAD_CANCEL_ENABLE,       &oldState );
-
-
   // Add this worker to the gc list
   pthread_mutex_lock( &gclistlock );
   threadcount++;
@@ -142,56 +167,99 @@ void* workerMain( void* arg ) {
   pthread_mutex_unlock( &gclistlock );
 
 
+  // start timing events in this thread
+  CP_CREATE();
+
+
   // then continue to process work
-  while( 1 ) {
+  while( keepRunning ) {
 
     // wait for work
 #ifdef CP_EVENTID_WORKSCHEDGRAB
     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
 #endif
-
+    workUnit = (void*) 0x5;
     haveWork = FALSE;
+
     while( !haveWork ) {
 
-      //NOTE...Fix these things...
-      pthread_mutex_lock( &systemLockOut );
-      if( headqi->next == NULL ) {
-        pthread_mutex_unlock( &systemLockOut );
+      workUnit = dqPopBottom( myDeque );
 
-        //NOTE: Do a check to see if we need to collect..
-        if( unlikely( needtocollect ) ) {
-          checkcollect( &emptygarbagelist );
+      if( workUnit != DQ_POP_EMPTY ) {
+        haveWork = TRUE;
+        break;
+        
+      } else {
+        // try to steal from another queue, starting
+        // with the last successful victim, don't check
+        // your own deque
+        for( i = 0; i < numWorkSchedWorkers - 1; ++i ) {
+          workUnit = dqPopTop( &(deques[lastVictim]) );
+          
+          if( workUnit != DQ_POP_ABORT &&
+              workUnit != DQ_POP_EMPTY ) {
+            // successful steal!
+            haveWork = TRUE;
+            break;
+          }
+       
+          // choose next victim
+          lastVictim++; if( lastVictim == numWorkSchedWorkers ) { lastVictim = 0; }
+          
+          if( lastVictim == myWorkerID ) {
+            lastVictim++; if( lastVictim == numWorkSchedWorkers ) { lastVictim = 0; }
+          }
         }
+        // end steal attempts
 
-        sched_yield();
-        continue;
-      } else {
-        haveWork = TRUE;
+
+        // if we successfully stole work, break out of the
+        // while-not-have-work loop, otherwise we looked
+        // everywhere, so drop down to "I'm idle" code below
+        if( haveWork ) {
+          break;
+        }
       }
-    }
 
-    struct QI* tmp = headqi;
-    headqi         = headqi->next;
-    workUnit       = headqi->value;
-    pthread_mutex_unlock( &systemLockOut );
-    free( tmp );
+      // if we drop down this far, we didn't find any work,
+      // so do a garbage collection, yield the processor,
+      // then check if the entire system is out of work
+      if( unlikely( needtocollect ) ) {
+        checkcollect( &emptygarbagelist );
+      }
+
+      sched_yield();
+
+      if( mainTaskRetired ) {
+        keepRunning = FALSE;
+        break;
+      }
+
+    } // end the while-not-have-work loop
 
 #ifdef CP_EVENTID_WORKSCHEDGRAB
     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
 #endif
     
-    // let GC see current work
-    litem.seseCommon = (void*)workUnit;
+    // when is no work left we will pop out
+    // here, so only do work if any left
+    if( haveWork ) {
+      // let GC see current work
+      litem.seseCommon = (void*)workUnit;
+
+      // unclear how useful this is
+      if( unlikely( needtocollect ) ) {
+        checkcollect( &emptygarbagelist );
+      }
 
-    // unclear how useful this is
-    if( unlikely( needtocollect ) ) {
-      checkcollect( &emptygarbagelist );
+      workFunc( workUnit );
     }
-
-    workFunc( workUnit );
   } 
 
 
+  CP_EXIT();
+
+
   // remove from GC list
   pthread_mutex_lock( &gclistlock );
   threadcount--;
@@ -206,114 +274,95 @@ void* workerMain( void* arg ) {
   pthread_mutex_unlock( &gclistlock );
 
 
-  //pthread_cleanup_pop( 0 );
-
   return NULL;
 }
 
+
 void workScheduleInit( int numProcessors,
                        void(*func)(void*) ) {
   int i, status;
+  pthread_attr_t attr;
 
   // the original thread must call this now to
-  // protect memory allocation events coming, but it
-  // will also add itself to the worker pool and therefore
-  // try to call it again, CP_CREATE should just ignore
-  // duplicate calls
+  // protect memory allocation events coming
   CP_CREATE();
 
-  pthread_mutex_init(&gclock, NULL);
-  pthread_mutex_init(&gclistlock, NULL);
-  pthread_cond_init(&gccond, NULL);
+  // the original thread will not become a worker, remember
+  myWorkerID = workerID_NOTAWORKER;
 
-  numWorkers = numProcessors + 1;
 
-  workFunc   = func;
+  pthread_mutex_init( &gclock,     NULL );
+  pthread_mutex_init( &gclistlock, NULL );
+  pthread_cond_init ( &gccond,     NULL );
 
-  headqi=tailqi=RUNMALLOC(sizeof(struct QI));
-  headqi->next=NULL;
-  
-  status = pthread_mutex_init( &systemLockIn, NULL );
-  status = pthread_mutex_init( &systemLockOut, NULL );
 
-  // allocate space for one more--the original thread (running
-  // this code) will become a worker thread after setup
-  workerDataArray = RUNMALLOC( sizeof( WorkerData ) * (numWorkers+1) );
+  numWorkSchedWorkers = numProcessors + 1;
+
+  workFunc = func;
+
+  deques          = RUNMALLOC( sizeof( deque      )*numWorkSchedWorkers );
+  workerDataArray = RUNMALLOC( sizeof( WorkerData )*numWorkSchedWorkers );
 
+  for( i = 0; i < numWorkSchedWorkers; ++i ) {
+    dqInit( &(deques[i]) );
+  }
+  
 #ifdef RCR
   //make sure the queue is initialized
   if (TRqueue==NULL)
     TRqueue=allocTR();
 #endif
 
-  for( i = 0; i < numWorkers; ++i ) {
+  pthread_attr_init( &attr );
+  pthread_attr_setdetachstate( &attr, 
+                               PTHREAD_CREATE_JOINABLE );
 
-    // the original thread is ID 1, start counting from there
-    workerDataArray[i].id = 2 + i;
+  for( i = 0; i < numWorkSchedWorkers; ++i ) {
+
+    workerDataArray[i].id = i;
 
     status = pthread_create( &(workerDataArray[i].workerThread), 
-                             NULL,
+                             &attr,
                              workerMain,
                              (void*) &(workerDataArray[i])
-                           );
+                             );
 
     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
-
-    // yield and let all workers get to the begin
-    // condition variable, waiting--we have to hold them
-    // so they don't all see empty work queues right away
-    if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
   }
 }
 
 
 void workScheduleSubmit( void* workUnit ) {
-  struct QI* item=RUNMALLOC(sizeof(struct QI));
-  item->value=workUnit;
-  item->next=NULL;
-  
-  pthread_mutex_lock  ( &systemLockIn );
-  tailqi->next=item;
-  tailqi=item;
-  pthread_mutex_unlock( &systemLockIn );
+
+  if( myWorkerID == workerID_NOTAWORKER ) {
+    dqPushBottom( &(deques[0]), workUnit );
+    return;
+  }
+
+  dqPushBottom( &(deques[myWorkerID]), workUnit );
 }
 
 
-// really should be named "add original thread as a worker"
+// really should be named "wait for work in system to complete"
 void workScheduleBegin() {
   int i;
 
-  // space was saved for the original thread to become a
-  // worker after setup is complete
-  workerDataArray[numWorkers].id           = 1;
-  workerDataArray[numWorkers].workerThread = pthread_self();
-  ++numWorkers;
+  // wait for all workers to exit gracefully
+  for( i = 0; i < numWorkSchedWorkers; ++i ) {
+    pthread_join( workerDataArray[i].workerThread, NULL );
+  }
 
-  workerMain( &(workerDataArray[numWorkers-1]) );
+  // for the original, non-worker thread to close up its events
+  CP_EXIT();
+
+  // write all thread's events to disk
+  CP_DUMP();
 }
 
 
-// the above function does NOT naturally join all the worker
-// threads at exit, once the main SESE/Rblock/Task completes
-// we know all worker threads are finished executing other
-// tasks so we can explicitly kill the workers, and therefore
-// trigger any worker-specific cleanup (like coreprof!)
+// only the worker that executes and then retires
+// the main task should invoke this, which indicates to
+// all other workers they should exit gracefully
 void workScheduleExit() {
-  int i;
-
-  // This is not working well--canceled threads don't run their
-  // thread-level exit routines?  Anyway, its not critical for
-  // coreprof but if we ever need a per-worker exit routine to
-  // run we'll have to look back into this.
-
-  //printf( "Thread %d performing schedule exit.\n", pthread_self() );
-  //
-  //for( i = 0; i < numWorkers; ++i ) {   
-  //  if( pthread_self() != workerDataArray[i].workerThread ) {
-  //    pthread_cancel( workerDataArray[i].workerThread );      
-  //  }
-  //}
-  //
-  //// how to let all the threads actually get canceled?  
-  //sleep( 2 );
+  mainTaskRetired = 1;
 }
index a36683bbaa98cab9f05999a38b84afcf046951eb..9cc20f11ce447645241b9128f22cac56966c1a88 100644 (file)
@@ -30,21 +30,17 @@ void workScheduleBegin();
 
 
 
-
+// this is a pattern where whatever the
+// driving runtime component is for your compiler
+// mode should provide the following synchronization
+// variables to share with the garbage collector
+// (so thread.c has them and uses them, too)
+// and the key is you must coordinate with the garbage
+// collector when the runtime wants to block threads
 extern int threadcount;
 extern pthread_mutex_t gclock;
 extern pthread_mutex_t gclistlock;
 extern pthread_cond_t  gccond;
 
-struct QI {
-  struct QI * next;
-  void * value;
-};
-
-struct QI * headqi;
-struct QI * tailqi;
-
-
-
 
 #endif /* __WORK_SCHEDULE__ */