make sure mistakes are obvious with identifiable values
[IRC.git] / Robust / src / Runtime / deque.c
1 ////////////////////////////////////////////////////////////////
2 //
3 //  This is an implementation of the structure described in
4 //  A Dynamic-Sized Nonblocking Work Stealing Deque
5 //  Hendler, Lev, Moir, and Shavit
6 //   
7 //  The bottom and top values for the deque must be CAS-able
8 //  and fit into 64 bits.  Our strategy for this is:
9 //  
10 //    19-bit Tag    36-bit Node Pointer     9-bit Index
11 //   +-----------+-------------------------+------------+
12 //   | 63 ... 45 | 44 ...                9 | 8 ...    0 |
13 //   +-----------+-------------------------+------------+
14 //
15 //  Let's call the encoded info E.  To retrieve the values:  
16 //    tag = (0xffffe00000000000 & E) >> 45;
17 //    ptr = (0x00001ffffffffe00 & E) <<  3;
18 //    idx = (0x00000000000001ff & E);
19 //
20 //  Increment the tag without decrypting:
21 //    E = (0x00001fffffffffff | E) + 1;
22 //
23 //  Increment (decrement) the index when it is not equal to
24 //  MAXINDEX (0) with E++ (E--).
25 //
26 //  x86 64-bit processors currently only use the lowest 48 bits for
27 //  virtual addresses, source:
28 //  http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
29 //  And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
30 //  of a 64-bit pointer are always zero.  This means if we are only
31 //  alloted 36 bits to store a pointer to a Node we have 
32 //  48 - 3 - 36 = 9 bits that could be lost.  Instead of aligning Node
33 //  pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
34 //  sure the lower 12 bits of the address are zero.  THEREFORE:
35 //  Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
36 //  we can ecnode the rest in 36 bits without a loss of information.  
37 //
38 ////////////////////////////////////////////////////////////////
39
40 #ifdef DEBUG_DEQUE
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #endif
45
46 #include "deque.h"
47
48
49 void* DQ_POP_EMPTY = (void*)0x17;
50 void* DQ_POP_ABORT = (void*)0x33;
51
52
53 // define a 19-bit dummy tag for the bottom
54 // value with a pattern that will expose errors
55 #define BOTTOM_NULL_TAG 0x40001
56
57
58
59 // the dequeNode struct must be 4096-byte aligned, 
60 // see above, so use the following magic to ask
61 // the allocator for a space that wastes 4095 bytes
62 // but gaurantees the address of the struct within
63 // that space is 4096-aligned
64 const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
65
66 static inline dequeNode* dqGet4096aligned( void* fromAllocator ) { 
67   INTPTR aligned = ((INTPTR)fromAllocator + 4095) & (~4095);
68
69 #ifdef DEBUG_DEQUE
70   //printf( "from allocator: 0x%08x to 0x%08x\n", (INTPTR)fromAllocator, (INTPTR)fromAllocator + DQNODE_SIZETOREQUEST );
71   //printf( "aligned:        0x%08x to 0x%08x\n", aligned,               aligned               + sizeof( dequeNode )  );
72   memset( (void*) aligned, 0x9, sizeof( dequeNode ) );
73 #endif
74
75   return (dequeNode*) aligned;
76 }
77
78
79 static inline INTPTR dqEncode( int tag, dequeNode* ptr, int idx ) {
80   INTPTR ptrE = (0x00001ffffffffe00 &  // second, mask off the addr's high-order 1's
81                  (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
82
83   INTPTR E =
84     (((INTPTR)tag) << 45) |
85     (ptrE)                |
86     ((INTPTR)idx);
87 #ifdef DEBUG_DEQUE
88   int tagOut = dqDecodeTag( E ); 
89   if( tag != tagOut ) { printf( "Lost tag information.\n" ); exit( -1 ); }
90
91   dequeNode* ptrOut = dqDecodePtr( E );
92   if( ptr != ptrOut ) { printf( "Lost ptr information.\n" ); exit( -1 ); }
93
94   int idxOut = dqDecodeIdx( E );
95   if( idx != idxOut ) { printf( "Lost idx information.\n" ); exit( -1 ); }
96 #endif
97   return E;
98 }
99
100
101 static inline int dqIndicateEmpty( INTPTR bottom, INTPTR top ) {
102   dequeNode* botNode = dqDecodePtr( bottom );
103   int        botIndx = dqDecodeIdx( bottom );
104   dequeNode* topNode = dqDecodePtr( top );
105   int        topIndx = dqDecodeIdx( top );  
106
107   if( (botNode == topNode) &&
108       (botIndx == topIndx || botIndx == (topIndx+1))
109       ) {
110     return 1;
111   }
112
113   if( (botNode == topNode->next) &&
114       (botIndx == 0)             &&
115       (topIndx == DQNODE_ARRAYSIZE - 1)
116       ) {
117     return 1;
118   }
119
120   return 0;
121 }
122
123
124
125 void dqInit( deque* dq ) {
126
127   dq->memPool = poolcreate( DQNODE_SIZETOREQUEST );
128
129   dequeNode* a = dqGet4096aligned( poolalloc( dq->memPool ) );
130   dequeNode* b = dqGet4096aligned( poolalloc( dq->memPool ) );
131   
132   a->next = b;
133   b->prev = a;
134
135   dq->bottom = dqEncode( BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1 );
136   dq->top    = dqEncode( 0,               a, DQNODE_ARRAYSIZE - 1 );
137 }
138
139
140 void dqPushBottom( deque* dq, void* item ) {
141
142 #ifdef DEBUG_DEQUE
143   if( item == 0x0 ) {
144     printf( "Pushing invalid work into the deque.\n" );
145   }
146 #endif
147
148   dequeNode* currNode = dqDecodePtr( dq->bottom );
149   int        currIndx = dqDecodeIdx( dq->bottom );
150
151   currNode->itsDataArr[currIndx] = item;
152
153   dequeNode* newNode;
154   int        newIndx;
155
156   if( currIndx != 0 ) {
157     newNode = currNode;
158     newIndx = currIndx - 1;
159
160   } else {
161     newNode        = dqGet4096aligned( poolalloc( dq->memPool ) );
162     newNode->next  = currNode;
163     currNode->prev = newNode;
164     newIndx        = DQNODE_ARRAYSIZE - 1;
165   }
166
167   dq->bottom = dqEncode( BOTTOM_NULL_TAG, newNode, newIndx );
168 }
169
170
171 void* dqPopTop( deque* dq ) {
172
173   INTPTR currTop = dq->top;
174
175   int        currTopTag  = dqDecodeTag( currTop );
176   dequeNode* currTopNode = dqDecodePtr( currTop );
177   int        currTopIndx = dqDecodeIdx( currTop );
178
179
180   // read of top followed by read of bottom, algorithm
181   // says specifically must be in this order
182   BARRIER();
183   
184   INTPTR currBottom = dq->bottom;
185
186   if( dqIndicateEmpty( currBottom, currTop ) ) {
187     if( currTop == dq->top ) {
188       return DQ_POP_EMPTY;
189     } else {
190       return DQ_POP_ABORT;
191     }
192   }
193
194   dequeNode* nodeToFree;
195   int        newTopTag;
196   dequeNode* newTopNode;
197   int        newTopIndx;
198
199   if( currTopIndx != 0 ) {
200     nodeToFree = NULL;
201     newTopTag  = currTopTag;
202     newTopNode = currTopNode;
203     newTopIndx = currTopIndx - 1;
204
205   } else {
206     nodeToFree = currTopNode->next;
207     newTopTag  = currTopTag + 1;
208     newTopNode = currTopNode->prev;
209     newTopIndx = DQNODE_ARRAYSIZE - 1;
210   }
211
212   void* retVal = currTopNode->itsDataArr[currTopIndx];
213
214   INTPTR newTop = dqEncode( newTopTag, newTopNode, newTopIndx );
215
216   // algorithm states above should happen
217   // before attempting the CAS
218   BARRIER();
219
220   INTPTR actualTop = (INTPTR)
221     CAS( &(dq->top), // location
222          currTop,    // expected value
223          newTop );   // desired value
224
225   if( actualTop == currTop ) {
226     // CAS succeeded
227     if( nodeToFree != NULL ) {
228       poolfreeinto( dq->memPool, nodeToFree );
229     }
230     return retVal;
231
232   } else {
233     return DQ_POP_ABORT;
234   }
235 }
236
237
238 void* dqPopBottom ( deque* dq ) {
239
240   INTPTR oldBot = dq->bottom;
241
242   dequeNode* oldBotNode = dqDecodePtr( oldBot );
243   int        oldBotIndx = dqDecodeIdx( oldBot );
244   
245   dequeNode* newBotNode;
246   int        newBotIndx;
247
248   if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
249     newBotNode = oldBotNode;
250     newBotIndx = oldBotIndx + 1;
251
252   } else {
253     newBotNode = oldBotNode->next;
254     newBotIndx = 0;
255   }
256
257   void* retVal = newBotNode->itsDataArr[newBotIndx];
258
259   dq->bottom = dqEncode( BOTTOM_NULL_TAG, newBotNode, newBotIndx );
260
261   // algorithm states above should happen
262   // before attempting the CAS
263   BARRIER();
264
265   INTPTR currTop = dq->top;
266
267   int        currTopTag  = dqDecodeTag( currTop );
268   dequeNode* currTopNode = dqDecodePtr( currTop );
269   int        currTopIndx = dqDecodeIdx( currTop );
270
271   if( oldBotNode == currTopNode &&
272       oldBotIndx == currTopIndx ) {
273     dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );
274     return DQ_POP_EMPTY;
275
276   } else if( newBotNode == currTopNode &&
277              newBotIndx == currTopIndx ) {
278     INTPTR newTop = dqEncode( currTopTag + 1, currTopNode, currTopIndx );
279
280     INTPTR actualTop = (INTPTR)
281       CAS( &(dq->top), // location
282            currTop,    // expected value
283            newTop );   // desired value
284
285     if( actualTop == currTop ) {
286       // CAS succeeded
287       if( oldBotNode != newBotNode ) {
288         poolfreeinto( dq->memPool, oldBotNode );
289       }
290       return retVal;
291       
292     } else {
293       dq->bottom = dqEncode( BOTTOM_NULL_TAG, oldBotNode, oldBotIndx );      
294       return DQ_POP_EMPTY;
295     }
296     
297   } else {
298     if( oldBotNode != newBotNode ) {
299       poolfreeinto( dq->memPool, oldBotNode );
300     }
301     return retVal;    
302   }
303 }