Changing OoOJava work scheduler to true work stealing, garbage collection can walk...
[IRC.git] / Robust / src / Runtime / workschedule.c
index 1ce7e925d3e1cea422f891be36057755bdedf53d..f828a5e82abe610803be44552024a7397a88d97c 100644 (file)
@@ -7,56 +7,79 @@
 #include "mlp_runtime.h"
 #include "psemaphore.h"
 #include "coreprof/coreprof.h"
+#include "deque.h"
 #ifdef RCR
 #include "rcr_runtime.h"
 #include "trqueue.h"
 #endif
 
-// NOTE: Converting this from a work-stealing strategy
-// to a single-queue thread pool protected by a single
-// lock.  This will not scale, but it will support
-// development of the system for now
 
+//////////////////////////////////////////////////
+//
+//  for coordination with the garbage collector
+//
+//////////////////////////////////////////////////
+int threadcount;
+pthread_mutex_t gclock;
+pthread_mutex_t gclistlock;
+pthread_cond_t gccond;
+
+// in garbage.h, listitem is a struct with a pointer
+// to a stack, objects, etc. such that the garbage
+// collector can find pointers for garbage collection
 
+// this is a global list of listitem structs that the
+// garbage collector uses to know about each thread
+extern struct listitem* list;
 
+// this is the local thread's item on the above list,
+// it should be added to the global list before a thread
+// starts doing work, and should be removed only when
+// the thread is completely finished--in OoOJava/MLP the
+// only thing hanging from this litem should be a single
+// task record that the worker thread is executing, if any!
+extern __thread struct listitem litem;
+//////////////////////////////////////////////////
+//
+//  end coordination with the garbage collector
+//
+//////////////////////////////////////////////////
 
 
 
-// for convenience
-typedef struct Queue deq;
 
