// This is an implementation of the structure described in
// A Dynamic-Sized Nonblocking Work Stealing Deque
// Hendler, Lev, Moir, and Shavit
-//
+//
// The bottom and top values for the deque must be CAS-able
// and fit into 64 bits. Our strategy for this is:
-//
+//
// 19-bit Tag 36-bit Node Pointer 9-bit Index
// +-----------+-------------------------+------------+
// | 63 ... 45 | 44 ... 9 | 8 ... 0 |
// +-----------+-------------------------+------------+
//
-// Let's call the encoded info E. To retrieve the values:
+// Let's call the encoded info E. To retrieve the values:
// tag = (0xffffe00000000000 & E) >> 45;
// ptr = (0x00001ffffffffe00 & E) << 3;
// idx = (0x00000000000001ff & E);
// http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
// And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
// of a 64-bit pointer are always zero. This means if we are only
-// alloted 36 bits to store a pointer to a Node we have
+// alloted 36 bits to store a pointer to a Node we have
// 48 - 3 - 36 = 9 bits that could be lost. Instead of aligning Node
// pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
// sure the lower 12 bits of the address are zero. THEREFORE:
// Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
-// we can ecnode the rest in 36 bits without a loss of information.
+// we can ecnode the rest in 36 bits without a loss of information.
//
////////////////////////////////////////////////////////////////
-// the dequeNode struct must be 4096-byte aligned,
+// the dequeNode struct must be 4096-byte aligned,
// see above, so use the following magic to ask
// the allocator for a space that wastes 4095 bytes
// but gaurantees the address of the struct within
// that space is 4096-aligned
const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
-static inline dequeNode* dqGet4096aligned( void* fromAllocator ) {
+static inline dequeNode* dqGet4096aligned(void* fromAllocator) {
INTPTR aligned = ((INTPTR)fromAllocator + 4095) & (~4095);
#ifdef DEBUG_DEQUE
}
-static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
+static inline INTPTR dqEncode(int tag, dequeNode* ptr, int idx) {
INTPTR ptrE = (0x00001ffffffffe00 & // second, mask off the addr's high-order 1's
(((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
(ptrE) |
((INTPTR)idx);
#ifdef DEBUG_DEQUE
- int tagOut = dqDecodeTag( E );
- if( tag != tagOut ) { printf( "Lost tag information.\n" ); exit( -1 ); }
+ int tagOut = dqDecodeTag(E);
+ if( tag != tagOut ) {
+ printf("Lost tag information.\n"); exit(-1);
+ }
- dequeNode* ptrOut = dqDecodePtr( E );
- if( ptr != ptrOut ) { printf( "Lost ptr information.\n" ); exit( -1 ); }
+ dequeNode* ptrOut = dqDecodePtr(E);
+ if( ptr != ptrOut ) {
+ printf("Lost ptr information.\n"); exit(-1);
+ }
- int idxOut = dqDecodeIdx( E );
- if( idx != idxOut ) { printf( "Lost idx information.\n" ); exit( -1 ); }
+ int idxOut = dqDecodeIdx(E);
+ if( idx != idxOut ) {
+ printf("Lost idx information.\n"); exit(-1);
+ }
#endif
return E;
}
-static inline int dqIndicateEmpty( INTPTR bottom, INTPTR top ) {
- dequeNode* botNode = dqDecodePtr( bottom );
- int botIndx = dqDecodeIdx( bottom );
- dequeNode* topNode = dqDecodePtr( top );
- int topIndx = dqDecodeIdx( top );
+static inline int dqIndicateEmpty(INTPTR bottom, INTPTR top) {
+ dequeNode* botNode = dqDecodePtr(bottom);
+ int botIndx = dqDecodeIdx(bottom);
+ dequeNode* topNode = dqDecodePtr(top);
+ int topIndx = dqDecodeIdx(top);
if( (botNode == topNode) &&
(botIndx == topIndx || botIndx == (topIndx+1))
-void dqInit( deque* dq ) {
+void dqInit(deque* dq) {
+
+ dq->memPool = poolcreate(DQNODE_SIZETOREQUEST, NULL);
- dq->memPool = poolcreate( DQNODE_SIZETOREQUEST, NULL );
+ dequeNode* a = dqGet4096aligned(poolalloc(dq->memPool) );
+ dequeNode* b = dqGet4096aligned(poolalloc(dq->memPool) );
- dequeNode* a = dqGet4096aligned( poolalloc( dq->memPool ) );
- dequeNode* b = dqGet4096aligned( poolalloc( dq->memPool ) );
-
a->next = b;
b->prev = a;
- dq->bottom = dqEncode( BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1 );
- dq->top = dqEncode( 0, a, DQNODE_ARRAYSIZE - 1 );
+ dq->bottom = dqEncode(BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1);
+ dq->top = dqEncode(0, a, DQNODE_ARRAYSIZE - 1);
}
-void dqPushBottom( deque* dq, void* item ) {
+void dqPushBottom(deque* dq, void* item) {
#ifdef DEBUG_DEQUE
if( item == 0x0 ) {
- printf( "Pushing invalid work into the deque.\n" );
+ printf("Pushing invalid work into the deque.\n");
}
#endif
- dequeNode* currNode = dqDecodePtr( dq->bottom );
- int currIndx = dqDecodeIdx( dq->bottom );
+ dequeNode* currNode = dqDecodePtr(dq->bottom);
+ int currIndx = dqDecodeIdx(dq->bottom);
currNode->itsDataArr[currIndx] = item;
dequeNode* newNode;
- int newIndx;
+ int newIndx;
if( currIndx != 0 ) {
newNode = currNode;
newIndx = currIndx - 1;
} else {
- newNode = dqGet4096aligned( poolalloc( dq->memPool ) );
+ newNode = dqGet4096aligned(poolalloc(dq->memPool) );
newNode->next = currNode;
currNode->prev = newNode;
newIndx = DQNODE_ARRAYSIZE - 1;
}
- dq->bottom = dqEncode( BOTTOM_NULL_TAG, newNode, newIndx );
+ dq->bottom = dqEncode(BOTTOM_NULL_TAG, newNode, newIndx);
}
-void* dqPopTop( deque* dq ) {
+void* dqPopTop(deque* dq) {
INTPTR currTop = dq->top;
- int currTopTag = dqDecodeTag( currTop );
- dequeNode* currTopNode = dqDecodePtr( currTop );
- int currTopIndx = dqDecodeIdx( currTop );
+ int currTopTag = dqDecodeTag(currTop);
+ dequeNode* currTopNode = dqDecodePtr(currTop);
+ int currTopIndx = dqDecodeIdx(currTop);
// read of top followed by read of bottom, algorithm
// says specifically must be in this order
BARRIER();
-
+
INTPTR currBottom = dq->bottom;
- if( dqIndicateEmpty( currBottom, currTop ) ) {
+ if( dqIndicateEmpty(currBottom, currTop) ) {
if( currTop == dq->top ) {
return DQ_POP_EMPTY;
} else {
}
dequeNode* nodeToFree;
- int newTopTag;
+ int newTopTag;
dequeNode* newTopNode;
- int newTopIndx;
+ int newTopIndx;
if( currTopIndx != 0 ) {
nodeToFree = NULL;
void* retVal = currTopNode->itsDataArr[currTopIndx];
- INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
+ INTPTR newTop = dqEncode(newTopTag, newTopNode, newTopIndx);
// algorithm states above should happen
// before attempting the CAS
BARRIER();
INTPTR actualTop = (INTPTR)
- CAS( &(dq->top), // location
- currTop, // expected value
- newTop ); // desired value
+ CAS(&(dq->top), // location
+ currTop, // expected value
+ newTop); // desired value
if( actualTop == currTop ) {
// CAS succeeded
if( nodeToFree != NULL ) {
- poolfreeinto( dq->memPool, nodeToFree );
+ poolfreeinto(dq->memPool, nodeToFree);
}
return retVal;
}
-void* dqPopBottom ( deque* dq ) {
+void* dqPopBottom(deque* dq) {
INTPTR oldBot = dq->bottom;
- dequeNode* oldBotNode = dqDecodePtr( oldBot );
- int oldBotIndx = dqDecodeIdx( oldBot );
-
+ dequeNode* oldBotNode = dqDecodePtr(oldBot);
+ int oldBotIndx = dqDecodeIdx(oldBot);
+
dequeNode* newBotNode;
- int newBotIndx;
+ int newBotIndx;
if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
newBotNode = oldBotNode;
void* retVal = newBotNode->itsDataArr[newBotIndx];
- dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
+ dq->bottom = dqEncode(BOTTOM_NULL_TAG, newBotNode, newBotIndx);
// algorithm states above should happen
// before attempting the CAS
INTPTR currTop = dq->top;
- int currTopTag = dqDecodeTag( currTop );
- dequeNode* currTopNode = dqDecodePtr( currTop );
- int currTopIndx = dqDecodeIdx( currTop );
+ int currTopTag = dqDecodeTag(currTop);
+ dequeNode* currTopNode = dqDecodePtr(currTop);
+ int currTopIndx = dqDecodeIdx(currTop);
if( oldBotNode == currTopNode &&
oldBotIndx == currTopIndx ) {
- dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
+ dq->bottom = dqEncode(BOTTOM_NULL_TAG, oldBotNode, oldBotIndx);
return DQ_POP_EMPTY;
} else if( newBotNode == currTopNode &&
newBotIndx == currTopIndx ) {
- INTPTR newTop = dqEncode( currTopTag + 1, currTopNode, currTopIndx );
+ INTPTR newTop = dqEncode(currTopTag + 1, currTopNode, currTopIndx);
INTPTR actualTop = (INTPTR)
- CAS( &(dq->top), // location
- currTop, // expected value
- newTop ); // desired value
+ CAS(&(dq->top), // location
+ currTop, // expected value
+ newTop); // desired value
if( actualTop == currTop ) {
// CAS succeeded
if( oldBotNode != newBotNode ) {
- poolfreeinto( dq->memPool, oldBotNode );
+ poolfreeinto(dq->memPool, oldBotNode);
}
return retVal;
-
+
} else {
- dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
+ dq->bottom = dqEncode(BOTTOM_NULL_TAG, oldBotNode, oldBotIndx);
return DQ_POP_EMPTY;
}
-
+
} else {
if( oldBotNode != newBotNode ) {
- poolfreeinto( dq->memPool, oldBotNode );
+ poolfreeinto(dq->memPool, oldBotNode);
}
- return retVal;
+ return retVal;
}
}