implemented deque for work stealing
authorjjenista <jjenista>
Thu, 21 Oct 2010 23:24:47 +0000 (23:24 +0000)
committerjjenista <jjenista>
Thu, 21 Oct 2010 23:24:47 +0000 (23:24 +0000)
Robust/src/Benchmarks/oooJava/micro-master-makefile
Robust/src/Runtime/deque.c [new file with mode: 0644]
Robust/src/Runtime/deque.h [new file with mode: 0644]
Robust/src/Runtime/memPool.h
Robust/src/Runtime/runtime.h
Robust/src/Runtime/workschedule.c
Robust/src/Runtime/workschedule.h
Robust/src/Tests/deque/makefile [new file with mode: 0644]
Robust/src/Tests/deque/test-deque.c [new file with mode: 0644]

index 54fd6ae2c8aa94fb95040083c0be06c234e1457c..1097466a7999edbecf08f06259c5b11d3105e548 100644 (file)
@@ -14,20 +14,20 @@ SOURCE_FILES=test.java
 BUILDSCRIPT=../../../buildscript
 
 
-COREPROFOVERFLOW= #-coreprof-checkoverflow
+COREPROFOVERFLOW= -coreprof-checkoverflow
 USECOREPROF= -coreprof $(COREPROFOVERFLOW) \
        -coreprof-eventwords 1024*1024*512 \
        -coreprof-enable cpe_main \
        -coreprof-enable cpe_runmalloc \
        -coreprof-enable cpe_taskexecute \
        -coreprof-enable cpe_taskdispatch \
-       -coreprof-enable cpe_poolalloc
+       -coreprof-enable cpe_poolalloc \
+       -coreprof-enable cpe_taskretire \
+       -coreprof-enable cpe_workschedgrab
 #      -coreprof-enable cpe_preparememq
 #      -coreprof-enable cpe_runfree \
 #      -coreprof-enable cpe_count_poolalloc \
 #      -coreprof-enable cpe_count_poolreuse \
-#      -coreprof-enable cpe_workschedgrab \
-#      -coreprof-enable cpe_taskretire \
 #      -coreprof-enable cpe_taskstallvar \
 #      -coreprof-enable cpe_taskstallmem
 
