update readme
[c11concurrency-benchmarks.git] / silo / persist_test.cc
1 /**
2  * A stand-alone binary which doesn't depend on the system,
3  * used to test the current persistence strategy
4  */
5
6 #include <cassert>
7 #include <iostream>
8 #include <cstdint>
9 #include <random>
10 #include <vector>
11 #include <set>
12 #include <atomic>
13 #include <thread>
14 #include <sstream>
15
16 #include <unistd.h>
17 #include <sys/uio.h>
18 #include <sys/types.h>
19 #include <fcntl.h>
20 #include <getopt.h>
21 #include <time.h>
22
23 #include <lz4.h>
24
25 #include "macros.h"
26 #include "circbuf.h"
27 #include "amd64.h"
28 #include "record/serializer.h"
29 #include "util.h"
30
31 using namespace std;
32 using namespace util;
33
34 struct tidhelpers {
35   // copied from txn_proto2_impl.h
36
37   static const uint64_t NBitsNumber = 24;
38
39   static const size_t CoreBits = NMAXCOREBITS; // allow 2^CoreShift distinct threads
40   static const size_t NMaxCores = NMAXCORES;
41
42   static const uint64_t CoreMask = (NMaxCores - 1);
43
44   static const uint64_t NumIdShift = CoreBits;
45   static const uint64_t NumIdMask = ((((uint64_t)1) << NBitsNumber) - 1) << NumIdShift;
46
47   static const uint64_t EpochShift = CoreBits + NBitsNumber;
48   static const uint64_t EpochMask = ((uint64_t)-1) << EpochShift;
49
50   static inline
51   uint64_t CoreId(uint64_t v)
52   {
53     return v & CoreMask;
54   }
55
56   static inline
57   uint64_t NumId(uint64_t v)
58   {
59     return (v & NumIdMask) >> NumIdShift;
60   }
61
62   static inline
63   uint64_t EpochId(uint64_t v)
64   {
65     return (v & EpochMask) >> EpochShift;
66   }
67
68   static inline
69   uint64_t MakeTid(uint64_t core_id, uint64_t num_id, uint64_t epoch_id)
70   {
71     // some sanity checking
72     static_assert((CoreMask | NumIdMask | EpochMask) == ((uint64_t)-1), "xx");
73     static_assert((CoreMask & NumIdMask) == 0, "xx");
74     static_assert((NumIdMask & EpochMask) == 0, "xx");
75     return (core_id) | (num_id << NumIdShift) | (epoch_id << EpochShift);
76   }
77
78   static uint64_t
79   vecidmax(uint64_t coremax, const vector<uint64_t> &v)
80   {
81     uint64_t ret = NumId(coremax);
82     for (size_t i = 0; i < v.size(); i++)
83       ret = max(ret, NumId(v[i]));
84     return ret;
85   }
86
87   static string
88   Str(uint64_t v)
89   {
90     ostringstream b;
91     b << "[core=" << CoreId(v) << " | n="
92       << NumId(v) << " | epoch="
93       << EpochId(v) << "]";
94     return b.str();
95   }
96
97 };
98
99 //static void
100 //fillstring(std::string &s, size_t t)
101 //{
102 //  s.clear();
103 //  for (size_t i = 0; i < t; i++)
104 //    s[i] = (char) i;
105 //}
106
107 template <typename PRNG>
108 static inline void
109 fillkey(std::string &s, uint64_t idx, size_t sz, PRNG &prng)
110 {
111   s.resize(sz);
112   serializer<uint64_t, false> ser;
113   ser.write((uint8_t *) s.data(), idx);
114 }
115
116 template <typename PRNG>
117 static inline void
118 fillvalue(std::string &s, uint64_t idx, size_t sz, PRNG &prng)
119 {
120   uniform_int_distribution<uint32_t> dist(0, 10000);
121   s.resize(sz);
122   serializer<uint32_t, false> s_uint32_t;
123   for (size_t i = 0; i < sz; i += sizeof(uint32_t)) {
124     if (i + sizeof(uint32_t) <= sz) {
125       const uint32_t x = dist(prng);
126       s_uint32_t.write((uint8_t *) &s[i], x);
127     }
128   }
129 }
130
131 /** simulate global database state */
132
133 static const size_t g_nrecords = 1000000;
134 static const size_t g_ntxns_worker = 1000000;
135 static const size_t g_nmax_loggers = 16;
136
137 static vector<uint64_t> g_database;
138 static atomic<uint64_t> g_ntxns_committed(0);
139 static atomic<uint64_t> g_ntxns_written(0);
140 static atomic<uint64_t> g_bytes_written[g_nmax_loggers];
141
142 static size_t g_nworkers = 1;
143 static int g_verbose = 0;
144 static int g_fsync_background = 0;
145 static size_t g_readset = 30;
146 static size_t g_writeset = 16;
147 static size_t g_keysize = 8; // in bytes
148 static size_t g_valuesize = 32; // in bytes
149
150 /** simulation framework */
151
152 // all simulations are epoch based
153 class database_simulation {
154 public:
155   static const unsigned long g_epoch_time_ns = 30000000; /* 30ms in ns */
156
157   database_simulation()
158     : keep_going_(true),
159       epoch_thread_(),
160       epoch_number_(1), // start at 1 so 0 can be fully persistent initially
161       system_sync_epoch_(0)
162   {
163     // XXX: depends on g_nworkers to be set by now
164     for (size_t i = 0; i < g_nworkers; i++)
165       per_thread_epochs_[i]->store(1, memory_order_release);
166     for (size_t i = 0; i < g_nmax_loggers; i++)
167       for (size_t j = 0; j < g_nworkers; j++)
168         per_thread_sync_epochs_[i].epochs_[j].store(0, memory_order_release);
169   }
170
171   virtual ~database_simulation() {}
172
173   virtual void
174   init()
175   {
176     epoch_thread_ = move(thread(&database_simulation::epoch_thread, this));
177   }
178
179   virtual void worker(unsigned id) = 0;
180
181   virtual void logger(const vector<int> &fd,
182                       const vector<vector<unsigned>> &assignments) = 0;
183
184   virtual void
185   terminate()
186   {
187     keep_going_->store(false, memory_order_release);
188     epoch_thread_.join();
189   }
190
191   static bool
192   AssignmentsValid(const vector<vector<unsigned>> &assignments,
193                    unsigned nfds,
194                    unsigned nworkers)
195   {
196     // each worker must be assigned exactly once in the assignment
197     // there must be <= nfds assignments
198
199     if (assignments.size() > nfds)
200       return false;
201
202     set<unsigned> seen;
203     for (auto &assignment : assignments)
204       for (auto w : assignment) {
205         if (seen.count(w) || w >= nworkers)
206           return false;
207         seen.insert(w);
208       }
209
210     return seen.size() == nworkers;
211   }
212
213 protected:
214   void
215   epoch_thread()
216   {
217     while (keep_going_->load(memory_order_acquire)) {
218       struct timespec t;
219       t.tv_sec  = g_epoch_time_ns / ONE_SECOND_NS;
220       t.tv_nsec = g_epoch_time_ns % ONE_SECOND_NS;
221       nanosleep(&t, nullptr);
222
223       // make sure all threads are at the current epoch
224       const uint64_t curepoch = epoch_number_->load(memory_order_acquire);
225
226     retry:
227       bool allthere = true;
228       for (size_t i = 0;
229            i < g_nworkers && keep_going_->load(memory_order_acquire);
230            i++) {
231         if (per_thread_epochs_[i]->load(memory_order_acquire) < curepoch) {
232           allthere = false;
233           break;
234         }
235       }
236       if (!keep_going_->load(memory_order_acquire))
237         return;
238       if (!allthere) {
239         nop_pause();
240         goto retry;
241       }
242
243       //cerr << "bumping epoch" << endl;
244       epoch_number_->store(curepoch + 1, memory_order_release); // bump it
245     }
246   }
247
248   aligned_padded_elem<atomic<bool>> keep_going_;
249
250   thread epoch_thread_;
251
252   aligned_padded_elem<atomic<uint64_t>> epoch_number_;
253
254   aligned_padded_elem<atomic<uint64_t>> per_thread_epochs_[NMAXCORES];
255
256   // v = per_thread_sync_epochs_[i].epochs_[j]: logger i has persisted up
257   // through (including) all transactions <= epoch v on core j. since core =>
258   // logger mapping is static, taking:
259   //   min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
260   // yields the entire system's persistent epoch
261   struct {
262     atomic<uint64_t> epochs_[NMAXCORES];
263     CACHE_PADOUT;
264   } per_thread_sync_epochs_[g_nmax_loggers] CACHE_ALIGNED;
265
266   // conservative estimate (<=) for:
267   //   min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
268   aligned_padded_elem<atomic<uint64_t>> system_sync_epoch_;
269 };
270
271 struct logbuf_header {
272   uint64_t nentries_; // > 0 for all valid log buffers
273   uint64_t last_tid_; // TID of the last commit
274 } PACKED;
275
276 struct pbuffer {
277   bool io_scheduled_; // has the logger scheduled IO yet?
278   size_t curoff_; // current offset into buf_, either for writing
279                   // or during the dep computation phase
280   size_t remaining_; // number of deps remaining to compute
281   std::string buf_; // the actual buffer, of size g_buffer_size
282
283   inline uint8_t *
284   pointer()
285   {
286     return (uint8_t *) buf_.data() + curoff_;
287   }
288
289   inline logbuf_header *
290   header()
291   {
292     return (logbuf_header *) buf_.data();
293   }
294
295   inline const logbuf_header *
296   header() const
297   {
298     return (const logbuf_header *) buf_.data();
299   }
300 };
301
302 class onecopy_logbased_simulation : public database_simulation {
303 public:
304   static const size_t g_perthread_buffers = 64; // 64 outstanding buffers
305   static const size_t g_buffer_size = (1<<20); // in bytes
306   static const size_t g_horizon_size = (1<<16); // in bytes, for compression only
307
308   static circbuf<pbuffer, g_perthread_buffers> g_all_buffers[NMAXCORES];
309   static circbuf<pbuffer, g_perthread_buffers> g_persist_buffers[NMAXCORES];
310
311 protected:
312
313   virtual const uint8_t *
314   read_log_entry(const uint8_t *p, uint64_t &tid,
315                  std::function<void(uint64_t)> readfunctor) = 0;
316
317   virtual uint64_t
318   compute_log_record_space() const = 0;
319
320   virtual void
321   write_log_record(uint8_t *p,
322                    uint64_t tidcommit,
323                    const vector<uint64_t> &readset,
324                    const vector<pair<string, string>> &writeset) = 0;
325
326   virtual void
327   logger_on_io_completion() {}
328
329   virtual bool
330   do_compression() const = 0;
331
332   pbuffer *
333   getbuffer(unsigned id)
334   {
335     // block until we get a buf
336     pbuffer *ret = g_all_buffers[id].deq();
337     ret->io_scheduled_ = false;
338     ret->buf_.assign(g_buffer_size, 0);
339     ret->curoff_ = sizeof(logbuf_header);
340     ret->remaining_ = 0;
341     return ret;
342   }
343
344 public:
345   void
346   init() OVERRIDE
347   {
348     database_simulation::init();
349     for (size_t i = 0; i < g_nworkers; i++) {
350       for (size_t j = 0; j < g_perthread_buffers; j++) {
351         struct pbuffer *p = new pbuffer;
352         g_all_buffers[i].enq(p);
353       }
354     }
355   }
356
357 private:
358   inline size_t
359   inplace_update_persistent_info(
360       vector<pair<uint64_t, uint64_t>> &outstanding_commits,
361       uint64_t cursyncepoch)
362   {
363     size_t ncommits_synced = 0;
364     // can erase all entries with x.first <= cursyncepoch
365     size_t idx = 0;
366     for (; idx < outstanding_commits.size(); idx++) {
367       if (outstanding_commits[idx].first <= cursyncepoch)
368         ncommits_synced += outstanding_commits[idx].second;
369       else
370         break;
371     }
372
373     // erase entries [0, idx)
374     // XXX: slow
375     outstanding_commits.erase(outstanding_commits.begin(),
376         outstanding_commits.begin() + idx);
377
378     return ncommits_synced;
379   }
380
381   inline pbuffer *
382   ensure_buffer_with_space(unsigned id, pbuffer *cur, size_t space_needed)
383   {
384     if (!cur) {
385       cur = getbuffer(id);
386     } else if (g_buffer_size - cur->curoff_ < space_needed) {
387       g_persist_buffers[id].enq(cur);
388       cur = getbuffer(id);
389     }
390     INVARIANT(cur);
391     INVARIANT(g_buffer_size - cur->curoff_ >= space_needed);
392     return cur;
393   }
394
395   /**
396    * write the horizon from [p, p+sz) into cur, assuming that cur has enough
397    * space. space needed is at least:
398    *   sizeof(uint32_t) + LZ4_compressBound(sz)
399    *
400    * also updates the buffer's headers and offset to reflect the write
401    *
402    * returns the compressed size of the horizon
403    */
404   inline uint64_t
405   write_horizon(void *lz4ctx,
406                 const uint8_t *p, uint64_t sz,
407                 uint64_t nentries, uint64_t lasttid,
408                 pbuffer *cur)
409   {
410 #ifdef CHECK_INVARIANTS
411     const uint64_t needed = sizeof(uint32_t) + LZ4_compressBound(sz);
412     INVARIANT(g_buffer_size - cur->curoff_ >= needed);
413 #endif
414
415     const int ret = LZ4_compress_heap(
416         lz4ctx,
417         (const char *) p,
418         (char *) cur->pointer() + sizeof(uint32_t),
419         sz);
420
421     INVARIANT(ret >= 0);
422     serializer<uint32_t, false> s_uint32_t;
423     s_uint32_t.write(cur->pointer(), ret);
424     cur->curoff_ += sizeof(uint32_t) + ret;
425     cur->header()->nentries_ += nentries;
426     cur->header()->last_tid_ = lasttid;
427
428     return ret;
429   }
430
431 protected:
432   void
433   worker(unsigned id) OVERRIDE
434   {
435     const bool compress = do_compression();
436     uint8_t horizon[g_horizon_size]; // LZ4 looks at 65kb windows
437
438     // where are we in the window, how many elems in this window?
439     size_t horizon_p = 0, horizon_nentries = 0;
440     uint64_t horizon_last_tid = 0; // last committed TID in the horizon
441
442     double cratios = 0.0;
443     unsigned long ncompressions = 0;
444
445     void *lz4ctx = nullptr; // holds a heap-allocated LZ4 hash table
446     if (compress)
447       lz4ctx = LZ4_create();
448
449     mt19937 prng(id);
450
451     // read/write sets are uniform for now
452     uniform_int_distribution<unsigned> dist(0, g_nrecords - 1);
453
454     vector<uint64_t> readset(g_readset);
455     vector<pair<string, string>> writeset(g_writeset);
456     for (auto &pr : writeset) {
457       pr.first.reserve(g_keysize);
458       pr.second.reserve(g_valuesize);
459     }
460
461     struct pbuffer *curbuf = nullptr;
462     uint64_t lasttid = 0,
463              ncommits_currentepoch = 0,
464              ncommits_synced = 0;
465     vector<pair<uint64_t, uint64_t>> outstanding_commits;
466     for (size_t i = 0; i < g_ntxns_worker; i++) {
467
468       // update epoch info
469       const uint64_t lastepoch = per_thread_epochs_[id]->load(memory_order_acquire);
470       const uint64_t curepoch = epoch_number_->load(memory_order_acquire);
471
472       if (lastepoch != curepoch) {
473         // try to sync outstanding commits
474         INVARIANT(curepoch == (lastepoch + 1));
475         const size_t cursyncepoch = system_sync_epoch_->load(memory_order_acquire);
476         ncommits_synced +=
477           inplace_update_persistent_info(outstanding_commits, cursyncepoch);
478
479         // add information about the last epoch
480         outstanding_commits.emplace_back(lastepoch, ncommits_currentepoch);
481         ncommits_currentepoch = 0;
482
483         per_thread_epochs_[id]->store(curepoch, memory_order_release);
484       }
485
486       for (size_t j = 0; j < g_readset; j++)
487         readset[j] = g_database[dist(prng)];
488
489       const uint64_t idmax = tidhelpers::vecidmax(lasttid, readset);
490       // XXX: ignore future epochs for now
491       const uint64_t tidcommit = tidhelpers::MakeTid(id, idmax + 1, curepoch);
492       lasttid = tidcommit;
493
494       for (size_t j = 0; j < g_writeset; j++) {
495         auto idx = dist(prng);
496         g_database[idx] = lasttid;
497         fillkey(writeset[j].first, idx, g_keysize, prng);
498         fillvalue(writeset[j].second, idx, g_valuesize, prng);
499       }
500
501       const uint64_t space_needed = compute_log_record_space();
502       if (compress) {
503         if (horizon_p + space_needed > g_horizon_size) {
504           // need to compress and write horizon
505           curbuf = ensure_buffer_with_space(id, curbuf,
506             sizeof(uint32_t) + LZ4_compressBound(horizon_p));
507
508           const uint64_t compsz =
509             write_horizon(lz4ctx, &horizon[0], horizon_p,
510                           horizon_nentries, horizon_last_tid,
511                           curbuf);
512
513           const double cratio = double(horizon_p) / double(compsz);
514           cratios += cratio;
515           ncompressions++;
516
517           // can reset horizon
518           horizon_p = horizon_nentries = horizon_last_tid = 0;
519         }
520
521         write_log_record(&horizon[0] + horizon_p, tidcommit, readset, writeset);
522         horizon_p += space_needed;
523         horizon_nentries++;
524         horizon_last_tid = tidcommit;
525         ncommits_currentepoch++;
526       } else {
527         curbuf = ensure_buffer_with_space(id, curbuf, space_needed);
528         uint8_t *p = curbuf->pointer();
529         write_log_record(p, tidcommit, readset, writeset);
530         //cerr << "write tidcommit=" << tidhelpers::Str(tidcommit) << endl;
531         curbuf->curoff_ += space_needed;
532         curbuf->header()->nentries_++;
533         curbuf->header()->last_tid_ = tidcommit;
534         ncommits_currentepoch++;
535       }
536     }
537
538     if (compress) {
539       if (horizon_nentries) {
540         curbuf = ensure_buffer_with_space(id, curbuf,
541             sizeof(uint32_t) + LZ4_compressBound(horizon_p));
542
543         const uint64_t compsz =
544           write_horizon(lz4ctx, &horizon[0], horizon_p,
545                         horizon_nentries, horizon_last_tid,
546                         curbuf);
547
548         const double cratio = double(horizon_p) / double(compsz);
549         cratios += cratio;
550         ncompressions++;
551
552         horizon_p = horizon_nentries = horizon_last_tid = 0;
553       }
554       LZ4_free(lz4ctx);
555     }
556
557     if (curbuf) {
558       // XXX: hacky - an agreed upon future epoch for all threads to converge
559       // on upon finishing
560       const uint64_t FutureEpoch = 100000;
561       const uint64_t waitfor = tidhelpers::EpochId(
562           curbuf->header()->last_tid_);
563       INVARIANT(per_thread_epochs_[id]->load(memory_order_acquire) == waitfor);
564       ALWAYS_ASSERT(waitfor < FutureEpoch);
565       curbuf->header()->last_tid_ =
566         tidhelpers::MakeTid(id, 0, FutureEpoch);
567       g_persist_buffers[id].enq(curbuf);
568       outstanding_commits.emplace_back(waitfor, ncommits_currentepoch);
569       //cerr << "worker " << id << " waitfor epoch " << waitfor << endl;
570       // get these commits persisted
571       while (system_sync_epoch_->load(memory_order_acquire) < waitfor)
572         nop_pause();
573       ncommits_synced +=
574         inplace_update_persistent_info(outstanding_commits, waitfor);
575       ALWAYS_ASSERT(outstanding_commits.empty());
576     }
577
578     if (g_verbose && compress)
579       cerr << "Average compression ratio: " << cratios / double(ncompressions) << endl;
580
581     g_ntxns_committed.fetch_add(ncommits_synced, memory_order_release);
582   }
583
584 private:
585   void
586   fsyncer(unsigned id, int fd, one_way_post<int> &channel)
587   {
588     for (;;) {
589       int ret;
590       channel.peek(ret);
591       if (ret == -1)
592         return;
593       ret = fdatasync(fd);
594       if (ret == -1) {
595         perror("fdatasync");
596         exit(1);
597       }
598       channel.consume(ret);
599     }
600   }
601
602   void
603   writer(unsigned id, int fd, const vector<unsigned> &assignment)
604   {
605     vector<iovec> iovs(g_nworkers * g_perthread_buffers);
606     vector<pbuffer *> pxs;
607     struct timespec last_io_completed;
608     one_way_post<int> *channel =
609       g_fsync_background ? new one_way_post<int> : nullptr;
610     uint64_t total_nbytes_written = 0,
611              total_txns_written = 0;
612
613     bool sense = false; // cur is at sense, prev is at !sense
614     uint64_t nbytes_written[2], txns_written[2], epoch_prefixes[2][g_nworkers];
615     memset(&nbytes_written[0], 0, sizeof(nbytes_written));
616     memset(&txns_written[0], 0, sizeof(txns_written));
617     memset(&epoch_prefixes[0], 0, sizeof(epoch_prefixes[0]));
618     memset(&epoch_prefixes[1], 0, sizeof(epoch_prefixes[1]));
619
620     clock_gettime(CLOCK_MONOTONIC, &last_io_completed);
621     thread fsync_thread;
622     if (g_fsync_background) {
623       fsync_thread = move(thread(
624             &onecopy_logbased_simulation::fsyncer, this, id, fd, ref(*channel)));
625       fsync_thread.detach();
626     }
627
628     while (keep_going_->load(memory_order_acquire)) {
629
630       // don't allow this loop to proceed less than an epoch's worth of time,
631       // so we can batch IO
632       struct timespec now, diff;
633       clock_gettime(CLOCK_MONOTONIC, &now);
634       timespec_utils::subtract(&now, &last_io_completed, &diff);
635       if (diff.tv_sec == 0 && diff.tv_nsec < long(g_epoch_time_ns)) {
636         // need to sleep it out
637         struct timespec ts;
638         ts.tv_sec = 0;
639         ts.tv_nsec = g_epoch_time_ns - diff.tv_nsec;
640         nanosleep(&ts, nullptr);
641       }
642       clock_gettime(CLOCK_MONOTONIC, &last_io_completed);
643
644       size_t nwritten = 0;
645       nbytes_written[sense] = txns_written[sense] = 0;
646       for (auto idx : assignment) {
647         INVARIANT(idx >= 0 && idx < g_nworkers);
648         g_persist_buffers[idx].peekall(pxs);
649         for (auto px : pxs) {
650           INVARIANT(px);
651           INVARIANT(!px->io_scheduled_);
652           iovs[nwritten].iov_base = (void *) px->buf_.data();
653           iovs[nwritten].iov_len = px->curoff_;
654           nbytes_written[sense] += px->curoff_;
655           px->io_scheduled_ = true;
656           px->curoff_ = sizeof(logbuf_header);
657           px->remaining_ = px->header()->nentries_;
658           txns_written[sense] += px->header()->nentries_;
659           nwritten++;
660           INVARIANT(tidhelpers::CoreId(px->header()->last_tid_) == idx);
661           INVARIANT(epoch_prefixes[sense][idx] <=
662                     tidhelpers::EpochId(px->header()->last_tid_));
663           INVARIANT(tidhelpers::EpochId(px->header()->last_tid_) > 0);
664           epoch_prefixes[sense][idx] =
665             tidhelpers::EpochId(px->header()->last_tid_) - 1;
666         }
667       }
668
669       if (!nwritten) {
670         // XXX: should probably sleep here
671         nop_pause();
672         if (!g_fsync_background || !channel->can_post()) {
673           //cerr << "writer skipping because no work to do" << endl;
674           continue;
675         }
676       }
677
678       //cerr << "writer " << id << " nwritten " << nwritten << endl;
679
680       const ssize_t ret =
681         nwritten ? writev(fd, &iovs[0], nwritten) : 0;
682       if (ret == -1) {
683         perror("writev");
684         exit(1);
685       }
686
687       bool dosense;
688       if (g_fsync_background) {
689         // wait for fsync from the previous write
690         if (nwritten)
691           channel->post(0, true);
692         else
693           INVARIANT(channel->can_post());
694         dosense = !sense;
695       } else {
696         int ret = fdatasync(fd);
697         if (ret == -1) {
698           perror("fdatasync");
699           exit(1);
700         }
701         dosense = sense;
702       }
703
704       // update metadata from previous write
705       for (size_t i = 0; i < g_nworkers; i++) {
706         const uint64_t x0 =
707           per_thread_sync_epochs_[id].epochs_[i].load(memory_order_acquire);
708         const uint64_t x1 = epoch_prefixes[dosense][i];
709         if (x1 > x0)
710           per_thread_sync_epochs_[id].epochs_[i].store(
711               x1, memory_order_release);
712       }
713       total_nbytes_written += nbytes_written[dosense];
714       total_txns_written += txns_written[dosense];
715
716       // bump the sense
717       sense = !sense;
718
719       // return all buffers that have been io_scheduled_ - we can do this as
720       // soon as write returns
721       for (auto idx : assignment) {
722         pbuffer *px;
723         while ((px = g_persist_buffers[idx].peek()) &&
724                px->io_scheduled_) {
725           g_persist_buffers[idx].deq();
726           g_all_buffers[idx].enq(px);
727         }
728       }
729     }
730
731     g_bytes_written[id].store(total_nbytes_written, memory_order_release);
732     g_ntxns_written.fetch_add(total_txns_written, memory_order_release);
733   }
734
735   inline void
736   advance_system_sync_epoch(const vector<vector<unsigned>> &assignments)
737   {
738     uint64_t min_so_far = numeric_limits<uint64_t>::max();
739     for (size_t i = 0; i < assignments.size(); i++)
740       for (auto j : assignments[i])
741         min_so_far =
742           min(per_thread_sync_epochs_[i].epochs_[j].load(memory_order_acquire), min_so_far);
743
744 #ifdef CHECK_INVARIANTS
745     const uint64_t syssync = system_sync_epoch_->load(memory_order_acquire);
746     INVARIANT(syssync <= min_so_far);
747 #endif
748     system_sync_epoch_->store(min_so_far, memory_order_release);
749   }
750
751 public:
752   void
753   logger(const vector<int> &fds,
754          const vector<vector<unsigned>> &assignments_given) OVERRIDE
755   {
756     // compute thread => logger assignment
757     vector<thread> writers;
758     vector<vector<unsigned>> assignments(assignments_given);
759
760     if (assignments.empty()) {
761       // compute assuming homogenous disks
762       if (g_nworkers <= fds.size()) {
763         // each thread gets its own logging worker
764         for (size_t i = 0; i < g_nworkers; i++)
765           assignments.push_back({(unsigned) i});
766       } else {
767         // XXX: currently we assume each logger is equally as fast- we should
768         // adjust ratios accordingly for non-homogenous loggers
769         const size_t threads_per_logger = g_nworkers / fds.size();
770         for (size_t i = 0; i < fds.size(); i++) {
771           assignments.emplace_back(
772             MakeRange<unsigned>(
773                 i * threads_per_logger,
774                 ((i + 1) == fds.size()) ?
775                   g_nworkers :
776                   (i + 1) * threads_per_logger));
777         }
778       }
779     }
780
781     INVARIANT(AssignmentsValid(assignments, fds.size(), g_nworkers));
782
783     timer tt;
784     for (size_t i = 0; i < assignments.size(); i++)
785       writers.emplace_back(
786         &onecopy_logbased_simulation::writer,
787         this, i, fds[i], ref(assignments[i]));
788     if (g_verbose)
789       cerr << "assignments: " << assignments << endl;
790     while (keep_going_->load(memory_order_acquire)) {
791       // periodically compute which epoch is the persistence epoch,
792       // and update system_sync_epoch_
793
794       struct timespec t;
795       t.tv_sec  = g_epoch_time_ns / ONE_SECOND_NS;
796       t.tv_nsec = g_epoch_time_ns % ONE_SECOND_NS;
797       nanosleep(&t, nullptr);
798
799       advance_system_sync_epoch(assignments);
800     }
801
802     for (auto &t : writers)
803       t.join();
804
805     if (g_verbose) {
806       cerr << "current epoch: " << epoch_number_->load(memory_order_acquire) << endl;
807       cerr << "sync epoch   : " << system_sync_epoch_->load(memory_order_acquire) << endl;
808       const double xsec = tt.lap_ms() / 1000.0;
809       for (size_t i = 0; i < writers.size(); i++)
810         cerr << "writer " << i << " " <<
811           (double(g_bytes_written[i].load(memory_order_acquire)) /
812            double(1UL << 20) /
813            xsec) << " MB/sec" << endl;
814     }
815   }
816
817 protected:
818   vector<pbuffer *> pxs_; // just some scratch space
819 };
820
821 circbuf<pbuffer, onecopy_logbased_simulation::g_perthread_buffers>
822   onecopy_logbased_simulation::g_all_buffers[NMAXCORES];
823 circbuf<pbuffer, onecopy_logbased_simulation::g_perthread_buffers>
824   onecopy_logbased_simulation::g_persist_buffers[NMAXCORES];
825
826 class explicit_deptracking_simulation : public onecopy_logbased_simulation {
827 public:
828
829   /** global state about our persistence calculations */
830
831   // contains the latest TID inclusive, per core, which is (transitively)
832   // persistent. note that the prefix of the DB which is totally persistent is
833   // simply the max of this table.
834   static uint64_t g_persistence_vc[NMAXCORES];
835
836 protected:
837
838   bool do_compression() const OVERRIDE { return false; }
839
840   const uint8_t *
841   read_log_entry(const uint8_t *p, uint64_t &tid,
842                  std::function<void(uint64_t)> readfunctor) OVERRIDE
843   {
844     serializer<uint8_t, false> s_uint8_t;
845     serializer<uint64_t, false> s_uint64_t;
846
847     uint8_t readset_sz, writeset_sz, key_sz, value_sz;
848     uint64_t v;
849
850     p = s_uint64_t.read(p, &tid);
851     p = s_uint8_t.read(p, &readset_sz);
852     INVARIANT(size_t(readset_sz) == g_readset);
853     for (size_t i = 0; i < size_t(readset_sz); i++) {
854       p = s_uint64_t.read(p, &v);
855       readfunctor(v);
856     }
857
858     p = s_uint8_t.read(p, &writeset_sz);
859     INVARIANT(size_t(writeset_sz) == g_writeset);
860     for (size_t i = 0; i < size_t(writeset_sz); i++) {
861       p = s_uint8_t.read(p, &key_sz);
862       INVARIANT(size_t(key_sz) == g_keysize);
863       p += size_t(key_sz);
864       p = s_uint8_t.read(p, &value_sz);
865       INVARIANT(size_t(value_sz) == g_valuesize);
866       p += size_t(value_sz);
867     }
868
869     return p;
870   }
871
872   uint64_t
873   compute_log_record_space() const OVERRIDE
874   {
875     // compute how much space we need for this entry
876     uint64_t space_needed = 0;
877
878     // 8 bytes to indicate TID
879     space_needed += sizeof(uint64_t);
880
881     // one byte to indicate # of read deps
882     space_needed += 1;
883
884     // each dep occupies 8 bytes
885     space_needed += g_readset * sizeof(uint64_t);
886
887     // one byte to indicate # of records written
888     space_needed += 1;
889
890     // each record occupies (1 + key_length + 1 + value_length) bytes
891     space_needed += g_writeset * (1 + g_keysize + 1 + g_valuesize);
892
893     return space_needed;
894   }
895
896   void
897   write_log_record(uint8_t *p,
898                    uint64_t tidcommit,
899                    const vector<uint64_t> &readset,
900                    const vector<pair<string, string>> &writeset) OVERRIDE
901   {
902     serializer<uint8_t, false> s_uint8_t;
903     serializer<uint64_t, false> s_uint64_t;
904
905     p = s_uint64_t.write(p, tidcommit);
906     p = s_uint8_t.write(p, readset.size());
907     for (auto t : readset)
908       p = s_uint64_t.write(p, t);
909     p = s_uint8_t.write(p, writeset.size());
910     for (auto &pr : writeset) {
911       p = s_uint8_t.write(p, pr.first.size());
912       memcpy(p, pr.first.data(), pr.first.size()); p += pr.first.size();
913       p = s_uint8_t.write(p, pr.second.size());
914       memcpy(p, pr.second.data(), pr.second.size()); p += pr.second.size();
915     }
916   }
917
918   void
919   logger_on_io_completion() OVERRIDE
920   {
921     ALWAYS_ASSERT(false); // currently broken
922     bool changed = true;
923     while (changed) {
924       changed = false;
925       for (size_t i = 0; i < NMAXCORES; i++) {
926         g_persist_buffers[i].peekall(pxs_);
927         for (auto px : pxs_) {
928           INVARIANT(px);
929           if (!px->io_scheduled_)
930             break;
931
932           INVARIANT(px->remaining_ > 0);
933           INVARIANT(px->curoff_ < g_buffer_size);
934
935           const uint8_t *p = px->pointer();
936           uint64_t committid;
937           bool allsat = true;
938
939           //cerr << "processing buffer " << px << " with curoff_=" << px->curoff_ << endl
940           //     << "  p=" << intptr_t(p) << endl;
941
942           while (px->remaining_ && allsat) {
943             allsat = true;
944             const uint8_t *nextp =
945               read_log_entry(p, committid, [&allsat](uint64_t readdep) {
946                 if (!allsat)
947                   return;
948                 const uint64_t cid = tidhelpers::CoreId(readdep);
949                 if (readdep > g_persistence_vc[cid])
950                   allsat = false;
951               });
952             if (allsat) {
953               //cerr << "committid=" << tidhelpers::Str(committid)
954               //     << ", g_persistence_vc=" << tidhelpers::Str(g_persistence_vc[i])
955               //     << endl;
956               INVARIANT(tidhelpers::CoreId(committid) == i);
957               INVARIANT(g_persistence_vc[i] < committid);
958               g_persistence_vc[i] = committid;
959               changed = true;
960               p = nextp;
961               px->remaining_--;
962               px->curoff_ = intptr_t(p) - intptr_t(px->buf_.data());
963               g_ntxns_committed++;
964             } else {
965               // done, no further entries will be satisfied
966             }
967           }
968
969           if (allsat) {
970             INVARIANT(px->remaining_ == 0);
971             // finished entire buffer
972             struct pbuffer *pxcheck = g_persist_buffers[i].deq();
973             if (pxcheck != px)
974               INVARIANT(false);
975             g_all_buffers[i].enq(px);
976             //cerr << "buffer flused at g_persistence_vc=" << tidhelpers::Str(g_persistence_vc[i]) << endl;
977           } else {
978             INVARIANT(px->remaining_ > 0);
979             break; // cannot process core's list any further
980           }
981         }
982       }
983     }
984   }
985
986 };
987
988 uint64_t explicit_deptracking_simulation::g_persistence_vc[NMAXCORES] = {0};
989
990 class epochbased_simulation : public onecopy_logbased_simulation {
991 public:
992   epochbased_simulation(bool compress)
993     : compress_(compress)
994   {
995   }
996
997 protected:
998   bool do_compression() const OVERRIDE { return compress_; }
999
1000 protected:
1001   const uint8_t *
1002   read_log_entry(const uint8_t *p, uint64_t &tid,
1003                  std::function<void(uint64_t)> readfunctor) OVERRIDE
1004   {
1005     serializer<uint8_t, false> s_uint8_t;
1006     serializer<uint64_t, false> s_uint64_t;
1007
1008     uint8_t writeset_sz, key_sz, value_sz;
1009
1010     p = s_uint64_t.read(p, &tid);
1011     p = s_uint8_t.read(p, &writeset_sz);
1012     INVARIANT(size_t(writeset_sz) == g_writeset);
1013     for (size_t i = 0; i < size_t(writeset_sz); i++) {
1014       p = s_uint8_t.read(p, &key_sz);
1015       INVARIANT(size_t(key_sz) == g_keysize);
1016       p += size_t(key_sz);
1017       p = s_uint8_t.read(p, &value_sz);
1018       INVARIANT(size_t(value_sz) == g_valuesize);
1019       p += size_t(value_sz);
1020     }
1021
1022     return p;
1023   }
1024
1025   uint64_t
1026   compute_log_record_space() const OVERRIDE
1027   {
1028     // compute how much space we need for this entry
1029     uint64_t space_needed = 0;
1030
1031     // 8 bytes to indicate TID
1032     space_needed += sizeof(uint64_t);
1033
1034     // one byte to indicate # of records written
1035     space_needed += 1;
1036
1037     // each record occupies (1 + key_length + 1 + value_length) bytes
1038     space_needed += g_writeset * (1 + g_keysize + 1 + g_valuesize);
1039
1040     return space_needed;
1041   }
1042
1043   void
1044   write_log_record(uint8_t *p,
1045                    uint64_t tidcommit,
1046                    const vector<uint64_t> &readset,
1047                    const vector<pair<string, string>> &writeset) OVERRIDE
1048   {
1049     serializer<uint8_t, false> s_uint8_t;
1050     serializer<uint64_t, false> s_uint64_t;
1051
1052     p = s_uint64_t.write(p, tidcommit);
1053     p = s_uint8_t.write(p, writeset.size());
1054     for (auto &pr : writeset) {
1055       p = s_uint8_t.write(p, pr.first.size());
1056       memcpy(p, pr.first.data(), pr.first.size()); p += pr.first.size();
1057       p = s_uint8_t.write(p, pr.second.size());
1058       memcpy(p, pr.second.data(), pr.second.size()); p += pr.second.size();
1059     }
1060   }
1061
1062 private:
1063   bool compress_;
1064 };
1065
1066 int
1067 main(int argc, char **argv)
1068 {
1069   string strategy = "epoch";
1070   vector<string> logfiles;
1071   vector<vector<unsigned>> assignments;
1072
1073   while (1) {
1074     static struct option long_options[] =
1075     {
1076       {"verbose"     , no_argument       , &g_verbose , 1}   ,
1077       {"fsync-back"  , no_argument       , &g_fsync_background, 1},
1078       {"num-threads" , required_argument , 0          , 't'} ,
1079       {"strategy"    , required_argument , 0          , 's'} ,
1080       {"readset"     , required_argument , 0          , 'r'} ,
1081       {"writeset"    , required_argument , 0          , 'w'} ,
1082       {"keysize"     , required_argument , 0          , 'k'} ,
1083       {"valuesize"   , required_argument , 0          , 'v'} ,
1084       {"logfile"     , required_argument , 0          , 'l'} ,
1085       {"assignment"  , required_argument , 0          , 'a'} ,
1086       {0, 0, 0, 0}
1087     };
1088     int option_index = 0;
1089     int c = getopt_long(argc, argv, "t:s:r:w:k:v:l:a:", long_options, &option_index);
1090     if (c == -1)
1091       break;
1092
1093     switch (c) {
1094     case 0:
1095       if (long_options[option_index].flag != 0)
1096         break;
1097       abort();
1098       break;
1099
1100     case 't':
1101       g_nworkers = strtoul(optarg, nullptr, 10);
1102       break;
1103
1104     case 's':
1105       strategy = optarg;
1106       break;
1107
1108     case 'r':
1109       g_readset = strtoul(optarg, nullptr, 10);
1110       break;
1111
1112     case 'w':
1113       g_writeset = strtoul(optarg, nullptr, 10);
1114       break;
1115
1116     case 'k':
1117       g_keysize = strtoul(optarg, nullptr, 10);
1118       break;
1119
1120     case 'v':
1121       g_valuesize = strtoul(optarg, nullptr, 10);
1122       break;
1123
1124     case 'l':
1125       logfiles.emplace_back(optarg);
1126       break;
1127
1128     case 'a':
1129       assignments.emplace_back(
1130           ParseCSVString<unsigned, RangeAwareParser<unsigned>>(optarg));
1131       break;
1132
1133     case '?':
1134       /* getopt_long already printed an error message. */
1135       exit(1);
1136
1137     default:
1138       abort();
1139     }
1140   }
1141   ALWAYS_ASSERT(g_nworkers >= 1);
1142   ALWAYS_ASSERT(g_readset >= 0);
1143   ALWAYS_ASSERT(g_writeset > 0);
1144   ALWAYS_ASSERT(g_keysize > 0);
1145   ALWAYS_ASSERT(g_valuesize >= 0);
1146   ALWAYS_ASSERT(!logfiles.empty());
1147   ALWAYS_ASSERT(logfiles.size() <= g_nmax_loggers);
1148   ALWAYS_ASSERT(
1149       assignments.empty() ||
1150       database_simulation::AssignmentsValid(
1151         assignments, logfiles.size(), g_nworkers));
1152
1153   if (g_verbose)
1154     cerr << "{nworkers=" << g_nworkers
1155          << ", readset=" << g_readset
1156          << ", writeset=" << g_writeset
1157          << ", keysize=" << g_keysize
1158          << ", valuesize=" << g_valuesize
1159          << ", logfiles=" << logfiles
1160          << ", strategy=" << strategy
1161          << ", fsync_background=" << g_fsync_background
1162          << ", assignments=" << assignments
1163          << "}" << endl;
1164
1165   if (strategy != "deptracking" &&
1166       strategy != "epoch" &&
1167       strategy != "epoch-compress")
1168     ALWAYS_ASSERT(false);
1169
1170   g_database.resize(g_nrecords); // all start at TID=0
1171
1172   vector<int> fds;
1173   for (auto &fname : logfiles) {
1174     int fd = open(fname.c_str(), O_CREAT|O_WRONLY|O_TRUNC, 0664);
1175     if (fd == -1) {
1176       perror("open");
1177       return 1;
1178     }
1179     fds.push_back(fd);
1180   }
1181
1182   unique_ptr<database_simulation> sim;
1183   if (strategy == "deptracking")
1184     sim.reset(new explicit_deptracking_simulation);
1185   else if (strategy == "epoch")
1186     sim.reset(new epochbased_simulation(false));
1187   else if (strategy == "epoch-compress")
1188     sim.reset(new epochbased_simulation(true));
1189   else
1190     ALWAYS_ASSERT(false);
1191   sim->init();
1192
1193   thread logger_thread(
1194       &database_simulation::logger, sim.get(), fds, ref(assignments));
1195
1196   vector<thread> workers;
1197   util::timer tt, tt1;
1198   for (size_t i = 0; i < g_nworkers; i++)
1199     workers.emplace_back(&database_simulation::worker, sim.get(), i);
1200   for (auto &p: workers)
1201     p.join();
1202   sim->terminate();
1203   logger_thread.join();
1204
1205   const double ntxns_committed = g_ntxns_committed.load();
1206   const double xsec = tt.lap_ms() / 1000.0;
1207   const double rate = double(ntxns_committed) / xsec;
1208   if (g_verbose) {
1209     cerr << "txns commited rate: " << rate << " txns/sec" << endl;
1210     cerr << "  (" << size_t(ntxns_committed) << " in " << xsec << " sec)" << endl;
1211
1212     const double ntxns_written = g_ntxns_written.load();
1213     const double rate1 = double(ntxns_written) / xsec;
1214     cerr << "txns written rate: " << rate1 << " txns/sec" << endl;
1215     cerr << "  (" << size_t(ntxns_written) << " in " << xsec << " sec)" << endl;
1216   } else {
1217     cout << rate << endl;
1218   }
1219
1220   return 0;
1221 }