2 * Eddie Kohler, Yandong Mao, Robert Morris
3 * Copyright (c) 2012-2014 President and Fellows of Harvard College
4 * Copyright (c) 2012-2014 Massachusetts Institute of Technology
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, subject to the conditions
9 * listed in the Masstree LICENSE file. These conditions include: you must
10 * preserve this copyright notice, and you cannot mention the copyright
11 * holders in advertising related to the Software without their permission.
12 * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13 * notice is a summary of the Masstree LICENSE file; the license in that file
17 #include "kvthread.hh"
20 #include "query_masstree.hh"
21 #include "masstree_tcursor.hh"
22 #include "masstree_insert.hh"
23 #include "masstree_remove.hh"
26 #include <sys/types.h>
33 kvepoch_t global_log_epoch;
34 kvepoch_t global_wake_epoch;
35 struct timeval log_epoch_interval;
36 static struct timeval log_epoch_time;
37 extern Masstree::default_table* tree;
39 kvepoch_t rec_ckp_min_epoch;
40 kvepoch_t rec_ckp_max_epoch;
41 logreplay::info_type *rec_log_infos;
42 kvepoch_t rec_replay_min_epoch;
43 kvepoch_t rec_replay_max_epoch;
44 kvepoch_t rec_replay_min_quiescent_last_epoch;
50 static size_t size() {
51 return sizeof(logrec_base);
53 static size_t store(char *buf, uint32_t command) {
54 // XXX check alignment on some architectures
55 logrec_base *lr = reinterpret_cast<logrec_base *>(buf);
56 lr->command_ = command;
57 lr->size_ = sizeof(*lr);
60 static bool check(const char *buf) {
61 const logrec_base *lr = reinterpret_cast<const logrec_base *>(buf);
62 return lr->size_ >= sizeof(*lr);
64 static uint32_t command(const char *buf) {
65 const logrec_base *lr = reinterpret_cast<const logrec_base *>(buf);
75 static size_t size() {
76 return sizeof(logrec_epoch);
78 static size_t store(char *buf, uint32_t command, kvepoch_t epoch) {
79 // XXX check alignment on some architectures
80 logrec_epoch *lr = reinterpret_cast<logrec_epoch *>(buf);
81 lr->command_ = command;
82 lr->size_ = sizeof(*lr);
86 static bool check(const char *buf) {
87 const logrec_epoch *lr = reinterpret_cast<const logrec_epoch *>(buf);
88 return lr->size_ >= sizeof(*lr);
99 static size_t size(uint32_t keylen, uint32_t vallen) {
100 return sizeof(logrec_kv) + keylen + vallen;
102 static size_t store(char *buf, uint32_t command,
105 // XXX check alignment on some architectures
106 logrec_kv *lr = reinterpret_cast<logrec_kv *>(buf);
107 lr->command_ = command;
108 lr->size_ = sizeof(*lr) + key.len + val.len;
110 lr->keylen_ = key.len;
111 memcpy(lr->buf_, key.s, key.len);
112 memcpy(lr->buf_ + key.len, val.s, val.len);
113 return sizeof(*lr) + key.len + val.len;
115 static bool check(const char *buf) {
116 const logrec_kv *lr = reinterpret_cast<const logrec_kv *>(buf);
117 return lr->size_ >= sizeof(*lr)
118 && lr->size_ >= sizeof(*lr) + lr->keylen_;
122 struct logrec_kvdelta {
126 kvtimestamp_t prev_ts_;
130 static size_t size(uint32_t keylen, uint32_t vallen) {
131 return sizeof(logrec_kvdelta) + keylen + vallen;
133 static size_t store(char *buf, uint32_t command,
135 kvtimestamp_t prev_ts, kvtimestamp_t ts) {
136 // XXX check alignment on some architectures
137 logrec_kvdelta *lr = reinterpret_cast<logrec_kvdelta *>(buf);
138 lr->command_ = command;
139 lr->size_ = sizeof(*lr) + key.len + val.len;
141 lr->prev_ts_ = prev_ts;
142 lr->keylen_ = key.len;
143 memcpy(lr->buf_, key.s, key.len);
144 memcpy(lr->buf_ + key.len, val.s, val.len);
145 return sizeof(*lr) + key.len + val.len;
147 static bool check(const char *buf) {
148 const logrec_kvdelta *lr = reinterpret_cast<const logrec_kvdelta *>(buf);
149 return lr->size_ >= sizeof(*lr)
150 && lr->size_ >= sizeof(*lr) + lr->keylen_;
155 logset* logset::make(int size) {
156 static_assert(sizeof(loginfo) == 2 * CACHE_LINE_SIZE, "unexpected sizeof(loginfo)");
157 assert(size > 0 && size <= 64);
158 char* x = new char[sizeof(loginfo) * size + sizeof(loginfo::logset_info) + CACHE_LINE_SIZE];
159 char* ls_pos = x + sizeof(loginfo::logset_info);
160 uintptr_t left = reinterpret_cast<uintptr_t>(ls_pos) % CACHE_LINE_SIZE;
162 ls_pos += CACHE_LINE_SIZE - left;
163 logset* ls = reinterpret_cast<logset*>(ls_pos);
164 ls->li_[-1].lsi_.size_ = size;
165 ls->li_[-1].lsi_.allocation_offset_ = (int) (x - ls_pos);
166 for (int i = 0; i != size; ++i)
167 new((void*) &ls->li_[i]) loginfo(ls, i);
171 void logset::free(logset* ls) {
172 for (int i = 0; i != ls->size(); ++i)
173 ls->li_[i].~loginfo();
174 delete[] (reinterpret_cast<char*>(ls) + ls->li_[-1].lsi_.allocation_offset_);
178 loginfo::loginfo(logset* ls, int logindex) {
181 f_.filename_ = String().internal_rep();
184 len_ = 20 * 1024 * 1024;
186 buf_ = (char *) malloc(len_);
189 quiescent_epoch_ = 0;
195 logindex_ = logindex;
200 loginfo::~loginfo() {
201 f_.filename_.deref();
205 void loginfo::initialize(const String& logfile) {
208 f_.filename_.deref();
209 f_.filename_ = logfile.internal_rep();
212 ti_ = threadinfo::make(threadinfo::TI_LOG, logindex_);
213 int r = ti_->run(logger_trampoline, this);
214 always_assert(r == 0);
217 // one logger thread per logs[].
218 static void check_epoch() {
220 gettimeofday(&tv, 0);
221 if (timercmp(&tv, &log_epoch_time, >)) {
223 timeradd(&log_epoch_time, &log_epoch_interval, &log_epoch_time);
224 global_log_epoch = global_log_epoch.next_nonzero(); // 0 isn't valid
228 void* loginfo::run() {
230 logreplay replayer(f_.filename_);
231 replayer.replay(ti_->index(), ti_);
234 int fd = open(String(f_.filename_).c_str(),
235 O_WRONLY | O_APPEND | O_CREAT, 0666);
236 always_assert(fd >= 0);
237 char *x_buf = (char *) malloc(len_);
238 always_assert(x_buf);
243 kvepoch_t ge = global_log_epoch, we = global_wake_epoch;
244 if (wake_epoch_ != we) {
246 quiescent_epoch_ = 0;
248 // If the writing threads appear quiescent, and aren't about to write
249 // to the log (f_.waiting_ != 0), then write a quiescence
251 if (!recovering && pos_ == 0 && !quiescent_epoch_
252 && ge != log_epoch_ && ge != we && !f_.waiting_) {
253 quiescent_epoch_ = log_epoch_ = ge;
255 p += logrec_epoch::store(p, logcmd_epoch, log_epoch_);
256 if (log_epoch_ == wake_epoch_)
257 p += logrec_base::store(p, logcmd_wake);
258 p += logrec_base::store(p, logcmd_quiesce);
261 if (!recovering && pos_ > 0) {
262 uint32_t x_pos = pos_;
263 std::swap(buf_, x_buf);
265 kvepoch_t x_epoch = log_epoch_;
267 ssize_t r = write(fd, x_buf, x_pos);
268 always_assert(r == ssize_t(x_pos));
270 flushed_epoch_ = x_epoch;
271 // printf("log %d %d\n", ti_->index(), x_pos);
277 if (ti_->index() == 0)
284 void* loginfo::logger_trampoline(threadinfo* ti) {
285 loginfo* li = static_cast<loginfo*>(ti->thread_data());
291 // log entry format: see log.hh
292 void loginfo::record(int command, const query_times& qtimes,
293 Str key, Str value) {
295 size_t n = logrec_kvdelta::size(key.len, value.len)
296 + logrec_epoch::size() + logrec_base::size();
297 waitlist wait = { &wait };
301 && (wait.next == &wait || f_.waiting_ == &wait)) {
302 kvepoch_t we = global_wake_epoch;
304 // Potentially record a new epoch.
305 if (qtimes.epoch != log_epoch_) {
306 log_epoch_ = qtimes.epoch;
307 pos_ += logrec_epoch::store(buf_ + pos_, logcmd_epoch, qtimes.epoch);
310 if (quiescent_epoch_) {
311 // We're recording a new log record on a log that's been
312 // quiescent for a while. If the quiescence marker has been
313 // flushed, then all epochs less than the query epoch are
314 // effectively on disk.
315 if (flushed_epoch_ == quiescent_epoch_)
316 flushed_epoch_ = qtimes.epoch;
317 quiescent_epoch_ = 0;
318 while (we < qtimes.epoch)
319 we = cmpxchg(&global_wake_epoch, we, qtimes.epoch);
322 // Log epochs should be recorded in monotonically increasing
323 // order, but the wake epoch may be ahead of the query epoch (if
324 // the query took a while). So potentially record an EARLIER
325 // wake_epoch. This will get fixed shortly by the next log
327 if (we != wake_epoch_ && qtimes.epoch < we)
329 if (we != wake_epoch_) {
331 pos_ += logrec_base::store(buf_ + pos_, logcmd_wake);
334 if (command == logcmd_put && qtimes.prev_ts
335 && !(qtimes.prev_ts & 1))
336 pos_ += logrec_kvdelta::store(buf_ + pos_,
337 logcmd_modify, key, value,
338 qtimes.prev_ts, qtimes.ts);
340 pos_ += logrec_kv::store(buf_ + pos_,
341 command, key, value, qtimes.ts);
343 if (f_.waiting_ == &wait)
344 f_.waiting_ = wait.next;
349 // Otherwise must spin
350 if (wait.next == &wait) {
351 waitlist** p = &f_.waiting_;
360 else if (stalls % 25 == 0)
361 printf("stall %d\n", stalls);
368 void loginfo::record(int command, const query_times& qtimes, Str key,
369 const lcdf::Json* req, const lcdf::Json* end_req) {
370 lcdf::StringAccum sa(128);
371 msgpack::unparser<lcdf::StringAccum> cu(sa);
372 cu.write_array_header(end_req - req);
373 for (; req != end_req; ++req)
375 record(command, qtimes, key, Str(sa.data(), sa.length()));
381 logreplay::logreplay(const String &filename)
382 : filename_(filename), errno_(0), buf_()
384 int fd = open(filename_.c_str(), O_RDONLY);
395 int r = fstat(fd, &sb);
401 // XXX what if filename_ is too big to mmap in its entirety?
402 // XXX should support mmaping/writing in pieces
403 buf_ = (char *) ::mmap(0, size_, PROT_READ, MAP_FILE | MAP_PRIVATE,
405 if (buf_ == MAP_FAILED)
412 logreplay::~logreplay()
422 r = munmap(buf_, size_);
434 kvtimestamp_t prev_ts;
437 const char *extract(const char *buf, const char *end);
439 template <typename T>
440 void run(T& table, std::vector<lcdf::Json>& jrepo, threadinfo& ti);
443 inline void apply(row_type*& value, bool found,
444 std::vector<lcdf::Json>& jrepo, threadinfo& ti);
448 logrecord::extract(const char *buf, const char *end)
450 const logrec_base *lr = reinterpret_cast<const logrec_base *>(buf);
451 if (unlikely(size_t(end - buf) < sizeof(*lr)
452 || lr->size_ < sizeof(*lr)
453 || size_t(end - buf) < lr->size_
454 || lr->command_ == logcmd_none)) {
456 command = logcmd_none;
460 command = lr->command_;
461 if (command == logcmd_put || command == logcmd_replace
462 || command == logcmd_remove) {
463 const logrec_kv *lk = reinterpret_cast<const logrec_kv *>(buf);
464 if (unlikely(lk->size_ < sizeof(*lk)
465 || lk->keylen_ > MASSTREE_MAXKEYLEN
466 || sizeof(*lk) + lk->keylen_ > lk->size_))
469 key.assign(lk->buf_, lk->keylen_);
470 val.assign(lk->buf_ + lk->keylen_, lk->size_ - sizeof(*lk) - lk->keylen_);
471 } else if (command == logcmd_modify) {
472 const logrec_kvdelta *lk = reinterpret_cast<const logrec_kvdelta *>(buf);
473 if (unlikely(lk->keylen_ > MASSTREE_MAXKEYLEN
474 || sizeof(*lk) + lk->keylen_ > lk->size_))
477 prev_ts = lk->prev_ts_;
478 key.assign(lk->buf_, lk->keylen_);
479 val.assign(lk->buf_ + lk->keylen_, lk->size_ - sizeof(*lk) - lk->keylen_);
480 } else if (command == logcmd_epoch) {
481 const logrec_epoch *lre = reinterpret_cast<const logrec_epoch *>(buf);
482 if (unlikely(lre->size_ < logrec_epoch::size()))
487 return buf + lr->size_;
490 template <typename T>
491 void logrecord::run(T& table, std::vector<lcdf::Json>& jrepo, threadinfo& ti) {
493 if (command == logcmd_remove) {
495 m.marker_type_ = row_marker::mt_remove;
496 val = Str((const char*) &m, sizeof(m));
499 typename T::cursor_type lp(table, key);
500 bool found = lp.find_insert(ti);
502 ti.advance_timestamp(lp.node_timestamp());
503 apply(lp.value(), found, jrepo, ti);
507 static lcdf::Json* parse_changeset(Str changeset,
508 std::vector<lcdf::Json>& jrepo) {
509 msgpack::parser mp(changeset.udata());
513 while (mp.position() != changeset.end()) {
514 if (pos == jrepo.size())
515 jrepo.resize(pos + 2);
516 mp >> index >> value;
518 jrepo[pos + 1] = String::make_stable(value);
521 return jrepo.data() + pos;
524 inline void logrecord::apply(row_type*& value, bool found,
525 std::vector<lcdf::Json>& jrepo, threadinfo& ti) {
526 row_type** cur_value = &value;
530 // find point to insert change (may be after some delta markers)
531 while (*cur_value && row_is_delta_marker(*cur_value)
532 && (*cur_value)->timestamp() > ts)
533 cur_value = &row_get_delta_marker(*cur_value)->prev_;
536 if (*cur_value && (*cur_value)->timestamp() >= ts)
539 // if not modifying, delete everything earlier
540 if (command != logcmd_modify)
541 while (row_type* old_value = *cur_value) {
542 if (row_is_delta_marker(old_value)) {
543 ti.mark(tc_replay_remove_delta);
544 *cur_value = row_get_delta_marker(old_value)->prev_;
547 old_value->deallocate(ti);
550 // actually apply change
551 if (command == logcmd_replace)
552 *cur_value = row_type::create1(val, ts, ti);
553 else if (command != logcmd_modify
554 || (*cur_value && (*cur_value)->timestamp() == prev_ts)) {
555 lcdf::Json* end_req = parse_changeset(val, jrepo);
556 if (command != logcmd_modify)
557 *cur_value = row_type::create(jrepo.data(), end_req, ts, ti);
559 row_type* old_value = *cur_value;
560 *cur_value = old_value->update(jrepo.data(), end_req, ts, ti);
561 if (*cur_value != old_value)
562 old_value->deallocate(ti);
565 // XXX assume that memory exists before saved request -- it does
566 // in conventional log replay, but that's an ugly interface
567 val.s -= sizeof(row_delta_marker<row_type>);
568 val.len += sizeof(row_delta_marker<row_type>);
569 row_type* new_value = row_type::create1(val, ts | 1, ti);
570 row_delta_marker<row_type>* dm = row_get_delta_marker(new_value, true);
571 dm->marker_type_ = row_marker::mt_delta;
572 dm->prev_ts_ = prev_ts;
573 dm->prev_ = *cur_value;
574 *cur_value = new_value;
575 ti.mark(tc_replay_create_delta);
579 while (value && row_is_delta_marker(value)) {
580 row_type **prev = 0, **trav = &value;
581 while (*trav && row_is_delta_marker(*trav)) {
583 trav = &row_get_delta_marker(*trav)->prev_;
586 && row_get_delta_marker(*prev)->prev_ts_ == (*trav)->timestamp()) {
587 row_type *old_prev = *prev;
588 Str req = old_prev->col(0);
589 req.s += sizeof(row_delta_marker<row_type>);
590 req.len -= sizeof(row_delta_marker<row_type>);
591 const lcdf::Json* end_req = parse_changeset(req, jrepo);
592 *prev = (*trav)->update(jrepo.data(), end_req, old_prev->timestamp() - 1, ti);
594 (*trav)->deallocate(ti);
595 old_prev->deallocate(ti);
596 ti.mark(tc_replay_remove_delta);
604 logreplay::info() const
607 x.first_epoch = x.last_epoch = x.wake_epoch = x.min_post_quiescent_wake_epoch = 0;
610 const char *buf = buf_, *end = buf_ + size_;
612 bool log_corrupt = false;
613 while (buf + sizeof(logrec_base) <= end) {
614 const logrec_base *lr = reinterpret_cast<const logrec_base *>(buf);
615 if (unlikely(lr->size_ < sizeof(logrec_base))) {
618 } else if (unlikely(buf + lr->size_ > end))
620 x.quiescent = lr->command_ == logcmd_quiesce;
621 if (lr->command_ == logcmd_epoch) {
622 const logrec_epoch *lre =
623 reinterpret_cast<const logrec_epoch *>(buf);
624 if (unlikely(lre->size_ < sizeof(*lre))) {
629 x.first_epoch = lre->epoch_;
630 x.last_epoch = lre->epoch_;
631 if (x.wake_epoch && x.wake_epoch > x.last_epoch) // wrap-around
633 } else if (lr->command_ == logcmd_wake)
634 x.wake_epoch = x.last_epoch;
636 else if (lr->command_ != logcmd_put
637 && lr->command_ != logcmd_replace
638 && lr->command_ != logcmd_modify
639 && lr->command_ != logcmd_remove
640 && lr->command_ != logcmd_quiesce) {
649 fprintf(stderr, "replay %s: %" PRIdOFF_T " records, first %" PRIu64 ", last %" PRIu64 ", wake %" PRIu64 "%s%s @%zu\n",
650 filename_.c_str(), nr, x.first_epoch.value(),
651 x.last_epoch.value(), x.wake_epoch.value(),
652 x.quiescent ? ", quiescent" : "",
653 log_corrupt ? ", CORRUPT" : "", buf - buf_);
658 logreplay::min_post_quiescent_wake_epoch(kvepoch_t quiescent_epoch) const
661 const char *buf = buf_, *end = buf_ + size_;
662 bool log_corrupt = false;
663 while (buf + sizeof(logrec_base) <= end) {
664 const logrec_base *lr = reinterpret_cast<const logrec_base *>(buf);
665 if (unlikely(lr->size_ < sizeof(logrec_base))) {
668 } else if (unlikely(buf + lr->size_ > end))
670 if (lr->command_ == logcmd_epoch) {
671 const logrec_epoch *lre =
672 reinterpret_cast<const logrec_epoch *>(buf);
673 if (unlikely(lre->size_ < sizeof(*lre))) {
678 } else if (lr->command_ == logcmd_wake
680 && e >= quiescent_epoch)
689 logreplay::replayandclean1(kvepoch_t min_epoch, kvepoch_t max_epoch,
693 const char *pos = buf_, *end = buf_ + size_;
694 const char *repbegin = 0, *repend = 0;
696 std::vector<lcdf::Json> jrepo;
700 const char *nextpos = lr.extract(pos, end);
701 if (lr.command == logcmd_none) {
702 fprintf(stderr, "replay %s: %" PRIu64 " entries replayed, CORRUPT @%zu\n",
703 filename_.c_str(), nr, pos - buf_);
706 if (lr.command == logcmd_epoch) {
707 if ((min_epoch && lr.epoch < min_epoch)
708 || (!min_epoch && !repbegin))
710 if (lr.epoch >= max_epoch) {
711 always_assert(repbegin);
716 if (!lr.epoch || (min_epoch && lr.epoch < min_epoch)) {
722 // replay only part of log after checkpoint
723 // could replay everything, the if() here tests
724 // correctness of checkpoint scheme.
727 if (lr.key.len) { // skip empty entry
728 if (lr.command == logcmd_put
729 || lr.command == logcmd_replace
730 || lr.command == logcmd_modify
731 || lr.command == logcmd_remove)
732 lr.run(tree->table(), jrepo, *ti);
734 if (nr % 100000 == 0)
736 "replay %s: %" PRIu64 " entries replayed\n",
737 filename_.c_str(), nr);
742 // rewrite portion of log
744 repbegin = repend = buf_;
746 fprintf(stderr, "replay %s: surprise repend\n", filename_.c_str());
751 int r = snprintf(tmplog, sizeof(tmplog), "%s.tmp", filename_.c_str());
752 always_assert(r >= 0 && size_t(r) < sizeof(tmplog));
754 printf("replay %s: truncate from %" PRIdOFF_T " to %" PRIdSIZE_T " [%" PRIdSIZE_T ",%" PRIdSIZE_T ")\n",
755 filename_.c_str(), size_, repend - repbegin,
756 repbegin - buf_, repend - buf_);
758 bool need_copy = repbegin != buf_;
761 fd = replay_truncate(repend - repbegin);
763 fd = replay_copy(tmplog, repbegin, repend);
766 always_assert(r == 0);
768 always_assert(r == 0);
770 // replace old log with rewritten log
775 r = rename(tmplog, filename_.c_str());
777 fprintf(stderr, "replay %s: %s\n", filename_.c_str(), strerror(errno));
786 logreplay::replay_truncate(size_t len)
788 int fd = open(filename_.c_str(), O_RDWR);
790 fprintf(stderr, "replay %s: %s\n", filename_.c_str(), strerror(errno));
795 int r = fstat(fd, &sb);
797 fprintf(stderr, "replay %s: %s\n", filename_.c_str(), strerror(errno));
799 } else if (sb.st_size < off_t(len)) {
800 fprintf(stderr, "replay %s: bad length %" PRIdOFF_T "\n", filename_.c_str(), sb.st_size);
804 r = ftruncate(fd, len);
806 fprintf(stderr, "replay %s: truncate: %s\n", filename_.c_str(), strerror(errno));
810 off_t off = lseek(fd, len, SEEK_SET);
811 if (off == (off_t) -1) {
812 fprintf(stderr, "replay %s: seek: %s\n", filename_.c_str(), strerror(errno));
820 logreplay::replay_copy(const char *tmpname, const char *first, const char *last)
822 int fd = creat(tmpname, 0666);
824 fprintf(stderr, "replay %s: create: %s\n", tmpname, strerror(errno));
828 ssize_t w = safe_write(fd, first, last - first);
829 always_assert(w >= 0 && w == last - first);
835 logreplay::replay(int which, threadinfo *ti)
837 waituntilphase(REC_LOG_TS);
838 // find the maximum timestamp of entries in the log
840 info_type x = info();
841 pthread_mutex_lock(&rec_mu);
842 rec_log_infos[which] = x;
843 pthread_mutex_unlock(&rec_mu);
847 waituntilphase(REC_LOG_ANALYZE_WAKE);
849 if (rec_replay_min_quiescent_last_epoch
850 && rec_replay_min_quiescent_last_epoch <= rec_log_infos[which].wake_epoch)
851 rec_log_infos[which].min_post_quiescent_wake_epoch =
852 min_post_quiescent_wake_epoch(rec_replay_min_quiescent_last_epoch);
856 waituntilphase(REC_LOG_REPLAY);
859 uint64_t nr = replayandclean1(rec_replay_min_epoch, rec_replay_max_epoch, ti);
861 printf("recovered %" PRIu64 " records from %s\n", nr, filename_.c_str());