-typedef struct workerData_t{
+typedef struct workerData_t {
   pthread_t workerThread;
-  int id;
+  int       id;
 } WorkerData;
 
+// a thread should know its worker id in any
+// functions below
+static __thread int myWorkerID;
 
-static pthread_mutex_t systemLockIn;
-static pthread_mutex_t systemLockOut;
+// the original thread starts up the work scheduler
+// and sleeps while it is running, it has no worker
+// ID so use this to realize that
+static const int workerID_NOTAWORKER = 0xffffff0;
 
-// implementation internal data
-static WorkerData*     workerDataArray;
-static pthread_t*      workerArray;
 
-static int systemStarted = 0;
+int numWorkSchedWorkers;
+static WorkerData*  workerDataArray;
+static pthread_t*   workerArray;
 
-static pthread_cond_t  systemBeginCond  = PTHREAD_COND_INITIALIZER;
 static void(*workFunc)(void*);
 
-static pthread_cond_t  workAvailCond  = PTHREAD_COND_INITIALIZER;
+// each thread can create objects but should assign
+// globally-unique object ID's (oid) so have threads
+// give out this as next id, then increment by number
+// of threads to ensure disjoint oid sets
+__thread int oid;
 
-int             numWorkers;
+// global array of work-stealing deques, where
+// each thread uses its ID as the index to its deque
+deque* deques;
 
-int threadcount;
-pthread_mutex_t gclock;
-pthread_mutex_t gclistlock;
-pthread_cond_t gccond;
 
-extern struct listitem * list;
-extern __thread struct listitem litem;
-extern __thread SESEcommon* seseCommon;
-
-__thread int oid;
 
 #ifdef RCR
 #include "trqueue.h"
@@ -64,27 +87,29 @@ __thread struct trQueue * TRqueue=NULL;
 #endif
 
 
-void workerExit( void* arg ) {
-  //printf( "Thread %d canceled.\n", pthread_self() );
-  CP_EXIT();
-}
+
+// this is a read-by-all and write-by-one variable
+// IT IS UNPROTECTED, BUT SAFE for all threads to
+// read it (periodically, only when they can find no work)
+// and only the worker that retires the main thread will
+// write it to 1, at which time other workers will see
+// that they should exit gracefully
+static volatile int mainTaskRetired = FALSE;
+
+
+
 
 void* workerMain( void* arg ) {
   void*       workUnit;
-  WorkerData* myData = (WorkerData*) arg;
-  int         oldState;
+  WorkerData* myData  = (WorkerData*) arg;
+  deque*      myDeque = &(deques[myData->id]);
+  int         keepRunning = TRUE;
   int         haveWork;
+  int         lastVictim = 0;
+  int         i;
 
-  // the worker threads really have no context relevant to the
-  // user program, so build an empty garbage list struct to
-  // pass to the collector if collection occurs
-  struct garbagelist emptygarbagelist = { 0, NULL };
-
-  // once-per-thread stuff
-  CP_CREATE();
+  myWorkerID = myData->id;
 
-  //pthread_cleanup_push( workerExit, NULL );  
-  
   // ensure that object ID's start at 1 so that using
   // oid with value 0 indicates an invalid object
   oid = myData->id + 1;
@@ -93,7 +118,11 @@ void* workerMain( void* arg ) {
   // task should hand off to children threads it is
   // going to stall on
   psem_init( &runningSESEstallSem );
-  
+
+  // the worker threads really have no context relevant to the
+  // user program, so build an empty garbage list struct to
+  // pass to the collector if collection occurs
+  struct garbagelist emptygarbagelist = { 0, NULL };
 
 #ifdef RCR
   //allocate task record queue
@@ -127,10 +156,6 @@ void* workerMain( void* arg ) {
 #endif
 
 
-  //pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS, &oldState );
-  //pthread_setcancelstate( PTHREAD_CANCEL_ENABLE,       &oldState );
-
-
   // Add this worker to the gc list
   pthread_mutex_lock( &gclistlock );
   threadcount++;
@@ -142,56 +167,99 @@ void* workerMain( void* arg ) {
   pthread_mutex_unlock( &gclistlock );
 
 
+  // start timing events in this thread
+  CP_CREATE();
+
+
   // then continue to process work
-  while( 1 ) {
+  while( keepRunning ) {
 
     // wait for work
 #ifdef CP_EVENTID_WORKSCHEDGRAB
     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
 #endif
-
+    workUnit = (void*) 0x5;
     haveWork = FALSE;
+
     while( !haveWork ) {
 
-      //NOTE...Fix these things...
-      pthread_mutex_lock( &systemLockOut );
-      if( headqi->next == NULL ) {
-        pthread_mutex_unlock( &systemLockOut );
+      workUnit = dqPopBottom( myDeque );
 
-        //NOTE: Do a check to see if we need to collect..
-        if( unlikely( needtocollect ) ) {
-          checkcollect( &emptygarbagelist );
+      if( workUnit != DQ_POP_EMPTY ) {
+        haveWork = TRUE;
+        break;
+        
+      } else {
+        // try to steal from another queue, starting
+        // with the last successful victim, don't check
+        // your own deque
+        for( i = 0; i < numWorkSchedWorkers - 1; ++i ) {
+          workUnit = dqPopTop( &(deques[lastVictim]) );
+          
+          if( workUnit != DQ_POP_ABORT &&
+              workUnit != DQ_POP_EMPTY ) {
+            // successful steal!
+            haveWork = TRUE;
+            break;
+          }
+       
+          // choose next victim
+          lastVictim++; if( lastVictim == numWorkSchedWorkers ) { lastVictim = 0; }
+          
+          if( lastVictim == myWorkerID ) {
+            lastVictim++; if( lastVictim == numWorkSchedWorkers ) { lastVictim = 0; }
+          }
         }
+        // end steal attempts
 
-        sched_yield();
-        continue;
-      } else {
-        haveWork = TRUE;
+
+        // if we successfully stole work, break out of the
+        // while-not-have-work loop, otherwise we looked
+        // everywhere, so drop down to "I'm idle" code below
+        if( haveWork ) {
+          break;
+        }
       }
-    }
 
-    struct QI* tmp = headqi;
-    headqi         = headqi->next;
-    workUnit       = headqi->value;
-    pthread_mutex_unlock( &systemLockOut );
-    free( tmp );
+      // if we drop down this far, we didn't find any work,
+      // so do a garbage collection, yield the processor,
+      // then check if the entire system is out of work
+      if( unlikely( needtocollect ) ) {
+        checkcollect( &emptygarbagelist );
+      }
+
+      sched_yield();
+
+      if( mainTaskRetired ) {
+        keepRunning = FALSE;
+        break;
+      }
+
+    } // end the while-not-have-work loop
 
 #ifdef CP_EVENTID_WORKSCHEDGRAB
     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
 #endif
     
-    // let GC see current work
-    litem.seseCommon = (void*)workUnit;
+    // when is no work left we will pop out
+    // here, so only do work if any left
+    if( haveWork ) {
+      // let GC see current work
+      litem.seseCommon = (void*)workUnit;
+
+      // unclear how useful this is
+      if( unlikely( needtocollect ) ) {
+        checkcollect( &emptygarbagelist );
+      }
 
-    // unclear how useful this is
-    if( unlikely( needtocollect ) ) {
-      checkcollect( &emptygarbagelist );
+      workFunc( workUnit );
     }
-
-    workFunc( workUnit );
   } 
 
 
+  CP_EXIT();
+
+
   // remove from GC list
   pthread_mutex_lock( &gclistlock );
   threadcount--;
@@ -206,114 +274,95 @@ void* workerMain( void* arg ) {
   pthread_mutex_unlock( &gclistlock );
 
 
-  //pthread_cleanup_pop( 0 );
-
   return NULL;
 }
 
+
 void workScheduleInit( int numProcessors,
                        void(*func)(void*) ) {
   int i, status;
+  pthread_attr_t attr;
 
   // the original thread must call this now to
-  // protect memory allocation events coming, but it
-  // will also add itself to the worker pool and therefore
-  // try to call it again, CP_CREATE should just ignore
-  // duplicate calls
+  // protect memory allocation events coming
   CP_CREATE();
 
-  pthread_mutex_init(&gclock, NULL);
-  pthread_mutex_init(&gclistlock, NULL);
-  pthread_cond_init(&gccond, NULL);
+  // the original thread will not become a worker, remember
+  myWorkerID = workerID_NOTAWORKER;
 
-  numWorkers = numProcessors + 1;
 
-  workFunc   = func;
+  pthread_mutex_init( &gclock,     NULL );
+  pthread_mutex_init( &gclistlock, NULL );
+  pthread_cond_init ( &gccond,     NULL );
 
-  headqi=tailqi=RUNMALLOC(sizeof(struct QI));
-  headqi->next=NULL;
-  
-  status = pthread_mutex_init( &systemLockIn, NULL );
-  status = pthread_mutex_init( &systemLockOut, NULL );
 
-  // allocate space for one more--the original thread (running
-  // this code) will become a worker thread after setup
-  workerDataArray = RUNMALLOC( sizeof( WorkerData ) * (numWorkers+1) );
+  numWorkSchedWorkers = numProcessors + 1;
+
+  workFunc = func;
+
+  deques          = RUNMALLOC( sizeof( deque      )*numWorkSchedWorkers );
+  workerDataArray = RUNMALLOC( sizeof( WorkerData )*numWorkSchedWorkers );
 
+  for( i = 0; i < numWorkSchedWorkers; ++i ) {
+    dqInit( &(deques[i]) );
+  }
+  
 #ifdef RCR
   //make sure the queue is initialized
   if (TRqueue==NULL)
     TRqueue=allocTR();
 #endif
 
-  for( i = 0; i < numWorkers; ++i ) {
+  pthread_attr_init( &attr );
+  pthread_attr_setdetachstate( &attr, 
+                               PTHREAD_CREATE_JOINABLE );
 
-    // the original thread is ID 1, start counting from there
-    workerDataArray[i].id = 2 + i;
+  for( i = 0; i < numWorkSchedWorkers; ++i ) {
+
+    workerDataArray[i].id = i;
 
     status = pthread_create( &(workerDataArray[i].workerThread), 
-                             NULL,
+                             &attr,
                              workerMain,
                              (void*) &(workerDataArray[i])
-                           );
+                             );
 
     if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
-
-    // yield and let all workers get to the begin
-    // condition variable, waiting--we have to hold them
-    // so they don't all see empty work queues right away
-    if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
   }
 }
 
 
 void workScheduleSubmit( void* workUnit ) {
-  struct QI* item=RUNMALLOC(sizeof(struct QI));
-  item->value=workUnit;
-  item->next=NULL;
-  
-  pthread_mutex_lock  ( &systemLockIn );
-  tailqi->next=item;
-  tailqi=item;
-  pthread_mutex_unlock( &systemLockIn );
+
+  if( myWorkerID == workerID_NOTAWORKER ) {
+    dqPushBottom( &(deques[0]), workUnit );
+    return;
+  }
+
+  dqPushBottom( &(deques[myWorkerID]), workUnit );
 }
 
 
-// really should be named "add original thread as a worker"
+// really should be named "wait for work in system to complete"
 void workScheduleBegin() {
   int i;
 
-  // space was saved for the original thread to become a
-  // worker after setup is complete
-  workerDataArray[numWorkers].id           = 1;
-  workerDataArray[numWorkers].workerThread = pthread_self();
-  ++numWorkers;
+  // wait for all workers to exit gracefully
+  for( i = 0; i < numWorkSchedWorkers; ++i ) {
+    pthread_join( workerDataArray[i].workerThread, NULL );
+  }
 
-  workerMain( &(workerDataArray[numWorkers-1]) );
+  // for the original, non-worker thread to close up its events
+  CP_EXIT();
+
+  // write all thread's events to disk
+  CP_DUMP();
 }
 
 
-// the above function does NOT naturally join all the worker
-// threads at exit, once the main SESE/Rblock/Task completes
-// we know all worker threads are finished executing other
-// tasks so we can explicitly kill the workers, and therefore
-// trigger any worker-specific cleanup (like coreprof!)
+// only the worker that executes and then retires
+// the main task should invoke this, which indicates to
+// all other workers they should exit gracefully
 void workScheduleExit() {
-  int i;
-
-  // This is not working well--canceled threads don't run their
-  // thread-level exit routines?  Anyway, its not critical for
-  // coreprof but if we ever need a per-worker exit routine to
-  // run we'll have to look back into this.
-
-  //printf( "Thread %d performing schedule exit.\n", pthread_self() );
-  //
-  //for( i = 0; i < numWorkers; ++i ) {   
-  //  if( pthread_self() != workerDataArray[i].workerThread ) {
-  //    pthread_cancel( workerDataArray[i].workerThread );      
-  //  }
-  //}
-  //
-  //// how to let all the threads actually get canceled?  
-  //sleep( 2 );
+  mainTaskRetired = 1;
 }