299e7a320533cf9b8362b6c812a57f63c7b6f9d6
[cdsspec-compiler.git] / benchmark / cliffc-hashtable / simplified_cliffc_hashtable.h
1 #ifndef SIMPLIFIED_CLIFFC_HASHTABLE_H
2 #define SIMPLIFIED_CLIFFC_HASHTABLE_H
3
4 #include <iostream>
5 #include <atomic>
6 #include <memory>
7 #include <assert.h>
8
9 using namespace std;
10
11
12
13 /**
14         This header file declares and defines a simplified version of Cliff Click's
15         NonblockingHashMap. It contains all the necessary structrues and main
16         functions. In simplified_cliffc_hashtable.cc file, it has the definition for
17         the static fields.
18 */
19
20 template<typename TypeK, typename TypeV>
21 class cliffc_hashtable;
22
23 /**
24         Corresponding the the Object[] array in Cliff Click's Java implementation.
25         It keeps the first two slots for CHM (Hashtable control unit) and the hash
26         records (an array of hash used for fast negative key-equality check).
27 */
28 struct kvs_data {
29         int _size;
30         atomic<void*> *_data;
31         
32         kvs_data(int sz) {
33                 _size = sz;
34                 int real_size = sizeof(atomic<void*>) * 2 + 2;
35                 _data = new atomic<void*>[real_size];
36                 // The control block should be initialized in resize()
37                 // Init the hash record array
38                 int *hashes = new int[_size];
39                 int i;
40                 for (i = 0; i < _size; i++) {
41                         hashes[i] = 0;
42                 }
43                 _data[1].store(hashes, memory_order_relaxed);
44                 // Init the data to Null slot
45                 for (i = 2; i < real_size; i++) {
46                         _data[i].store(NULL, memory_order_relaxed);
47                 }
48         }
49
50         ~kvs_data() {
51                 int *hashes = (int*) _data[1].load(memory_order_relaxed);
52                 delete hashes;
53                 delete[] _data;
54         }
55 };
56
57 struct slot {
58         bool _prime;
59         shared_ptr<void> _ptr;
60
61         slot(bool prime, shared_ptr<void> ptr) {
62                 _prime = prime;
63                 _ptr = ptr;
64         }
65
66 };
67
68
69 /**
70         TypeK must have defined function "int hashCode()" which return the hash
71         code for the its object, and "int equals(TypeK anotherKey)" which is
72         used to judge equality.
73         TypeK and TypeV should define their own copy constructor.
74         To make the memory management safe and similar to Cliff Click's Java
75         implementation, we use shared_ptr instead of normal pointer in terms of the
76         pointers that point to TypeK and TypeV.
77 */
78 template<typename TypeK, typename TypeV>
79 class cliffc_hashtable {
80         /**
81                 # The synchronization we have for the hashtable gives us the property of
82                 # serializability, so we should have a sequential hashtable when we check the
83                 # correctness. The key thing is to identify all the commit point.
84         
85                 @Begin
86                 @Options:
87                         LANG = C;
88                         CLASS = cliffc_hashtable;
89                 @Global_define:
90                         @DeclareVar:
91                         spec_hashtable<TypeK, TypeV*> map;
92                         spec_hashtable<TypeK, Tag> id_map;
93                         Tag tag;
94                         @InitVar:
95                                 map = spec_hashtable<TypeK, TypeV*>();
96                                 id_map = spec_hashtable<TypeK, TypeV*>();
97                                 tag = Tag();
98                         @DefineFunc:
99                         static bool equals_val(TypeV *ptr1, TypeV *ptr2) {
100                                 // ...
101                         }
102                         
103                         # Update the tag for the current key slot if the corresponding tag
104                         # is NULL, otherwise just return that tag. It will update the next
105                         # available tag too if it requires a new tag for that key slot.
106                         static Tag getKeyTag(TypeK &key) {
107                                 if (id_map.get(key) == NULL) {
108                                         Tag cur_tag = tag.current();
109                                         id_map.put(key, cur_tag);
110                                         tag.next();
111                                         return cur_tag;
112                                 } else {
113                                         return id_map.get(key);
114                                 }
115                         }
116                 
117                 @Interface_cluster:
118                         Read_interface = {
119                                 Get,
120                                 PutIfAbsent,
121                                 RemoveAny,
122                                 RemoveIfMatch,
123                                 ReplaceAny,
124                                 ReplaceIfMatch
125                         }
126                         
127                         Write_interface = {
128                                 Put,
129                                 PutIfAbsent(COND_PutIfAbsentSucc),
130                                 RemoveAny,
131                                 RemoveIfMatch(COND_RemoveIfMatchSucc),
132                                 ReplaceAny,
133                                 ReplaceIfMatch(COND_ReplaceIfMatchSucc)
134                         }
135                 @Happens_before:
136                         Write_interface -> Read_interface
137                 @End
138         */
139
140 friend class CHM;
141         /**
142                 The control structure for the hashtable
143         */
144         private:
145         class CHM {
146                 friend class cliffc_hashtable;
147                 private:
148                 atomic<kvs_data*> _newkvs;
149                 
150                 // Size of active K,V pairs
151                 atomic_int _size;
152         
153                 // Count of used slots
154                 atomic_int _slots;
155                 
156                 // The next part of the table to copy
157                 atomic_int _copy_idx;
158                 
159                 // Work-done reporting
160                 atomic_int _copy_done;
161         
162                 public:
163                 CHM(int size) {
164                         _size.store(size, memory_order_relaxed);
165                         _slots.store(0, memory_order_relaxed);
166         
167                         _copy_idx.store(0, memory_order_relaxed);
168                         _copy_done.store(0, memory_order_release);
169                 }
170         
171                 ~CHM() {}
172                 
173                 private:
174                         
175                 // Heuristic to decide if the table is too full
176                 bool table_full(int reprobe_cnt, int len) {
177                         return
178                                 reprobe_cnt >= REPROBE_LIMIT &&
179                                 _slots.load(memory_order_relaxed) >= reprobe_limit(len);
180                 }
181         
182                 kvs_data* resize(cliffc_hashtable *topmap, kvs_data *kvs) {
183                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
184                         if (newkvs != NULL)
185                                 return newkvs;
186         
187                         // No copy in-progress, start one; Only double the table size
188                         int oldlen = kvs->_size;
189                         int sz = _size.load(memory_order_relaxed);
190                         int newsz = sz;
191                         
192                         // Just follow Cliff Click's heuristic to decide the new size
193                         if (sz >= (oldlen >> 2)) { // If we are 25% full
194                                 newsz = oldlen << 1; // Double size
195                                 if (sz >= (oldlen >> 1))
196                                         newsz = oldlen << 2; // Double double size
197                         }
198         
199                         // We do not record the record timestamp
200                         if (newsz <= oldlen) newsz = oldlen << 1;
201                         // Do not shrink ever
202                         if (newsz < oldlen) newsz = oldlen;
203         
204                         // Last check cause the 'new' below is expensive
205                         newkvs = _newkvs.load(memory_order_acquire);
206                         if (newkvs != NULL) return newkvs;
207         
208                         newkvs = new kvs_data(newsz);
209                         void *chm = (void*) new CHM(sz);
210                         newkvs->_data[0].store(chm, memory_order_relaxed);
211         
212                         kvs_data *cur_newkvs; 
213                         // Another check after the slow allocation
214                         if ((cur_newkvs = _newkvs.load(memory_order_acquire)) != NULL)
215                                 return cur_newkvs;
216                         // CAS the _newkvs to the allocated table
217                         kvs_data *desired = (kvs_data*) NULL;
218                         kvs_data *expected = (kvs_data*) newkvs; 
219                         if (!_newkvs.compare_exchange_strong(desired, expected, memory_order_release,
220                                         memory_order_release)) {
221                                 // Should clean the allocated area
222                                 delete newkvs;
223                                 newkvs = _newkvs.load(memory_order_acquire);
224                         }
225                         return newkvs;
226                 }
227         
228                 void help_copy_impl(cliffc_hashtable *topmap, kvs_data *oldkvs,
229                         bool copy_all) {
230                         assert (get_chm(oldkvs) == this);
231                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
232                         int oldlen = oldkvs->_size;
233                         int min_copy_work = oldlen > 1024 ? 1024 : oldlen;
234                 
235                         // Just follow Cliff Click's code here
236                         int panic_start = -1;
237                         int copyidx;
238                         while (_copy_done.load(memory_order_acquire) < oldlen) {
239                                 copyidx = _copy_idx.load(memory_order_acquire);
240                                 if (panic_start == -1) { // No painc
241                                         copyidx = _copy_idx.load(memory_order_acquire);
242                                         while (copyidx < (oldlen << 1) &&
243                                                 !_copy_idx.compare_exchange_strong(copyidx, copyidx +
244                                                         min_copy_work, memory_order_release, memory_order_release))
245                                                 copyidx = _copy_idx.load(memory_order_relaxed);
246                                         if (!(copyidx < (oldlen << 1)))
247                                                 panic_start = copyidx;
248                                 }
249         
250                                 // Now copy the chunk of work we claimed
251                                 int workdone = 0;
252                                 for (int i = 0; i < min_copy_work; i++)
253                                         if (copy_slot(topmap, (copyidx + i) & (oldlen - 1), oldkvs,
254                                                 newkvs))
255                                                 workdone++;
256                                 if (workdone > 0)
257                                         copy_check_and_promote(topmap, oldkvs, workdone);
258         
259                                 copyidx += min_copy_work;
260                                 if (!copy_all && panic_start == -1)
261                                         return; // We are done with the work we claim
262                         }
263                         copy_check_and_promote(topmap, oldkvs, 0); // See if we can promote
264                 }
265         
266                 kvs_data* copy_slot_and_check(cliffc_hashtable *topmap, kvs_data
267                         *oldkvs, int idx, void *should_help) {
268                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
269                         // We're only here cause the caller saw a Prime
270                         if (copy_slot(topmap, idx, oldkvs, _newkvs))
271                                 copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied
272                         return (should_help == NULL) ? newkvs : topmap->help_copy(newkvs);
273                 }
274         
275                 void copy_check_and_promote(cliffc_hashtable *topmap, kvs_data*
276                         oldkvs, int workdone) {
277                         int oldlen = oldkvs->_size;
278                         int copyDone = _copy_done.load(memory_order_relaxed);
279                         if (workdone > 0) {
280                                 while (true) {
281                                         copyDone = _copy_done.load(memory_order_relaxed);
282                                         if (_copy_done.compare_exchange_weak(copyDone, copyDone +
283                                                 workdone, memory_order_relaxed, memory_order_relaxed))
284                                                 break;
285                                 }
286                         }
287         
288                         // Promote the new table to the current table
289                         if (copyDone + workdone == oldlen &&
290                                 topmap->_kvs.load(memory_order_acquire) == oldkvs)
291                                 topmap->_kvs.compare_exchange_strong(oldkvs, _newkvs, memory_order_release,
292                                         memory_order_release);
293                 }
294         
295                 bool copy_slot(cliffc_hashtable *topmap, int idx, kvs_data *oldkvs,
296                         kvs_data *newkvs) {
297                         slot *key_slot;
298                         while ((key_slot = key(oldkvs, idx)) == NULL)
299                                 CAS_key(oldkvs, idx, NULL, TOMBSTONE);
300         
301                         // First CAS old to Prime
302                         slot *oldval = val(oldkvs, idx, NULL);
303                         while (!is_prime(oldval)) {
304                                 slot *box = (oldval == NULL || oldval == TOMBSTONE)
305                                         ? TOMBPRIME : new slot(true, oldval->_ptr);
306                                 if (CAS_val(oldkvs, idx, oldval, box)) {
307                                         if (box == TOMBPRIME)
308                                                 return 1; // Copy done
309                                         // Otherwise we CAS'd the box
310                                         oldval = box; // Record updated oldval
311                                         break;
312                                 }
313                                 oldval = val(oldkvs, idx, NULL); // Else re-try
314                         }
315         
316                         if (oldval == TOMBPRIME) return false; // Copy already completed here
317         
318                         slot *old_unboxed = new slot(false, oldval->_ptr);
319                         int copied_into_new = (putIfMatch(topmap, newkvs, key_slot, old_unboxed,
320                                 NULL) == NULL);
321         
322                         // Old value is exposed in the new table
323                         while (!CAS_val(oldkvs, idx, oldval, TOMBPRIME))
324                                 oldval = val(oldkvs, idx, NULL);
325         
326                         return copied_into_new;
327                 }
328         };
329
330         
331
332         private:
333         static const int Default_Init_Size = 8; // Intial table size
334
335         static slot* const MATCH_ANY;
336         static slot* const NO_MATCH_OLD;
337
338         static slot* const TOMBPRIME;
339         static slot* const TOMBSTONE;
340
341         static const int REPROBE_LIMIT = 10; // Forces a table-resize
342
343         atomic<kvs_data*> _kvs;
344
345         public:
346         cliffc_hashtable() {
347                 // Should initialize the CHM for the construction of the table
348                 // For other CHM in kvs_data, they should be initialzed in resize()
349                 // because the size is determined dynamically
350                 kvs_data *kvs = new kvs_data(Default_Init_Size);
351                 void *chm = (void*) new CHM(0);
352                 kvs->_data[0].store(chm, memory_order_relaxed);
353                 _kvs.store(kvs, memory_order_release);
354         }
355
356         cliffc_hashtable(int init_size) {
357                 // Should initialize the CHM for the construction of the table
358                 // For other CHM in kvs_data, they should be initialzed in resize()
359                 // because the size is determined dynamically
360                 kvs_data *kvs = new kvs_data(init_size);
361                 void *chm = (void*) new CHM(0);
362                 kvs->_data[0].store(chm, memory_order_relaxed);
363                 _kvs.store(kvs, memory_order_release);
364         }
365
366         /**
367                 @Begin
368                 @Interface: Get
369                 @Commit_point_set: Read_Val_Point1 | Read_Val_Point2 | Read_Val_Point3
370                 @ID: __sequential.getKeyTag(key)
371                 @Action:
372                         @DefineVar: TypeV *_Old_Val = __sequential.map.get(key)
373                 @Post_check:
374                         __sequential.equals_val(_Old_Val, __RET__)
375                 @End
376         */
377         shared_ptr<TypeV> get(TypeK& key) {
378                 void *key_ptr = (void*) new TypeK(key);
379                 slot *key_slot = new slot(false, shared_ptr<void>(key_ptr));
380                 int fullhash = hash(key_slot);
381                 slot *V = get_impl(this, _kvs, key_slot, fullhash);
382                 if (V == NULL) return NULL;
383                 assert (!is_prime(V));
384                 return static_pointer_cast<TypeV>(V->_ptr);
385         }
386
387         /**
388                 @Begin
389                 @Interface: Put
390                 @Commit_point_set: Write_Val_Point
391                 @ID: __sequential.getKeyTag(key)
392                 @Action:
393                         # Remember this old value at checking point
394                         @DefineVar: TypeV *_Old_Val = __sequential.map.get(key)
395                         @Code: __sequential.map.put(key, &val);
396                 @Post_check:
397                         __sequential.equals_val(__RET__, _Old_Val)
398                 @End
399         */
400         shared_ptr<TypeV> put(TypeK& key, TypeV& val) {
401                 return putIfMatch(key, val, NO_MATCH_OLD);
402         }
403
404         /**
405                 @Begin
406                 @Interface: PutIfAbsent
407                 @Commit_point_set:
408                         Write_Val_Point | PutIfAbsent_Fail_Point
409                 @Condition: __sequential.map.get(key) == NULL
410                 @HB_condition:
411                         COND_PutIfAbsentSucc :: __RET__ == NULL
412                 @ID: __sequential.getKeyTag(key)
413                 @Action:
414                         @DefineVar: TypeV *_Old_Val = __sequential.map.get(key)
415                         @Code:
416                         if (__COND_SAT__)
417                                 __sequential.map.put(key, &value);
418                 @Post_check:
419                         __COND_SAT__ ? __RET__ == NULL : __sequential.equals_val(_Old_Val, __RET__) 
420                 @End
421         */
422         shared_ptr<TypeV> putIfAbsent(TypeK& key, TypeV& value) {
423                 return putIfMatch(key, val, TOMBSTONE);
424         }
425
426         /**
427                 @Begin
428                 @Interface: RemoveAny
429                 @Commit_point_set: Write_Val_Point
430                 @ID: __sequential.getKeyTag(key)
431                 @Action:
432                         @DefineVar: TypeV *_Old_Val = __sequential.map.get(key)
433                         @Code: __sequential.map.put(key, NULL);
434                 @Post_check:
435                         __sequential.equals_val(__RET__, _Old_Val)
436                 @End
437         */
438         shared_ptr<TypeV> remove(TypeK& key) {
439                 return putIfMatch(key, TOMBSTONE, NO_MATCH_OLD);
440         }
441
442         /**
443                 @Begin
444                 @Interface: RemoveIfMatch
445                 @Commit_point_set:
446                         Write_Val_Point | RemoveIfMatch_Fail_Point
447                 @Condition:
448                         __sequential.equals_val(__sequential.map.get(key), &val)
449                 @HB_condition:
450                         COND_RemoveIfMatchSucc :: __RET__ == true
451                 @ID: __sequential.getKeyTag(key)
452                 @Action:
453                         @Code:
454                         if (__COND_SAY__)
455                                 __sequential.map.put(key, NULL);
456                 @Post_check:
457                         __COND_SAY__ ? __RET__ : !__RET__
458                 @End
459         */
460         bool remove(TypeK& key, TypeV& val) {
461                 slot *val_slot = val == NULL ? NULL : new slot(false, val);
462                 return putIfMatch(key, TOMBSTONE, val) == val;
463
464         }
465
466         /**
467                 @Begin
468                 @Interface: ReplaceAny
469                 @Commit_point_set:
470                         Write_Val_Point
471                 @ID: __sequential.getKeyTag(key)
472                 @Action:
473                         @DefineVar: TypeV *_Old_Val = __sequential.map.get(key)
474                 @Post_check:
475                         __sequential.equals_val(__RET__, _Old_Val)
476                 @End
477         */
478         shared_ptr<TypeV> replace(TypeK& key, TypeV& val) {
479                 return putIfMatch(key, val, MATCH_ANY);
480         }
481
482         /**
483                 @Begin
484                 @Interface: ReplaceIfMatch
485                 @Commit_point_set:
486                         Write_Val_Point | ReplaceIfMatch_Fail_Point
487                 @Condition:
488                         __sequential.equals_val(__sequential.map.get(key), &oldval)
489                 @HB_condition:
490                         COND_ReplaceIfMatchSucc :: __RET__ == true
491                 @ID: __sequential.getKeyTag(key)
492                 @Action:
493                         @Code:
494                         if (__COND_SAY__)
495                                 __sequential.map.put(key, &newval);
496                 @Post_check:
497                         __COND_SAY__ ? __RET__ : !__RET__
498                 @End
499         */
500         bool replace(TypeK& key, TypeV& oldval, TypeV& newval) {
501                 return putIfMatch(key, newval, oldval) == oldval;
502         }
503
504         private:
505         static CHM* get_chm(kvs_data* kvs) {
506                 return (CHM*) kvs->_data[0].load(memory_order_relaxed);
507         }
508
509         static int* get_hashes(kvs_data *kvs) {
510                 return (int *) kvs->_data[1].load(memory_order_relaxed);
511         }
512         
513         // Preserve happens-before semantics on newly inserted keys
514         static inline slot* key(kvs_data *kvs, int idx) {
515                 assert (idx >= 0 && idx < kvs->_size);
516                 // Corresponding to the volatile read in get_impl() and putIfMatch in
517                 // Cliff Click's Java implementation
518                 return (slot*) kvs->_data[idx * 2 + 2].load(memory_order_acquire);
519         }
520
521         /**
522                 The atomic operation in val() function is a "potential" commit point,
523                 which means in some case it is a real commit point while it is not for
524                 some other cases. This so happens because the val() function is such a
525                 fundamental function that many internal operation will call. Our
526                 strategy is that we label any potential commit points and check if they
527                 really are the commit points later.
528         */
529         // Preserve happens-before semantics on newly inserted values
530         static inline slot* val(kvs_data *kvs, int idx) {
531                 assert (idx >= 0 && idx < kvs->_size);
532                 // Corresponding to the volatile read in get_impl() and putIfMatch in
533                 // Cliff Click's Java implementation
534                 slot *res = (slot*) kvs->_data[idx * 2 + 3].load(memory_order_acquire);
535                 /**
536                         @Begin
537                         # This is a complicated potential commit point since many many functions are
538                         # calling val().
539                         @Potential_commit_point_define: true
540                         @Label: Read_Val_Point
541                         @End
542                 */
543                 return res;
544
545
546         }
547
548         static int hash(slot *key_slot) {
549                 assert(key_slot != NULL && key_slot->_ptr != NULL);
550                 shared_ptr<TypeK> key = static_pointer_cast<TypeK>(key_slot->_ptr);
551                 int h = key->hashCode();
552                 // Spread bits according to Cliff Click's code
553                 h += (h << 15) ^ 0xffffcd7d;
554                 h ^= (h >> 10);
555                 h += (h << 3);
556                 h ^= (h >> 6);
557                 h += (h << 2) + (h << 14);
558                 return h ^ (h >> 16);
559         }
560         
561         // Heuristic to decide if reprobed too many times. 
562         // Be careful here: Running over the limit on a 'get' acts as a 'miss'; on a
563         // put it triggers a table resize. Several places MUST have exact agreement.
564         static int reprobe_limit(int len) {
565                 return REPROBE_LIMIT + (len >> 2);
566         }
567         
568         static inline bool is_prime(slot *val) {
569                 return (val != NULL) && val->_prime;
570         }
571
572         // Check for key equality. Try direct pointer comparison first (fast
573         // negative teset) and then the full 'equals' call
574         static bool keyeq(slot *K, slot *key_slot, int *hashes, int hash,
575                 int fullhash) {
576                 // Caller should've checked this.
577                 assert (K != NULL);
578                 shared_ptr<TypeK> key_ptr = static_pointer_cast<TypeK>(key_slot->_ptr);
579                 return
580                         K == key_slot ||
581                                 ((hashes[hash] == 0 || hashes[hash] == fullhash) &&
582                                 K != TOMBSTONE &&
583                                 key_ptr->equals(K->_ptr));
584         }
585
586         static bool valeq(slot *val_slot1, slot *val_slot2) {
587                 assert (val_slot1 != NULL);
588                 shared_ptr<TypeK> ptr1 = static_pointer_cast<TypeV>(val_slot1->_ptr);
589                 if (val_slot2 == NULL || ptr1 == NULL) return false;
590                 return ptr1->equals(val_slot2->_ptr);
591         }
592         
593         // Together with key() preserve the happens-before relationship on newly
594         // inserted keys
595         static inline bool CAS_key(kvs_data *kvs, int idx, void *expected, void *desired) {
596                 return kvs->_data[2 * idx + 2].compare_exchange_strong(expected,
597                         desired, memory_order_release, memory_order_release);
598         }
599
600         /**
601                 Same as the val() function, we only label the CAS operation as the
602                 potential commit point.
603         */
604         // Together with val() preserve the happens-before relationship on newly
605         // inserted values
606         static inline bool CAS_val(kvs_data *kvs, int idx, void *expected, void
607                 *desired) {
608                 bool res =  kvs->_data[2 * idx + 3].compare_exchange_strong(expected,
609                         desired, memory_order_release, memory_order_release);
610                 /**
611                         # If it is a successful put instead of a copy or any other internal
612                         # operantions, expected != NULL
613                         @Begin
614                         @Potential_commit_point_define: __ATOMIC_RET__ == true
615                         @Label: Write_Val_Point
616                         @End
617                 */
618                 return res;
619         }
620
621         slot* get_impl(cliffc_hashtable *topmap, kvs_data *kvs, slot* key_slot, int
622                 fullhash) {
623                 int len = kvs->_size;
624                 CHM *chm = get_chm(kvs);
625                 int *hashes = get_hashes(kvs);
626
627                 int idx = fullhash & (len - 1);
628                 int reprobe_cnt = 0;
629                 while (true) {
630                         slot *K = key(kvs, idx);
631                         slot *V = val(kvs, idx);
632                         /**
633                                 @Begin
634                                 @Commit_point_define: V == NULL
635                                 @Potential_commit_point_label: Read_Val_Point
636                                 @Label: Get_Success_Point_1
637                                 @End
638                         */
639
640                         if (V == NULL) return NULL; // A miss
641                         
642                         if (keyeq(K, key_slot, hashes, idx, fullhash)) {
643                                 // Key hit! Check if table-resize in progress
644                                 if (!is_prime(V)) {
645                                         /**
646                                                 @Begin
647                                                 @Commit_point_define: true
648                                                 @Potential_commit_point_label: Read_Val_Point
649                                                 @Label: Get_Success_Point_2
650                                                 @End
651                                         */
652                                         return (V == TOMBSTONE) ? NULL : V; // Return this value
653                                 }
654                                 // Otherwise, finish the copy & retry in the new table
655                                 return get_impl(topmap, chm->copy_slot_and_check(topmap, kvs,
656                                         idx, key_slot), key_slot, fullhash);
657                         }
658
659                         if (++reprobe_cnt >= REPROBE_LIMIT ||
660                                 key_slot == TOMBSTONE) {
661                                 // Retry in new table
662                                 // Atomic read (acquire) can be here 
663                                 kvs_data *newkvs = chm->_newkvs.load(memory_order_acquire);
664                                 /**
665                                         @Begin
666                                         @Commit_point_define_check: newkvs == NULL
667                                         @Label: Get_Success_Point_3
668                                         @End
669                                 */
670                                 return newkvs == NULL ? NULL : get_impl(topmap,
671                                         topmap->help_copy(newkvs), key_slot, fullhash);
672                         }
673
674                         idx = (idx + 1) & (len - 1); // Reprobe by 1
675                 }
676         }
677
678         // A wrapper of the essential function putIfMatch()
679         shared_ptr<TypeV> putIfMatch(TypeK& key, TypeV& value, slot *old_val) {
680                 // TODO: Should throw an exception rather return NULL
681                 if (old_val == NULL) {
682                         return NULL;
683                 }
684                 void *key_ptr = (void*) new TypeK(key);
685                 slot *key_slot = new slot(false, shared_ptr<void>(key_ptr));
686
687                 void *val_ptr = (void*) new TypeV(value);
688                 slot *value_slot = new slot(false, shared_ptr<void>(val_ptr));
689                 slot *res = putIfMatch(this, _kvs, key_slot, value_slot, old_val);
690                 // Only when copy_slot() call putIfMatch() will it return NULL
691                 assert (res != NULL); 
692                 assert (!is_prime(res));
693                 return res == TOMBSTONE ? NULL : static_pointer_cast<TypeV>(res->_ptr);
694         }
695
696         /**
697                 Put, Remove, PutIfAbsent, etc will call this function. Return the old
698                 value. If the returned value is equals to the expVal (or expVal is
699                 NO_MATCH_OLD), then this function puts the val_slot to the table 'kvs'.
700                 Only copy_slot will pass a NULL expVal, and putIfMatch only returns a
701                 NULL if passed a NULL expVal.
702         */
703         static slot* putIfMatch(cliffc_hashtable *topmap, kvs_data *kvs, slot
704                 *key_slot, slot *val_slot, slot *expVal) {
705                 assert (val_slot != NULL);
706                 assert (!is_prime(val_slot));
707                 assert (!is_prime(expVal));
708
709                 int fullhash = hash(key_slot);
710                 int len = kvs->_size;
711                 CHM *chm = get_chm(kvs);
712                 int *hashes = get_hashes(kvs);
713                 int idx = fullhash & (len - 1);
714
715                 // Claim a key slot
716                 int reprobe_cnt = 0;
717                 slot *K;
718                 slot *V;
719                 kvs_data *newkvs;
720                 
721                 while (true) { // Spin till we get a key slot
722                         K = key(kvs, idx);
723                         V = val(kvs, idx, NULL);
724                         if (K == NULL) { // Get a free slot
725                                 if (val_slot == TOMBSTONE) return val_slot;
726                                 // Claim the null key-slot
727                                 if (CAS_key(kvs, idx, NULL, key_slot)) {
728                                         chm->_slots.fetch_add(1, memory_order_relaxed); // Inc key-slots-used count
729                                         hashes[idx] = fullhash; // Memorize full hash
730                                         break;
731                                 }
732                                 K = key(kvs, idx); // CAS failed, get updated value
733                                 assert (K != NULL);
734                         }
735
736                         // Key slot not null, there exists a Key here
737                         if (keyeq(K, key_slot, hashes, idx, fullhash))
738                                 break; // Got it
739                         
740                         // Notice that the logic here should be consistent with that of get.
741                         // The first predicate means too many reprobes means nothing in the
742                         // old table.
743                         if (++reprobe_cnt >= reprobe_limit(len) ||
744                                 K == TOMBSTONE) { // Found a Tombstone key, no more keys
745                                 newkvs = chm->resize(topmap, kvs);
746                                 // Help along an existing copy
747                                 if (expVal != NULL) topmap->help_copy(newkvs);
748                                 return putIfMatch(topmap, newkvs, key_slot, val_slot, expVal);
749                         }
750
751                         idx = (idx + 1) & (len - 1); // Reprobe
752                 } // End of spinning till we get a Key slot
753
754                 if (val_slot == V) return V; // Fast cutout for no-change
755         
756                 // Here it tries to resize cause it doesn't want other threads to stop
757                 // its progress (eagerly try to resize soon)
758                 newkvs = chm->_newkvs.load(memory_order_acquire);
759                 if (newkvs == NULL &&
760                         ((V == NULL && chm->table_full(reprobe_cnt, len)) || is_prime(V)))
761                         newkvs = chm->resize(topmap, kvs); // Force the copy to start
762                 
763                 // Finish the copy and then put it in the new table
764                 if (newkvs != NULL)
765                         return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs, idx,
766                                 expVal), key_slot, val_slot, expVal);
767                 
768                 // Decided to update the existing table
769                 while (true) {
770                         assert (!is_prime(V));
771
772                         if (expVal != NO_MATCH_OLD &&
773                                 V != expVal &&
774                                 (expVal != MATCH_ANY || V == TOMBSTONE || V == NULL) &&
775                                 !(V == NULL && expVal == TOMBSTONE) &&
776                                 (expVal == NULL || !valeq(expVal, V))) {
777                                 /**
778                                         @Begin
779                                         @Commit_point_define: expVal == TOMBSTONE
780                                         @Potential_commit_point_label: Read_Val_Point
781                                         @Label: PutIfAbsent_Fail_Point
782                                                 # This is a check for the PutIfAbsent() when the value
783                                                 # is not absent
784                                         @End
785                                 */
786                                 /**
787                                         @Begin
788                                         @Commit_point_define: expVal != NULL && val_slot == TOMBSTONE
789                                         @Potential_commit_point_label: Read_Val_Point
790                                         @Label: RemoveIfMatch_Fail_Point
791                                         @End
792                                 */
793                                 /**
794                                         @Begin
795                                         @Commit_point_define: !valeq(expVal, V)
796                                         @Potential_commit_point_label: Read_Val_Point
797                                         @Label: ReplaceIfMatch_Fail_Point
798                                         @End
799                                 */
800                                 return V; // Do not update!
801                         }
802
803                         if (CAS_val(kvs, idx, V, val_slot)) {
804                                 /**
805                                         @Begin
806                                         # The only point where a successful put happens
807                                         @Commit_point_define: true
808                                         @Potential_commit_point_label: Write_Val_Point
809                                         @Label: Write_Success_Point
810                                         @End
811                                 */
812                                 if (expVal != NULL) { // Not called by a table-copy
813                                         // CAS succeeded, should adjust size
814                                         // Both normal put's and table-copy calls putIfMatch, but
815                                         // table-copy does not increase the number of live K/V pairs
816                                         if ((V == NULL || V == TOMBSTONE) &&
817                                                 val_slot != TOMBSTONE)
818                                                 chm->_size.fetch_add(1, memory_order_relaxed);
819                                         if (!(V == NULL || V == TOMBSTONE) &&
820                                                 val_slot == TOMBSTONE)
821                                                 chm->_size.fetch_add(-1, memory_order_relaxed);
822                                 }
823                                 return (V == NULL && expVal != NULL) ? TOMBSTONE : V;
824                         }
825                         // Else CAS failed
826                         V = val(kvs, idx, NULL);
827                         if (is_prime(V))
828                                 return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs,
829                                         idx, expVal), key_slot, val_slot, expVal);
830                 }
831         }
832
833         // Help along an existing table-resize. This is a fast cut-out wrapper.
834         kvs_data* help_copy(kvs_data *helper) {
835                 kvs_data *topkvs = _kvs.load(memory_order_acquire);
836                 CHM *topchm = get_chm(topkvs);
837                 // No cpy in progress
838                 if (topchm->_newkvs.load(memory_order_acquire) == NULL) return helper;
839                 topchm->help_copy_impl(this, topkvs, false);
840                 return helper;
841         }
842 };
843
844 #endif