benchmark silo added
[c11concurrency-benchmarks.git] / silo / masstree / log.cc
1 /* Masstree
2  * Eddie Kohler, Yandong Mao, Robert Morris
3  * Copyright (c) 2012-2014 President and Fellows of Harvard College
4  * Copyright (c) 2012-2014 Massachusetts Institute of Technology
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, subject to the conditions
9  * listed in the Masstree LICENSE file. These conditions include: you must
10  * preserve this copyright notice, and you cannot mention the copyright
11  * holders in advertising related to the Software without their permission.
12  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13  * notice is a summary of the Masstree LICENSE file; the license in that file
14  * is legally binding.
15  */
16 #include "log.hh"
17 #include "kvthread.hh"
18 #include "kvrow.hh"
19 #include "file.hh"
20 #include "query_masstree.hh"
21 #include "masstree_tcursor.hh"
22 #include "masstree_insert.hh"
23 #include "masstree_remove.hh"
24 #include "misc.hh"
25 #include "msgpack.hh"
26 #include <sys/types.h>
27 #include <sys/stat.h>
28 #include <fcntl.h>
29 #include <unistd.h>
30 #include <errno.h>
31 using lcdf::String;
32
33 kvepoch_t global_log_epoch;
34 kvepoch_t global_wake_epoch;
35 struct timeval log_epoch_interval;
36 static struct timeval log_epoch_time;
37 extern Masstree::default_table* tree;
38
39 kvepoch_t rec_ckp_min_epoch;
40 kvepoch_t rec_ckp_max_epoch;
41 logreplay::info_type *rec_log_infos;
42 kvepoch_t rec_replay_min_epoch;
43 kvepoch_t rec_replay_max_epoch;
44 kvepoch_t rec_replay_min_quiescent_last_epoch;
45
46 struct logrec_base {
47     uint32_t command_;
48     uint32_t size_;
49
50     static size_t size() {
51         return sizeof(logrec_base);
52     }
53     static size_t store(char *buf, uint32_t command) {
54         // XXX check alignment on some architectures
55         logrec_base *lr = reinterpret_cast<logrec_base *>(buf);
56         lr->command_ = command;
57         lr->size_ = sizeof(*lr);
58         return sizeof(*lr);
59     }
60     static bool check(const char *buf) {
61         const logrec_base *lr = reinterpret_cast<const logrec_base *>(buf);
62         return lr->size_ >= sizeof(*lr);
63     }
64     static uint32_t command(const char *buf) {
65         const logrec_base *lr = reinterpret_cast<const logrec_base *>(buf);
66         return lr->command_;
67     }
68 };
69
70 struct logrec_epoch {
71     uint32_t command_;
72     uint32_t size_;
73     kvepoch_t epoch_;
74
75     static size_t size() {
76         return sizeof(logrec_epoch);
77     }
78     static size_t store(char *buf, uint32_t command, kvepoch_t epoch) {
79         // XXX check alignment on some architectures
80         logrec_epoch *lr = reinterpret_cast<logrec_epoch *>(buf);
81         lr->command_ = command;
82         lr->size_ = sizeof(*lr);
83         lr->epoch_ = epoch;
84         return sizeof(*lr);
85     }
86     static bool check(const char *buf) {
87         const logrec_epoch *lr = reinterpret_cast<const logrec_epoch *>(buf);
88         return lr->size_ >= sizeof(*lr);
89     }
90 };
91
92 struct logrec_kv {
93     uint32_t command_;
94     uint32_t size_;
95     kvtimestamp_t ts_;
96     uint32_t keylen_;
97     char buf_[0];
98
99     static size_t size(uint32_t keylen, uint32_t vallen) {
100         return sizeof(logrec_kv) + keylen + vallen;
101     }
102     static size_t store(char *buf, uint32_t command,
103                         Str key, Str val,
104                         kvtimestamp_t ts) {
105         // XXX check alignment on some architectures
106         logrec_kv *lr = reinterpret_cast<logrec_kv *>(buf);
107         lr->command_ = command;
108         lr->size_ = sizeof(*lr) + key.len + val.len;
109         lr->ts_ = ts;
110         lr->keylen_ = key.len;
111         memcpy(lr->buf_, key.s, key.len);
112         memcpy(lr->buf_ + key.len, val.s, val.len);
113         return sizeof(*lr) + key.len + val.len;
114     }
115     static bool check(const char *buf) {
116         const logrec_kv *lr = reinterpret_cast<const logrec_kv *>(buf);
117         return lr->size_ >= sizeof(*lr)
118             && lr->size_ >= sizeof(*lr) + lr->keylen_;
119     }
120 };
121
122 struct logrec_kvdelta {
123     uint32_t command_;
124     uint32_t size_;
125     kvtimestamp_t ts_;
126     kvtimestamp_t prev_ts_;
127     uint32_t keylen_;
128     char buf_[0];
129
130     static size_t size(uint32_t keylen, uint32_t vallen) {
131         return sizeof(logrec_kvdelta) + keylen + vallen;
132     }
133     static size_t store(char *buf, uint32_t command,
134                         Str key, Str val,
135                         kvtimestamp_t prev_ts, kvtimestamp_t ts) {
136         // XXX check alignment on some architectures
137         logrec_kvdelta *lr = reinterpret_cast<logrec_kvdelta *>(buf);
138         lr->command_ = command;
139         lr->size_ = sizeof(*lr) + key.len + val.len;
140         lr->ts_ = ts;
141         lr->prev_ts_ = prev_ts;
142         lr->keylen_ = key.len;
143         memcpy(lr->buf_, key.s, key.len);
144         memcpy(lr->buf_ + key.len, val.s, val.len);
145         return sizeof(*lr) + key.len + val.len;
146     }
147     static bool check(const char *buf) {
148         const logrec_kvdelta *lr = reinterpret_cast<const logrec_kvdelta *>(buf);
149         return lr->size_ >= sizeof(*lr)
150             && lr->size_ >= sizeof(*lr) + lr->keylen_;
151     }
152 };
153
154
155 logset* logset::make(int size) {
156     static_assert(sizeof(loginfo) == 2 * CACHE_LINE_SIZE, "unexpected sizeof(loginfo)");
157     assert(size > 0 && size <= 64);
158     char* x = new char[sizeof(loginfo) * size + sizeof(loginfo::logset_info) + CACHE_LINE_SIZE];
159     char* ls_pos = x + sizeof(loginfo::logset_info);
160     uintptr_t left = reinterpret_cast<uintptr_t>(ls_pos) % CACHE_LINE_SIZE;
161     if (left)
162         ls_pos += CACHE_LINE_SIZE - left;
163     logset* ls = reinterpret_cast<logset*>(ls_pos);
164     ls->li_[-1].lsi_.size_ = size;
165     ls->li_[-1].lsi_.allocation_offset_ = (int) (x - ls_pos);
166     for (int i = 0; i != size; ++i)
167         new((void*) &ls->li_[i]) loginfo(ls, i);
168     return ls;
169 }
170
171 void logset::free(logset* ls) {
172     for (int i = 0; i != ls->size(); ++i)
173         ls->li_[i].~loginfo();
174     delete[] (reinterpret_cast<char*>(ls) + ls->li_[-1].lsi_.allocation_offset_);
175 }
176
177
178 loginfo::loginfo(logset* ls, int logindex) {
179     f_.lock_ = 0;
180     f_.waiting_ = 0;
181     f_.filename_ = String().internal_rep();
182     f_.filename_.ref();
183
184     len_ = 20 * 1024 * 1024;
185     pos_ = 0;
186     buf_ = (char *) malloc(len_);
187     always_assert(buf_);
188     log_epoch_ = 0;
189     quiescent_epoch_ = 0;
190     wake_epoch_ = 0;
191     flushed_epoch_ = 0;
192
193     ti_ = 0;
194     f_.logset_ = ls;
195     logindex_ = logindex;
196
197     (void) padding1_;
198 }
199
200 loginfo::~loginfo() {
201     f_.filename_.deref();
202     free(buf_);
203 }
204
205 void loginfo::initialize(const String& logfile) {
206     assert(!ti_);
207
208     f_.filename_.deref();
209     f_.filename_ = logfile.internal_rep();
210     f_.filename_.ref();
211
212     ti_ = threadinfo::make(threadinfo::TI_LOG, logindex_);
213     int r = ti_->run(logger_trampoline, this);
214     always_assert(r == 0);
215 }
216
217 // one logger thread per logs[].
218 static void check_epoch() {
219     struct timeval tv;
220     gettimeofday(&tv, 0);
221     if (timercmp(&tv, &log_epoch_time, >)) {
222         log_epoch_time = tv;
223         timeradd(&log_epoch_time, &log_epoch_interval, &log_epoch_time);
224         global_log_epoch = global_log_epoch.next_nonzero(); // 0 isn't valid
225     }
226 }
227
228 void* loginfo::run() {
229     {
230         logreplay replayer(f_.filename_);
231         replayer.replay(ti_->index(), ti_);
232     }
233
234     int fd = open(String(f_.filename_).c_str(),
235                   O_WRONLY | O_APPEND | O_CREAT, 0666);
236     always_assert(fd >= 0);
237     char *x_buf = (char *) malloc(len_);
238     always_assert(x_buf);
239
240     while (1) {
241         uint32_t nb = 0;
242         acquire();
243         kvepoch_t ge = global_log_epoch, we = global_wake_epoch;
244         if (wake_epoch_ != we) {
245             wake_epoch_ = we;
246             quiescent_epoch_ = 0;
247         }
248         // If the writing threads appear quiescent, and aren't about to write
249         // to the log (f_.waiting_ != 0), then write a quiescence
250         // notification.
251         if (!recovering && pos_ == 0 && !quiescent_epoch_
252             && ge != log_epoch_ && ge != we && !f_.waiting_) {
253             quiescent_epoch_ = log_epoch_ = ge;
254             char *p = buf_;
255             p += logrec_epoch::store(p, logcmd_epoch, log_epoch_);
256             if (log_epoch_ == wake_epoch_)
257                 p += logrec_base::store(p, logcmd_wake);
258             p += logrec_base::store(p, logcmd_quiesce);
259             pos_ = p - buf_;
260         }
261         if (!recovering && pos_ > 0) {
262             uint32_t x_pos = pos_;
263             std::swap(buf_, x_buf);
264             pos_ = 0;
265             kvepoch_t x_epoch = log_epoch_;
266             release();
267             ssize_t r = write(fd, x_buf, x_pos);
268             always_assert(r == ssize_t(x_pos));
269             fsync(fd);
270             flushed_epoch_ = x_epoch;
271             // printf("log %d %d\n", ti_->index(), x_pos);
272             nb = x_pos;
273         } else
274             release();
275         if (nb < len_ / 4)
276             napms(200);
277         if (ti_->index() == 0)
278             check_epoch();
279     }
280
281     return 0;
282 }
283
284 void* loginfo::logger_trampoline(threadinfo* ti) {
285     loginfo* li = static_cast<loginfo*>(ti->thread_data());
286     return li->run();
287 }
288
289
290
291 // log entry format: see log.hh
292 void loginfo::record(int command, const query_times& qtimes,
293                      Str key, Str value) {
294     assert(!recovering);
295     size_t n = logrec_kvdelta::size(key.len, value.len)
296         + logrec_epoch::size() + logrec_base::size();
297     waitlist wait = { &wait };
298     int stalls = 0;
299     while (1) {
300         if (len_ - pos_ >= n
301             && (wait.next == &wait || f_.waiting_ == &wait)) {
302             kvepoch_t we = global_wake_epoch;
303
304             // Potentially record a new epoch.
305             if (qtimes.epoch != log_epoch_) {
306                 log_epoch_ = qtimes.epoch;
307                 pos_ += logrec_epoch::store(buf_ + pos_, logcmd_epoch, qtimes.epoch);
308             }
309
310             if (quiescent_epoch_) {
311                 // We're recording a new log record on a log that's been
312                 // quiescent for a while. If the quiescence marker has been
313                 // flushed, then all epochs less than the query epoch are
314                 // effectively on disk.
315                 if (flushed_epoch_ == quiescent_epoch_)
316                     flushed_epoch_ = qtimes.epoch;
317                 quiescent_epoch_ = 0;
318                 while (we < qtimes.epoch)
319                     we = cmpxchg(&global_wake_epoch, we, qtimes.epoch);
320             }
321
322             // Log epochs should be recorded in monotonically increasing
323             // order, but the wake epoch may be ahead of the query epoch (if
324             // the query took a while). So potentially record an EARLIER
325             // wake_epoch. This will get fixed shortly by the next log
326             // record.
327             if (we != wake_epoch_ && qtimes.epoch < we)
328                 we = qtimes.epoch;
329             if (we != wake_epoch_) {
330                 wake_epoch_ = we;
331                 pos_ += logrec_base::store(buf_ + pos_, logcmd_wake);
332             }
333
334             if (command == logcmd_put && qtimes.prev_ts
335                 && !(qtimes.prev_ts & 1))
336                 pos_ += logrec_kvdelta::store(buf_ + pos_,
337                                               logcmd_modify, key, value,
338                                               qtimes.prev_ts, qtimes.ts);
339             else
340                 pos_ += logrec_kv::store(buf_ + pos_,
341                                          command, key, value, qtimes.ts);
342
343             if (f_.waiting_ == &wait)
344                 f_.waiting_ = wait.next;
345             release();
346             return;
347         }
348
349         // Otherwise must spin
350         if (wait.next == &wait) {
351             waitlist** p = &f_.waiting_;
352             while (*p)
353                 p = &(*p)->next;
354             *p = &wait;
355             wait.next = 0;
356         }
357         release();
358         if (stalls == 0)
359             printf("stall\n");
360         else if (stalls % 25 == 0)
361             printf("stall %d\n", stalls);
362         ++stalls;
363         napms(50);
364         acquire();
365     }
366 }
367
368 void loginfo::record(int command, const query_times& qtimes, Str key,
369                      const lcdf::Json* req, const lcdf::Json* end_req) {
370     lcdf::StringAccum sa(128);
371     msgpack::unparser<lcdf::StringAccum> cu(sa);
372     cu.write_array_header(end_req - req);
373     for (; req != end_req; ++req)
374         cu << *req;
375     record(command, qtimes, key, Str(sa.data(), sa.length()));
376 }
377
378
379 // replay
380
381 logreplay::logreplay(const String &filename)
382     : filename_(filename), errno_(0), buf_()
383 {
384     int fd = open(filename_.c_str(), O_RDONLY);
385     if (fd == -1) {
386     fail:
387         errno_ = errno;
388         buf_ = 0;
389         if (fd != -1)
390             (void) close(fd);
391         return;
392     }
393
394     struct stat sb;
395     int r = fstat(fd, &sb);
396     if (r == -1)
397         goto fail;
398
399     size_ = sb.st_size;
400     if (size_ != 0) {
401         // XXX what if filename_ is too big to mmap in its entirety?
402         // XXX should support mmaping/writing in pieces
403         buf_ = (char *) ::mmap(0, size_, PROT_READ, MAP_FILE | MAP_PRIVATE,
404                                fd, 0);
405         if (buf_ == MAP_FAILED)
406             goto fail;
407     }
408
409     (void) close(fd);
410 }
411
412 logreplay::~logreplay()
413 {
414     unmap();
415 }
416
417 int
418 logreplay::unmap()
419 {
420     int r = 0;
421     if (buf_) {
422         r = munmap(buf_, size_);
423         buf_ = 0;
424     }
425     return r;
426 }
427
428
429 struct logrecord {
430     uint32_t command;
431     Str key;
432     Str val;
433     kvtimestamp_t ts;
434     kvtimestamp_t prev_ts;
435     kvepoch_t epoch;
436
437     const char *extract(const char *buf, const char *end);
438
439     template <typename T>
440     void run(T& table, std::vector<lcdf::Json>& jrepo, threadinfo& ti);
441
442   private:
443     inline void apply(row_type*& value, bool found,
444                       std::vector<lcdf::Json>& jrepo, threadinfo& ti);
445 };
446
447 const char *
448 logrecord::extract(const char *buf, const char *end)
449 {
450     const logrec_base *lr = reinterpret_cast<const logrec_base *>(buf);
451     if (unlikely(size_t(end - buf) < sizeof(*lr)
452                  || lr->size_ < sizeof(*lr)
453                  || size_t(end - buf) < lr->size_
454                  || lr->command_ == logcmd_none)) {
455     fail:
456         command = logcmd_none;
457         return end;
458     }
459
460     command = lr->command_;
461     if (command == logcmd_put || command == logcmd_replace
462         || command == logcmd_remove) {
463         const logrec_kv *lk = reinterpret_cast<const logrec_kv *>(buf);
464         if (unlikely(lk->size_ < sizeof(*lk)
465                      || lk->keylen_ > MASSTREE_MAXKEYLEN
466                      || sizeof(*lk) + lk->keylen_ > lk->size_))
467             goto fail;
468         ts = lk->ts_;
469         key.assign(lk->buf_, lk->keylen_);
470         val.assign(lk->buf_ + lk->keylen_, lk->size_ - sizeof(*lk) - lk->keylen_);
471     } else if (command == logcmd_modify) {
472         const logrec_kvdelta *lk = reinterpret_cast<const logrec_kvdelta *>(buf);
473         if (unlikely(lk->keylen_ > MASSTREE_MAXKEYLEN
474                      || sizeof(*lk) + lk->keylen_ > lk->size_))
475             goto fail;
476         ts = lk->ts_;
477         prev_ts = lk->prev_ts_;
478         key.assign(lk->buf_, lk->keylen_);
479         val.assign(lk->buf_ + lk->keylen_, lk->size_ - sizeof(*lk) - lk->keylen_);
480     } else if (command == logcmd_epoch) {
481         const logrec_epoch *lre = reinterpret_cast<const logrec_epoch *>(buf);
482         if (unlikely(lre->size_ < logrec_epoch::size()))
483             goto fail;
484         epoch = lre->epoch_;
485     }
486
487     return buf + lr->size_;
488 }
489
490 template <typename T>
491 void logrecord::run(T& table, std::vector<lcdf::Json>& jrepo, threadinfo& ti) {
492     row_marker m;
493     if (command == logcmd_remove) {
494         ts |= 1;
495         m.marker_type_ = row_marker::mt_remove;
496         val = Str((const char*) &m, sizeof(m));
497     }
498
499     typename T::cursor_type lp(table, key);
500     bool found = lp.find_insert(ti);
501     if (!found)
502         ti.advance_timestamp(lp.node_timestamp());
503     apply(lp.value(), found, jrepo, ti);
504     lp.finish(1, ti);
505 }
506
507 static lcdf::Json* parse_changeset(Str changeset,
508                                    std::vector<lcdf::Json>& jrepo) {
509     msgpack::parser mp(changeset.udata());
510     unsigned index = 0;
511     Str value;
512     size_t pos = 0;
513     while (mp.position() != changeset.end()) {
514         if (pos == jrepo.size())
515             jrepo.resize(pos + 2);
516         mp >> index >> value;
517         jrepo[pos] = index;
518         jrepo[pos + 1] = String::make_stable(value);
519         pos += 2;
520     }
521     return jrepo.data() + pos;
522 }
523
524 inline void logrecord::apply(row_type*& value, bool found,
525                              std::vector<lcdf::Json>& jrepo, threadinfo& ti) {
526     row_type** cur_value = &value;
527     if (!found)
528         *cur_value = 0;
529
530     // find point to insert change (may be after some delta markers)
531     while (*cur_value && row_is_delta_marker(*cur_value)
532            && (*cur_value)->timestamp() > ts)
533         cur_value = &row_get_delta_marker(*cur_value)->prev_;
534
535     // check out of date
536     if (*cur_value && (*cur_value)->timestamp() >= ts)
537         return;
538
539     // if not modifying, delete everything earlier
540     if (command != logcmd_modify)
541         while (row_type* old_value = *cur_value) {
542             if (row_is_delta_marker(old_value)) {
543                 ti.mark(tc_replay_remove_delta);
544                 *cur_value = row_get_delta_marker(old_value)->prev_;
545             } else
546                 *cur_value = 0;
547             old_value->deallocate(ti);
548         }
549
550     // actually apply change
551     if (command == logcmd_replace)
552         *cur_value = row_type::create1(val, ts, ti);
553     else if (command != logcmd_modify
554              || (*cur_value && (*cur_value)->timestamp() == prev_ts)) {
555         lcdf::Json* end_req = parse_changeset(val, jrepo);
556         if (command != logcmd_modify)
557             *cur_value = row_type::create(jrepo.data(), end_req, ts, ti);
558         else {
559             row_type* old_value = *cur_value;
560             *cur_value = old_value->update(jrepo.data(), end_req, ts, ti);
561             if (*cur_value != old_value)
562                 old_value->deallocate(ti);
563         }
564     } else {
565         // XXX assume that memory exists before saved request -- it does
566         // in conventional log replay, but that's an ugly interface
567         val.s -= sizeof(row_delta_marker<row_type>);
568         val.len += sizeof(row_delta_marker<row_type>);
569         row_type* new_value = row_type::create1(val, ts | 1, ti);
570         row_delta_marker<row_type>* dm = row_get_delta_marker(new_value, true);
571         dm->marker_type_ = row_marker::mt_delta;
572         dm->prev_ts_ = prev_ts;
573         dm->prev_ = *cur_value;
574         *cur_value = new_value;
575         ti.mark(tc_replay_create_delta);
576     }
577
578     // clean up
579     while (value && row_is_delta_marker(value)) {
580         row_type **prev = 0, **trav = &value;
581         while (*trav && row_is_delta_marker(*trav)) {
582             prev = trav;
583             trav = &row_get_delta_marker(*trav)->prev_;
584         }
585         if (prev && *trav
586             && row_get_delta_marker(*prev)->prev_ts_ == (*trav)->timestamp()) {
587             row_type *old_prev = *prev;
588             Str req = old_prev->col(0);
589             req.s += sizeof(row_delta_marker<row_type>);
590             req.len -= sizeof(row_delta_marker<row_type>);
591             const lcdf::Json* end_req = parse_changeset(req, jrepo);
592             *prev = (*trav)->update(jrepo.data(), end_req, old_prev->timestamp() - 1, ti);
593             if (*prev != *trav)
594                 (*trav)->deallocate(ti);
595             old_prev->deallocate(ti);
596             ti.mark(tc_replay_remove_delta);
597         } else
598             break;
599     }
600 }
601
602
603 logreplay::info_type
604 logreplay::info() const
605 {
606     info_type x;
607     x.first_epoch = x.last_epoch = x.wake_epoch = x.min_post_quiescent_wake_epoch = 0;
608     x.quiescent = true;
609
610     const char *buf = buf_, *end = buf_ + size_;
611     off_t nr = 0;
612     bool log_corrupt = false;
613     while (buf + sizeof(logrec_base) <= end) {
614         const logrec_base *lr = reinterpret_cast<const logrec_base *>(buf);
615         if (unlikely(lr->size_ < sizeof(logrec_base))) {
616             log_corrupt = true;
617             break;
618         } else if (unlikely(buf + lr->size_ > end))
619             break;
620         x.quiescent = lr->command_ == logcmd_quiesce;
621         if (lr->command_ == logcmd_epoch) {
622             const logrec_epoch *lre =
623                 reinterpret_cast<const logrec_epoch *>(buf);
624             if (unlikely(lre->size_ < sizeof(*lre))) {
625                 log_corrupt = true;
626                 break;
627             }
628             if (!x.first_epoch)
629                 x.first_epoch = lre->epoch_;
630             x.last_epoch = lre->epoch_;
631             if (x.wake_epoch && x.wake_epoch > x.last_epoch) // wrap-around
632                 x.wake_epoch = 0;
633         } else if (lr->command_ == logcmd_wake)
634             x.wake_epoch = x.last_epoch;
635 #if !NDEBUG
636         else if (lr->command_ != logcmd_put
637                  && lr->command_ != logcmd_replace
638                  && lr->command_ != logcmd_modify
639                  && lr->command_ != logcmd_remove
640                  && lr->command_ != logcmd_quiesce) {
641             log_corrupt = true;
642             break;
643         }
644 #endif
645         buf += lr->size_;
646         ++nr;
647     }
648
649     fprintf(stderr, "replay %s: %" PRIdOFF_T " records, first %" PRIu64 ", last %" PRIu64 ", wake %" PRIu64 "%s%s @%zu\n",
650             filename_.c_str(), nr, x.first_epoch.value(),
651             x.last_epoch.value(), x.wake_epoch.value(),
652             x.quiescent ? ", quiescent" : "",
653             log_corrupt ? ", CORRUPT" : "", buf - buf_);
654     return x;
655 }
656
657 kvepoch_t
658 logreplay::min_post_quiescent_wake_epoch(kvepoch_t quiescent_epoch) const
659 {
660     kvepoch_t e = 0;
661     const char *buf = buf_, *end = buf_ + size_;
662     bool log_corrupt = false;
663     while (buf + sizeof(logrec_base) <= end) {
664         const logrec_base *lr = reinterpret_cast<const logrec_base *>(buf);
665         if (unlikely(lr->size_ < sizeof(logrec_base))) {
666             log_corrupt = true;
667             break;
668         } else if (unlikely(buf + lr->size_ > end))
669             break;
670         if (lr->command_ == logcmd_epoch) {
671             const logrec_epoch *lre =
672                 reinterpret_cast<const logrec_epoch *>(buf);
673             if (unlikely(lre->size_ < sizeof(*lre))) {
674                 log_corrupt = true;
675                 break;
676             }
677             e = lre->epoch_;
678         } else if (lr->command_ == logcmd_wake
679                    && e
680                    && e >= quiescent_epoch)
681             return e;
682         buf += lr->size_;
683     }
684     (void) log_corrupt;
685     return 0;
686 }
687
688 uint64_t
689 logreplay::replayandclean1(kvepoch_t min_epoch, kvepoch_t max_epoch,
690                            threadinfo *ti)
691 {
692     uint64_t nr = 0;
693     const char *pos = buf_, *end = buf_ + size_;
694     const char *repbegin = 0, *repend = 0;
695     logrecord lr;
696     std::vector<lcdf::Json> jrepo;
697
698     // XXX
699     while (pos < end) {
700         const char *nextpos = lr.extract(pos, end);
701         if (lr.command == logcmd_none) {
702             fprintf(stderr, "replay %s: %" PRIu64 " entries replayed, CORRUPT @%zu\n",
703                     filename_.c_str(), nr, pos - buf_);
704             break;
705         }
706         if (lr.command == logcmd_epoch) {
707             if ((min_epoch && lr.epoch < min_epoch)
708                 || (!min_epoch && !repbegin))
709                 repbegin = pos;
710             if (lr.epoch >= max_epoch) {
711                 always_assert(repbegin);
712                 repend = nextpos;
713                 break;
714             }
715         }
716         if (!lr.epoch || (min_epoch && lr.epoch < min_epoch)) {
717             pos = nextpos;
718             if (repbegin)
719                 repend = nextpos;
720             continue;
721         }
722         // replay only part of log after checkpoint
723         // could replay everything, the if() here tests
724         // correctness of checkpoint scheme.
725         assert(repbegin);
726         repend = nextpos;
727         if (lr.key.len) { // skip empty entry
728             if (lr.command == logcmd_put
729                 || lr.command == logcmd_replace
730                 || lr.command == logcmd_modify
731                 || lr.command == logcmd_remove)
732                 lr.run(tree->table(), jrepo, *ti);
733             ++nr;
734             if (nr % 100000 == 0)
735                 fprintf(stderr,
736                         "replay %s: %" PRIu64 " entries replayed\n",
737                         filename_.c_str(), nr);
738         }
739         pos = nextpos;
740     }
741
742     // rewrite portion of log
743     if (!repbegin)
744         repbegin = repend = buf_;
745     else if (!repend) {
746         fprintf(stderr, "replay %s: surprise repend\n", filename_.c_str());
747         repend = pos;
748     }
749
750     char tmplog[256];
751     int r = snprintf(tmplog, sizeof(tmplog), "%s.tmp", filename_.c_str());
752     always_assert(r >= 0 && size_t(r) < sizeof(tmplog));
753
754     printf("replay %s: truncate from %" PRIdOFF_T " to %" PRIdSIZE_T " [%" PRIdSIZE_T ",%" PRIdSIZE_T ")\n",
755            filename_.c_str(), size_, repend - repbegin,
756            repbegin - buf_, repend - buf_);
757
758     bool need_copy = repbegin != buf_;
759     int fd;
760     if (!need_copy)
761         fd = replay_truncate(repend - repbegin);
762     else
763         fd = replay_copy(tmplog, repbegin, repend);
764
765     r = fsync(fd);
766     always_assert(r == 0);
767     r = close(fd);
768     always_assert(r == 0);
769
770     // replace old log with rewritten log
771     if (unmap() != 0)
772         abort();
773
774     if (need_copy) {
775         r = rename(tmplog, filename_.c_str());
776         if (r != 0) {
777             fprintf(stderr, "replay %s: %s\n", filename_.c_str(), strerror(errno));
778             abort();
779         }
780     }
781
782     return nr;
783 }
784
785 int
786 logreplay::replay_truncate(size_t len)
787 {
788     int fd = open(filename_.c_str(), O_RDWR);
789     if (fd < 0) {
790         fprintf(stderr, "replay %s: %s\n", filename_.c_str(), strerror(errno));
791         abort();
792     }
793
794     struct stat sb;
795     int r = fstat(fd, &sb);
796     if (r != 0) {
797         fprintf(stderr, "replay %s: %s\n", filename_.c_str(), strerror(errno));
798         abort();
799     } else if (sb.st_size < off_t(len)) {
800         fprintf(stderr, "replay %s: bad length %" PRIdOFF_T "\n", filename_.c_str(), sb.st_size);
801         abort();
802     }
803
804     r = ftruncate(fd, len);
805     if (r != 0) {
806         fprintf(stderr, "replay %s: truncate: %s\n", filename_.c_str(), strerror(errno));
807         abort();
808     }
809
810     off_t off = lseek(fd, len, SEEK_SET);
811     if (off == (off_t) -1) {
812         fprintf(stderr, "replay %s: seek: %s\n", filename_.c_str(), strerror(errno));
813         abort();
814     }
815
816     return fd;
817 }
818
819 int
820 logreplay::replay_copy(const char *tmpname, const char *first, const char *last)
821 {
822     int fd = creat(tmpname, 0666);
823     if (fd < 0) {
824         fprintf(stderr, "replay %s: create: %s\n", tmpname, strerror(errno));
825         abort();
826     }
827
828     ssize_t w = safe_write(fd, first, last - first);
829     always_assert(w >= 0 && w == last - first);
830
831     return fd;
832 }
833
834 void
835 logreplay::replay(int which, threadinfo *ti)
836 {
837     waituntilphase(REC_LOG_TS);
838     // find the maximum timestamp of entries in the log
839     if (buf_) {
840         info_type x = info();
841         pthread_mutex_lock(&rec_mu);
842         rec_log_infos[which] = x;
843         pthread_mutex_unlock(&rec_mu);
844     }
845     inactive();
846
847     waituntilphase(REC_LOG_ANALYZE_WAKE);
848     if (buf_) {
849         if (rec_replay_min_quiescent_last_epoch
850             && rec_replay_min_quiescent_last_epoch <= rec_log_infos[which].wake_epoch)
851             rec_log_infos[which].min_post_quiescent_wake_epoch =
852                 min_post_quiescent_wake_epoch(rec_replay_min_quiescent_last_epoch);
853     }
854     inactive();
855
856     waituntilphase(REC_LOG_REPLAY);
857     if (buf_) {
858         ti->rcu_start();
859         uint64_t nr = replayandclean1(rec_replay_min_epoch, rec_replay_max_epoch, ti);
860         ti->rcu_stop();
861         printf("recovered %" PRIu64 " records from %s\n", nr, filename_.c_str());
862     }
863     inactive();
864 }