benchmark silo added
[c11concurrency-benchmarks.git] / silo / txn_btree.cc
1 #include <unistd.h>
2 #include <limits>
3 #include <memory>
4 #include <atomic>
5 #include <mutex>
6
7 #include "txn.h"
8 #include "txn_proto2_impl.h"
9 #include "txn_btree.h"
10 #include "typed_txn_btree.h"
11 #include "thread.h"
12 #include "util.h"
13 #include "macros.h"
14 #include "tuple.h"
15 #include "record/encoder.h"
16 #include "record/inline_str.h"
17
18 #include "scopedperf.hh"
19
20 #if defined(NDB_MASSTREE)
21 #define HAVE_REVERSE_RANGE_SCANS
22 #endif
23
24 using namespace std;
25 using namespace util;
26 uint64_t initial_timestamp;
27
28 struct test_callback_ctr {
29   test_callback_ctr(size_t *ctr) : ctr(ctr) {}
30   inline bool
31   operator()(const concurrent_btree::string_type &k, const string &v) const
32   {
33     (*ctr)++;
34     return true;
35   }
36   size_t *const ctr;
37 };
38
39 // all combinations of txn flags to test
40 static uint64_t TxnFlags[] = { 0, transaction_base::TXN_FLAG_LOW_LEVEL_SCAN };
41
42 template <typename P>
43 static void
44 always_assert_cond_in_txn(
45     const P &t, bool cond,
46     const char *condstr, const char *func,
47     const char *filename, int lineno)
48 {
49   if (likely(cond))
50     return;
51   static mutex g_report_lock;
52   std::lock_guard<mutex> guard(g_report_lock);
53   cerr << func << " (" << filename << ":" << lineno << ") - Condition `"
54        << condstr << "' failed!" << endl;
55   t.dump_debug_info();
56   sleep(1); // XXX(stephentu): give time for debug dump to reach console
57             // why doesn't flushing solve this?
58   abort();
59 }
60
61 #define ALWAYS_ASSERT_COND_IN_TXN(t, cond) \
62   always_assert_cond_in_txn(t, cond, #cond, __PRETTY_FUNCTION__, __FILE__, __LINE__)
63
64 template <typename P>
65 static inline void
66 AssertSuccessfulCommit(P &t)
67 {
68   ALWAYS_ASSERT_COND_IN_TXN(t, t.commit(false));
69 }
70
71 template <typename P>
72 static inline void
73 AssertFailedCommit(P &t)
74 {
75   ALWAYS_ASSERT_COND_IN_TXN(t, !t.commit(false));
76 }
77
78 template <typename T>
79 inline void
80 AssertByteEquality(const T &t, const uint8_t * v, size_t sz)
81 {
82   ALWAYS_ASSERT(sizeof(T) == sz);
83   bool success = memcmp(&t, v, sz) == 0;
84   if (!success) {
85     cerr << "expecting: " << hexify(string((const char *) &t, sizeof(T))) << endl;
86     cerr << "got      : " << hexify(string((const char *) v, sz)) << endl;
87     ALWAYS_ASSERT(false);
88   }
89 }
90
91 template <typename T>
92 inline void
93 AssertByteEquality(const T &t, const string &v)
94 {
95   AssertByteEquality(t, (const uint8_t *) v.data(), v.size());
96 }
97
98 struct rec {
99   rec() : v() {}
100   rec(uint64_t v) : v(v) {}
101   uint64_t v;
102 };
103
104 static inline ostream &
105 operator<<(ostream &o, const rec &r)
106 {
107   o << "[rec=" << r.v << "]";
108   return o;
109 }
110
111 template <template <typename> class TxnType, typename Traits>
112 static void
113 test1()
114 {
115   for (size_t txn_flags_idx = 0;
116        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
117        txn_flags_idx++) {
118     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
119
120     VERBOSE(cerr << "Testing with flags=0x" << hexify(txn_flags) << endl);
121
122     txn_btree<TxnType> btr(sizeof(rec));
123     typename Traits::StringAllocator arena;
124
125     {
126       TxnType<Traits> t(txn_flags, arena);
127       string v;
128       ALWAYS_ASSERT_COND_IN_TXN(t, !btr.search(t, u64_varkey(0), v));
129       btr.insert_object(t, u64_varkey(0), rec(0));
130       ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, u64_varkey(0), v));
131       ALWAYS_ASSERT_COND_IN_TXN(t, !v.empty());
132       AssertByteEquality(rec(0), v);
133       AssertSuccessfulCommit(t);
134       VERBOSE(cerr << "------" << endl);
135     }
136
137     {
138       TxnType<Traits>
139         t0(txn_flags, arena), t1(txn_flags, arena);
140       string v0, v1;
141
142       ALWAYS_ASSERT_COND_IN_TXN(t0, btr.search(t0, u64_varkey(0), v0));
143       ALWAYS_ASSERT_COND_IN_TXN(t0, !v0.empty());
144       AssertByteEquality(rec(0), v0);
145
146       btr.insert_object(t0, u64_varkey(0), rec(1));
147       ALWAYS_ASSERT_COND_IN_TXN(t1, btr.search(t1, u64_varkey(0), v1));
148       ALWAYS_ASSERT_COND_IN_TXN(t1, !v1.empty());
149
150       if (t1.is_snapshot())
151         // we don't read-uncommitted for consistent snapshot
152         AssertByteEquality(rec(0), v1);
153
154       AssertSuccessfulCommit(t0);
155       try {
156         t1.commit(true);
157         // if we have a consistent snapshot, then this txn should not abort
158         ALWAYS_ASSERT_COND_IN_TXN(t1, t1.is_snapshot());
159       } catch (transaction_abort_exception &e) {
160         // if we dont have a snapshot, then we expect an abort
161         ALWAYS_ASSERT_COND_IN_TXN(t1, !t1.is_snapshot());
162       }
163       VERBOSE(cerr << "------" << endl);
164     }
165
166     {
167       // racy insert
168       TxnType<Traits>
169         t0(txn_flags, arena), t1(txn_flags, arena);
170       string v0, v1;
171
172       ALWAYS_ASSERT_COND_IN_TXN(t0, btr.search(t0, u64_varkey(0), v0));
173       ALWAYS_ASSERT_COND_IN_TXN(t0, !v0.empty());
174       AssertByteEquality(rec(1), v0);
175
176       btr.insert_object(t0, u64_varkey(0), rec(2));
177       ALWAYS_ASSERT_COND_IN_TXN(t1, btr.search(t1, u64_varkey(0), v1));
178       ALWAYS_ASSERT_COND_IN_TXN(t1, !v1.empty());
179       // our API allows v1 to be a dirty read though (t1 will abort)
180
181       btr.insert_object(t1, u64_varkey(0), rec(3));
182
183       AssertSuccessfulCommit(t0);
184       AssertFailedCommit(t1);
185       VERBOSE(cerr << "------" << endl);
186     }
187
188     {
189       // racy scan
190       TxnType<Traits>
191         t0(txn_flags, arena), t1(txn_flags, arena);
192
193       const u64_varkey vend(5);
194       size_t ctr = 0;
195       test_callback_ctr cb(&ctr);
196       btr.search_range(t0, u64_varkey(1), &vend, cb);
197       ALWAYS_ASSERT_COND_IN_TXN(t0, ctr == 0);
198
199       btr.insert_object(t1, u64_varkey(2), rec(4));
200       AssertSuccessfulCommit(t1);
201
202       btr.insert_object(t0, u64_varkey(0), rec(0));
203       AssertFailedCommit(t0);
204       VERBOSE(cerr << "------" << endl);
205     }
206
207     {
208       TxnType<Traits> t(txn_flags, arena);
209       const u64_varkey vend(20);
210       size_t ctr = 0;
211       test_callback_ctr cb(&ctr);
212       btr.search_range(t, u64_varkey(10), &vend, cb);
213       ALWAYS_ASSERT_COND_IN_TXN(t, ctr == 0);
214       btr.insert_object(t, u64_varkey(15), rec(5));
215       AssertSuccessfulCommit(t);
216       VERBOSE(cerr << "------" << endl);
217     }
218
219     txn_epoch_sync<TxnType>::sync();
220     txn_epoch_sync<TxnType>::finish();
221   }
222 }
223
224 struct bufrec { char buf[256]; };
225
226 template <template <typename> class TxnType, typename Traits>
227 static void
228 test2()
229 {
230   for (size_t txn_flags_idx = 0;
231        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
232        txn_flags_idx++) {
233     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
234     txn_btree<TxnType> btr(sizeof(bufrec));
235     typename Traits::StringAllocator arena;
236     bufrec r;
237     NDB_MEMSET(r.buf, 'a', ARRAY_NELEMS(r.buf));
238     for (size_t i = 0; i < 100; i++) {
239       TxnType<Traits> t(txn_flags, arena);
240       btr.insert_object(t, u64_varkey(i), r);
241       AssertSuccessfulCommit(t);
242     }
243     for (size_t i = 0; i < 100; i++) {
244       TxnType<Traits> t(txn_flags, arena);
245       string v;
246       ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, u64_varkey(i), v));
247       AssertByteEquality(r, v);
248       AssertSuccessfulCommit(t);
249     }
250     txn_epoch_sync<TxnType>::sync();
251     txn_epoch_sync<TxnType>::finish();
252   }
253 }
254
255 template <template <typename> class TxnType, typename Traits>
256 static void
257 test_absent_key_race()
258 {
259   for (size_t txn_flags_idx = 0;
260        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
261        txn_flags_idx++) {
262     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
263     txn_btree<TxnType> btr;
264     typename Traits::StringAllocator arena;
265
266     {
267       TxnType<Traits>
268         t0(txn_flags, arena), t1(txn_flags, arena);
269       string v0, v1;
270       ALWAYS_ASSERT_COND_IN_TXN(t0, !btr.search(t0, u64_varkey(0), v0));
271       ALWAYS_ASSERT_COND_IN_TXN(t1, !btr.search(t1, u64_varkey(0), v1));
272
273       // t0 does a write
274       btr.insert_object(t0, u64_varkey(0), rec(1));
275
276       // t1 does a write
277       btr.insert_object(t1, u64_varkey(0), rec(1));
278
279       // t0 should win, t1 should abort
280       AssertSuccessfulCommit(t0);
281       AssertFailedCommit(t1);
282     }
283
284     txn_epoch_sync<TxnType>::sync();
285     txn_epoch_sync<TxnType>::finish();
286   }
287 }
288
289 template <template <typename> class TxnType, typename Traits>
290 static void
291 test_inc_value_size()
292 {
293   for (size_t txn_flags_idx = 0;
294        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
295        txn_flags_idx++) {
296     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
297     txn_btree<TxnType> btr;
298     typename Traits::StringAllocator arena;
299     const size_t upper = numeric_limits<dbtuple::node_size_type>::max();
300     for (size_t i = 1; i < upper; i++) {
301       const string v(i, 'a');
302       TxnType<Traits> t(txn_flags, arena);
303       btr.insert(t, u64_varkey(0), (const uint8_t *) v.data(), v.size());
304       AssertSuccessfulCommit(t);
305     }
306     txn_epoch_sync<TxnType>::sync();
307     txn_epoch_sync<TxnType>::finish();
308   }
309 }
310
311 template <template <typename> class TxnType, typename Traits>
312 static void
313 test_multi_btree()
314 {
315   for (size_t txn_flags_idx = 0;
316        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
317        txn_flags_idx++) {
318     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
319     txn_btree<TxnType> btr0, btr1;
320     typename Traits::StringAllocator arena;
321     for (size_t i = 0; i < 100; i++) {
322       TxnType<Traits> t(txn_flags, arena);
323       btr0.insert_object(t, u64_varkey(i), rec(123));
324       btr1.insert_object(t, u64_varkey(i), rec(123));
325       AssertSuccessfulCommit(t);
326     }
327
328     for (size_t i = 0; i < 100; i++) {
329       TxnType<Traits> t(txn_flags, arena);
330       string v0, v1;
331       bool ret0 = btr0.search(t, u64_varkey(i), v0);
332       bool ret1 = btr1.search(t, u64_varkey(i), v1);
333       AssertSuccessfulCommit(t);
334       ALWAYS_ASSERT_COND_IN_TXN(t, ret0);
335       ALWAYS_ASSERT_COND_IN_TXN(t, !v0.empty());
336       ALWAYS_ASSERT_COND_IN_TXN(t, ret1);
337       ALWAYS_ASSERT_COND_IN_TXN(t, !v1.empty());
338       AssertByteEquality(rec(123), v0);
339       AssertByteEquality(rec(123), v1);
340     }
341
342     txn_epoch_sync<TxnType>::sync();
343     txn_epoch_sync<TxnType>::finish();
344   }
345 }
346
347 template <template <typename> class TxnType, typename Traits>
348 static void
349 test_read_only_snapshot()
350 {
351   for (size_t txn_flags_idx = 0;
352        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
353        txn_flags_idx++) {
354     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
355     txn_btree<TxnType> btr;
356     typename Traits::StringAllocator arena;
357
358     {
359       TxnType<Traits> t(txn_flags, arena);
360       btr.insert_object(t, u64_varkey(0), rec(0));
361       AssertSuccessfulCommit(t);
362     }
363
364     // XXX(stephentu): HACK! we need to wait for the GC to
365     // compute a new consistent snapshot version that includes this
366     // latest update
367     txn_epoch_sync<TxnType>::sync();
368
369     {
370       TxnType<Traits>
371         t0(txn_flags, arena),
372         t1(txn_flags | transaction_base::TXN_FLAG_READ_ONLY, arena);
373       string v0, v1;
374       ALWAYS_ASSERT_COND_IN_TXN(t0, btr.search(t0, u64_varkey(0), v0));
375       ALWAYS_ASSERT_COND_IN_TXN(t0, !v0.empty());
376       AssertByteEquality(rec(0), v0);
377
378       btr.insert_object(t0, u64_varkey(0), rec(1));
379
380       ALWAYS_ASSERT_COND_IN_TXN(t1, btr.search(t1, u64_varkey(0), v1));
381       ALWAYS_ASSERT_COND_IN_TXN(t1, !v1.empty());
382       AssertByteEquality(rec(0), v1);
383
384       AssertSuccessfulCommit(t0);
385       AssertSuccessfulCommit(t1);
386     }
387
388     txn_epoch_sync<TxnType>::sync();
389     txn_epoch_sync<TxnType>::finish();
390   }
391 }
392
393 namespace test_long_keys_ns {
394
395 static inline string
396 make_long_key(int32_t a, int32_t b, int32_t c, int32_t d) {
397   char buf[4 * sizeof(int32_t)];
398   int32_t *p = (int32_t *) &buf[0];
399   *p++ = a;
400   *p++ = b;
401   *p++ = c;
402   *p++ = d;
403   return string(&buf[0], ARRAY_NELEMS(buf));
404 }
405
406 template <template <typename> class Protocol>
407 class counting_scan_callback : public txn_btree<Protocol>::search_range_callback {
408 public:
409   counting_scan_callback(uint64_t expect) : ctr(0), expect(expect) {}
410
411   virtual bool
412   invoke(const typename txn_btree<Protocol>::keystring_type &k, const string &v)
413   {
414     AssertByteEquality(rec(expect), v);
415     ctr++;
416     return true;
417   }
418   size_t ctr;
419   uint64_t expect;
420 };
421
422 }
423
424 namespace test_insert_same_key_ns {
425   struct rec0 { rec0(int ch) { NDB_MEMSET(&buf[0], ch, sizeof(buf)); } char buf[8];    };
426   struct rec1 { rec1(int ch) { NDB_MEMSET(&buf[0], ch, sizeof(buf)); } char buf[128];  };
427   struct rec2 { rec2(int ch) { NDB_MEMSET(&buf[0], ch, sizeof(buf)); } char buf[1024]; };
428   struct rec3 { rec3(int ch) { NDB_MEMSET(&buf[0], ch, sizeof(buf)); } char buf[2048]; };
429   struct rec4 { rec4(int ch) { NDB_MEMSET(&buf[0], ch, sizeof(buf)); } char buf[4096]; };
430 }
431
432 template <template <typename> class TxnType, typename Traits>
433 static void
434 test_long_keys()
435 {
436   using namespace test_long_keys_ns;
437   const size_t N = 10;
438   for (size_t txn_flags_idx = 0;
439        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
440        txn_flags_idx++) {
441     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
442     txn_btree<TxnType> btr;
443     typename Traits::StringAllocator arena;
444
445     {
446       TxnType<Traits> t(txn_flags, arena);
447       for (size_t a = 0; a < N; a++)
448         for (size_t b = 0; b < N; b++)
449           for (size_t c = 0; c < N; c++)
450             for (size_t d = 0; d < N; d++) {
451               const string k = make_long_key(a, b, c, d);
452               btr.insert_object(t, varkey(k), rec(1));
453             }
454       AssertSuccessfulCommit(t);
455     }
456
457     {
458       TxnType<Traits> t(txn_flags, arena);
459       const string lowkey_s = make_long_key(1, 2, 3, 0);
460       const string highkey_s = make_long_key(1, 2, 3, N);
461       const varkey highkey(highkey_s);
462       counting_scan_callback<TxnType> c(1);
463       btr.search_range_call(t, varkey(lowkey_s), &highkey, c);
464       AssertSuccessfulCommit(t);
465       if (c.ctr != N)
466         cerr << "c.ctr: " << c.ctr << ", N: " << N << endl;
467       ALWAYS_ASSERT_COND_IN_TXN(t, c.ctr == N);
468     }
469
470 #ifdef HAVE_REVERSE_RANGE_SCANS
471     {
472       TxnType<Traits> t(txn_flags, arena);
473       const string lowkey_s = make_long_key(4, 5, 3, 0);
474       const string highkey_s = make_long_key(4, 5, 3, N);
475       const varkey lowkey(lowkey_s);
476       counting_scan_callback<TxnType> c(1);
477       btr.rsearch_range_call(t, varkey(highkey_s), &lowkey, c);
478       AssertSuccessfulCommit(t);
479       if (c.ctr != (N-1))
480         cerr << "c.ctr: " << c.ctr << ", N: " << N << endl;
481       ALWAYS_ASSERT_COND_IN_TXN(t, c.ctr == (N-1));
482     }
483 #endif
484
485     txn_epoch_sync<TxnType>::sync();
486     txn_epoch_sync<TxnType>::finish();
487   }
488
489   cerr << "test_long_keys passed" << endl;
490 }
491
492 template <template <typename> class TxnType, typename Traits>
493 static void
494 test_long_keys2()
495 {
496   using namespace test_long_keys_ns;
497   for (size_t txn_flags_idx = 0;
498        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
499        txn_flags_idx++) {
500     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
501
502     const uint8_t lowkey_cstr[] = {
503       0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x45, 0x49, 0x4E, 0x47,
504       0x41, 0x54, 0x49, 0x4F, 0x4E, 0x45, 0x49, 0x4E, 0x47, 0x00, 0x00, 0x00,
505       0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
506       0x00, 0x00, 0x00, 0x00,
507     };
508     const string lowkey_s((const char *) &lowkey_cstr[0], ARRAY_NELEMS(lowkey_cstr));
509     const uint8_t highkey_cstr[] = {
510       0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x45, 0x49, 0x4E, 0x47,
511       0x41, 0x54, 0x49, 0x4F, 0x4E, 0x45, 0x49, 0x4E, 0x47, 0x00, 0x00, 0x00,
512       0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
513       0xFF, 0xFF, 0xFF, 0xFF
514     };
515     const string highkey_s((const char *) &highkey_cstr[0], ARRAY_NELEMS(highkey_cstr));
516
517     txn_btree<TxnType> btr;
518     typename Traits::StringAllocator arena;
519     {
520       TxnType<Traits> t(txn_flags, arena);
521       btr.insert_object(t, varkey(lowkey_s), rec(12345));
522       AssertSuccessfulCommit(t);
523     }
524
525     {
526       TxnType<Traits> t(txn_flags, arena);
527       string v;
528       ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, varkey(lowkey_s), v));
529       AssertByteEquality(rec(12345), v);
530       AssertSuccessfulCommit(t);
531     }
532
533     {
534       TxnType<Traits> t(txn_flags, arena);
535       counting_scan_callback<TxnType> c(12345);
536       const varkey highkey(highkey_s);
537       btr.search_range_call(t, varkey(lowkey_s), &highkey, c);
538       AssertSuccessfulCommit(t);
539       ALWAYS_ASSERT_COND_IN_TXN(t, c.ctr == 1);
540     }
541
542     txn_epoch_sync<TxnType>::sync();
543     txn_epoch_sync<TxnType>::finish();
544   }
545 }
546
547 template <template <typename> class TxnType, typename Traits>
548 static void
549 test_insert_same_key()
550 {
551   using namespace test_insert_same_key_ns;
552   for (size_t txn_flags_idx = 0;
553        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
554        txn_flags_idx++) {
555     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
556
557     txn_btree<TxnType> btr;
558     typename Traits::StringAllocator arena;
559
560     {
561       TxnType<Traits> t(txn_flags, arena);
562       btr.insert_object(t, u64_varkey(0), rec0('a'));
563       btr.insert_object(t, u64_varkey(0), rec1('b'));
564       btr.insert_object(t, u64_varkey(0), rec2('c'));
565       AssertSuccessfulCommit(t);
566     }
567
568     {
569       TxnType<Traits> t(txn_flags, arena);
570       btr.insert_object(t, u64_varkey(0), rec3('d'));
571       btr.insert_object(t, u64_varkey(0), rec4('e'));
572       AssertSuccessfulCommit(t);
573     }
574
575     txn_epoch_sync<TxnType>::sync();
576     txn_epoch_sync<TxnType>::finish();
577   }
578 }
579
580 #define TESTREC_KEY_FIELDS(x, y) \
581   x(int32_t,k0) \
582   y(int32_t,k1)
583 #define TESTREC_VALUE_FIELDS(x, y) \
584   x(int32_t,v0) \
585   y(int16_t,v1) \
586   y(inline_str_fixed<10>,v2)
587 DO_STRUCT(testrec, TESTREC_KEY_FIELDS, TESTREC_VALUE_FIELDS)
588
589 namespace test_typed_btree_ns {
590
591 static const pair<testrec::key, testrec::value> scan_values[] = {
592   {testrec::key(10, 1), testrec::value(123456 + 1, 10, "A")},
593   {testrec::key(10, 2), testrec::value(123456 + 2, 10, "B")},
594   {testrec::key(10, 3), testrec::value(123456 + 3, 10, "C")},
595   {testrec::key(10, 4), testrec::value(123456 + 4, 10, "D")},
596   {testrec::key(10, 5), testrec::value(123456 + 5, 10, "E")},
597 };
598
599 template <template <typename> class Protocol>
600 class scan_callback : public typed_txn_btree<Protocol, schema<testrec>>::search_range_callback {
601 public:
602   constexpr scan_callback() : n(0) {}
603
604   virtual bool
605   invoke(const testrec::key &key,
606          const testrec::value &value)
607   {
608     ALWAYS_ASSERT(n < ARRAY_NELEMS(scan_values));
609     ALWAYS_ASSERT(scan_values[n].first == key);
610     ALWAYS_ASSERT(scan_values[n].second.v2 == value.v2);
611     n++;
612     return true;
613   }
614
615 private:
616   size_t n;
617 };
618
619 }
620
621 template <template <typename> class TxnType, typename Traits>
622 static void
623 test_typed_btree()
624 {
625   using namespace test_typed_btree_ns;
626
627   typedef typed_txn_btree<TxnType, schema<testrec>> ttxn_btree_type;
628   ttxn_btree_type btr;
629   typename Traits::StringAllocator arena;
630   typedef TxnType<Traits> txn_type;
631
632   const testrec::key k0(1, 1);
633   const testrec::value v0(2, 3, "hello");
634
635   {
636     txn_type t(0, arena);
637     testrec::value v;
638     ALWAYS_ASSERT_COND_IN_TXN(t, !btr.search(t, k0, v));
639     btr.insert(t, k0, v0);
640     AssertSuccessfulCommit(t);
641   }
642
643   {
644     txn_type t(0, arena);
645     testrec::value v;
646     ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, k0, v));
647     ALWAYS_ASSERT_COND_IN_TXN(t, v0 == v);
648     AssertSuccessfulCommit(t);
649   }
650
651   {
652     txn_type t(0, arena);
653     testrec::value v;
654     const bool ret = btr.search(t, k0, v, FIELDS(1, 2));
655     ALWAYS_ASSERT_COND_IN_TXN(t, ret);
656     ALWAYS_ASSERT_COND_IN_TXN(t, v.v1 == v0.v1);
657     ALWAYS_ASSERT_COND_IN_TXN(t, v.v2 == v0.v2);
658     AssertSuccessfulCommit(t);
659   }
660
661   {
662     txn_type t(0, arena);
663     testrec::value v;
664     const bool ret = btr.search(t, k0, v, FIELDS(0, 2));
665     ALWAYS_ASSERT_COND_IN_TXN(t, ret);
666     ALWAYS_ASSERT_COND_IN_TXN(t, v.v0 == v0.v0);
667     ALWAYS_ASSERT_COND_IN_TXN(t, v.v2 == v0.v2);
668     AssertSuccessfulCommit(t);
669   }
670
671   {
672     txn_type t(0, arena);
673     for (size_t i = 0; i < ARRAY_NELEMS(scan_values); i++)
674       btr.insert(t, scan_values[i].first, scan_values[i].second);
675     AssertSuccessfulCommit(t);
676   }
677
678   {
679     txn_type t(0, arena);
680     const testrec::key begin(10, 0);
681     scan_callback<TxnType> cb;
682     btr.search_range_call(t, begin, nullptr, cb, false, FIELDS(2));
683     AssertSuccessfulCommit(t);
684   }
685
686   txn_epoch_sync<TxnType>::sync();
687   txn_epoch_sync<TxnType>::finish();
688
689   cerr << "test_typed_btree() passed" << endl;
690 }
691
692 template <template <typename> class Protocol>
693 class txn_btree_worker : public ndb_thread {
694 public:
695   txn_btree_worker(txn_btree<Protocol> &btr, uint64_t txn_flags)
696     : btr(&btr), txn_flags(txn_flags) {}
697   inline uint64_t get_txn_flags() const { return txn_flags; }
698 protected:
699   txn_btree<Protocol> *const btr;
700   const uint64_t txn_flags;
701 };
702
703 namespace mp_stress_test_allocator_ns {
704
705   static const size_t nworkers = 28;
706   static const size_t nkeys = nworkers * 4;
707
708   static atomic<bool> running(true);
709
710   template <template <typename> class TxnType, typename Traits>
711   class worker : public txn_btree_worker<TxnType> {
712   public:
713     worker(unsigned id, txn_btree<TxnType> &btr, uint64_t txn_flags)
714       : txn_btree_worker<TxnType>(btr, txn_flags),
715         id(id), commits(0), aborts(0) {}
716     ~worker() {}
717     virtual void run()
718     {
719       rcu::s_instance.pin_current_thread(id);
720       fast_random r(reinterpret_cast<unsigned long>(this));
721       string v;
722       while (running.load()) {
723         typename Traits::StringAllocator arena;
724         TxnType<Traits> t(this->txn_flags, arena);
725         try {
726           // RMW on a small space of keys
727           const u64_varkey k(r.next() % nkeys);
728           ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, k, v));
729           ((rec *) v.data())->v++;
730           this->btr->put(t, k, v);
731           t.commit(true);
732           commits++;
733         } catch (transaction_abort_exception &e) {
734           aborts++;
735         }
736       }
737     }
738     unsigned get_id() const { return id; }
739     unsigned get_commits() const { return commits; }
740     unsigned get_aborts() const { return aborts; }
741   private:
742     unsigned id;
743     unsigned commits;
744     unsigned aborts;
745   };
746
747 };
748
749 template <template <typename> class TxnType, typename Traits>
750 static void
751 mp_stress_test_allocator()
752 {
753   using namespace mp_stress_test_allocator_ns;
754   txn_btree<TxnType> btr;
755   {
756     rcu::s_instance.pin_current_thread(0);
757     typename Traits::StringAllocator arena;
758     TxnType<Traits> t(0, arena);
759     for (size_t i = 0; i < nkeys; i++)
760       btr.insert_object(t, u64_varkey(i), rec(0));
761     AssertSuccessfulCommit(t);
762   }
763   vector< shared_ptr< worker<TxnType, Traits> > > v;
764   for (size_t i = 0; i < nworkers; i++)
765     v.emplace_back(new worker<TxnType, Traits>(i, btr, 0));
766   for (auto &p : v)
767     p->start();
768   sleep(60);
769   running.store(false);
770   for (auto &p : v) {
771     p->join();
772     cerr << "worker " << p->get_id()
773          << " commits " << p->get_commits()
774          << " aborts " << p->get_aborts()
775          << endl;
776   }
777   cerr << "mp_stress_test_allocator passed" << endl;
778 }
779
780 namespace mp_stress_test_insert_removes_ns {
781   struct big_rec {
782     static const size_t S = numeric_limits<dbtuple::node_size_type>::max();
783     char buf[S];
784   };
785   static const size_t nworkers = 4;
786   static atomic<bool> running(true);
787   template <template <typename> class TxnType, typename Traits>
788   class worker : public txn_btree_worker<TxnType> {
789   public:
790     worker(txn_btree<TxnType> &btr, uint64_t txn_flags)
791       : txn_btree_worker<TxnType>(btr, txn_flags) {}
792     ~worker() {}
793     virtual void run()
794     {
795       fast_random r(reinterpret_cast<unsigned long>(this));
796       while (running.load()) {
797         typename Traits::StringAllocator arena;
798         TxnType<Traits> t(this->txn_flags, arena);
799         try {
800           switch (r.next() % 3) {
801           case 0:
802             this->btr->insert_object(t, u64_varkey(0), rec(1));
803             break;
804           case 1:
805             this->btr->insert_object(t, u64_varkey(0), big_rec());
806             break;
807           case 2:
808             this->btr->remove(t, u64_varkey(0));
809             break;
810           }
811           t.commit(true);
812         } catch (transaction_abort_exception &e) {
813         }
814       }
815     }
816   };
817 }
818
819 template <template <typename> class TxnType, typename Traits>
820 static void
821 mp_stress_test_insert_removes()
822 {
823   using namespace mp_stress_test_insert_removes_ns;
824   txn_btree<TxnType> btr;
825   vector< shared_ptr< worker<TxnType, Traits> > > v;
826   for (size_t i = 0; i < nworkers; i++)
827     v.emplace_back(new worker<TxnType, Traits>(btr, 0));
828   for (auto &p : v)
829     p->start();
830   sleep(5); // let many epochs pass
831   running.store(false);
832   for (auto &p : v)
833     p->join();
834
835 }
836
837 namespace mp_test1_ns {
838   // read-modify-write test (counters)
839
840   const size_t niters = 1000;
841
842   template <template <typename> class TxnType, typename Traits>
843   class worker : public txn_btree_worker<TxnType> {
844   public:
845     worker(txn_btree<TxnType> &btr, uint64_t txn_flags)
846       : txn_btree_worker<TxnType>(btr, txn_flags) {}
847     ~worker() {}
848     virtual void run()
849     {
850       for (size_t i = 0; i < niters; i++) {
851       retry:
852         typename Traits::StringAllocator arena;
853         TxnType<Traits> t(this->txn_flags, arena);
854         try {
855           rec r;
856           string v;
857           if (!this->btr->search(t, u64_varkey(0), v)) {
858             r.v = 1;
859           } else {
860             r = *((const rec *) v.data());
861             r.v++;
862           }
863           this->btr->insert_object(t, u64_varkey(0), r);
864           t.commit(true);
865         } catch (transaction_abort_exception &e) {
866           goto retry;
867         }
868       }
869     }
870   };
871 }
872
873 template <template <typename> class TxnType, typename Traits>
874 static void
875 mp_test1()
876 {
877   using namespace mp_test1_ns;
878
879   for (size_t txn_flags_idx = 0;
880        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
881        txn_flags_idx++) {
882     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
883     txn_btree<TxnType> btr;
884     typename Traits::StringAllocator arena;
885
886     worker<TxnType, Traits> w0(btr, txn_flags);
887     worker<TxnType, Traits> w1(btr, txn_flags);
888     worker<TxnType, Traits> w2(btr, txn_flags);
889     worker<TxnType, Traits> w3(btr, txn_flags);
890
891     w0.start(); w1.start(); w2.start(); w3.start();
892     w0.join(); w1.join(); w2.join(); w3.join();
893
894     {
895       TxnType<Traits> t(txn_flags, arena);
896       string v;
897       ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, u64_varkey(0), v));
898       ALWAYS_ASSERT_COND_IN_TXN(t, !v.empty());
899       const uint64_t rv = ((const rec *) v.data())->v;
900       if (rv != (niters * 4))
901         cerr << "v: " << rv << ", expected: " << (niters * 4) << endl;
902       ALWAYS_ASSERT_COND_IN_TXN(t, rv == (niters * 4));
903       AssertSuccessfulCommit(t);
904     }
905
906     txn_epoch_sync<TxnType>::sync();
907     txn_epoch_sync<TxnType>::finish();
908   }
909 }
910
911 namespace mp_test2_ns {
912
913   static const uint64_t ctr_key = 0;
914   static const uint64_t range_begin = 100;
915   static const uint64_t range_end = 150;
916
917   struct control_rec {
918     uint32_t count_;
919     uint32_t values_[range_end - range_begin];
920     control_rec()
921       : count_(0)
922     {
923       NDB_MEMSET(&values_[0], 0, sizeof(values_));
924     }
925
926 #if 0
927     void
928     sanity_check() const
929     {
930       INVARIANT(count_ <= ARRAY_NELEMS(values_));
931       if (!count_)
932         return;
933       uint32_t last = values_[0];
934       INVARIANT(last >= range_begin && last < range_end);
935       for (size_t i = 1; i < count_; last = values_[i], i++) {
936         INVARIANT(last < values_[i]);
937         INVARIANT(values_[i] >= range_begin && values_[i] < range_end);
938       }
939     }
940 #else
941     inline void sanity_check() const {}
942 #endif
943
944     // assumes list is in ascending order, no dups, and there is space
945     // remaining
946     //
947     // inserts in ascending order
948     void
949     insert(uint32_t v)
950     {
951       if (count_ >= ARRAY_NELEMS(values_)) {
952         cerr << "ERROR when trying to insert " << v
953              << ": " << vectorize() << endl;
954       }
955
956       INVARIANT(count_ < ARRAY_NELEMS(values_));
957       if (!count_ || values_[count_ - 1] < v) {
958         values_[count_++] = v;
959         sanity_check();
960         INVARIANT(contains(v));
961         return;
962       }
963       for (size_t i = 0; i < count_; i++) {
964         if (values_[i] > v) {
965           // move values_[i, count_) into slots values_[i+1, count_+1)
966           memmove(&values_[i+1], &values_[i], (count_ - i) * sizeof(uint32_t));
967           values_[i] = v;
968           count_++;
969           sanity_check();
970           INVARIANT(contains(v));
971           return;
972         }
973       }
974     }
975
976     inline bool
977     contains(uint32_t v) const
978     {
979       for (size_t i = 0; i < count_; i++)
980         if (values_[i] == v)
981           return true;
982       return false;
983     }
984
985     // removes v from list, returns true if actual removal happened.
986     // assumes v is in ascending order with no dups
987     bool
988     remove(uint32_t v)
989     {
990       for (size_t i = 0; i < count_; i++) {
991         if (values_[i] == v) {
992           // move values_[i+1, count_) into slots values_[i, count_-1)
993           memmove(&values_[i], &values_[i+1], (count_ - (i+1)) * sizeof(uint32_t));
994           count_ -= 1;
995           sanity_check();
996           INVARIANT(!contains(v));
997           // no dups assumption
998           return true;
999         } else if (values_[i] > v) {
1000           // ascending order assumption
1001           break;
1002         }
1003       }
1004       INVARIANT(!contains(v));
1005       return false;
1006     }
1007
1008     inline vector<uint32_t>
1009     vectorize() const
1010     {
1011       vector<uint32_t> v;
1012       for (size_t i = 0; i < count_; i++)
1013         v.push_back(values_[i]);
1014       return v;
1015     }
1016   };
1017
1018   static volatile bool running = true;
1019
1020   // expects the values to be monotonically increasing (as records)
1021   template <template <typename> class Protocol>
1022   class counting_scan_callback : public txn_btree<Protocol>::search_range_callback {
1023   public:
1024     virtual bool
1025     invoke(const typename txn_btree<Protocol>::keystring_type &k,
1026            const string &v)
1027     {
1028       ALWAYS_ASSERT(k.length() == 8);
1029       const uint64_t u64k =
1030         host_endian_trfm<uint64_t>()(*reinterpret_cast<const uint64_t *>(k.data()));
1031       if (v.size() != sizeof(rec)) {
1032         cerr << "v.size(): " << v.size() << endl;
1033         cerr << "sizeof rec: " << sizeof(rec) << endl;
1034         ALWAYS_ASSERT(false);
1035       }
1036       const rec *r = (const rec *) v.data();
1037       ALWAYS_ASSERT(u64k == r->v);
1038       VERBOSE(cerr << "counting_scan_callback: " << hexify(k) << " => " << r->v << endl);
1039       values_.push_back(r->v);
1040       return true;
1041     }
1042     vector<uint32_t> values_;
1043   };
1044
1045   template <template <typename> class TxnType, typename Traits>
1046   class mutate_worker : public txn_btree_worker<TxnType> {
1047   public:
1048     mutate_worker(txn_btree<TxnType> &btr, uint64_t flags)
1049       : txn_btree_worker<TxnType>(btr, flags), naborts(0) {}
1050     virtual void run()
1051     {
1052       while (running) {
1053         for (size_t i = range_begin; running && i < range_end; i++) {
1054         retry:
1055           typename Traits::StringAllocator arena;
1056           //bool did_remove = false;
1057           //uint64_t did_v = 0;
1058           {
1059             TxnType<Traits> t(this->txn_flags, arena);
1060             try {
1061               control_rec ctr_rec;
1062               string v, v_ctr;
1063               ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(ctr_key), v_ctr));
1064               ALWAYS_ASSERT_COND_IN_TXN(t, v_ctr.size() == sizeof(control_rec));
1065               ALWAYS_ASSERT_COND_IN_TXN(t, ((const control_rec *) v_ctr.data())->count_ > 1);
1066               ctr_rec = *((const control_rec *) v_ctr.data());
1067               if (this->btr->search(t, u64_varkey(i), v)) {
1068                 AssertByteEquality(rec(i), v);
1069                 this->btr->remove(t, u64_varkey(i));
1070                 ALWAYS_ASSERT_COND_IN_TXN(t, ctr_rec.remove(i));
1071                 //did_remove = true;
1072               } else {
1073                 this->btr->insert_object(t, u64_varkey(i), rec(i));
1074                 ctr_rec.insert(i);
1075               }
1076               //did_v = ctr_rec.v;
1077               ctr_rec.sanity_check();
1078               this->btr->insert_object(t, u64_varkey(ctr_key), ctr_rec);
1079               t.commit(true);
1080             } catch (transaction_abort_exception &e) {
1081               naborts++;
1082               goto retry;
1083             }
1084           }
1085
1086           //{
1087           //  TxnType<Traits> t(this->txn_flags, arena);
1088           //  try {
1089           //    string v, v_ctr;
1090           //    ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(ctr_key), v_ctr));
1091           //    ALWAYS_ASSERT_COND_IN_TXN(t, v_ctr.size() == sizeof(control_rec));
1092           //    const bool ret = this->btr->search(t, u64_varkey(i), v);
1093           //    t.commit(true);
1094           //    if (reinterpret_cast<const rec *>(v_ctr.data())->v != did_v) {
1095           //      cerr << "rec.v: " << reinterpret_cast<const rec *>(v_ctr.data())->v << ", did_v: " << did_v << endl;
1096           //      ALWAYS_ASSERT(false);
1097           //    }
1098           //    if (did_remove && ret) {
1099           //      cerr << "removed previous, but still found" << endl;
1100           //      ALWAYS_ASSERT(false);
1101           //    } else if (!did_remove && !ret) {
1102           //      cerr << "did not previous, but not found" << endl;
1103           //      ALWAYS_ASSERT(false);
1104           //    }
1105           //  } catch (transaction_abort_exception &e) {
1106           //    // possibly aborts due to GC mechanism- if so, just move on
1107           //  }
1108           //}
1109         }
1110       }
1111     }
1112     size_t naborts;
1113   };
1114
1115   template <template <typename> class TxnType, typename Traits>
1116   class reader_worker : public txn_btree_worker<TxnType> {
1117   public:
1118     reader_worker(txn_btree<TxnType> &btr, uint64_t flags, bool reverse)
1119       : txn_btree_worker<TxnType>(btr, flags),
1120         reverse_(reverse), validations(0), naborts(0) {}
1121     virtual void run()
1122     {
1123       while (running) {
1124         typename Traits::StringAllocator arena;
1125         TxnType<Traits> t(this->txn_flags, arena);
1126         try {
1127           string v_ctr;
1128           ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(ctr_key), v_ctr));
1129           ALWAYS_ASSERT_COND_IN_TXN(t, v_ctr.size() == sizeof(control_rec));
1130           counting_scan_callback<TxnType> c;
1131           if (!reverse_) {
1132             const u64_varkey kend(range_end);
1133             this->btr->search_range_call(t, u64_varkey(range_begin), &kend, c);
1134           } else {
1135             const u64_varkey mkey(range_begin - 1);
1136             this->btr->rsearch_range_call(t, u64_varkey(range_end), &mkey, c);
1137             // reverse values
1138             c.values_ = vector<uint32_t>(c.values_.rbegin(), c.values_.rend());
1139           }
1140           t.commit(true);
1141
1142           const control_rec *crec = (const control_rec *) v_ctr.data();
1143           crec->sanity_check();
1144           auto cvec = crec->vectorize();
1145           if (c.values_ != cvec) {
1146             cerr << "observed (" << c.values_.size() << "): " << c.values_ << endl;
1147             cerr << "db value (" << cvec.size() << "): " << cvec << endl;
1148             ALWAYS_ASSERT_COND_IN_TXN(t, c.values_ == cvec);
1149           }
1150           validations++;
1151           VERBOSE(cerr << "successful validation" << endl);
1152         } catch (transaction_abort_exception &e) {
1153           naborts++;
1154           if (this->txn_flags & transaction_base::TXN_FLAG_READ_ONLY)
1155             // RO txns shouldn't abort
1156             ALWAYS_ASSERT_COND_IN_TXN(t, false);
1157         }
1158       }
1159     }
1160     bool reverse_;
1161     size_t validations;
1162     size_t naborts;
1163   };
1164 }
1165
1166 template <template <typename> class TxnType, typename Traits>
1167 static void
1168 mp_test2()
1169 {
1170   using namespace mp_test2_ns;
1171   for (size_t txn_flags_idx = 0;
1172        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
1173        txn_flags_idx++) {
1174     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
1175     txn_btree<TxnType> btr;
1176     typename Traits::StringAllocator arena;
1177     {
1178       TxnType<Traits> t(txn_flags, arena);
1179       control_rec ctrl;
1180       for (size_t i = range_begin; i < range_end; i++)
1181         if ((i % 2) == 0) {
1182           btr.insert_object(t, u64_varkey(i), rec(i));
1183           ctrl.values_[ctrl.count_++] = i;
1184         }
1185       ctrl.sanity_check();
1186       btr.insert_object(t, u64_varkey(ctr_key), ctrl);
1187       AssertSuccessfulCommit(t);
1188     }
1189
1190     // XXX(stephentu): HACK! we need to wait for the GC to
1191     // compute a new consistent snapshot version that includes this
1192     // latest update
1193     txn_epoch_sync<TxnType>::sync();
1194
1195     {
1196       // make sure the first validation passes
1197       TxnType<Traits> t(
1198           txn_flags | transaction_base::TXN_FLAG_READ_ONLY, arena);
1199       typename Traits::StringAllocator arena;
1200       string v_ctr;
1201       ALWAYS_ASSERT_COND_IN_TXN(t, btr.search(t, u64_varkey(ctr_key), v_ctr));
1202       ALWAYS_ASSERT_COND_IN_TXN(t, v_ctr.size() == sizeof(control_rec));
1203       counting_scan_callback<TxnType> c;
1204       const u64_varkey kend(range_end);
1205       btr.search_range_call(t, u64_varkey(range_begin), &kend, c);
1206       AssertSuccessfulCommit(t);
1207
1208       const control_rec *crec = (const control_rec *) v_ctr.data();
1209       crec->sanity_check();
1210       auto cvec = crec->vectorize();
1211       if (c.values_ != cvec) {
1212         cerr << "observed: " << c.values_ << endl;
1213         cerr << "db value: " << cvec << endl;
1214         ALWAYS_ASSERT_COND_IN_TXN(t, c.values_ == cvec);
1215       }
1216       VERBOSE(cerr << "initial read only scan passed" << endl);
1217     }
1218
1219     mutate_worker<TxnType, Traits> w0(btr, txn_flags);
1220     //reader_worker<TxnType, Traits> w1(btr, txn_flags);
1221     reader_worker<TxnType, Traits> w2(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY, false);
1222     reader_worker<TxnType, Traits> w3(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY, false);
1223 #ifdef HAVE_REVERSE_RANGE_SCANS
1224     reader_worker<TxnType, Traits> w4(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY, true);
1225 #else
1226     reader_worker<TxnType, Traits> w4(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY, false);
1227 #endif
1228
1229     running = true;
1230     __sync_synchronize();
1231     w0.start();
1232     //w1.start();
1233     w2.start();
1234     w3.start();
1235     w4.start();
1236
1237     sleep(30);
1238     running = false;
1239     __sync_synchronize();
1240     w0.join();
1241     //w1.join();
1242     w2.join();
1243     w3.join();
1244     w4.join();
1245
1246     cerr << "mutate naborts: " << w0.naborts << endl;
1247     //cerr << "reader naborts: " << w1.naborts << endl;
1248     //cerr << "reader validations: " << w1.validations << endl;
1249     cerr << "read-only reader 1 naborts: " << w2.naborts << endl;
1250     cerr << "read-only reader 1 validations: " << w2.validations << endl;
1251     cerr << "read-only reader 2 naborts: " << w3.naborts << endl;
1252     cerr << "read-only reader 2 validations: " << w3.validations << endl;
1253     cerr << "read-only reader 3 naborts: " << w4.naborts << endl;
1254     cerr << "read-only reader 3 validations: " << w4.validations << endl;
1255
1256     txn_epoch_sync<TxnType>::sync();
1257     txn_epoch_sync<TxnType>::finish();
1258   }
1259 }
1260
1261 namespace mp_test3_ns {
1262
1263   static const size_t amount_per_person = 100;
1264   static const size_t naccounts = 100;
1265   static const size_t niters = 1000000;
1266
1267   template <template <typename> class TxnType, typename Traits>
1268   class transfer_worker : public txn_btree_worker<TxnType> {
1269   public:
1270     transfer_worker(txn_btree<TxnType> &btr, uint64_t flags, unsigned long seed)
1271       : txn_btree_worker<TxnType>(btr, flags), seed(seed) {}
1272     virtual void run()
1273     {
1274       fast_random r(seed);
1275       for (size_t i = 0; i < niters; i++) {
1276       retry:
1277         try {
1278           typename Traits::StringAllocator arena;
1279           TxnType<Traits> t(this->txn_flags, arena);
1280           uint64_t a = r.next() % naccounts;
1281           uint64_t b = r.next() % naccounts;
1282           while (unlikely(a == b))
1283             b = r.next() % naccounts;
1284           string arecv, brecv;
1285           ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(a), arecv));
1286           ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(b), brecv));
1287           const rec *arec = (const rec *) arecv.data();
1288           const rec *brec = (const rec *) brecv.data();
1289           if (arec->v == 0) {
1290             t.abort();
1291           } else {
1292             const uint64_t xfer = (arec->v > 1) ? (r.next() % (arec->v - 1) + 1) : 1;
1293             this->btr->insert_object(t, u64_varkey(a), rec(arec->v - xfer));
1294             this->btr->insert_object(t, u64_varkey(b), rec(brec->v + xfer));
1295             t.commit(true);
1296           }
1297         } catch (transaction_abort_exception &e) {
1298           goto retry;
1299         }
1300       }
1301     }
1302   private:
1303     const unsigned long seed;
1304   };
1305
1306   template <template <typename> class TxnType, typename Traits>
1307   class invariant_worker_scan : public txn_btree_worker<TxnType>,
1308                                 public txn_btree<TxnType>::search_range_callback {
1309   public:
1310     invariant_worker_scan(txn_btree<TxnType> &btr, uint64_t flags)
1311       : txn_btree_worker<TxnType>(btr, flags), running(true),
1312         validations(0), naborts(0), sum(0) {}
1313     virtual void run()
1314     {
1315       while (running) {
1316         try {
1317           typename Traits::StringAllocator arena;
1318           TxnType<Traits> t(this->txn_flags, arena);
1319           sum = 0;
1320           this->btr->search_range_call(t, u64_varkey(0), NULL, *this);
1321           t.commit(true);
1322           ALWAYS_ASSERT_COND_IN_TXN(t, sum == (naccounts * amount_per_person));
1323           validations++;
1324         } catch (transaction_abort_exception &e) {
1325           naborts++;
1326         }
1327       }
1328     }
1329     virtual bool invoke(const typename txn_btree<TxnType>::keystring_type &k,
1330                         const string &v)
1331     {
1332       sum += ((rec *) v.data())->v;
1333       return true;
1334     }
1335     volatile bool running;
1336     size_t validations;
1337     size_t naborts;
1338     uint64_t sum;
1339   };
1340
1341   template <template <typename> class TxnType, typename Traits>
1342   class invariant_worker_1by1 : public txn_btree_worker<TxnType> {
1343   public:
1344     invariant_worker_1by1(txn_btree<TxnType> &btr, uint64_t flags)
1345       : txn_btree_worker<TxnType>(btr, flags), running(true),
1346         validations(0), naborts(0) {}
1347     virtual void run()
1348     {
1349       while (running) {
1350         try {
1351           typename Traits::StringAllocator arena;
1352           TxnType<Traits> t(this->txn_flags, arena);
1353           uint64_t sum = 0;
1354           for (uint64_t i = 0; i < naccounts; i++) {
1355             string v;
1356             ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(i), v));
1357             sum += ((const rec *) v.data())->v;
1358           }
1359           t.commit(true);
1360           if (sum != (naccounts * amount_per_person)) {
1361             cerr << "sum: " << sum << endl;
1362             cerr << "naccounts * amount_per_person: " << (naccounts * amount_per_person) << endl;
1363           }
1364           ALWAYS_ASSERT_COND_IN_TXN(t, sum == (naccounts * amount_per_person));
1365           validations++;
1366         } catch (transaction_abort_exception &e) {
1367           naborts++;
1368         }
1369       }
1370     }
1371     volatile bool running;
1372     size_t validations;
1373     size_t naborts;
1374   };
1375
1376 }
1377
1378 template <template <typename> class TxnType, typename Traits>
1379 static void
1380 mp_test3()
1381 {
1382   using namespace mp_test3_ns;
1383
1384   for (size_t txn_flags_idx = 0;
1385        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
1386        txn_flags_idx++) {
1387     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
1388
1389     txn_btree<TxnType> btr;
1390     typename Traits::StringAllocator arena;
1391     {
1392       TxnType<Traits> t(txn_flags, arena);
1393       for (uint64_t i = 0; i < naccounts; i++)
1394         btr.insert_object(t, u64_varkey(i), rec(amount_per_person));
1395       AssertSuccessfulCommit(t);
1396     }
1397
1398     txn_epoch_sync<TxnType>::sync();
1399
1400     transfer_worker<TxnType, Traits> w0(btr, txn_flags, 342),
1401                              w1(btr, txn_flags, 93852),
1402                              w2(btr, txn_flags, 23085),
1403                              w3(btr, txn_flags, 859438989);
1404     invariant_worker_scan<TxnType, Traits> w4(btr, txn_flags);
1405     invariant_worker_1by1<TxnType, Traits> w5(btr, txn_flags);
1406     invariant_worker_scan<TxnType, Traits> w6(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY);
1407     invariant_worker_1by1<TxnType, Traits> w7(btr, txn_flags | transaction_base::TXN_FLAG_READ_ONLY);
1408
1409     w0.start(); w1.start(); w2.start(); w3.start(); w4.start(); w5.start(); w6.start(); w7.start();
1410     w0.join(); w1.join(); w2.join(); w3.join();
1411     w4.running = false; w5.running = false; w6.running = false; w7.running = false;
1412     __sync_synchronize();
1413     w4.join(); w5.join(); w6.join(); w7.join();
1414
1415     cerr << "scan validations: " << w4.validations << ", scan aborts: " << w4.naborts << endl;
1416     cerr << "1by1 validations: " << w5.validations << ", 1by1 aborts: " << w5.naborts << endl;
1417     cerr << "scan-readonly validations: " << w6.validations << ", scan-readonly aborts: " << w6.naborts << endl;
1418     cerr << "1by1-readonly validations: " << w7.validations << ", 1by1-readonly aborts: " << w7.naborts << endl;
1419
1420     txn_epoch_sync<TxnType>::sync();
1421     txn_epoch_sync<TxnType>::finish();
1422   }
1423 }
1424
1425 namespace mp_test_simple_write_skew_ns {
1426   static const size_t NDoctors = 16;
1427
1428   volatile bool running = true;
1429
1430   template <template <typename> class TxnType, typename Traits>
1431   class get_worker : public txn_btree_worker<TxnType> {
1432   public:
1433     get_worker(unsigned int d, txn_btree<TxnType> &btr, uint64_t txn_flags)
1434       : txn_btree_worker<TxnType>(btr, txn_flags), n(0), d(d) {}
1435     virtual void run()
1436     {
1437       while (running) {
1438         try {
1439           typename Traits::StringAllocator arena;
1440           TxnType<Traits> t(this->txn_flags, arena);
1441           if ((n % 2) == 0) {
1442             // try to take this doctor off call
1443             unsigned int ctr = 0;
1444             for (unsigned int i = 0; i < NDoctors && ctr < 2; i++) {
1445               string v;
1446               ALWAYS_ASSERT_COND_IN_TXN(t, this->btr->search(t, u64_varkey(i), v));
1447               INVARIANT(v.size() == sizeof(rec));
1448               const rec *r = (const rec *) v.data();
1449               if (r->v)
1450                 ctr++;
1451             }
1452             if (ctr == 2)
1453               this->btr->insert_object(t, u64_varkey(d), rec(0));
1454             t.commit(true);
1455             ALWAYS_ASSERT_COND_IN_TXN(t, ctr >= 1);
1456           } else {
1457             // place this doctor on call
1458             this->btr->insert_object(t, u64_varkey(d), rec(1));
1459             t.commit(true);
1460           }
1461           n++;
1462         } catch (transaction_abort_exception &e) {
1463           // no-op
1464         }
1465       }
1466     }
1467     uint64_t n;
1468   private:
1469     unsigned int d;
1470   };
1471
1472   template <template <typename> class TxnType, typename Traits>
1473   class scan_worker : public txn_btree_worker<TxnType>,
1474                       public txn_btree<TxnType>::search_range_callback {
1475   public:
1476     scan_worker(unsigned int d, txn_btree<TxnType> &btr, uint64_t txn_flags)
1477       : txn_btree_worker<TxnType>(btr, txn_flags), n(0), d(d), ctr(0) {}
1478     virtual void run()
1479     {
1480       while (running) {
1481         try {
1482           typename Traits::StringAllocator arena;
1483           TxnType<Traits> t(this->txn_flags, arena);
1484           if ((n % 2) == 0) {
1485             ctr = 0;
1486             this->btr->search_range_call(t, u64_varkey(0), NULL, *this);
1487             if (ctr == 2)
1488               this->btr->insert_object(t, u64_varkey(d), rec(0));
1489             t.commit(true);
1490             ALWAYS_ASSERT_COND_IN_TXN(t, ctr >= 1);
1491           } else {
1492             this->btr->insert_object(t, u64_varkey(d), rec(1));
1493             t.commit(true);
1494           }
1495           n++;
1496         } catch (transaction_abort_exception &e) {
1497           // no-op
1498         }
1499       }
1500     }
1501     virtual bool invoke(const typename txn_btree<TxnType>::keystring_type &k,
1502                         const string &v)
1503     {
1504       INVARIANT(v.size() == sizeof(rec));
1505       const rec *r = (const rec *) v.data();
1506       if (r->v)
1507         ctr++;
1508       return ctr < 2;
1509     }
1510     uint64_t n;
1511   private:
1512     unsigned int d;
1513     unsigned int ctr;
1514   };
1515 }
1516
1517 template <template <typename> class TxnType, typename Traits>
1518 static void
1519 mp_test_simple_write_skew()
1520 {
1521   using namespace mp_test_simple_write_skew_ns;
1522
1523   for (size_t txn_flags_idx = 0;
1524        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
1525        txn_flags_idx++) {
1526     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
1527
1528     txn_btree<TxnType> btr;
1529     typename Traits::StringAllocator arena;
1530     {
1531       TxnType<Traits> t(txn_flags, arena);
1532       static_assert(NDoctors >= 2, "XX");
1533       for (uint64_t i = 0; i < NDoctors; i++)
1534         btr.insert_object(t, u64_varkey(i), rec(i < 2 ? 1 : 0));
1535       AssertSuccessfulCommit(t);
1536     }
1537
1538     txn_epoch_sync<TxnType>::sync();
1539
1540     running = true;
1541     __sync_synchronize();
1542     vector<txn_btree_worker<TxnType> *> workers;
1543     for (size_t i = 0; i < NDoctors / 2; i++)
1544       workers.push_back(new get_worker<TxnType, Traits>(i, btr, txn_flags));
1545     for (size_t i = NDoctors / 2; i < NDoctors; i++)
1546       workers.push_back(new scan_worker<TxnType, Traits>(i, btr, txn_flags));
1547     for (size_t i = 0; i < NDoctors; i++)
1548       workers[i]->start();
1549     sleep(10);
1550     running = false;
1551     __sync_synchronize();
1552
1553
1554     size_t n_get_succ = 0, n_scan_succ = 0;
1555     for (size_t i = 0; i < NDoctors; i++) {
1556       workers[i]->join();
1557       if (get_worker<TxnType, Traits> *w = dynamic_cast<get_worker<TxnType, Traits> *>(workers[i]))
1558         n_get_succ += w->n;
1559       else if (scan_worker<TxnType, Traits> *w = dynamic_cast<scan_worker<TxnType, Traits> *>(workers[i]))
1560         n_scan_succ += w->n;
1561       else
1562         ALWAYS_ASSERT(false);
1563       delete workers[i];
1564     }
1565     workers.clear();
1566
1567     cerr << "get_worker  txns: " << n_get_succ << endl;
1568     cerr << "scan_worker txns: " << n_scan_succ << endl;
1569
1570     txn_epoch_sync<TxnType>::sync();
1571     txn_epoch_sync<TxnType>::finish();
1572   }
1573 }
1574
1575 namespace mp_test_batch_processing_ns {
1576
1577   volatile bool running = true;
1578
1579   static inline string
1580   MakeKey(uint32_t batch_id, uint32_t receipt_id)
1581   {
1582     const big_endian_trfm<int32_t> t;
1583     string buf(8, 0);
1584     uint32_t *p = (uint32_t *) &buf[0];
1585     *p++ = t(batch_id);
1586     *p++ = t(receipt_id);
1587     return buf;
1588   }
1589
1590   template <template <typename> class TxnType, typename Traits>
1591   class report_worker : public ndb_thread,
1592                         public txn_btree<TxnType>::search_range_callback {
1593   public:
1594     report_worker(txn_btree<TxnType> &ctrl, txn_btree<TxnType> &receipts, uint64_t txn_flags)
1595       : ctrl(&ctrl), receipts(&receipts), txn_flags(txn_flags), n(0), m(0), sum(0) {}
1596     virtual void run()
1597     {
1598       while (running) {
1599         try {
1600           typename Traits::StringAllocator arena;
1601           TxnType<Traits> t(this->txn_flags, arena);
1602           string v;
1603           ALWAYS_ASSERT_COND_IN_TXN(t, ctrl->search(t, u64_varkey(0), v));
1604           ALWAYS_ASSERT_COND_IN_TXN(t, v.size() == sizeof(rec));
1605           const rec * const r = (const rec *) v.data();
1606           const uint32_t prev_bid = r->v - 1; // prev batch
1607           sum = 0;
1608           const string endkey = MakeKey(prev_bid, numeric_limits<uint32_t>::max());
1609           receipts->search_range_call(t, MakeKey(prev_bid, 0), &endkey, *this);
1610           t.commit(true);
1611           map<uint32_t, uint32_t>::iterator it = reports.find(prev_bid);
1612           if (it != reports.end()) {
1613             ALWAYS_ASSERT_COND_IN_TXN(t, sum == it->second);
1614             m++;
1615           } else {
1616             reports[prev_bid] = sum;
1617           }
1618           n++;
1619         } catch (transaction_abort_exception &e) {
1620           // no-op
1621         }
1622       }
1623     }
1624     virtual bool invoke(const typename txn_btree<TxnType>::keystring_type &k,
1625                         const string &v)
1626     {
1627       INVARIANT(v.size() == sizeof(rec));
1628       const rec * const r = (const rec *) v.data();
1629       sum += r->v;
1630       return true;
1631     }
1632   private:
1633     txn_btree<TxnType> *ctrl;
1634     txn_btree<TxnType> *receipts;
1635     uint64_t txn_flags;
1636
1637   public:
1638     uint64_t n;
1639     uint64_t m;
1640
1641   private:
1642     map<uint32_t, uint32_t> reports;
1643     unsigned int sum;
1644   };
1645
1646   template <template <typename> class TxnType, typename Traits>
1647   class new_receipt_worker : public ndb_thread {
1648   public:
1649     new_receipt_worker(txn_btree<TxnType> &ctrl, txn_btree<TxnType> &receipts, uint64_t txn_flags)
1650       : ctrl(&ctrl), receipts(&receipts), txn_flags(txn_flags),
1651         n(0), last_bid(0), last_rid(0) {}
1652     virtual void run()
1653     {
1654       while (running) {
1655         try {
1656           typename Traits::StringAllocator arena;
1657           TxnType<Traits> t(this->txn_flags, arena);
1658           string v;
1659           ALWAYS_ASSERT_COND_IN_TXN(t, ctrl->search(t, u64_varkey(0), v));
1660           ALWAYS_ASSERT_COND_IN_TXN(t, v.size() == sizeof(rec));
1661           const rec * const r = (const rec *) v.data();
1662           const uint32_t cur_bid = r->v;
1663           const uint32_t cur_rid = (cur_bid != last_bid) ? 0 : last_rid + 1;
1664           const string rkey = MakeKey(cur_bid, cur_rid);
1665           receipts->insert_object(t, rkey, rec(1));
1666           t.commit(true);
1667           last_bid = cur_bid;
1668           last_rid = cur_rid;
1669           n++;
1670         } catch (transaction_abort_exception &e) {
1671           // no-op
1672         }
1673       }
1674     }
1675
1676   private:
1677     txn_btree<TxnType> *ctrl;
1678     txn_btree<TxnType> *receipts;
1679     uint64_t txn_flags;
1680
1681   public:
1682     uint64_t n;
1683
1684   private:
1685     uint32_t last_bid;
1686     uint32_t last_rid;
1687   };
1688
1689   template <template <typename> class TxnType, typename Traits>
1690   class incr_worker : public ndb_thread {
1691   public:
1692     incr_worker(txn_btree<TxnType> &ctrl, txn_btree<TxnType> &receipts, uint64_t txn_flags)
1693       : ctrl(&ctrl), receipts(&receipts), txn_flags(txn_flags), n(0) {}
1694     virtual void run()
1695     {
1696       struct timespec t;
1697       NDB_MEMSET(&t, 0, sizeof(t));
1698       t.tv_nsec = 1000; // 1 us
1699       while (running) {
1700         try {
1701           typename Traits::StringAllocator arena;
1702           TxnType<Traits> t(this->txn_flags, arena);
1703           string v;
1704           ALWAYS_ASSERT_COND_IN_TXN(t, ctrl->search(t, u64_varkey(0), v));
1705           ALWAYS_ASSERT_COND_IN_TXN(t, v.size() == sizeof(rec));
1706           const rec * const r = (const rec *) v.data();
1707           ctrl->insert_object(t, u64_varkey(0), rec(r->v + 1));
1708           t.commit(true);
1709           n++;
1710         } catch (transaction_abort_exception &e) {
1711           // no-op
1712         }
1713         nanosleep(&t, NULL);
1714       }
1715     }
1716   private:
1717     txn_btree<TxnType> *ctrl;
1718     txn_btree<TxnType> *receipts;
1719     uint64_t txn_flags;
1720
1721   public:
1722     uint64_t n;
1723   };
1724 }
1725
1726 template <template <typename> class TxnType, typename Traits>
1727 static void
1728 mp_test_batch_processing()
1729 {
1730   using namespace mp_test_batch_processing_ns;
1731
1732   for (size_t txn_flags_idx = 0;
1733        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
1734        txn_flags_idx++) {
1735     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
1736
1737     txn_btree<TxnType> ctrl;
1738     txn_btree<TxnType> receipts;
1739     typename Traits::StringAllocator arena;
1740     {
1741       TxnType<Traits> t(txn_flags, arena);
1742       ctrl.insert_object(t, u64_varkey(0), rec(1));
1743       AssertSuccessfulCommit(t);
1744     }
1745
1746     txn_epoch_sync<TxnType>::sync();
1747
1748     report_worker<TxnType, Traits> w0(ctrl, receipts, txn_flags);
1749     new_receipt_worker<TxnType, Traits> w1(ctrl, receipts, txn_flags);
1750     incr_worker<TxnType, Traits> w2(ctrl, receipts, txn_flags);
1751     running = true;
1752     __sync_synchronize();
1753     w0.start(); w1.start(); w2.start();
1754     sleep(10);
1755     running = false;
1756     __sync_synchronize();
1757     w0.join(); w1.join(); w2.join();
1758
1759
1760     cerr << "report_worker      txns       : " << w0.n << endl;
1761     cerr << "report_worker      validations: " << w0.m << endl;
1762     cerr << "new_receipt_worker txns       : " << w1.n << endl;
1763     cerr << "incr_worker        txns       : " << w2.n << endl;
1764
1765     txn_epoch_sync<TxnType>::sync();
1766     txn_epoch_sync<TxnType>::finish();
1767   }
1768 }
1769
1770 namespace read_only_perf_ns {
1771   const size_t nkeys = 140000000; // 140M
1772   //const size_t nkeys = 100000; // 100K
1773
1774   unsigned long seeds[] = {
1775     9576455804445224191ULL,
1776     3303315688255411629ULL,
1777     3116364238170296072ULL,
1778     641702699332002535ULL,
1779     17755947590284612420ULL,
1780     13349066465957081273ULL,
1781     16389054441777092823ULL,
1782     2687412585397891607ULL,
1783     16665670053534306255ULL,
1784     5166823197462453937ULL,
1785     1252059952779729626ULL,
1786     17962022827457676982ULL,
1787     940911318964853784ULL,
1788     479878990529143738ULL,
1789     250864516707124695ULL,
1790     8507722621803716653ULL,
1791   };
1792
1793   volatile bool running = false;
1794
1795   template <template <typename> class TxnType, typename Traits>
1796   class worker : public txn_btree_worker<TxnType> {
1797   public:
1798     worker(unsigned int seed, txn_btree<TxnType> &btr, uint64_t txn_flags)
1799       : txn_btree_worker<TxnType>(btr, txn_flags), n(0), seed(seed) {}
1800     virtual void run()
1801     {
1802       fast_random r(seed);
1803       while (running) {
1804         const uint64_t k = r.next() % nkeys;
1805       retry:
1806         try {
1807           typename Traits::StringAllocator arena;
1808           TxnType<Traits> t(this->txn_flags, arena);
1809           string v;
1810           bool found = this->btr->search(t, u64_varkey(k), v);
1811           t.commit(true);
1812           ALWAYS_ASSERT_COND_IN_TXN(t, found);
1813           AssertByteEquality(rec(k + 1), v);
1814         } catch (transaction_abort_exception &e) {
1815           goto retry;
1816         }
1817         n++;
1818       }
1819     }
1820     uint64_t n;
1821   private:
1822     unsigned int seed;
1823   };
1824 }
1825
1826 template <template <typename> class TxnType, typename Traits>
1827 static void
1828 read_only_perf()
1829 {
1830   using namespace read_only_perf_ns;
1831   for (size_t txn_flags_idx = 0;
1832        txn_flags_idx < ARRAY_NELEMS(TxnFlags);
1833        txn_flags_idx++) {
1834     const uint64_t txn_flags = TxnFlags[txn_flags_idx];
1835
1836     txn_btree<TxnType> btr;
1837
1838     {
1839       const size_t nkeyspertxn = 100000;
1840       for (size_t i = 0; i < nkeys / nkeyspertxn; i++) {
1841         TxnType<Traits> t;
1842         const size_t end = (i == (nkeys / nkeyspertxn - 1)) ? nkeys : ((i + 1) * nkeyspertxn);
1843         for (size_t j = i * nkeyspertxn; j < end; j++)
1844           btr.insert_object(t, u64_varkey(j), rec(j + 1));
1845         AssertSuccessfulCommit(t);
1846         cerr << "batch " << i << " completed" << endl;
1847       }
1848       cerr << "btree loaded, test starting" << endl;
1849     }
1850
1851     vector<worker<TxnType, Traits> *> workers;
1852     for (size_t i = 0; i < ARRAY_NELEMS(seeds); i++)
1853       workers.push_back(new worker<TxnType, Traits>(seeds[i], btr, txn_flags));
1854
1855     running = true;
1856     timer t;
1857     COMPILER_MEMORY_FENCE;
1858     for (size_t i = 0; i < ARRAY_NELEMS(seeds); i++)
1859       workers[i]->start();
1860     sleep(30);
1861     COMPILER_MEMORY_FENCE;
1862     running = false;
1863     COMPILER_MEMORY_FENCE;
1864     uint64_t total_n = 0;
1865     for (size_t i = 0; i < ARRAY_NELEMS(seeds); i++) {
1866       workers[i]->join();
1867       total_n += workers[i]->n;
1868       delete workers[i];
1869     }
1870
1871     double agg_throughput = double(total_n) / (double(t.lap()) / 1000000.0);
1872     double avg_per_core_throughput = agg_throughput / double(ARRAY_NELEMS(seeds));
1873
1874     cerr << "agg_read_throughput: " << agg_throughput << " gets/sec" << endl;
1875     cerr << "avg_per_core_read_throughput: " << avg_per_core_throughput << " gets/sec/core" << endl;
1876
1877     txn_epoch_sync<TxnType>::sync();
1878     txn_epoch_sync<TxnType>::finish();
1879   }
1880 }
1881
1882 void txn_btree_test()
1883 {
1884   cerr << "Test proto2" << endl;
1885   test_typed_btree<transaction_proto2, default_stable_transaction_traits>();
1886   test1<transaction_proto2, default_transaction_traits>();
1887   test2<transaction_proto2, default_transaction_traits>();
1888   test_absent_key_race<transaction_proto2, default_transaction_traits>();
1889   test_inc_value_size<transaction_proto2, default_transaction_traits>();
1890   test_multi_btree<transaction_proto2, default_transaction_traits>();
1891   test_read_only_snapshot<transaction_proto2, default_transaction_traits>();
1892   test_long_keys<transaction_proto2, default_transaction_traits>();
1893   test_long_keys2<transaction_proto2, default_transaction_traits>();
1894   test_insert_same_key<transaction_proto2, default_transaction_traits>();
1895
1896   //mp_stress_test_allocator<transaction_proto2, default_transaction_traits>();
1897   mp_stress_test_insert_removes<transaction_proto2, default_transaction_traits>();
1898   mp_test1<transaction_proto2, default_transaction_traits>();
1899   mp_test2<transaction_proto2, default_transaction_traits>();
1900   mp_test3<transaction_proto2, default_transaction_traits>();
1901   mp_test_simple_write_skew<transaction_proto2, default_transaction_traits>();
1902   mp_test_batch_processing<transaction_proto2, default_transaction_traits>();
1903
1904   //read_only_perf<transaction_proto1>();
1905   //read_only_perf<transaction_proto2>();
1906 }