update readme
[c11concurrency-benchmarks.git] / silo / tuple.h
1 #ifndef _NDB_TUPLE_H_
2 #define _NDB_TUPLE_H_
3
4 #include <atomic>
5 #include <vector>
6 #include <string>
7 #include <utility>
8 #include <limits>
9 #include <unordered_map>
10 #include <ostream>
11 #include <thread>
12
13 #include "amd64.h"
14 #include "core.h"
15 #include "counter.h"
16 #include "macros.h"
17 #include "varkey.h"
18 #include "util.h"
19 #include "rcu.h"
20 #include "thread.h"
21 #include "spinlock.h"
22 #include "small_unordered_map.h"
23 #include "prefetch.h"
24 #include "ownership_checker.h"
25
26 // debugging tool
27 //#define TUPLE_LOCK_OWNERSHIP_CHECKING
28
29 template <template <typename> class Protocol, typename Traits>
30   class transaction; // forward decl
31
32 // XXX: hack
33 extern std::string (*g_proto_version_str)(uint64_t v);
34
35 /**
36  * A dbtuple is the type of value which we stick
37  * into underlying (non-transactional) data structures- it
38  * also contains the memory of the value
39  */
40 struct dbtuple {
41   friend std::ostream &
42   operator<<(std::ostream &o, const dbtuple &tuple);
43
44 public:
45   // trying to save space by putting constraints
46   // on node maximums
47   typedef uint32_t version_t;
48   typedef uint16_t node_size_type;
49   typedef uint64_t tid_t;
50   typedef uint8_t * record_type;
51   typedef const uint8_t * const_record_type;
52   typedef size_t size_type;
53   typedef std::string string_type;
54
55   static const tid_t MIN_TID = 0;
56   static const tid_t MAX_TID = (tid_t) -1;
57
58   // lock ownership helpers- works by recording all tuple
59   // locks obtained in each transaction, and then when the txn
60   // finishes, calling AssertAllTupleLocksReleased(), which makes
61   // sure the current thread is no longer the owner of any locks
62   // acquired during the txn
63 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
64   static inline void
65   TupleLockRegionBegin()
66   {
67     ownership_checker<dbtuple, dbtuple>::NodeLockRegionBegin();
68   }
69
70   // is used to signal the end of a tuple lock region
71   static inline void
72   AssertAllTupleLocksReleased()
73   {
74     ownership_checker<dbtuple, dbtuple>::AssertAllNodeLocksReleased();
75   }
76 private:
77   static inline void
78   AddTupleToLockRegion(const dbtuple *n)
79   {
80     ownership_checker<dbtuple, dbtuple>::AddNodeToLockRegion(n);
81   }
82 #endif
83
84 private:
85   static const version_t HDR_LOCKED_MASK = 0x1;
86
87   static const version_t HDR_DELETING_SHIFT = 1;
88   static const version_t HDR_DELETING_MASK = 0x1 << HDR_DELETING_SHIFT;
89
90   static const version_t HDR_WRITE_INTENT_SHIFT = 2;
91   static const version_t HDR_WRITE_INTENT_MASK = 0x1 << HDR_WRITE_INTENT_SHIFT;
92
93   static const version_t HDR_MODIFYING_SHIFT = 3;
94   static const version_t HDR_MODIFYING_MASK = 0x1 << HDR_MODIFYING_SHIFT;
95
96   static const version_t HDR_LATEST_SHIFT = 4;
97   static const version_t HDR_LATEST_MASK = 0x1 << HDR_LATEST_SHIFT;
98
99   static const version_t HDR_VERSION_SHIFT = 5;
100   static const version_t HDR_VERSION_MASK = ((version_t)-1) << HDR_VERSION_SHIFT;
101
102 public:
103
104 #ifdef TUPLE_MAGIC
105   class magic_failed_exception: public std::exception {};
106   static const uint8_t TUPLE_MAGIC = 0x29U;
107   uint8_t magic;
108   inline ALWAYS_INLINE void CheckMagic() const
109   {
110     if (unlikely(magic != TUPLE_MAGIC)) {
111       print(1);
112       // so we can catch it later and print out useful debugging output
113       throw magic_failed_exception();
114     }
115   }
116 #else
117   inline ALWAYS_INLINE void CheckMagic() const {}
118 #endif
119
120   // NB(stephentu): ABA problem happens after some multiple of
121   // 2^(NBits(version_t)-6) concurrent modifications- somewhat low probability
122   // event, so we let it happen
123   //
124   // <-- low bits
125   // [ locked | deleting | write_intent | modifying | latest | version ]
126   // [  0..1  |   1..2   |    2..3      |   3..4    |  4..5  |  5..32  ]
127   volatile version_t hdr;
128
129 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
130   std::thread::id lock_owner;
131 #endif
132
133   // uninterpreted TID
134   tid_t version;
135
136   // small sizes on purpose
137   node_size_type size; // actual size of record
138                        // (only meaningful is the deleting bit is not set)
139   node_size_type alloc_size; // max size record allowed. is the space
140                              // available for the record buf
141
142   dbtuple *next; // be very careful about traversing this pointer,
143                  // GC is capable of reaping it at certain (well defined)
144                  // points, and will not bother to set it to null
145
146 #ifdef TUPLE_CHECK_KEY
147   // for debugging
148   std::string key;
149   void *tree;
150 #endif
151
152 #ifdef CHECK_INVARIANTS
153   // for debugging
154   std::atomic<uint64_t> opaque;
155 #endif
156
157   // must be last field
158   uint8_t value_start[0];
159
160   void print(std::ostream &o, unsigned len) const;
161
162 private:
163   // private ctor/dtor b/c we do some special memory stuff
164   // ctors start node off as latest node
165
166   static inline ALWAYS_INLINE node_size_type
167   CheckBounds(size_type s)
168   {
169     INVARIANT(s <= std::numeric_limits<node_size_type>::max());
170     return s;
171   }
172
173   dbtuple(const dbtuple &) = delete;
174   dbtuple(dbtuple &&) = delete;
175   dbtuple &operator=(const dbtuple &) = delete;
176
177   // creates a (new) record with a tentative value at MAX_TID
178   dbtuple(size_type size, size_type alloc_size, bool acquire_lock)
179     :
180 #ifdef TUPLE_MAGIC
181       magic(TUPLE_MAGIC),
182 #endif
183       hdr(HDR_LATEST_MASK |
184           (acquire_lock ? (HDR_LOCKED_MASK | HDR_WRITE_INTENT_MASK) : 0) |
185           (!size ? HDR_DELETING_MASK : 0))
186 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
187       , lock_owner()
188 #endif
189       , version(MAX_TID)
190       , size(CheckBounds(size))
191       , alloc_size(CheckBounds(alloc_size))
192       , next(nullptr)
193 #ifdef TUPLE_CHECK_KEY
194       , key()
195       , tree(nullptr)
196 #endif
197 #ifdef CHECK_INVARIANTS
198       , opaque(0)
199 #endif
200   {
201     INVARIANT(((char *)this) + sizeof(*this) == (char *) &value_start[0]);
202     INVARIANT(is_latest());
203     INVARIANT(size || is_deleting());
204     ++g_evt_dbtuple_creates;
205     g_evt_dbtuple_bytes_allocated += alloc_size + sizeof(dbtuple);
206 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
207     if (acquire_lock) {
208       lock_owner = std::this_thread::get_id();
209       AddTupleToLockRegion(this);
210       INVARIANT(is_lock_owner());
211     }
212 #endif
213     COMPILER_MEMORY_FENCE; // for acquire_lock
214   }
215
216   // creates a record at version derived from base
217   // (inheriting its value).
218   dbtuple(tid_t version,
219           struct dbtuple *base,
220           size_type alloc_size,
221           bool set_latest)
222     :
223 #ifdef TUPLE_MAGIC
224       magic(TUPLE_MAGIC),
225 #endif
226       hdr(set_latest ? HDR_LATEST_MASK : 0)
227 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
228       , lock_owner()
229 #endif
230       , version(version)
231       , size(base->size)
232       , alloc_size(CheckBounds(alloc_size))
233       , next(base->next)
234 #ifdef TUPLE_CHECK_KEY
235       , key()
236       , tree(nullptr)
237 #endif
238 #ifdef CHECK_INVARIANTS
239       , opaque(0)
240 #endif
241   {
242     INVARIANT(size <= alloc_size);
243     INVARIANT(set_latest == is_latest());
244     if (base->is_deleting())
245       mark_deleting();
246     NDB_MEMCPY(&value_start[0], base->get_value_start(), size);
247     ++g_evt_dbtuple_creates;
248     g_evt_dbtuple_bytes_allocated += alloc_size + sizeof(dbtuple);
249   }
250
251   // creates a spill record, copying in the *old* value if necessary, but
252   // setting the size to the *new* value
253   dbtuple(tid_t version,
254           const_record_type r,
255           size_type old_size,
256           size_type new_size,
257           size_type alloc_size,
258           struct dbtuple *next,
259           bool set_latest,
260           bool needs_old_value)
261     :
262 #ifdef TUPLE_MAGIC
263       magic(TUPLE_MAGIC),
264 #endif
265       hdr((set_latest ? HDR_LATEST_MASK : 0) | (!new_size ? HDR_DELETING_MASK : 0))
266 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
267       , lock_owner()
268 #endif
269       , version(version)
270       , size(CheckBounds(new_size))
271       , alloc_size(CheckBounds(alloc_size))
272       , next(next)
273 #ifdef TUPLE_CHECK_KEY
274       , key()
275       , tree(nullptr)
276 #endif
277 #ifdef CHECK_INVARIANTS
278       , opaque(0)
279 #endif
280   {
281     INVARIANT(!needs_old_value || old_size <= alloc_size);
282     INVARIANT(new_size <= alloc_size);
283     INVARIANT(set_latest == is_latest());
284     INVARIANT(new_size || is_deleting());
285     if (needs_old_value)
286       NDB_MEMCPY(&value_start[0], r, old_size);
287     ++g_evt_dbtuple_creates;
288     g_evt_dbtuple_bytes_allocated += alloc_size + sizeof(dbtuple);
289   }
290
291   friend class rcu;
292   ~dbtuple();
293
294   static event_avg_counter g_evt_avg_dbtuple_stable_version_spins;
295   static event_avg_counter g_evt_avg_dbtuple_lock_acquire_spins;
296   static event_avg_counter g_evt_avg_dbtuple_read_retries;
297
298 public:
299
300   enum ReadStatus {
301     READ_FAILED,
302     READ_EMPTY,
303     READ_RECORD,
304   };
305
306   inline void
307   prefetch() const
308   {
309 #ifdef TUPLE_PREFETCH
310     prefetch_bytes(this, sizeof(*this) + alloc_size);
311 #endif
312   }
313
314   // gcs *this* instance, ignoring the chain
315   void gc_this();
316
317   inline bool
318   is_locked() const
319   {
320     return IsLocked(hdr);
321   }
322
323   static inline bool
324   IsLocked(version_t v)
325   {
326     return v & HDR_LOCKED_MASK;
327   }
328
329 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
330   inline bool
331   is_lock_owner() const
332   {
333     return std::this_thread::get_id() == lock_owner;
334   }
335 #else
336   inline bool
337   is_lock_owner() const
338   {
339     return true;
340   }
341 #endif
342
343   inline version_t
344   lock(bool write_intent)
345   {
346     // XXX: implement SPINLOCK_BACKOFF
347     CheckMagic();
348 #ifdef ENABLE_EVENT_COUNTERS
349     unsigned nspins = 0;
350 #endif
351     version_t v = hdr;
352     const version_t lockmask = write_intent ?
353       (HDR_LOCKED_MASK | HDR_WRITE_INTENT_MASK) :
354       (HDR_LOCKED_MASK);
355     while (IsLocked(v) ||
356            !__sync_bool_compare_and_swap(&hdr, v, v | lockmask)) {
357       nop_pause();
358       v = hdr;
359 #ifdef ENABLE_EVENT_COUNTERS
360       ++nspins;
361 #endif
362     }
363 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
364     lock_owner = std::this_thread::get_id();
365     AddTupleToLockRegion(this);
366     INVARIANT(is_lock_owner());
367 #endif
368     COMPILER_MEMORY_FENCE;
369     INVARIANT(IsLocked(hdr));
370     INVARIANT(!write_intent || IsWriteIntent(hdr));
371     INVARIANT(!IsModifying(hdr));
372 #ifdef ENABLE_EVENT_COUNTERS
373     g_evt_avg_dbtuple_lock_acquire_spins.offer(nspins);
374 #endif
375     return hdr;
376   }
377
378   inline void
379   unlock()
380   {
381     CheckMagic();
382     version_t v = hdr;
383     bool newv = false;
384     INVARIANT(IsLocked(v));
385     INVARIANT(is_lock_owner());
386     if (IsModifying(v) || IsWriteIntent(v)) {
387       newv = true;
388       const version_t n = Version(v);
389       v &= ~HDR_VERSION_MASK;
390       v |= (((n + 1) << HDR_VERSION_SHIFT) & HDR_VERSION_MASK);
391     }
392     // clear locked + modifying bits
393     v &= ~(HDR_LOCKED_MASK | HDR_MODIFYING_MASK | HDR_WRITE_INTENT_MASK);
394     if (newv) {
395       INVARIANT(!reader_check_version(v));
396       INVARIANT(!writer_check_version(v));
397     }
398     INVARIANT(!IsLocked(v));
399     INVARIANT(!IsModifying(v));
400     INVARIANT(!IsWriteIntent(v));
401 #ifdef TUPLE_LOCK_OWNERSHIP_CHECKING
402     lock_owner = std::thread::id();
403     INVARIANT(!is_lock_owner());
404 #endif
405     COMPILER_MEMORY_FENCE;
406     hdr = v;
407   }
408
409   inline bool
410   is_deleting() const
411   {
412     return IsDeleting(hdr);
413   }
414
415   static inline bool
416   IsDeleting(version_t v)
417   {
418     return v & HDR_DELETING_MASK;
419   }
420
421   inline void
422   mark_deleting()
423   {
424     CheckMagic();
425     // the lock on the latest version guards non-latest versions
426     INVARIANT(!is_latest() || is_locked());
427     INVARIANT(!is_latest() || is_lock_owner());
428     INVARIANT(!is_deleting());
429     hdr |= HDR_DELETING_MASK;
430   }
431
432   inline void
433   clear_deleting()
434   {
435     CheckMagic();
436     INVARIANT(is_locked());
437     INVARIANT(is_lock_owner());
438     INVARIANT(is_deleting());
439     hdr &= ~HDR_DELETING_MASK;
440   }
441
442   inline bool
443   is_modifying() const
444   {
445     return IsModifying(hdr);
446   }
447
448   inline void
449   mark_modifying()
450   {
451     CheckMagic();
452     version_t v = hdr;
453     INVARIANT(IsLocked(v));
454     INVARIANT(is_lock_owner());
455     //INVARIANT(!IsModifying(v)); // mark_modifying() must be re-entrant
456     v |= HDR_MODIFYING_MASK;
457     COMPILER_MEMORY_FENCE; // XXX: is this fence necessary?
458     hdr = v;
459     COMPILER_MEMORY_FENCE;
460   }
461
462   static inline bool
463   IsModifying(version_t v)
464   {
465     return v & HDR_MODIFYING_MASK;
466   }
467
468   inline bool
469   is_write_intent() const
470   {
471     return IsWriteIntent(hdr);
472   }
473
474   static inline bool
475   IsWriteIntent(version_t v)
476   {
477     return v & HDR_WRITE_INTENT_MASK;
478   }
479
480   inline bool
481   is_latest() const
482   {
483     return IsLatest(hdr);
484   }
485
486   static inline bool
487   IsLatest(version_t v)
488   {
489     return v & HDR_LATEST_MASK;
490   }
491
492   inline void
493   clear_latest()
494   {
495     CheckMagic();
496     INVARIANT(is_locked());
497     INVARIANT(is_lock_owner());
498     INVARIANT(is_latest());
499     hdr &= ~HDR_LATEST_MASK;
500   }
501
502   static inline version_t
503   Version(version_t v)
504   {
505     return (v & HDR_VERSION_MASK) >> HDR_VERSION_SHIFT;
506   }
507
508   inline version_t
509   reader_stable_version(bool allow_write_intent) const
510   {
511     version_t v = hdr;
512 #ifdef ENABLE_EVENT_COUNTERS
513     unsigned long nspins = 0;
514 #endif
515     while (IsModifying(v) ||
516            (!allow_write_intent && IsWriteIntent(v))) {
517       nop_pause();
518       v = hdr;
519 #ifdef ENABLE_EVENT_COUNTERS
520       ++nspins;
521 #endif
522     }
523     COMPILER_MEMORY_FENCE;
524 #ifdef ENABLE_EVENT_COUNTERS
525     g_evt_avg_dbtuple_stable_version_spins.offer(nspins);
526 #endif
527     return v;
528   }
529
530   /**
531    * returns true if succeeded, false otherwise
532    */
533   inline bool
534   try_writer_stable_version(version_t &v, unsigned int spins) const
535   {
536     v = hdr;
537     while (IsWriteIntent(v) && spins--) {
538       INVARIANT(IsLocked(v));
539       nop_pause();
540       v = hdr;
541     }
542     const bool ret = !IsWriteIntent(v);
543     COMPILER_MEMORY_FENCE;
544     INVARIANT(ret || IsLocked(v));
545     INVARIANT(!ret || !IsModifying(v));
546     return ret;
547   }
548
549   inline version_t
550   unstable_version() const
551   {
552     return hdr;
553   }
554
555   inline bool
556   reader_check_version(version_t version) const
557   {
558     COMPILER_MEMORY_FENCE;
559     // are the versions the same, modulo the
560     // {locked, write_intent, latest} bits?
561     const version_t MODULO_BITS =
562       (HDR_LOCKED_MASK | HDR_WRITE_INTENT_MASK | HDR_LATEST_MASK);
563     return (hdr & ~MODULO_BITS) == (version & ~MODULO_BITS);
564   }
565
566   inline bool
567   writer_check_version(version_t version) const
568   {
569     COMPILER_MEMORY_FENCE;
570     return hdr == version;
571   }
572
573   inline ALWAYS_INLINE struct dbtuple *
574   get_next()
575   {
576     return next;
577   }
578
579   inline const struct dbtuple *
580   get_next() const
581   {
582     return next;
583   }
584
585   inline ALWAYS_INLINE void
586   set_next(struct dbtuple *next)
587   {
588     CheckMagic();
589     this->next = next;
590   }
591
592   inline void
593   clear_next()
594   {
595     CheckMagic();
596     this->next = nullptr;
597   }
598
599   inline ALWAYS_INLINE uint8_t *
600   get_value_start()
601   {
602     CheckMagic();
603     return &value_start[0];
604   }
605
606   inline ALWAYS_INLINE const uint8_t *
607   get_value_start() const
608   {
609     return &value_start[0];
610   }
611
612   // worst name ever...
613   inline bool
614   is_not_behind(tid_t t) const
615   {
616     return version <= t;
617   }
618
619 private:
620
621 #ifdef ENABLE_EVENT_COUNTERS
622   struct scoped_recorder {
623     scoped_recorder(unsigned long &n) : n(&n) {}
624     ~scoped_recorder()
625     {
626       g_evt_avg_dbtuple_read_retries.offer(*n);
627     }
628   private:
629     unsigned long *n;
630   };
631 #endif
632
633   // written to be non-recursive
634   template <typename Reader, typename StringAllocator>
635   static ReadStatus
636   record_at_chain(
637       const dbtuple *starting, tid_t t, tid_t &start_t,
638       Reader &reader, StringAllocator &sa, bool allow_write_intent)
639   {
640 #ifdef ENABLE_EVENT_COUNTERS
641     unsigned long nretries = 0;
642     scoped_recorder rec(nretries);
643 #endif
644     const dbtuple *current = starting;
645   loop:
646     INVARIANT(current->version != MAX_TID);
647     const version_t v = current->reader_stable_version(allow_write_intent);
648     const struct dbtuple *p;
649     const bool found = current->is_not_behind(t);
650     if (found) {
651       start_t = current->version;
652       const size_t read_sz = IsDeleting(v) ? 0 : current->size;
653       if (unlikely(read_sz && !reader(current->get_value_start(), read_sz, sa)))
654         goto retry;
655       if (unlikely(!current->reader_check_version(v)))
656         goto retry;
657       return read_sz ? READ_RECORD : READ_EMPTY;
658     } else {
659       p = current->get_next();
660     }
661     if (unlikely(!current->reader_check_version(v)))
662       goto retry;
663     if (p) {
664       current = p;
665       goto loop;
666     }
667     // see note in record_at()
668     start_t = MIN_TID;
669     return READ_EMPTY;
670   retry:
671 #ifdef ENABLE_EVENT_COUNTERS
672     ++nretries;
673 #endif
674     goto loop;
675   }
676
677   // we force one level of inlining, but don't force record_at_chain()
678   // to be inlined
679   template <typename Reader, typename StringAllocator>
680   inline ALWAYS_INLINE ReadStatus
681   record_at(
682       tid_t t, tid_t &start_t,
683       Reader &reader, StringAllocator &sa, bool allow_write_intent) const
684   {
685 #ifdef ENABLE_EVENT_COUNTERS
686     unsigned long nretries = 0;
687     scoped_recorder rec(nretries);
688 #endif
689     if (unlikely(version == MAX_TID)) {
690       // XXX(stephentu): HACK! we use MAX_TID to indicate a tentative
691       // "insert"- the actual latest value is empty.
692       //
693       // since our system is screwed anyways if we ever reach MAX_TID, this
694       // is OK for now, but a real solution should exist at some point
695       start_t = MIN_TID;
696       return READ_EMPTY;
697     }
698   loop:
699     const version_t v = reader_stable_version(allow_write_intent);
700     const struct dbtuple *p;
701     const bool found = is_not_behind(t);
702     if (found) {
703       //if (unlikely(!IsLatest(v)))
704       //  return READ_FAILED;
705       start_t = version;
706       const size_t read_sz = IsDeleting(v) ? 0 : size;
707       if (unlikely(read_sz && !reader(get_value_start(), read_sz, sa)))
708         goto retry;
709       if (unlikely(!reader_check_version(v)))
710         goto retry;
711       return read_sz ? READ_RECORD : READ_EMPTY;
712     } else {
713       p = get_next();
714     }
715     if (unlikely(!reader_check_version(v)))
716       goto retry;
717     if (p)
718       return record_at_chain(p, t, start_t, reader, sa, allow_write_intent);
719     // NB(stephentu): if we reach the end of a chain then we assume that
720     // the record exists as a deleted record.
721     //
722     // This is safe because we have been very careful to not garbage collect
723     // elements along the chain until it is guaranteed that the record
724     // is superceded by later record in any consistent read. Therefore,
725     // if we reach the end of the chain, then it *must* be the case that
726     // the record does not actually exist.
727     //
728     // Note that MIN_TID is the *wrong* tid to use here given wrap-around- we
729     // really should be setting this value to the tid which represents the
730     // oldest TID possible in the system. But we currently don't implement
731     // wrap around
732     start_t = MIN_TID;
733     return READ_EMPTY;
734   retry:
735 #ifdef ENABLE_EVENT_COUNTERS
736     ++nretries;
737 #endif
738     goto loop;
739   }
740
741   static event_counter g_evt_dbtuple_creates;
742   static event_counter g_evt_dbtuple_logical_deletes;
743   static event_counter g_evt_dbtuple_physical_deletes;
744   static event_counter g_evt_dbtuple_bytes_allocated;
745   static event_counter g_evt_dbtuple_bytes_freed;
746   static event_counter g_evt_dbtuple_spills;
747   static event_counter g_evt_dbtuple_inplace_buf_insufficient;
748   static event_counter g_evt_dbtuple_inplace_buf_insufficient_on_spill;
749   static event_avg_counter g_evt_avg_record_spill_len;
750
751 public:
752
753   /**
754    * Read the record at tid t. Returns true if such a record exists, false
755    * otherwise (ie the record was GC-ed, or other reasons). On a successful
756    * read, the value @ start_t will be stored in r
757    *
758    * NB(stephentu): calling stable_read() while holding the lock
759    * is an error- this will cause deadlock
760    */
761   template <typename Reader, typename StringAllocator>
762   inline ALWAYS_INLINE ReadStatus
763   stable_read(
764       tid_t t, tid_t &start_t,
765       Reader &reader, StringAllocator &sa,
766       bool allow_write_intent) const
767   {
768     return record_at(t, start_t, reader, sa, allow_write_intent);
769   }
770
771   inline bool
772   is_latest_version(tid_t t) const
773   {
774     return is_latest() && is_not_behind(t);
775   }
776
777   bool
778   stable_is_latest_version(tid_t t) const
779   {
780     version_t v = 0;
781     if (!try_writer_stable_version(v, 16))
782       return false;
783     // now v is a stable version
784     INVARIANT(!IsWriteIntent(v));
785     INVARIANT(!IsModifying(v));
786     const bool ret = IsLatest(v) && is_not_behind(t);
787     // only check_version() if the answer would be true- otherwise,
788     // no point in doing a version check
789     if (ret && writer_check_version(v))
790       return true;
791     else
792       // no point in retrying, since we know it will fail (since we had a
793       // version change)
794       return false;
795   }
796
797   inline bool
798   latest_value_is_nil() const
799   {
800     return is_latest() && size == 0;
801   }
802
803   inline bool
804   stable_latest_value_is_nil() const
805   {
806     version_t v = 0;
807     if (!try_writer_stable_version(v, 16))
808       return false;
809     INVARIANT(!IsWriteIntent(v));
810     INVARIANT(!IsModifying(v));
811     const bool ret = IsLatest(v) && size == 0;
812     if (ret && writer_check_version(v))
813       return true;
814     else
815       return false;
816   }
817
818   struct write_record_ret {
819     write_record_ret() : head_(), rest_(), forced_spill_() {}
820     write_record_ret(dbtuple *head, dbtuple* rest, bool forced_spill)
821       : head_(head), rest_(rest), forced_spill_(forced_spill)
822     {
823       INVARIANT(head);
824       INVARIANT(head != rest);
825       INVARIANT(!forced_spill || rest);
826     }
827     dbtuple *head_;
828     dbtuple *rest_;
829     bool forced_spill_;
830   };
831
832   // XXX: kind of hacky, but we do this to avoid virtual
833   // functions / passing multiple function pointers around
834   enum TupleWriterMode {
835     TUPLE_WRITER_NEEDS_OLD_VALUE, // all three args ignored
836     TUPLE_WRITER_COMPUTE_NEEDED,
837     TUPLE_WRITER_COMPUTE_DELTA_NEEDED, // last two args ignored
838     TUPLE_WRITER_DO_WRITE,
839     TUPLE_WRITER_DO_DELTA_WRITE,
840   };
841   typedef size_t (*tuple_writer_t)(TupleWriterMode, const void *, uint8_t *, size_t);
842
843   /**
844    * Always writes the record in the latest (newest) version slot,
845    * not asserting whether or not inserting r @ t would violate the
846    * sorted order invariant
847    *
848    * ret.first  = latest tuple after the write (guaranteed to not be nullptr)
849    * ret.second = old version of tuple, iff no overwrite (can be nullptr)
850    *
851    * Note: if this != ret.first, then we need a tree replacement
852    */
853   template <typename Transaction>
854   write_record_ret
855   write_record_at(const Transaction *txn, tid_t t,
856                   const void *v, tuple_writer_t writer)
857   {
858 #ifndef DISABLE_OVERWRITE_IN_PLACE
859     CheckMagic();
860     INVARIANT(is_locked());
861     INVARIANT(is_lock_owner());
862     INVARIANT(is_latest());
863     INVARIANT(is_write_intent());
864
865     const size_t new_sz =
866       v ? writer(TUPLE_WRITER_COMPUTE_NEEDED, v, get_value_start(), size) : 0;
867     INVARIANT(!v || new_sz);
868     INVARIANT(is_deleting() || size);
869     const size_t old_sz = is_deleting() ? 0 : size;
870
871     if (!new_sz)
872       ++g_evt_dbtuple_logical_deletes;
873
874     // try to overwrite this record
875     if (likely(txn->can_overwrite_record_tid(version, t) && old_sz)) {
876       INVARIANT(!is_deleting());
877       // see if we have enough space
878       if (likely(new_sz <= alloc_size)) {
879         // directly update in place
880         mark_modifying();
881         if (v)
882           writer(TUPLE_WRITER_DO_WRITE, v, get_value_start(), old_sz);
883         version = t;
884         size = new_sz;
885         if (!new_sz)
886           mark_deleting();
887         return write_record_ret(this, nullptr, false);
888       }
889
890       //std::cerr
891       //  << "existing: " << g_proto_version_str(version) << std::endl
892       //  << "new     : " << g_proto_version_str(t)       << std::endl
893       //  << "alloc_size : " << alloc_size                << std::endl
894       //  << "new_sz     : " << new_sz                    << std::endl;
895
896       // keep this tuple in the chain (it's wasteful, but not incorrect)
897       // so that cleanup is easier
898       //
899       // XXX(stephentu): alloc_spill() should acquire the lock on
900       // the returned tuple in the ctor, as an optimization
901
902       const bool needs_old_value =
903         writer(TUPLE_WRITER_NEEDS_OLD_VALUE, nullptr, nullptr, 0);
904       INVARIANT(new_sz);
905       INVARIANT(v);
906       dbtuple * const rep =
907         alloc_spill(t, get_value_start(), old_sz, new_sz,
908                     this, true, needs_old_value);
909       writer(TUPLE_WRITER_DO_WRITE, v, rep->get_value_start(), old_sz);
910       INVARIANT(rep->is_latest());
911       INVARIANT(rep->size == new_sz);
912       clear_latest();
913       ++g_evt_dbtuple_inplace_buf_insufficient;
914
915       // [did not spill because of epochs, need to replace this with rep]
916       return write_record_ret(rep, this, false);
917     }
918
919     //std::cerr
920     //  << "existing: " << g_proto_version_str(version) << std::endl
921     //  << "new     : " << g_proto_version_str(t)       << std::endl
922     //  << "alloc_size : " << alloc_size                << std::endl
923     //  << "new_sz     : " << new_sz                    << std::endl;
924
925     // need to spill
926     ++g_evt_dbtuple_spills;
927     g_evt_avg_record_spill_len.offer(size);
928
929     if (new_sz <= alloc_size && old_sz) {
930       INVARIANT(!is_deleting());
931       dbtuple * const spill = alloc(version, this, false);
932       INVARIANT(!spill->is_latest());
933       mark_modifying();
934       set_next(spill);
935       if (v)
936         writer(TUPLE_WRITER_DO_WRITE, v, get_value_start(), size);
937       version = t;
938       size = new_sz;
939       if (!new_sz)
940         mark_deleting();
941       return write_record_ret(this, spill, true);
942     }
943
944     const bool needs_old_value =
945       writer(TUPLE_WRITER_NEEDS_OLD_VALUE, nullptr, nullptr, 0);
946     dbtuple * const rep =
947       alloc_spill(t, get_value_start(), old_sz, new_sz,
948                   this, true, needs_old_value);
949     if (v)
950       writer(TUPLE_WRITER_DO_WRITE, v, rep->get_value_start(), size);
951     INVARIANT(rep->is_latest());
952     INVARIANT(rep->size == new_sz);
953     INVARIANT(new_sz || rep->is_deleting()); // set by alloc_spill()
954     clear_latest();
955     ++g_evt_dbtuple_inplace_buf_insufficient_on_spill;
956     return write_record_ret(rep, this, true);
957 #else
958     CheckMagic();
959     INVARIANT(is_locked());
960     INVARIANT(is_lock_owner());
961     INVARIANT(is_latest());
962     INVARIANT(is_write_intent());
963
964     const size_t new_sz =
965       v ? writer(TUPLE_WRITER_COMPUTE_NEEDED, v, get_value_start(), size) : 0;
966     INVARIANT(!v || new_sz);
967     INVARIANT(is_deleting() || size);
968     const size_t old_sz = is_deleting() ? 0 : size;
969
970     if (!new_sz)
971       ++g_evt_dbtuple_logical_deletes;
972
973     const bool needs_old_value =
974       writer(TUPLE_WRITER_NEEDS_OLD_VALUE, nullptr, nullptr, 0);
975     dbtuple * const rep =
976       alloc_spill(t, get_value_start(), old_sz, new_sz,
977                   this, true, needs_old_value);
978     if (v)
979       writer(TUPLE_WRITER_DO_WRITE, v, rep->get_value_start(), size);
980     INVARIANT(rep->is_latest());
981     INVARIANT(rep->size == new_sz);
982     INVARIANT(new_sz || rep->is_deleting()); // set by alloc_spill()
983     clear_latest();
984     ++g_evt_dbtuple_inplace_buf_insufficient_on_spill;
985     return write_record_ret(rep, this, true);
986 #endif
987   }
988
989   // NB: we round up allocation sizes because jemalloc will do this
990   // internally anyways, so we might as well grab more usable space (really
991   // just internal vs external fragmentation)
992
993   static inline dbtuple *
994   alloc_first(size_type sz, bool acquire_lock)
995   {
996     INVARIANT(sz <= std::numeric_limits<node_size_type>::max());
997     const size_t max_alloc_sz =
998       std::numeric_limits<node_size_type>::max() + sizeof(dbtuple);
999     const size_t alloc_sz =
1000       std::min(
1001           util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(dbtuple) + sz),
1002           max_alloc_sz);
1003     char *p = reinterpret_cast<char *>(rcu::s_instance.alloc(alloc_sz));
1004     INVARIANT(p);
1005     INVARIANT((alloc_sz - sizeof(dbtuple)) >= sz);
1006     return new (p) dbtuple(
1007         sz, alloc_sz - sizeof(dbtuple), acquire_lock);
1008   }
1009
1010   static inline dbtuple *
1011   alloc(tid_t version, struct dbtuple *base, bool set_latest)
1012   {
1013     const size_t max_alloc_sz =
1014       std::numeric_limits<node_size_type>::max() + sizeof(dbtuple);
1015     const size_t alloc_sz =
1016       std::min(
1017           util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(dbtuple) + base->size),
1018           max_alloc_sz);
1019     char *p = reinterpret_cast<char *>(rcu::s_instance.alloc(alloc_sz));
1020     INVARIANT(p);
1021     return new (p) dbtuple(
1022         version, base, alloc_sz - sizeof(dbtuple), set_latest);
1023   }
1024
1025   static inline dbtuple *
1026   alloc_spill(tid_t version, const_record_type value, size_type oldsz,
1027               size_type newsz, struct dbtuple *next, bool set_latest,
1028               bool copy_old_value)
1029   {
1030     INVARIANT(oldsz <= std::numeric_limits<node_size_type>::max());
1031     INVARIANT(newsz <= std::numeric_limits<node_size_type>::max());
1032
1033     const size_t needed_sz =
1034       copy_old_value ? std::max(newsz, oldsz) : newsz;
1035     const size_t max_alloc_sz =
1036       std::numeric_limits<node_size_type>::max() + sizeof(dbtuple);
1037     const size_t alloc_sz =
1038       std::min(
1039           util::round_up<size_t, allocator::LgAllocAlignment>(sizeof(dbtuple) + needed_sz),
1040           max_alloc_sz);
1041     char *p = reinterpret_cast<char *>(rcu::s_instance.alloc(alloc_sz));
1042     INVARIANT(p);
1043     return new (p) dbtuple(
1044         version, value, oldsz, newsz,
1045         alloc_sz - sizeof(dbtuple), next, set_latest, copy_old_value);
1046   }
1047
1048
1049 private:
1050   static inline void
1051   destruct_and_free(dbtuple *n)
1052   {
1053     const size_t alloc_sz = n->alloc_size + sizeof(*n);
1054     n->~dbtuple();
1055     rcu::s_instance.dealloc(n, alloc_sz);
1056   }
1057
1058 public:
1059   static void
1060   deleter(void *p)
1061   {
1062     dbtuple * const n = (dbtuple *) p;
1063     INVARIANT(!n->is_latest());
1064     INVARIANT(!n->is_locked());
1065     INVARIANT(!n->is_modifying());
1066     destruct_and_free(n);
1067   }
1068
1069   static inline void
1070   release(dbtuple *n)
1071   {
1072     if (unlikely(!n))
1073       return;
1074     INVARIANT(n->is_locked());
1075     INVARIANT(!n->is_latest());
1076     rcu::s_instance.free_with_fn(n, deleter);
1077   }
1078
1079   static inline void
1080   release_no_rcu(dbtuple *n)
1081   {
1082     if (unlikely(!n))
1083       return;
1084     INVARIANT(!n->is_latest());
1085     destruct_and_free(n);
1086   }
1087
1088   static std::string
1089   VersionInfoStr(version_t v);
1090
1091 }
1092 #if !defined(TUPLE_CHECK_KEY) && \
1093     !defined(CHECK_INVARIANTS) && \
1094     !defined(TUPLE_LOCK_OWNERSHIP_CHECKING)
1095 PACKED
1096 #endif
1097 ;
1098
1099 #endif /* _NDB_TUPLE_H_ */