diff --git a/Robust/src/Runtime/deque.c b/Robust/src/Runtime/deque.c
new file mode 100644 (file)
index 0000000..1b461d0
--- /dev/null
@@ -0,0 +1,293 @@
+////////////////////////////////////////////////////////////////
+//
+//  This is an implementation of the structure described in
+//  A Dynamic-Sized Nonblocking Work Stealing Deque
+//  Hendler, Lev, Moir, and Shavit
+//   
+//  The bottom and top values for the deque must be CAS-able
+//  and fit into 64 bits.  Our strategy for this is:
+//  
+//    19-bit Tag    36-bit Node Pointer     9-bit Index
+//   +-----------+-------------------------+------------+
+//   | 63 ... 45 | 44 ...                9 | 8 ...    0 |
+//   +-----------+-------------------------+------------+
+//
+//  Let's call the encoded info E.  To retrieve the values:  
+//    tag = (0xffffe00000000000 & E) >> 45;
+//    ptr = (0x00001ffffffffe00 & E) <<  3;
+//    idx = (0x00000000000001ff & E);
+//
+//  Increment the tag without decrypting:
+//    E = (0x00001fffffffffff | E) + 1;
+//
+//  Increment (decrement) the index when it is not equal to
+//  MAXINDEX (0) with E++ (E--).
+//
+//  x86 64-bit processors currently only use the lowest 48 bits for
+//  virtual addresses, source:
+//  http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
+//  And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
+//  of a 64-bit pointer are always zero.  This means if we are only
+//  alloted 36 bits to store a pointer to a Node we have 
+//  48 - 3 - 36 = 9 bits that could be lost.  Instead of aligning Node
+//  pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
+//  sure the lower 12 bits of the address are zero.  THEREFORE:
+//  Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
+//  we can ecnode the rest in 36 bits without a loss of information.  
+//
+////////////////////////////////////////////////////////////////
+
+#ifdef DEBUG_DEQUE
+#include <stdlib.h>
+#include <stdio.h>
+#endif
+
+#include "deque.h"
+
+
+void* DQ_POP_EMPTY = (void*)0x1;
+void* DQ_POP_ABORT = (void*)0x3;
+
+
+// define a 19-bit dummy tag for the bottom
+// value with a pattern that will expose errors
+#define BOTTOM_NULL_TAG 0x40001
+
+
+// there are 9 bits for the index into a Node's array,
+// so 2^9 = 512 elements per node of the deque
+#define DQNODE_ARRAYSIZE 512
+
+
+typedef struct dequeNode_t {
+  void* itsDataArr[DQNODE_ARRAYSIZE];
+  struct dequeNode_t* next;
+  struct dequeNode_t* prev;
+} dequeNode;
+
+
+// the dequeNode struct must be 4096-byte aligned, 
+// see above, so use the following magic to ask
+// the allocator for a space that wastes 4095 bytes
+// but gaurantees the address of the struct within
+// that space is 4096-aligned
+const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
+
+static inline dequeNode* dqGet4096aligned( void* fromAllocator ) { 
+  return (dequeNode*) ( ((INTPTR)fromAllocator) & (~4095) );
+}
+
+
+
+static inline int        dqDecodeTag( INTPTR E ) { return (int)        ((0xffffe00000000000 & E) >> 45); }
+static inline dequeNode* dqDecodePtr( INTPTR E ) { return (dequeNode*) ((0x00001ffffffffe00 & E) <<  3); }
+static inline int        dqDecodeIdx( INTPTR E ) { return (int)        ((0x00000000000001ff & E)      ); }
+
+
+
+static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
+  INTPTR ptrE = (0x00001ffffffffe00 &  // second, mask off the addr's high-order 1's
+                 (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
+
+  INTPTR E =
+    (((INTPTR)tag) << 45) |
+    (ptrE)                |
+    ((INTPTR)idx);
+#ifdef DEBUG_DEQUE
+  int tagOut = dqDecodeTag( E ); 
+  if( tag != tagOut ) { printf( "Lost tag information.\n" ); exit( -1 ); }
+
+  dequeNode* ptrOut = dqDecodePtr( E );
+  if( ptr != ptrOut ) { printf( "Lost ptr information.\n" ); exit( -1 ); }
+
+  int idxOut = dqDecodeIdx( E );
+  if( idx != idxOut ) { printf( "Lost idx information.\n" ); exit( -1 ); }
+#endif
+  return E;
+}
+
+
+static inline int dqIndicateEmpty( INTPTR bottom, INTPTR top ) {
+  dequeNode* botNode = dqDecodePtr( bottom );
+  int        botIndx = dqDecodeIdx( bottom );
+  dequeNode* topNode = dqDecodePtr( top );
+  int        topIndx = dqDecodeIdx( top );  
+
+  if( (botNode == topNode) &&
+      (botIndx == topIndx || botIndx == (topIndx+1))
+      ) {
+    return 1;
+  }
+
+  if( (botNode == topNode->next) &&
+      (botIndx == 0)             &&
+      (topIndx == DQNODE_ARRAYSIZE - 1)
+      ) {
+    return 1;
+  }
+
+  return 0;
+}
+
+
+
+void dqInit( deque* dq ) {
+
+  dq->memPool = poolcreate( DQNODE_SIZETOREQUEST );
+
+  dequeNode* a = dqGet4096aligned( poolalloc( dq->memPool ) );
+  dequeNode* b = dqGet4096aligned( poolalloc( dq->memPool ) );
+  
+  a->next = b;
+  b->prev = a;
+
+  dq->bottom = dqEncode( BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1 );
+  dq->top    = dqEncode( 0,               a, DQNODE_ARRAYSIZE - 1 );
+}
+
+
+void dqPushBottom( deque* dq, void* item ) {
+
+  dequeNode* currNode = dqDecodePtr( dq->bottom );
+  int        currIndx = dqDecodeIdx( dq->bottom );
+
+  currNode->itsDataArr[currIndx] = item;
+
+  dequeNode* newNode;
+  int        newIndx;
+
+  if( currIndx != 0 ) {
+    newNode = currNode;
+    newIndx = currIndx - 1;
+
+  } else {
+    newNode        = dqGet4096aligned( poolalloc( dq->memPool ) );
+    newNode->next  = currNode;
+    currNode->prev = newNode;
+    newIndx        = DQNODE_ARRAYSIZE - 1;
+  }
+
+  dq->bottom = dqEncode( BOTTOM_NULL_TAG, newNode, newIndx );
+}
+
+
+void* dqPopTop( deque* dq ) {
+
+  INTPTR currTop = dq->top;
+
+  int        currTopTag  = dqDecodeTag( currTop );
+  dequeNode* currTopNode = dqDecodePtr( currTop );
+  int        currTopIndx = dqDecodeIdx( currTop );
+  
+  INTPTR currBottom = dq->bottom;
+
+  if( dqIndicateEmpty( currBottom, currTop ) ) {
+    if( currTop == dq->top ) {
+      return DQ_POP_EMPTY;
+    } else {
+      return DQ_POP_ABORT;
+    }
+  }
+
+  dequeNode* nodeToFree;
+  int        newTopTag;
+  dequeNode* newTopNode;
+  int        newTopIndx;
+
+  if( currTopIndx != 0 ) {
+    nodeToFree = NULL;
+    newTopTag  = currTopTag;
+    newTopNode = currTopNode;
+    newTopIndx = currTopIndx - 1;
+
+  } else {
+    nodeToFree = currTopNode->next;
+    newTopTag  = currTopTag + 1;
+    newTopNode = currTopNode->prev;
+    newTopIndx = DQNODE_ARRAYSIZE - 1;
+  }
+
+  void* retVal = currTopNode->itsDataArr[currTopIndx];
+
+  INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
+
+  INTPTR actualTop = (INTPTR)
+    CAS( &(dq->top), // location
+         currTop,    // expected value
+         newTop );   // desired value
+
+  if( actualTop == currTop ) {
+    // CAS succeeded
+    if( nodeToFree != NULL ) {
+      poolfreeinto( dq->memPool, nodeToFree );
+    }
+    return retVal;
+
+  } else {
+    return DQ_POP_ABORT;
+  }
+}
+
+
+void* dqPopBottom ( deque* dq ) {
+
+  INTPTR oldBot = dq->bottom;
+
+  dequeNode* oldBotNode = dqDecodePtr( oldBot );
+  int        oldBotIndx = dqDecodeIdx( oldBot );
+  
+  dequeNode* newBotNode;
+  int        newBotIndx;
+
+  if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
+    newBotNode = oldBotNode;
+    newBotIndx = oldBotIndx + 1;
+
+  } else {
+    newBotNode = oldBotNode->next;
+    newBotIndx = 0;
+  }
+
+  void* retVal = newBotNode->itsDataArr[newBotIndx];
+
+  dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
+
+  INTPTR currTop = dq->top;
+
+  int        currTopTag  = dqDecodeTag( currTop );
+  dequeNode* currTopNode = dqDecodePtr( currTop );
+  int        currTopIndx = dqDecodeIdx( currTop );
+
+  if( oldBotNode == currTopNode &&
+      oldBotIndx == currTopIndx ) {
+    dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
+    return DQ_POP_EMPTY;
+
+  } else if( newBotNode == currTopNode &&
+             newBotIndx == currTopIndx ) {
+    INTPTR newTop = dqEncode( currTopTag + 1, currTopNode, currTopIndx );
+
+    INTPTR actualTop = (INTPTR)
+      CAS( &(dq->top), // location
+           currTop,    // expected value
+           newTop );   // desired value
+
+    if( actualTop == currTop ) {
+      // CAS succeeded
+      if( oldBotNode != newBotNode ) {
+        poolfreeinto( dq->memPool, oldBotNode );
+      }
+      return retVal;
+      
+    } else {
+      dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );      
+      return DQ_POP_EMPTY;
+    }
+    
+  } else {
+    if( oldBotNode != newBotNode ) {
+      poolfreeinto( dq->memPool, oldBotNode );
+    }
+    return retVal;    
+  }
+}
diff --git a/Robust/src/Runtime/deque.h b/Robust/src/Runtime/deque.h
new file mode 100644 (file)
index 0000000..156226c
--- /dev/null
@@ -0,0 +1,36 @@
+#ifndef ___DEQUE_H__
+#define ___DEQUE_H__
+
+#include "runtime.h"
+#include "memPool.h"
+
+
+// the bottom and top 64-bit values encode
+// several sub-values, see deque.c
+typedef struct deque_t {
+  MemPool* memPool;
+  INTPTR   bottom;
+
+  // force bottom and top to different cache lines
+  char buffer[CACHELINESIZE];
+
+  INTPTR top;
+} deque;
+
+
+void  dqInit      ( deque* dq );
+void  dqPushBottom( deque* dq, void* item );
+void* dqPopTop    ( deque* dq );
+void* dqPopBottom ( deque* dq );
+
+
+// pop operations may return these values
+// instead of an item
+extern void* DQ_POP_EMPTY;
+extern void* DQ_POP_ABORT;
+
+
+//void dq_take ( deque* sem, struct garbagelist* gl );
+
+
+#endif // ___DEQUE_H__
index 9d2f21b3c79a66978f4e5a28b53f5401870e989f..17e8cc4212a944d59f5514efb73062711606342b 100644 (file)
 //////////////////////////////////////////////////////////
 
 #include <stdlib.h>
