#ifdef DEBUG_DEQUE
#include <stdlib.h>
#include <stdio.h>
+#include <string.h>
#endif
#include "deque.h"
#define BOTTOM_NULL_TAG 0x40001
-// there are 9 bits for the index into a Node's array,
-// so 2^9 = 512 elements per node of the deque
-#define DQNODE_ARRAYSIZE 512
-
-
-typedef struct dequeNode_t {
- void* itsDataArr[DQNODE_ARRAYSIZE];
- struct dequeNode_t* next;
- struct dequeNode_t* prev;
-} dequeNode;
-
// the dequeNode struct must be 4096-byte aligned,
// see above, so use the following magic to ask
const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
static inline dequeNode* dqGet4096aligned( void* fromAllocator ) {
- return (dequeNode*) ( ((INTPTR)fromAllocator) & (~4095) );
+ INTPTR aligned = ((INTPTR)fromAllocator + 4095) & (~4095);
+#ifdef DEBUG_DEQUE
+ printf( "from allocator: 0x%08x to 0x%08x\n", (INTPTR)fromAllocator, (INTPTR)fromAllocator + DQNODE_SIZETOREQUEST );
+ printf( "aligned: 0x%08x to 0x%08x\n", aligned, aligned + sizeof( dequeNode ) );
+ memset( (void*) aligned, 0, sizeof( dequeNode ) );
+#endif
+ return (dequeNode*) aligned;
}
-
-static inline int dqDecodeTag( INTPTR E ) { return (int) ((0xffffe00000000000 & E) >> 45); }
-static inline dequeNode* dqDecodePtr( INTPTR E ) { return (dequeNode*) ((0x00001ffffffffe00 & E) << 3); }
-static inline int dqDecodeIdx( INTPTR E ) { return (int) ((0x00000000000001ff & E) ); }
-
-
-
static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
INTPTR ptrE = (0x00001ffffffffe00 & // second, mask off the addr's high-order 1's
(((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
int currTopTag = dqDecodeTag( currTop );
dequeNode* currTopNode = dqDecodePtr( currTop );
int currTopIndx = dqDecodeIdx( currTop );
+
+ // read of top followed by read of bottom, algorithm
+ // says specifically must be in this order
+ BARRIER();
INTPTR currBottom = dq->bottom;
INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
+ // algorithm states above should happen
+ // before attempting the CAS
+ BARRIER();
+
INTPTR actualTop = (INTPTR)
CAS( &(dq->top), // location
currTop, // expected value
dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
+ // algorithm states above should happen
+ // before attempting the CAS
+ BARRIER();
+
INTPTR currTop = dq->top;
int currTopTag = dqDecodeTag( currTop );
#include "mlp_runtime.h"
#include "psemaphore.h"
#include "coreprof/coreprof.h"
+#include "deque.h"
#ifdef RCR
#include "rcr_runtime.h"
#include "trqueue.h"
#endif
-// NOTE: Converting this from a work-stealing strategy
-// to a single-queue thread pool protected by a single
-// lock. This will not scale, but it will support
-// development of the system for now
+//////////////////////////////////////////////////
+//
+// for coordination with the garbage collector
+//
+//////////////////////////////////////////////////
+int threadcount;
+pthread_mutex_t gclock;
+pthread_mutex_t gclistlock;
+pthread_cond_t gccond;
+
+// in garbage.h, listitem is a struct with a pointer
+// to a stack, objects, etc. such that the garbage
+// collector can find pointers for garbage collection
+// this is a global list of listitem structs that the
+// garbage collector uses to know about each thread
+extern struct listitem* list;
+// this is the local thread's item on the above list,
+// it should be added to the global list before a thread
+// starts doing work, and should be removed only when
+// the thread is completely finished--in OoOJava/MLP the
+// only thing hanging from this litem should be a single
+// task record that the worker thread is executing, if any!
+extern __thread struct listitem litem;
+//////////////////////////////////////////////////
+//
+// end coordination with the garbage collector
+//
+//////////////////////////////////////////////////
-// for convenience
-typedef struct Queue deq;
-typedef struct workerData_t{
+typedef struct workerData_t {
pthread_t workerThread;
- int id;
+ int id;
} WorkerData;
+// a thread should know its worker id in any
+// functions below
+static __thread int myWorkerID;
-static pthread_mutex_t systemLockIn;
-static pthread_mutex_t systemLockOut;
+// the original thread starts up the work scheduler
+// and sleeps while it is running, it has no worker
+// ID so use this to realize that
+static const int workerID_NOTAWORKER = 0xffffff0;
-// implementation internal data
-static WorkerData* workerDataArray;
-static pthread_t* workerArray;
-static int systemStarted = 0;
+int numWorkSchedWorkers;
+static WorkerData* workerDataArray;
+static pthread_t* workerArray;
-static pthread_cond_t systemBeginCond = PTHREAD_COND_INITIALIZER;
static void(*workFunc)(void*);
-static pthread_cond_t workAvailCond = PTHREAD_COND_INITIALIZER;
+// each thread can create objects but should assign
+// globally-unique object ID's (oid) so have threads
+// give out this as next id, then increment by number
+// of threads to ensure disjoint oid sets
+__thread int oid;
-int numWorkers;
+// global array of work-stealing deques, where
+// each thread uses its ID as the index to its deque
+deque* deques;
-int threadcount;
-pthread_mutex_t gclock;
-pthread_mutex_t gclistlock;
-pthread_cond_t gccond;
-extern struct listitem * list;
-extern __thread struct listitem litem;
-extern __thread SESEcommon* seseCommon;
-
-__thread int oid;
#ifdef RCR
#include "trqueue.h"
#endif
-void workerExit( void* arg ) {
- //printf( "Thread %d canceled.\n", pthread_self() );
- CP_EXIT();
-}
+
+// this is a read-by-all and write-by-one variable
+// IT IS UNPROTECTED, BUT SAFE for all threads to
+// read it (periodically, only when they can find no work)
+// and only the worker that retires the main thread will
+// write it to 1, at which time other workers will see
+// that they should exit gracefully
+static volatile int mainTaskRetired = FALSE;
+
+
+
void* workerMain( void* arg ) {
void* workUnit;
- WorkerData* myData = (WorkerData*) arg;
- int oldState;
+ WorkerData* myData = (WorkerData*) arg;
+ deque* myDeque = &(deques[myData->id]);
+ int keepRunning = TRUE;
int haveWork;
+ int lastVictim = 0;
+ int i;
- // the worker threads really have no context relevant to the
- // user program, so build an empty garbage list struct to
- // pass to the collector if collection occurs
- struct garbagelist emptygarbagelist = { 0, NULL };
-
- // once-per-thread stuff
- CP_CREATE();
+ myWorkerID = myData->id;
- //pthread_cleanup_push( workerExit, NULL );
-
// ensure that object ID's start at 1 so that using
// oid with value 0 indicates an invalid object
oid = myData->id + 1;
// task should hand off to children threads it is
// going to stall on
psem_init( &runningSESEstallSem );
-
+
+ // the worker threads really have no context relevant to the
+ // user program, so build an empty garbage list struct to
+ // pass to the collector if collection occurs
+ struct garbagelist emptygarbagelist = { 0, NULL };
#ifdef RCR
//allocate task record queue
#endif
- //pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS, &oldState );
- //pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, &oldState );
-
-
// Add this worker to the gc list
pthread_mutex_lock( &gclistlock );
threadcount++;
pthread_mutex_unlock( &gclistlock );
+ // start timing events in this thread
+ CP_CREATE();
+
+
// then continue to process work
- while( 1 ) {
+ while( keepRunning ) {
// wait for work
#ifdef CP_EVENTID_WORKSCHEDGRAB
CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
#endif
-
+ workUnit = (void*) 0x5;
haveWork = FALSE;
+
while( !haveWork ) {
- //NOTE...Fix these things...
- pthread_mutex_lock( &systemLockOut );
- if( headqi->next == NULL ) {
- pthread_mutex_unlock( &systemLockOut );
+ workUnit = dqPopBottom( myDeque );
- //NOTE: Do a check to see if we need to collect..
- if( unlikely( needtocollect ) ) {
- checkcollect( &emptygarbagelist );
+ if( workUnit != DQ_POP_EMPTY ) {
+ haveWork = TRUE;
+ break;
+
+ } else {
+ // try to steal from another queue, starting
+ // with the last successful victim, don't check
+ // your own deque
+ for( i = 0; i < numWorkSchedWorkers - 1; ++i ) {
+ workUnit = dqPopTop( &(deques[lastVictim]) );
+
+ if( workUnit != DQ_POP_ABORT &&
+ workUnit != DQ_POP_EMPTY ) {
+ // successful steal!
+ haveWork = TRUE;
+ break;
+ }
+
+ // choose next victim
+ lastVictim++; if( lastVictim == numWorkSchedWorkers ) { lastVictim = 0; }
+
+ if( lastVictim == myWorkerID ) {
+ lastVictim++; if( lastVictim == numWorkSchedWorkers ) { lastVictim = 0; }
+ }
}
+ // end steal attempts
- sched_yield();
- continue;
- } else {
- haveWork = TRUE;
+
+ // if we successfully stole work, break out of the
+ // while-not-have-work loop, otherwise we looked
+ // everywhere, so drop down to "I'm idle" code below
+ if( haveWork ) {
+ break;
+ }
}
- }
- struct QI* tmp = headqi;
- headqi = headqi->next;
- workUnit = headqi->value;
- pthread_mutex_unlock( &systemLockOut );
- free( tmp );
+ // if we drop down this far, we didn't find any work,
+ // so do a garbage collection, yield the processor,
+ // then check if the entire system is out of work
+ if( unlikely( needtocollect ) ) {
+ checkcollect( &emptygarbagelist );
+ }
+
+ sched_yield();
+
+ if( mainTaskRetired ) {
+ keepRunning = FALSE;
+ break;
+ }
+
+ } // end the while-not-have-work loop
#ifdef CP_EVENTID_WORKSCHEDGRAB
CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
#endif
- // let GC see current work
- litem.seseCommon = (void*)workUnit;
+ // when is no work left we will pop out
+ // here, so only do work if any left
+ if( haveWork ) {
+ // let GC see current work
+ litem.seseCommon = (void*)workUnit;
+
+ // unclear how useful this is
+ if( unlikely( needtocollect ) ) {
+ checkcollect( &emptygarbagelist );
+ }
- // unclear how useful this is
- if( unlikely( needtocollect ) ) {
- checkcollect( &emptygarbagelist );
+ workFunc( workUnit );
}
-
- workFunc( workUnit );
}
+ CP_EXIT();
+
+
// remove from GC list
pthread_mutex_lock( &gclistlock );
threadcount--;
pthread_mutex_unlock( &gclistlock );
- //pthread_cleanup_pop( 0 );
-
return NULL;
}
+
void workScheduleInit( int numProcessors,
void(*func)(void*) ) {
int i, status;
+ pthread_attr_t attr;
// the original thread must call this now to
- // protect memory allocation events coming, but it
- // will also add itself to the worker pool and therefore
- // try to call it again, CP_CREATE should just ignore
- // duplicate calls
+ // protect memory allocation events coming
CP_CREATE();
- pthread_mutex_init(&gclock, NULL);
- pthread_mutex_init(&gclistlock, NULL);
- pthread_cond_init(&gccond, NULL);
+ // the original thread will not become a worker, remember
+ myWorkerID = workerID_NOTAWORKER;
- numWorkers = numProcessors + 1;
- workFunc = func;
+ pthread_mutex_init( &gclock, NULL );
+ pthread_mutex_init( &gclistlock, NULL );
+ pthread_cond_init ( &gccond, NULL );
- headqi=tailqi=RUNMALLOC(sizeof(struct QI));
- headqi->next=NULL;
-
- status = pthread_mutex_init( &systemLockIn, NULL );
- status = pthread_mutex_init( &systemLockOut, NULL );
- // allocate space for one more--the original thread (running
- // this code) will become a worker thread after setup
- workerDataArray = RUNMALLOC( sizeof( WorkerData ) * (numWorkers+1) );
+ numWorkSchedWorkers = numProcessors + 1;
+
+ workFunc = func;
+
+ deques = RUNMALLOC( sizeof( deque )*numWorkSchedWorkers );
+ workerDataArray = RUNMALLOC( sizeof( WorkerData )*numWorkSchedWorkers );
+ for( i = 0; i < numWorkSchedWorkers; ++i ) {
+ dqInit( &(deques[i]) );
+ }
+
#ifdef RCR
//make sure the queue is initialized
if (TRqueue==NULL)
TRqueue=allocTR();
#endif
- for( i = 0; i < numWorkers; ++i ) {
+ pthread_attr_init( &attr );
+ pthread_attr_setdetachstate( &attr,
+ PTHREAD_CREATE_JOINABLE );
- // the original thread is ID 1, start counting from there
- workerDataArray[i].id = 2 + i;
+ for( i = 0; i < numWorkSchedWorkers; ++i ) {
+
+ workerDataArray[i].id = i;
status = pthread_create( &(workerDataArray[i].workerThread),
- NULL,
+ &attr,
workerMain,
(void*) &(workerDataArray[i])
- );
+ );
if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
-
- // yield and let all workers get to the begin
- // condition variable, waiting--we have to hold them
- // so they don't all see empty work queues right away
- if( sched_yield() == -1 ) { printf( "Error thread trying to yield.\n" ); exit( -1 ); }
}
}
void workScheduleSubmit( void* workUnit ) {
- struct QI* item=RUNMALLOC(sizeof(struct QI));
- item->value=workUnit;
- item->next=NULL;
-
- pthread_mutex_lock ( &systemLockIn );
- tailqi->next=item;
- tailqi=item;
- pthread_mutex_unlock( &systemLockIn );
+
+ if( myWorkerID == workerID_NOTAWORKER ) {
+ dqPushBottom( &(deques[0]), workUnit );
+ return;
+ }
+
+ dqPushBottom( &(deques[myWorkerID]), workUnit );
}
-// really should be named "add original thread as a worker"
+// really should be named "wait for work in system to complete"
void workScheduleBegin() {
int i;
- // space was saved for the original thread to become a
- // worker after setup is complete
- workerDataArray[numWorkers].id = 1;
- workerDataArray[numWorkers].workerThread = pthread_self();
- ++numWorkers;
+ // wait for all workers to exit gracefully
+ for( i = 0; i < numWorkSchedWorkers; ++i ) {
+ pthread_join( workerDataArray[i].workerThread, NULL );
+ }
- workerMain( &(workerDataArray[numWorkers-1]) );
+ // for the original, non-worker thread to close up its events
+ CP_EXIT();
+
+ // write all thread's events to disk
+ CP_DUMP();
}
-// the above function does NOT naturally join all the worker
-// threads at exit, once the main SESE/Rblock/Task completes
-// we know all worker threads are finished executing other
-// tasks so we can explicitly kill the workers, and therefore
-// trigger any worker-specific cleanup (like coreprof!)
+// only the worker that executes and then retires
+// the main task should invoke this, which indicates to
+// all other workers they should exit gracefully
void workScheduleExit() {
- int i;
-
- // This is not working well--canceled threads don't run their
- // thread-level exit routines? Anyway, its not critical for
- // coreprof but if we ever need a per-worker exit routine to
- // run we'll have to look back into this.
-
- //printf( "Thread %d performing schedule exit.\n", pthread_self() );
- //
- //for( i = 0; i < numWorkers; ++i ) {
- // if( pthread_self() != workerDataArray[i].workerThread ) {
- // pthread_cancel( workerDataArray[i].workerThread );
- // }
- //}
- //
- //// how to let all the threads actually get canceled?
- //sleep( 2 );
+ mainTaskRetired = 1;
}