more bug fix
[cdsspec-compiler.git] / benchmark / cliffc-hashtable / simplified_cliffc_hashtable.h
1 #ifndef SIMPLIFIED_CLIFFC_HASHTABLE_H
2 #define SIMPLIFIED_CLIFFC_HASHTABLE_H
3
4 #include <iostream>
5 #include <atomic>
6 #include <memory>
7 #include <assert.h>
8
9 using namespace std;
10
11
12
13 /**
14         This header file declares and defines a simplified version of Cliff Click's
15         NonblockingHashMap. It contains all the necessary structrues and main
16         functions. In simplified_cliffc_hashtable.cc file, it has the definition for
17         the static fields.
18 */
19
20 template<typename TypeK, typename TypeV>
21 class cliffc_hashtable;
22
23 /**
24         Corresponding the the Object[] array in Cliff Click's Java implementation.
25         It keeps the first two slots for CHM (Hashtable control unit) and the hash
26         records (an array of hash used for fast negative key-equality check).
27 */
28 struct kvs_data {
29         int _size;
30         atomic<void*> *_data;
31         
32         kvs_data(int sz) {
33                 _size = sz;
34                 int real_size = sizeof(atomic<void*>) * 2 + 2;
35                 _data = new atomic<void*>[real_size];
36                 // The control block should be initialized in resize()
37                 // Init the hash record array
38                 int *hashes = new int[_size];
39                 int i;
40                 for (i = 0; i < _size; i++) {
41                         hashes[i] = 0;
42                 }
43                 _data[1].store(hashes, memory_order_relaxed);
44                 // Init the data to Null slot
45                 for (i = 2; i < real_size; i++) {
46                         _data[i].store(NULL, memory_order_relaxed);
47                 }
48         }
49
50         ~kvs_data() {
51                 int *hashes = (int*) _data[1].load(memory_order_relaxed);
52                 delete hashes;
53                 delete[] _data;
54         }
55 };
56
57 struct slot {
58         bool _prime;
59         shared_ptr<void> _ptr;
60
61         slot(bool prime, shared_ptr<void> ptr) {
62                 _prime = prime;
63                 _ptr = ptr;
64         }
65
66 };
67
68
69 /**
70         TypeK must have defined function "int hashCode()" which return the hash
71         code for the its object, and "int equals(TypeK anotherKey)" which is
72         used to judge equality.
73         TypeK and TypeV should define their own copy constructor.
74         To make the memory management safe and similar to Cliff Click's Java
75         implementation, we use shared_ptr instead of normal pointer in terms of the
76         pointers that point to TypeK and TypeV.
77 */
78 template<typename TypeK, typename TypeV>
79 class cliffc_hashtable {
80         /**
81                 # The synchronization we have for the hashtable gives us the property of
82                 # serializability, so we should have a sequential hashtable when we check the
83                 # correctness. The key thing is to identify all the commit point.
84         
85                 @Begin
86                 @Options:
87                         LANG = C;
88                         CLASS = cliffc_hashtable;
89                 @Global_define:
90                         @DeclareVar:
91                         spec_hashtable<TypeK, TypeV*> map;
92                         spec_hashtable<TypeK, Tag> id_map;
93                         Tag tag;
94                         @InitVar:
95                                 map = spec_hashtable<TypeK, TypeV*>();
96                                 id_map = spec_hashtable<TypeK, TypeV*>();
97                                 tag = Tag();
98                         static bool equals_val(TypeV *ptr1, TypeV *ptr2) {
99                                 // ...
100                         }
101                         
102                         # Update the tag for the current key slot if the corresponding tag
103                         # is NULL, otherwise just return that tag. It will update the next
104                         # available tag too if it requires a new tag for that key slot.
105                         static Tag getKeyTag(TypeK &key) {
106                                 if (id_map.get(key) == NULL) {
107                                         Tag cur_tag = tag.current();
108                                         id_map.put(key, cur_tag);
109                                         tag.next();
110                                         return cur_tag;
111                                 } else {
112                                         return id_map.get(key);
113                                 }
114                         }
115                 
116                 @Interface_cluster:
117                         Read_interface = {
118                                 Get,
119                                 PutIfAbsent,
120                                 RemoveAny,
121                                 RemoveIfMatch,
122                                 ReplaceAny,
123                                 ReplaceIfMatch
124                         }
125                         
126                         Write_interface = {
127                                 Put,
128                                 PutIfAbsent(COND_PutIfAbsentSucc),
129                                 RemoveAny,
130                                 RemoveIfMatch(COND_RemoveIfMatchSucc),
131                                 ReplaceAny,
132                                 ReplaceIfMatch(COND_ReplaceIfMatchSucc)
133                         }
134                 @Happens_before:
135                         Write_interface -> Read_interface
136                 @End
137         */
138
139 friend class CHM;
140         /**
141                 The control structure for the hashtable
142         */
143         private:
144         class CHM {
145                 friend class cliffc_hashtable;
146                 private:
147                 atomic<kvs_data*> _newkvs;
148                 
149                 // Size of active K,V pairs
150                 atomic_int _size;
151         
152                 // Count of used slots
153                 atomic_int _slots;
154                 
155                 // The next part of the table to copy
156                 atomic_int _copy_idx;
157                 
158                 // Work-done reporting
159                 atomic_int _copy_done;
160         
161                 public:
162                 CHM(int size) {
163                         _size.store(size, memory_order_relaxed);
164                         _slots.store(0, memory_order_relaxed);
165         
166                         _copy_idx.store(0, memory_order_relaxed);
167                         _copy_done.store(0, memory_order_release);
168                 }
169         
170                 ~CHM() {}
171                 
172                 private:
173                         
174                 // Heuristic to decide if the table is too full
175                 bool table_full(int reprobe_cnt, int len) {
176                         return
177                                 reprobe_cnt >= REPROBE_LIMIT &&
178                                 _slots.load(memory_order_relaxed) >= reprobe_limit(len);
179                 }
180         
181                 kvs_data* resize(cliffc_hashtable *topmap, kvs_data *kvs) {
182                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
183                         if (newkvs != NULL)
184                                 return newkvs;
185         
186                         // No copy in-progress, start one; Only double the table size
187                         int oldlen = kvs->_size;
188                         int sz = _size.load(memory_order_relaxed);
189                         int newsz = sz;
190                         
191                         // Just follow Cliff Click's heuristic to decide the new size
192                         if (sz >= (oldlen >> 2)) { // If we are 25% full
193                                 newsz = oldlen << 1; // Double size
194                                 if (sz >= (oldlen >> 1))
195                                         newsz = oldlen << 2; // Double double size
196                         }
197         
198                         // We do not record the record timestamp
199                         if (newsz <= oldlen) newsz = oldlen << 1;
200                         // Do not shrink ever
201                         if (newsz < oldlen) newsz = oldlen;
202         
203                         // Last check cause the 'new' below is expensive
204                         newkvs = _newkvs.load(memory_order_acquire);
205                         if (newkvs != NULL) return newkvs;
206         
207                         newkvs = new kvs_data(newsz);
208                         void *chm = (void*) new CHM(sz);
209                         newkvs->_data[0].store(chm, memory_order_relaxed);
210         
211                         kvs_data *cur_newkvs; 
212                         // Another check after the slow allocation
213                         if ((cur_newkvs = _newkvs.load(memory_order_acquire)) != NULL)
214                                 return cur_newkvs;
215                         // CAS the _newkvs to the allocated table
216                         kvs_data *desired = (kvs_data*) NULL;
217                         kvs_data *expected = (kvs_data*) newkvs; 
218                         if (!_newkvs.compare_exchange_strong(desired, expected, memory_order_release,
219                                         memory_order_release)) {
220                                 // Should clean the allocated area
221                                 delete newkvs;
222                                 newkvs = _newkvs.load(memory_order_acquire);
223                         }
224                         return newkvs;
225                 }
226         
227                 void help_copy_impl(cliffc_hashtable *topmap, kvs_data *oldkvs,
228                         bool copy_all) {
229                         assert (get_chm(oldkvs) == this);
230                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
231                         int oldlen = oldkvs->_size;
232                         int min_copy_work = oldlen > 1024 ? 1024 : oldlen;
233                 
234                         // Just follow Cliff Click's code here
235                         int panic_start = -1;
236                         int copyidx;
237                         while (_copy_done.load(memory_order_acquire) < oldlen) {
238                                 copyidx = _copy_idx.load(memory_order_acquire);
239                                 if (panic_start == -1) { // No painc
240                                         copyidx = _copy_idx.load(memory_order_acquire);
241                                         while (copyidx < (oldlen << 1) &&
242                                                 !_copy_idx.compare_exchange_strong(copyidx, copyidx +
243                                                         min_copy_work, memory_order_release, memory_order_release))
244                                                 copyidx = _copy_idx.load(memory_order_relaxed);
245                                         if (!(copyidx < (oldlen << 1)))
246                                                 panic_start = copyidx;
247                                 }
248         
249                                 // Now copy the chunk of work we claimed
250                                 int workdone = 0;
251                                 for (int i = 0; i < min_copy_work; i++)
252                                         if (copy_slot(topmap, (copyidx + i) & (oldlen - 1), oldkvs,
253                                                 newkvs))
254                                                 workdone++;
255                                 if (workdone > 0)
256                                         copy_check_and_promote(topmap, oldkvs, workdone);
257         
258                                 copyidx += min_copy_work;
259                                 if (!copy_all && panic_start == -1)
260                                         return; // We are done with the work we claim
261                         }
262                         copy_check_and_promote(topmap, oldkvs, 0); // See if we can promote
263                 }
264         
265                 kvs_data* copy_slot_and_check(cliffc_hashtable *topmap, kvs_data
266                         *oldkvs, int idx, void *should_help) {
267                         kvs_data *newkvs = _newkvs.load(memory_order_acquire);
268                         // We're only here cause the caller saw a Prime
269                         if (copy_slot(topmap, idx, oldkvs, _newkvs))
270                                 copy_check_and_promote(topmap, oldkvs, 1); // Record the slot copied
271                         return (should_help == NULL) ? newkvs : topmap->help_copy(newkvs);
272                 }
273         
274                 void copy_check_and_promote(cliffc_hashtable *topmap, kvs_data*
275                         oldkvs, int workdone) {
276                         int oldlen = oldkvs->_size;
277                         int copyDone = _copy_done.load(memory_order_relaxed);
278                         if (workdone > 0) {
279                                 while (true) {
280                                         copyDone = _copy_done.load(memory_order_relaxed);
281                                         if (_copy_done.compare_exchange_weak(copyDone, copyDone +
282                                                 workdone, memory_order_relaxed, memory_order_relaxed))
283                                                 break;
284                                 }
285                         }
286         
287                         // Promote the new table to the current table
288                         if (copyDone + workdone == oldlen &&
289                                 topmap->_kvs.load(memory_order_acquire) == oldkvs)
290                                 topmap->_kvs.compare_exchange_strong(oldkvs, _newkvs, memory_order_release,
291                                         memory_order_release);
292                 }
293         
294                 bool copy_slot(cliffc_hashtable *topmap, int idx, kvs_data *oldkvs,
295                         kvs_data *newkvs) {
296                         slot *key_slot;
297                         while ((key_slot = key(oldkvs, idx)) == NULL)
298                                 CAS_key(oldkvs, idx, NULL, TOMBSTONE);
299         
300                         // First CAS old to Prime
301                         slot *oldval = val(oldkvs, idx, NULL);
302                         while (!is_prime(oldval)) {
303                                 slot *box = (oldval == NULL || oldval == TOMBSTONE)
304                                         ? TOMBPRIME : new slot(true, oldval->_ptr);
305                                 if (CAS_val(oldkvs, idx, oldval, box)) {
306                                         if (box == TOMBPRIME)
307                                                 return 1; // Copy done
308                                         // Otherwise we CAS'd the box
309                                         oldval = box; // Record updated oldval
310                                         break;
311                                 }
312                                 oldval = val(oldkvs, idx, NULL); // Else re-try
313                         }
314         
315                         if (oldval == TOMBPRIME) return false; // Copy already completed here
316         
317                         slot *old_unboxed = new slot(false, oldval->_ptr);
318                         int copied_into_new = (putIfMatch(topmap, newkvs, key_slot, old_unboxed,
319                                 NULL) == NULL);
320         
321                         // Old value is exposed in the new table
322                         while (!CAS_val(oldkvs, idx, oldval, TOMBPRIME))
323                                 oldval = val(oldkvs, idx, NULL);
324         
325                         return copied_into_new;
326                 }
327         };
328
329         
330
331         private:
332         static const int Default_Init_Size = 8; // Intial table size
333
334         static slot* const MATCH_ANY;
335         static slot* const NO_MATCH_OLD;
336
337         static slot* const TOMBPRIME;
338         static slot* const TOMBSTONE;
339
340         static const int REPROBE_LIMIT = 10; // Forces a table-resize
341
342         atomic<kvs_data*> _kvs;
343
344         public:
345         cliffc_hashtable() {
346                 // Should initialize the CHM for the construction of the table
347                 // For other CHM in kvs_data, they should be initialzed in resize()
348                 // because the size is determined dynamically
349                 kvs_data *kvs = new kvs_data(Default_Init_Size);
350                 void *chm = (void*) new CHM(0);
351                 kvs->_data[0].store(chm, memory_order_relaxed);
352                 _kvs.store(kvs, memory_order_release);
353         }
354
355         cliffc_hashtable(int init_size) {
356                 // Should initialize the CHM for the construction of the table
357                 // For other CHM in kvs_data, they should be initialzed in resize()
358                 // because the size is determined dynamically
359                 kvs_data *kvs = new kvs_data(init_size);
360                 void *chm = (void*) new CHM(0);
361                 kvs->_data[0].store(chm, memory_order_relaxed);
362                 _kvs.store(kvs, memory_order_release);
363         }
364
365         /**
366                 @Begin
367                 @Interface: Get
368                 @Commit_point_set: Read_Val_Point1 | Read_Val_Point2 | Read_Val_Point3
369                 @ID: __sequential.getKeyTag(key)
370                 @Action:
371                         TypeV *_Old_Val = __sequential.map.get(key)
372                 @Post_check:
373                         __sequential.equals_val(_Old_Val, __RET__)
374                 @End
375         */
376         shared_ptr<TypeV> get(TypeK& key) {
377                 void *key_ptr = (void*) new TypeK(key);
378                 slot *key_slot = new slot(false, shared_ptr<void>(key_ptr));
379                 int fullhash = hash(key_slot);
380                 slot *V = get_impl(this, _kvs, key_slot, fullhash);
381                 if (V == NULL) return NULL;
382                 assert (!is_prime(V));
383                 return static_pointer_cast<TypeV>(V->_ptr);
384         }
385
386         /**
387                 @Begin
388                 @Interface: Put
389                 @Commit_point_set: Write_Val_Point
390                 @ID: __sequential.getKeyTag(key)
391                 @Action:
392                         # Remember this old value at checking point
393                         TypeV *_Old_Val = __sequential.map.get(key)
394                         __sequential.map.put(key, &val);
395                 @Post_check:
396                         __sequential.equals_val(__RET__, _Old_Val)
397                 @End
398         */
399         shared_ptr<TypeV> put(TypeK& key, TypeV& val) {
400                 return putIfMatch(key, val, NO_MATCH_OLD);
401         }
402
403         /**
404                 @Begin
405                 @Interface: PutIfAbsent
406                 @Commit_point_set:
407                         Write_Val_Point | PutIfAbsent_Fail_Point
408                 @Condition: __sequential.map.get(key) == NULL
409                 @HB_condition:
410                         COND_PutIfAbsentSucc :: __RET__ == NULL
411                 @ID: __sequential.getKeyTag(key)
412                 @Action:
413                         TypeV *_Old_Val = __sequential.map.get(key)
414                         if (__COND_SAT__)
415                                 __sequential.map.put(key, &value);
416                 @Post_check:
417                         __COND_SAT__ ? __RET__ == NULL : __sequential.equals_val(_Old_Val, __RET__) 
418                 @End
419         */
420         shared_ptr<TypeV> putIfAbsent(TypeK& key, TypeV& value) {
421                 return putIfMatch(key, val, TOMBSTONE);
422         }
423
424         /**
425                 @Begin
426                 @Interface: RemoveAny
427                 @Commit_point_set: Write_Val_Point
428                 @ID: __sequential.getKeyTag(key)
429                 @Action:
430                         TypeV *_Old_Val = __sequential.map.get(key)
431                         __sequential.map.put(key, NULL);
432                 @Post_check:
433                         __sequential.equals_val(__RET__, _Old_Val)
434                 @End
435         */
436         shared_ptr<TypeV> remove(TypeK& key) {
437                 return putIfMatch(key, TOMBSTONE, NO_MATCH_OLD);
438         }
439
440         /**
441                 @Begin
442                 @Interface: RemoveIfMatch
443                 @Commit_point_set:
444                         Write_Val_Point | RemoveIfMatch_Fail_Point
445                 @Condition:
446                         __sequential.equals_val(__sequential.map.get(key), &val)
447                 @HB_condition:
448                         COND_RemoveIfMatchSucc :: __RET__ == true
449                 @ID: __sequential.getKeyTag(key)
450                 @Action:
451                         if (__COND_SAY__)
452                                 __sequential.map.put(key, NULL);
453                 @Post_check:
454                         __COND_SAY__ ? __RET__ : !__RET__
455                 @End
456         */
457         bool remove(TypeK& key, TypeV& val) {
458                 slot *val_slot = val == NULL ? NULL : new slot(false, val);
459                 return putIfMatch(key, TOMBSTONE, val) == val;
460
461         }
462
463         /**
464                 @Begin
465                 @Interface: ReplaceAny
466                 @Commit_point_set:
467                         Write_Val_Point
468                 @ID: __sequential.getKeyTag(key)
469                 @Action:
470                         TypeV *_Old_Val = __sequential.map.get(key)
471                 @Post_check:
472                         __sequential.equals_val(__RET__, _Old_Val)
473                 @End
474         */
475         shared_ptr<TypeV> replace(TypeK& key, TypeV& val) {
476                 return putIfMatch(key, val, MATCH_ANY);
477         }
478
479         /**
480                 @Begin
481                 @Interface: ReplaceIfMatch
482                 @Commit_point_set:
483                         Write_Val_Point | ReplaceIfMatch_Fail_Point
484                 @Condition:
485                         __sequential.equals_val(__sequential.map.get(key), &oldval)
486                 @HB_condition:
487                         COND_ReplaceIfMatchSucc :: __RET__ == true
488                 @ID: __sequential.getKeyTag(key)
489                 @Action:
490                         if (__COND_SAY__)
491                                 __sequential.map.put(key, &newval);
492                 @Post_check:
493                         __COND_SAY__ ? __RET__ : !__RET__
494                 @End
495         */
496         bool replace(TypeK& key, TypeV& oldval, TypeV& newval) {
497                 return putIfMatch(key, newval, oldval) == oldval;
498         }
499
500         private:
501         static CHM* get_chm(kvs_data* kvs) {
502                 return (CHM*) kvs->_data[0].load(memory_order_relaxed);
503         }
504
505         static int* get_hashes(kvs_data *kvs) {
506                 return (int *) kvs->_data[1].load(memory_order_relaxed);
507         }
508         
509         // Preserve happens-before semantics on newly inserted keys
510         static inline slot* key(kvs_data *kvs, int idx) {
511                 assert (idx >= 0 && idx < kvs->_size);
512                 // Corresponding to the volatile read in get_impl() and putIfMatch in
513                 // Cliff Click's Java implementation
514                 return (slot*) kvs->_data[idx * 2 + 2].load(memory_order_acquire);
515         }
516
517         /**
518                 The atomic operation in val() function is a "potential" commit point,
519                 which means in some case it is a real commit point while it is not for
520                 some other cases. This so happens because the val() function is such a
521                 fundamental function that many internal operation will call. Our
522                 strategy is that we label any potential commit points and check if they
523                 really are the commit points later.
524         */
525         // Preserve happens-before semantics on newly inserted values
526         static inline slot* val(kvs_data *kvs, int idx) {
527                 assert (idx >= 0 && idx < kvs->_size);
528                 // Corresponding to the volatile read in get_impl() and putIfMatch in
529                 // Cliff Click's Java implementation
530                 slot *res = (slot*) kvs->_data[idx * 2 + 3].load(memory_order_acquire);
531                 /**
532                         @Begin
533                         # This is a complicated potential commit point since many many functions are
534                         # calling val().
535                         @Potential_commit_point_define: true
536                         @Label: Read_Val_Point
537                         @End
538                 */
539                 return res;
540
541
542         }
543
544         static int hash(slot *key_slot) {
545                 assert(key_slot != NULL && key_slot->_ptr != NULL);
546                 shared_ptr<TypeK> key = static_pointer_cast<TypeK>(key_slot->_ptr);
547                 int h = key->hashCode();
548                 // Spread bits according to Cliff Click's code
549                 h += (h << 15) ^ 0xffffcd7d;
550                 h ^= (h >> 10);
551                 h += (h << 3);
552                 h ^= (h >> 6);
553                 h += (h << 2) + (h << 14);
554                 return h ^ (h >> 16);
555         }
556         
557         // Heuristic to decide if reprobed too many times. 
558         // Be careful here: Running over the limit on a 'get' acts as a 'miss'; on a
559         // put it triggers a table resize. Several places MUST have exact agreement.
560         static int reprobe_limit(int len) {
561                 return REPROBE_LIMIT + (len >> 2);
562         }
563         
564         static inline bool is_prime(slot *val) {
565                 return (val != NULL) && val->_prime;
566         }
567
568         // Check for key equality. Try direct pointer comparison first (fast
569         // negative teset) and then the full 'equals' call
570         static bool keyeq(slot *K, slot *key_slot, int *hashes, int hash,
571                 int fullhash) {
572                 // Caller should've checked this.
573                 assert (K != NULL);
574                 shared_ptr<TypeK> key_ptr = static_pointer_cast<TypeK>(key_slot->_ptr);
575                 return
576                         K == key_slot ||
577                                 ((hashes[hash] == 0 || hashes[hash] == fullhash) &&
578                                 K != TOMBSTONE &&
579                                 key_ptr->equals(K->_ptr));
580         }
581
582         static bool valeq(slot *val_slot1, slot *val_slot2) {
583                 assert (val_slot1 != NULL);
584                 shared_ptr<TypeK> ptr1 = static_pointer_cast<TypeV>(val_slot1->_ptr);
585                 if (val_slot2 == NULL || ptr1 == NULL) return false;
586                 return ptr1->equals(val_slot2->_ptr);
587         }
588         
589         // Together with key() preserve the happens-before relationship on newly
590         // inserted keys
591         static inline bool CAS_key(kvs_data *kvs, int idx, void *expected, void *desired) {
592                 return kvs->_data[2 * idx + 2].compare_exchange_strong(expected,
593                         desired, memory_order_release, memory_order_release);
594         }
595
596         /**
597                 Same as the val() function, we only label the CAS operation as the
598                 potential commit point.
599         */
600         // Together with val() preserve the happens-before relationship on newly
601         // inserted values
602         static inline bool CAS_val(kvs_data *kvs, int idx, void *expected, void
603                 *desired) {
604                 bool res =  kvs->_data[2 * idx + 3].compare_exchange_strong(expected,
605                         desired, memory_order_release, memory_order_release);
606                 /**
607                         # If it is a successful put instead of a copy or any other internal
608                         # operantions, expected != NULL
609                         @Begin
610                         @Potential_commit_point_define: __ATOMIC_RET__ == true
611                         @Label: Write_Val_Point
612                         @End
613                 */
614                 return res;
615         }
616
617         slot* get_impl(cliffc_hashtable *topmap, kvs_data *kvs, slot* key_slot, int
618                 fullhash) {
619                 int len = kvs->_size;
620                 CHM *chm = get_chm(kvs);
621                 int *hashes = get_hashes(kvs);
622
623                 int idx = fullhash & (len - 1);
624                 int reprobe_cnt = 0;
625                 while (true) {
626                         slot *K = key(kvs, idx);
627                         slot *V = val(kvs, idx);
628                         /**
629                                 @Begin
630                                 @Commit_point_define: V == NULL
631                                 @Potential_commit_point_label: Read_Val_Point
632                                 @Label: Get_Success_Point_1
633                                 @End
634                         */
635
636                         if (V == NULL) return NULL; // A miss
637                         
638                         if (keyeq(K, key_slot, hashes, idx, fullhash)) {
639                                 // Key hit! Check if table-resize in progress
640                                 if (!is_prime(V)) {
641                                         /**
642                                                 @Begin
643                                                 @Commit_point_define: true
644                                                 @Potential_commit_point_label: Read_Val_Point
645                                                 @Label: Get_Success_Point_2
646                                                 @End
647                                         */
648                                         return (V == TOMBSTONE) ? NULL : V; // Return this value
649                                 }
650                                 // Otherwise, finish the copy & retry in the new table
651                                 return get_impl(topmap, chm->copy_slot_and_check(topmap, kvs,
652                                         idx, key_slot), key_slot, fullhash);
653                         }
654
655                         if (++reprobe_cnt >= REPROBE_LIMIT ||
656                                 key_slot == TOMBSTONE) {
657                                 // Retry in new table
658                                 // Atomic read (acquire) can be here 
659                                 kvs_data *newkvs = chm->_newkvs.load(memory_order_acquire);
660                                 /**
661                                         @Begin
662                                         @Commit_point_define_check: newkvs == NULL
663                                         @Label: Get_Success_Point_3
664                                         @End
665                                 */
666                                 return newkvs == NULL ? NULL : get_impl(topmap,
667                                         topmap->help_copy(newkvs), key_slot, fullhash);
668                         }
669
670                         idx = (idx + 1) & (len - 1); // Reprobe by 1
671                 }
672         }
673
674         // A wrapper of the essential function putIfMatch()
675         shared_ptr<TypeV> putIfMatch(TypeK& key, TypeV& value, slot *old_val) {
676                 // TODO: Should throw an exception rather return NULL
677                 if (old_val == NULL) {
678                         return NULL;
679                 }
680                 void *key_ptr = (void*) new TypeK(key);
681                 slot *key_slot = new slot(false, shared_ptr<void>(key_ptr));
682
683                 void *val_ptr = (void*) new TypeV(value);
684                 slot *value_slot = new slot(false, shared_ptr<void>(val_ptr));
685                 slot *res = putIfMatch(this, _kvs, key_slot, value_slot, old_val);
686                 // Only when copy_slot() call putIfMatch() will it return NULL
687                 assert (res != NULL); 
688                 assert (!is_prime(res));
689                 return res == TOMBSTONE ? NULL : static_pointer_cast<TypeV>(res->_ptr);
690         }
691
692         /**
693                 Put, Remove, PutIfAbsent, etc will call this function. Return the old
694                 value. If the returned value is equals to the expVal (or expVal is
695                 NO_MATCH_OLD), then this function puts the val_slot to the table 'kvs'.
696                 Only copy_slot will pass a NULL expVal, and putIfMatch only returns a
697                 NULL if passed a NULL expVal.
698         */
699         static slot* putIfMatch(cliffc_hashtable *topmap, kvs_data *kvs, slot
700                 *key_slot, slot *val_slot, slot *expVal) {
701                 assert (val_slot != NULL);
702                 assert (!is_prime(val_slot));
703                 assert (!is_prime(expVal));
704
705                 int fullhash = hash(key_slot);
706                 int len = kvs->_size;
707                 CHM *chm = get_chm(kvs);
708                 int *hashes = get_hashes(kvs);
709                 int idx = fullhash & (len - 1);
710
711                 // Claim a key slot
712                 int reprobe_cnt = 0;
713                 slot *K;
714                 slot *V;
715                 kvs_data *newkvs;
716                 
717                 while (true) { // Spin till we get a key slot
718                         K = key(kvs, idx);
719                         V = val(kvs, idx, NULL);
720                         if (K == NULL) { // Get a free slot
721                                 if (val_slot == TOMBSTONE) return val_slot;
722                                 // Claim the null key-slot
723                                 if (CAS_key(kvs, idx, NULL, key_slot)) {
724                                         chm->_slots.fetch_add(1, memory_order_relaxed); // Inc key-slots-used count
725                                         hashes[idx] = fullhash; // Memorize full hash
726                                         break;
727                                 }
728                                 K = key(kvs, idx); // CAS failed, get updated value
729                                 assert (K != NULL);
730                         }
731
732                         // Key slot not null, there exists a Key here
733                         if (keyeq(K, key_slot, hashes, idx, fullhash))
734                                 break; // Got it
735                         
736                         // Notice that the logic here should be consistent with that of get.
737                         // The first predicate means too many reprobes means nothing in the
738                         // old table.
739                         if (++reprobe_cnt >= reprobe_limit(len) ||
740                                 K == TOMBSTONE) { // Found a Tombstone key, no more keys
741                                 newkvs = chm->resize(topmap, kvs);
742                                 // Help along an existing copy
743                                 if (expVal != NULL) topmap->help_copy(newkvs);
744                                 return putIfMatch(topmap, newkvs, key_slot, val_slot, expVal);
745                         }
746
747                         idx = (idx + 1) & (len - 1); // Reprobe
748                 } // End of spinning till we get a Key slot
749
750                 if (val_slot == V) return V; // Fast cutout for no-change
751         
752                 // Here it tries to resize cause it doesn't want other threads to stop
753                 // its progress (eagerly try to resize soon)
754                 newkvs = chm->_newkvs.load(memory_order_acquire);
755                 if (newkvs == NULL &&
756                         ((V == NULL && chm->table_full(reprobe_cnt, len)) || is_prime(V)))
757                         newkvs = chm->resize(topmap, kvs); // Force the copy to start
758                 
759                 // Finish the copy and then put it in the new table
760                 if (newkvs != NULL)
761                         return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs, idx,
762                                 expVal), key_slot, val_slot, expVal);
763                 
764                 // Decided to update the existing table
765                 while (true) {
766                         assert (!is_prime(V));
767
768                         if (expVal != NO_MATCH_OLD &&
769                                 V != expVal &&
770                                 (expVal != MATCH_ANY || V == TOMBSTONE || V == NULL) &&
771                                 !(V == NULL && expVal == TOMBSTONE) &&
772                                 (expVal == NULL || !valeq(expVal, V))) {
773                                 /**
774                                         @Begin
775                                         @Commit_point_define: expVal == TOMBSTONE
776                                         @Potential_commit_point_label: Read_Val_Point
777                                         @Label: PutIfAbsent_Fail_Point
778                                                 # This is a check for the PutIfAbsent() when the value
779                                                 # is not absent
780                                         @End
781                                 */
782                                 /**
783                                         @Begin
784                                         @Commit_point_define: expVal != NULL && val_slot == TOMBSTONE
785                                         @Potential_commit_point_label: Read_Val_Point
786                                         @Label: RemoveIfMatch_Fail_Point
787                                         @End
788                                 */
789                                 /**
790                                         @Begin
791                                         @Commit_point_define: !valeq(expVal, V)
792                                         @Potential_commit_point_label: Read_Val_Point
793                                         @Label: ReplaceIfMatch_Fail_Point
794                                         @End
795                                 */
796                                 return V; // Do not update!
797                         }
798
799                         if (CAS_val(kvs, idx, V, val_slot)) {
800                                 /**
801                                         @Begin
802                                         # The only point where a successful put happens
803                                         @Commit_point_define: true
804                                         @Potential_commit_point_label: Write_Val_Point
805                                         @Label: Write_Success_Point
806                                         @End
807                                 */
808                                 if (expVal != NULL) { // Not called by a table-copy
809                                         // CAS succeeded, should adjust size
810                                         // Both normal put's and table-copy calls putIfMatch, but
811                                         // table-copy does not increase the number of live K/V pairs
812                                         if ((V == NULL || V == TOMBSTONE) &&
813                                                 val_slot != TOMBSTONE)
814                                                 chm->_size.fetch_add(1, memory_order_relaxed);
815                                         if (!(V == NULL || V == TOMBSTONE) &&
816                                                 val_slot == TOMBSTONE)
817                                                 chm->_size.fetch_add(-1, memory_order_relaxed);
818                                 }
819                                 return (V == NULL && expVal != NULL) ? TOMBSTONE : V;
820                         }
821                         // Else CAS failed
822                         V = val(kvs, idx, NULL);
823                         if (is_prime(V))
824                                 return putIfMatch(topmap, chm->copy_slot_and_check(topmap, kvs,
825                                         idx, expVal), key_slot, val_slot, expVal);
826                 }
827         }
828
829         // Help along an existing table-resize. This is a fast cut-out wrapper.
830         kvs_data* help_copy(kvs_data *helper) {
831                 kvs_data *topkvs = _kvs.load(memory_order_acquire);
832                 CHM *topchm = get_chm(topkvs);
833                 // No cpy in progress
834                 if (topchm->_newkvs.load(memory_order_acquire) == NULL) return helper;
835                 topchm->help_copy_impl(this, topkvs, false);
836                 return helper;
837         }
838 };
839
840 #endif