+#include "runtime.h"
 #include "mem.h"
 #include "mlp_lock.h"
 
 
-// The cache line size is set for the AMD Opteron 6168 (dc-10)
-// that has L1 and L2 cache line sizes of 64 bytes.  Source:
-// http://www.cs.virginia.edu/~skadron/cs451/opteron/opteron.ppt
-#define CACHELINESIZE 64
-
 
 typedef struct MemPoolItem_t {
   void* next;
@@ -113,7 +109,7 @@ static inline void* poolalloc( MemPool* p ) {
 
   if(next == NULL) {
     // only one item, so don't take from pool
-    return RUNMALLOC( p->itemSize );
+    return (void*) RUNMALLOC( p->itemSize );
   }
  
   p->head = next;
@@ -132,7 +128,7 @@ static inline void* poolalloc( MemPool* p ) {
   //__builtin_prefetch( &(p->head->next) );
   asm volatile( "prefetcht0 (%0)" :: "r" (next));
 
-  return headCurrent;
+  return (void*)headCurrent;
 }
 
 
index f9090d483a5553b70e449f0bb39b230a4114dbf0..a0ad41f16e18e2c1219fb6c98dfa9631135c9e6d 100644 (file)
@@ -28,6 +28,13 @@ void set_affinity();
 #endif
 #endif
 
+#ifndef CACHELINESIZE
+// The L1 and L2 cache line size for the
+// AMD Opteron 6168 (dc-10) is 64 bytes.  Source:
+// http://www.cs.virginia.edu/~skadron/cs451/opteron/opteron.ppt
+#define CACHELINESIZE 64
+#endif
+
 extern void * curr_heapbase;
 extern void * curr_heaptop;
 
index 3595cb7e4a780282fa685b9e51588a055a5b7281..f35d94602dfef3f8c4b4e306a44de0495efb21d0 100644 (file)
@@ -19,6 +19,9 @@
 
 
 
+
+
+
 // for convenience
 typedef struct Queue deq;
 
@@ -71,7 +74,11 @@ void* workerMain( void* arg ) {
   WorkerData* myData = (WorkerData*) arg;
   int         oldState;
   int         haveWork;
-  struct garbagelist emptygarbagelist={0,NULL};
+
+  // the worker threads really have no context relevant to the
+  // user program, so build an empty garbage list struct to
+  // pass to the collector if collection occurs
+  struct garbagelist emptygarbagelist = { 0, NULL };
 
   // once-per-thread stuff
   CP_CREATE();
@@ -92,86 +99,101 @@ void* workerMain( void* arg ) {
   //allocate task record queue
   pthread_t thread;
   pthread_attr_t nattr;  
-  pthread_attr_init(&nattr);
-  pthread_attr_setdetachstate(&nattr, PTHREAD_CREATE_DETACHED);
-  if (TRqueue==NULL)
-    TRqueue=allocTR();
+  pthread_attr_init( &nattr );
+  pthread_attr_setdetachstate( &nattr, PTHREAD_CREATE_DETACHED );
+
+  if( TRqueue == NULL ) {
+    TRqueue = allocTR();
+  }
+
   int status = pthread_create( &thread,
                               NULL,
                               workerTR,
-                              (void*) TRqueue);
-  pthread_attr_destroy(&nattr);
+                              (void*) TRqueue );
+
+  pthread_attr_destroy( &nattr );
+
   if( status != 0 ) { printf( "Error\n" ); exit( -1 ); }
 #endif
 
+
   //pthread_setcanceltype ( PTHREAD_CANCEL_ASYNCHRONOUS, &oldState );
   //pthread_setcancelstate( PTHREAD_CANCEL_ENABLE,       &oldState );
 
-  // then continue to process work
-  //NOTE: ADD US TO THE GC LIST
-  
-  pthread_mutex_lock(&gclistlock);
-  threadcount++;
-  litem.prev=NULL;
-  litem.next=list;
-  if(list!=NULL)
-    list->prev=&litem;
-  list=&litem;
-  pthread_mutex_unlock(&gclistlock);
 
+  // Add this worker to the gc list
+  pthread_mutex_lock( &gclistlock );
+  threadcount++;
+  litem.prev = NULL;
+  litem.next = list;
+  if( list != NULL ) 
+    list->prev = &litem;
+  list = &litem;
+  pthread_mutex_unlock( &gclistlock );
 
-  //ALSO CREATE EMPTY GARBAGELIST TO PASS TO COLLECTOR
 
+  // then continue to process work
   while( 1 ) {
 
     // wait for work
 #ifdef CP_EVENTID_WORKSCHEDGRAB
     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_BEGIN );
 #endif
+
     haveWork = FALSE;
     while( !haveWork ) {
+
       //NOTE...Fix these things...
       pthread_mutex_lock( &systemLockOut );
       if( headqi->next == NULL ) {
         pthread_mutex_unlock( &systemLockOut );
+
         //NOTE: Do a check to see if we need to collect..
-        if (unlikely(needtocollect)) checkcollect(&emptygarbagelist);
+        if( unlikely( needtocollect ) ) {
+          checkcollect( &emptygarbagelist );
+        }
+
         sched_yield();
         continue;
       } else {
         haveWork = TRUE;
       }
     }
-    struct QI * tmp=headqi;
-    headqi = headqi->next;
-    workUnit = headqi->value;
+
+    struct QI* tmp = headqi;
+    headqi         = headqi->next;
+    workUnit       = headqi->value;
     pthread_mutex_unlock( &systemLockOut );
     free( tmp );
+
 #ifdef CP_EVENTID_WORKSCHEDGRAB
     CP_LOGEVENT( CP_EVENTID_WORKSCHEDGRAB, CP_EVENTTYPE_END );
 #endif
     
-    //let GC see current work
-    litem.seseCommon=(void*)workUnit;
+    // let GC see current work
+    litem.seseCommon = (void*)workUnit;
 
-    //unclear how useful this is
-    if (unlikely(needtocollect)) checkcollect(&emptygarbagelist);
+    // unclear how useful this is
+    if( unlikely( needtocollect ) ) {
+      checkcollect( &emptygarbagelist );
+    }
 
     workFunc( workUnit );
-  }
+  } 
+
 
-  //NOTE: Remove from GC LIST DOWN HERE....
-  pthread_mutex_lock(&gclistlock);
+  // remove from GC list
+  pthread_mutex_lock( &gclistlock );
   threadcount--;
-  if (litem.prev==NULL) {
-    list=litem.next;
+  if( litem.prev == NULL ) {
+    list = litem.next;
   } else {
-    litem.prev->next=litem.next;
+    litem.prev->next = litem.next;
   }
-  if (litem.next!=NULL) {
-    litem.next->prev=litem.prev;
+  if( litem.next != NULL ) {
+    litem.next->prev = litem.prev;
   }
-  pthread_mutex_unlock(&gclistlock);
+  pthread_mutex_unlock( &gclistlock );
 
 
   //pthread_cleanup_pop( 0 );
@@ -234,6 +256,7 @@ void workScheduleInit( int numProcessors,
   }
 }
 
+
 void workScheduleSubmit( void* workUnit ) {
   struct QI* item=RUNMALLOC(sizeof(struct QI));
   item->value=workUnit;
index ac496da5bf8c9a8db47ad57650779eaa4bec4164..a36683bbaa98cab9f05999a38b84afcf046951eb 100644 (file)
@@ -28,10 +28,13 @@ void workScheduleSubmit( void* workUnit );
 // should not expect to return from this
 void workScheduleBegin();
 
+
+
+
 extern int threadcount;
 extern pthread_mutex_t gclock;
 extern pthread_mutex_t gclistlock;
-extern pthread_cond_t gccond;
+extern pthread_cond_t  gccond;
 
 struct QI {
   struct QI * next;
@@ -42,4 +45,6 @@ struct QI * headqi;
 struct QI * tailqi;
 
 
+
+
 #endif /* __WORK_SCHEDULE__ */
diff --git a/Robust/src/Tests/deque/makefile b/Robust/src/Tests/deque/makefile
new file mode 100644 (file)
index 0000000..8f9d951
--- /dev/null
@@ -0,0 +1,21 @@
+PROGRAM=test-deque
+
+DQDIR=../../Runtime
+DEFS= -D "RUNMALLOC=malloc" -D "RUNFREE=free" -DBIT64 -DDEBUG_DEQUE
+FLAGS= -m64 -g #-O3
+
+all: $(PROGRAM)
+
+$(PROGRAM): $(PROGRAM).o deque.o
+       gcc $(PROGRAM).o deque.o -lpthread -o $(PROGRAM)
+
+deque.o: $(DQDIR)/deque.h $(DQDIR)/deque.c
+       gcc -c $(FLAGS) -I$(DQDIR) $(DEFS) $(DQDIR)/deque.c -o deque.o
+
+$(PROGRAM).o: $(PROGRAM).c $(DQDIR)/deque.h
+       gcc -c $(FLAGS) -I$(DQDIR) $(DEFS) $(PROGRAM).c -o $(PROGRAM).o
+
+clean:
+       rm -f $(PROGRAM)
+       rm -f *.o
+       rm -f *~
diff --git a/Robust/src/Tests/deque/test-deque.c b/Robust/src/Tests/deque/test-deque.c
new file mode 100644 (file)
index 0000000..c60e9bb
--- /dev/null
@@ -0,0 +1,136 @@
+#include <stdlib.h>
+#include <stdio.h>
+
+#include <pthread.h>
+#include "deque.h"
+
+
+#define numThreads 24
+#define numCycles  1000
+#define retries    100000
+
+
+// global array of work-stealing deques
+// each thread knows its own index
+deque* deques;
+
+
+
+// for artificially keeping threads busy
+int spin( int n ) {
+  long x = 0;
+  int i, j;
+  //for( i = 0; i < n; ++i ) {
+  //  for( j = 0; j < numCycles; ++j ) {
+  //    x = (x * x) + (x + x) - (x * 1 * x * 2) + i;
+  //  }
+  // }
+  return x;
+}
+
+
+
+void* workerMain( void* arg ) {
+  INTPTR i = (INTPTR)arg;
+  
+  INTPTR w = 0;
+  int r = retries;
+
+  deque* dq = &(deques[i]);
+  
+  srand( i * 1777 );
+
+  int j;
+  for( j = 0; j < i; ++j ) {
+    int* one = malloc( sizeof( int ) );
+    *one = 1;
+    dqPushBottom( dq, one );
+    spin( i );
+  }
+
+  while( r > 0 ) {
+    void* num = dqPopBottom( dq );
+
+    if( num == DQ_POP_ABORT ) {
+      // another op is in progress, try again
+      continue;
+
+    } else if( num == DQ_POP_EMPTY ) {
+      
+      // IF YOU INSERT THIS (NEVER STEAL) THE AMOUNT
+      // OF WORK COMES OUT RIGHT?!?!?!
+      //pthread_exit( (void*)w );
+
+      // no work here, steal!
+      int v = rand() % numThreads;
+      num = dqPopTop( &(deques[v]) );
+      
+      if( num == DQ_POP_ABORT ) {
+        // another op in progress, try again later
+        continue;
+
+      } else if( num == DQ_POP_EMPTY ) {
+        // lose a retry
+        r--;
+        continue;
+
+      } else {
+        // STOLE WORK!
+        w += *((int*)num);
+        spin( w / (i+1) );
+        continue;
+      }
+
+    } else {
+      // grabbed work
+      w += *((int*)num);
+      spin( w / (i+1) );
+      continue;
+    }
+  }
+  printf( "I'm %d and I did %d many.\n", i, w );
+  pthread_exit( (void*)w );
+}
+
+
+int main() {
+  INTPTR i;
+  int    j;
+
+  pthread_t      threads[numThreads];
+  pthread_attr_t attr;
+
+  long total = 0;
+
+
+  pthread_attr_init( &attr );
+  pthread_attr_setdetachstate( &attr, 
+                               PTHREAD_CREATE_JOINABLE );
+
+  deques = malloc( sizeof( deque )*numThreads );
+
+  for( i = 0; i < numThreads; ++i ) {
+    dqInit( &(deques[i]) );
+  }
+
+  for( i = 0; i < numThreads; ++i ) {
+    pthread_create( &(threads[i]),
+                    &attr,
+                    workerMain,
+                    (void*)i );
+    printf( "." );
+  }
+  
+  printf( "\n" );
+
+  for( i = 0; i < numThreads; ++i ) {
+    long x;
+    pthread_join( threads[i],
+                  (void*)&x );
+    total += x;
+    printf( "+" );
+  }
+
+  printf( "\nTotal (expect 300)=%d\n", total+24 );
+  return 0;
+}