switch to spaces only..
[IRC.git] / Robust / src / Runtime / deque.c
1 ////////////////////////////////////////////////////////////////
2 //
3 //  This is an implementation of the structure described in
4 //  A Dynamic-Sized Nonblocking Work Stealing Deque
5 //  Hendler, Lev, Moir, and Shavit
6 //
7 //  The bottom and top values for the deque must be CAS-able
8 //  and fit into 64 bits.  Our strategy for this is:
9 //
10 //    19-bit Tag    36-bit Node Pointer     9-bit Index
11 //   +-----------+-------------------------+------------+
12 //   | 63 ... 45 | 44 ...                9 | 8 ...    0 |
13 //   +-----------+-------------------------+------------+
14 //
15 //  Let's call the encoded info E.  To retrieve the values:
16 //    tag = (0xffffe00000000000 & E) >> 45;
17 //    ptr = (0x00001ffffffffe00 & E) <<  3;
18 //    idx = (0x00000000000001ff & E);
19 //
20 //  Increment the tag without decrypting:
21 //    E = (0x00001fffffffffff | E) + 1;
22 //
23 //  Increment (decrement) the index when it is not equal to
24 //  MAXINDEX (0) with E++ (E--).
25 //
26 //  x86 64-bit processors currently only use the lowest 48 bits for
27 //  virtual addresses, source:
28 //  http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
29 //  And 64-bit addresses are 2^3=8 byte aligned, so the lower 3 bits
30 //  of a 64-bit pointer are always zero.  This means if we are only
31 //  alloted 36 bits to store a pointer to a Node we have
32 //  48 - 3 - 36 = 9 bits that could be lost.  Instead of aligning Node
33 //  pointers to 8 bytes we can align them to 2^(3+9)=4096 bytes and be
34 //  sure the lower 12 bits of the address are zero.  THEREFORE:
35 //  Nodes must be 4096-byte aligned so the lower 12 bits are zeroes and
36 //  we can ecnode the rest in 36 bits without a loss of information.
37 //
38 ////////////////////////////////////////////////////////////////
39
40 #ifdef DEBUG_DEQUE
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #endif
45
46 #include "deque.h"
47
48
49 void* DQ_POP_EMPTY = (void*)0x17;
50 void* DQ_POP_ABORT = (void*)0x33;
51
52
53 // define a 19-bit dummy tag for the bottom
54 // value with a pattern that will expose errors
55 #define BOTTOM_NULL_TAG 0x40001
56
57
58
59 // the dequeNode struct must be 4096-byte aligned,
60 // see above, so use the following magic to ask
61 // the allocator for a space that wastes 4095 bytes
62 // but gaurantees the address of the struct within
63 // that space is 4096-aligned
64 const INTPTR DQNODE_SIZETOREQUEST = sizeof( dequeNode ) + 4095;
65
66 static inline dequeNode* dqGet4096aligned(void* fromAllocator) {
67   INTPTR aligned = ((INTPTR)fromAllocator + 4095) & (~4095);
68
69 #ifdef DEBUG_DEQUE
70   //printf( "from allocator: 0x%08x to 0x%08x\n", (INTPTR)fromAllocator, (INTPTR)fromAllocator + DQNODE_SIZETOREQUEST );
71   //printf( "aligned:        0x%08x to 0x%08x\n", aligned,               aligned               + sizeof( dequeNode )  );
72   memset( (void*) aligned, 0x9, sizeof( dequeNode ) );
73 #endif
74
75   return (dequeNode*) aligned;
76 }
77
78
79 static inline INTPTR dqEncode(int tag, dequeNode* ptr, int idx) {
80   INTPTR ptrE = (0x00001ffffffffe00 &  // second, mask off the addr's high-order 1's
81                  (((INTPTR)ptr) >> 3)); // first, shift down 8-byte alignment bits
82
83   INTPTR E =
84     (((INTPTR)tag) << 45) |
85     (ptrE)                |
86     ((INTPTR)idx);
87 #ifdef DEBUG_DEQUE
88   int tagOut = dqDecodeTag(E);
89   if( tag != tagOut ) {
90     printf("Lost tag information.\n"); exit(-1);
91   }
92
93   dequeNode* ptrOut = dqDecodePtr(E);
94   if( ptr != ptrOut ) {
95     printf("Lost ptr information.\n"); exit(-1);
96   }
97
98   int idxOut = dqDecodeIdx(E);
99   if( idx != idxOut ) {
100     printf("Lost idx information.\n"); exit(-1);
101   }
102 #endif
103   return E;
104 }
105
106
107 static inline int dqIndicateEmpty(INTPTR bottom, INTPTR top) {
108   dequeNode* botNode = dqDecodePtr(bottom);
109   int botIndx = dqDecodeIdx(bottom);
110   dequeNode* topNode = dqDecodePtr(top);
111   int topIndx = dqDecodeIdx(top);
112
113   if( (botNode == topNode) &&
114       (botIndx == topIndx || botIndx == (topIndx+1))
115       ) {
116     return 1;
117   }
118
119   if( (botNode == topNode->next) &&
120       (botIndx == 0)             &&
121       (topIndx == DQNODE_ARRAYSIZE - 1)
122       ) {
123     return 1;
124   }
125
126   return 0;
127 }
128
129
130
131 void dqInit(deque* dq) {
132
133   dq->memPool = poolcreate(DQNODE_SIZETOREQUEST, NULL);
134
135   dequeNode* a = dqGet4096aligned(poolalloc(dq->memPool) );
136   dequeNode* b = dqGet4096aligned(poolalloc(dq->memPool) );
137
138   a->next = b;
139   b->prev = a;
140
141   dq->bottom = dqEncode(BOTTOM_NULL_TAG, a, DQNODE_ARRAYSIZE - 1);
142   dq->top    = dqEncode(0,               a, DQNODE_ARRAYSIZE - 1);
143 }
144
145
146 void dqPushBottom(deque* dq, void* item) {
147
148 #ifdef DEBUG_DEQUE
149   if( item == 0x0 ) {
150     printf("Pushing invalid work into the deque.\n");
151   }
152 #endif
153
154   dequeNode* currNode = dqDecodePtr(dq->bottom);
155   int currIndx = dqDecodeIdx(dq->bottom);
156
157   currNode->itsDataArr[currIndx] = item;
158
159   dequeNode* newNode;
160   int newIndx;
161
162   if( currIndx != 0 ) {
163     newNode = currNode;
164     newIndx = currIndx - 1;
165
166   } else {
167     newNode        = dqGet4096aligned(poolalloc(dq->memPool) );
168     newNode->next  = currNode;
169     currNode->prev = newNode;
170     newIndx        = DQNODE_ARRAYSIZE - 1;
171   }
172
173   dq->bottom = dqEncode(BOTTOM_NULL_TAG, newNode, newIndx);
174 }
175
176
177 void* dqPopTop(deque* dq) {
178
179   INTPTR currTop = dq->top;
180
181   int currTopTag  = dqDecodeTag(currTop);
182   dequeNode* currTopNode = dqDecodePtr(currTop);
183   int currTopIndx = dqDecodeIdx(currTop);
184
185
186   // read of top followed by read of bottom, algorithm
187   // says specifically must be in this order
188   BARRIER();
189
190   INTPTR currBottom = dq->bottom;
191
192   if( dqIndicateEmpty(currBottom, currTop) ) {
193     if( currTop == dq->top ) {
194       return DQ_POP_EMPTY;
195     } else {
196       return DQ_POP_ABORT;
197     }
198   }
199
200   dequeNode* nodeToFree;
201   int newTopTag;
202   dequeNode* newTopNode;
203   int newTopIndx;
204
205   if( currTopIndx != 0 ) {
206     nodeToFree = NULL;
207     newTopTag  = currTopTag;
208     newTopNode = currTopNode;
209     newTopIndx = currTopIndx - 1;
210
211   } else {
212     nodeToFree = currTopNode->next;
213     newTopTag  = currTopTag + 1;
214     newTopNode = currTopNode->prev;
215     newTopIndx = DQNODE_ARRAYSIZE - 1;
216   }
217
218   void* retVal = currTopNode->itsDataArr[currTopIndx];
219
220   INTPTR newTop = dqEncode(newTopTag, newTopNode, newTopIndx);
221
222   // algorithm states above should happen
223   // before attempting the CAS
224   BARRIER();
225
226   INTPTR actualTop = (INTPTR)
227                      CAS(&(dq->top), // location
228                          currTop, // expected value
229                          newTop); // desired value
230
231   if( actualTop == currTop ) {
232     // CAS succeeded
233     if( nodeToFree != NULL ) {
234       poolfreeinto(dq->memPool, nodeToFree);
235     }
236     return retVal;
237
238   } else {
239     return DQ_POP_ABORT;
240   }
241 }
242
243
244 void* dqPopBottom(deque* dq) {
245
246   INTPTR oldBot = dq->bottom;
247
248   dequeNode* oldBotNode = dqDecodePtr(oldBot);
249   int oldBotIndx = dqDecodeIdx(oldBot);
250
251   dequeNode* newBotNode;
252   int newBotIndx;
253
254   if( oldBotIndx != DQNODE_ARRAYSIZE - 1 ) {
255     newBotNode = oldBotNode;
256     newBotIndx = oldBotIndx + 1;
257
258   } else {
259     newBotNode = oldBotNode->next;
260     newBotIndx = 0;
261   }
262
263   void* retVal = newBotNode->itsDataArr[newBotIndx];
264
265   dq->bottom = dqEncode(BOTTOM_NULL_TAG, newBotNode, newBotIndx);
266
267   // algorithm states above should happen
268   // before attempting the CAS
269   BARRIER();
270
271   INTPTR currTop = dq->top;
272
273   int currTopTag  = dqDecodeTag(currTop);
274   dequeNode* currTopNode = dqDecodePtr(currTop);
275   int currTopIndx = dqDecodeIdx(currTop);
276
277   if( oldBotNode == currTopNode &&
278       oldBotIndx == currTopIndx ) {
279     dq->bottom = dqEncode(BOTTOM_NULL_TAG, oldBotNode, oldBotIndx);
280     return DQ_POP_EMPTY;
281
282   } else if( newBotNode == currTopNode &&
283              newBotIndx == currTopIndx ) {
284     INTPTR newTop = dqEncode(currTopTag + 1, currTopNode, currTopIndx);
285
286     INTPTR actualTop = (INTPTR)
287                        CAS(&(dq->top), // location
288                            currTop, // expected value
289                            newTop); // desired value
290
291     if( actualTop == currTop ) {
292       // CAS succeeded
293       if( oldBotNode != newBotNode ) {
294         poolfreeinto(dq->memPool, oldBotNode);
295       }
296       return retVal;
297
298     } else {
299       dq->bottom = dqEncode(BOTTOM_NULL_TAG, oldBotNode, oldBotIndx);
300       return DQ_POP_EMPTY;
301     }
302
303   } else {
304     if( oldBotNode != newBotNode ) {
305       poolfreeinto(dq->memPool, oldBotNode);
306     }
307     return retVal;
308   }
309 }