2 * Eddie Kohler, Yandong Mao, Robert Morris
3 * Copyright (c) 2012-2014 President and Fellows of Harvard College
4 * Copyright (c) 2012-2014 Massachusetts Institute of Technology
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, subject to the conditions
9 * listed in the Masstree LICENSE file. These conditions include: you must
10 * preserve this copyright notice, and you cannot mention the copyright
11 * holders in advertising related to the Software without their permission.
12 * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13 * notice is a summary of the Masstree LICENSE file; the license in that file
17 // kvd: key/value server
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27 #include <netinet/tcp.h>
28 #include <sys/select.h>
33 #include <sys/epoll.h>
36 #include <asm-generic/mman.h>
48 #include "nodeversion.hh"
52 #include "kvrandom.hh"
55 #include "checkpoint.hh"
58 #include "query_masstree.hh"
59 #include "masstree_tcursor.hh"
60 #include "masstree_insert.hh"
61 #include "masstree_remove.hh"
62 #include "masstree_scan.hh"
66 using lcdf::StringAccum;
68 enum { CKState_Quit, CKState_Uninit, CKState_Ready, CKState_Go };
70 volatile bool timeout[2] = {false, false};
71 double duration[2] = {10, 0};
73 Masstree::default_table *tree;
75 // all default to the number of cores
76 static int udpthreads = 0;
77 static int tcpthreads = 0;
78 static int nckthreads = 0;
79 static int testthreads = 0;
80 static int nlogger = 0;
81 static std::vector<int> cores;
83 static bool logging = true;
84 static bool pinthreads = false;
85 static bool recovery_only = false;
86 volatile uint64_t globalepoch = 1; // global epoch, updated by main thread regularly
87 static int port = 2117;
88 static uint64_t test_limit = ~uint64_t(0);
89 static int doprint = 0;
90 int kvtest_first_seed = 31949;
92 static volatile sig_atomic_t go_quit = 0;
93 static int quit_pipe[2];
95 static std::vector<const char*> logdirs;
96 static std::vector<const char*> ckpdirs;
99 volatile bool recovering = false; // so don't add log entries, and free old value immediately
101 static double checkpoint_interval = 1000000;
102 static kvepoch_t ckp_gen = 0; // recover from checkpoint
103 static ckstate *cks = NULL; // checkpoint status of all checkpointing threads
104 static pthread_cond_t rec_cond;
105 pthread_mutex_t rec_mu;
106 static int rec_nactive;
107 static int rec_state = REC_NONE;
109 kvtimestamp_t initial_timestamp;
111 static pthread_cond_t checkpoint_cond;
112 static pthread_mutex_t checkpoint_mu;
114 static void prepare_thread(threadinfo *ti);
115 static int* tcp_thread_pipes;
116 static void* tcp_threadfunc(threadinfo* ti);
117 static void* udp_threadfunc(threadinfo* ti);
119 static void log_init();
120 static void recover(threadinfo *);
121 static kvepoch_t read_checkpoint(threadinfo*, const char *path);
123 static void* conc_checkpointer(threadinfo* ti);
124 static void recovercheckpoint(threadinfo *ti);
126 static void *canceling(void *);
127 static void catchint(int);
128 static void epochinc(int);
130 /* running local tests */
131 void test_timeout(int) {
133 for (n = 0; n < arraysize(timeout) && timeout[n]; ++n)
135 if (n < arraysize(timeout)) {
137 if (n + 1 < arraysize(timeout) && duration[n + 1])
138 xalarm(duration[n + 1]);
142 struct kvtest_client {
144 : checks_(0), kvo_() {
146 kvtest_client(const char *testname)
147 : testname_(testname), checks_(0), kvo_() {
153 int nthreads() const {
156 void set_thread(threadinfo *ti) {
159 void register_timeouts(int n) {
160 always_assert(n <= (int) arraysize(::timeout));
161 for (int i = 1; i < n; ++i)
162 if (duration[i] == 0)
163 duration[i] = 0;//duration[i - 1];
165 bool timeout(int which) const {
166 return ::timeout[which];
168 uint64_t limit() const {
171 Json param(const String&) const {
178 void get(long ikey, Str *value);
179 void get(const Str &key);
180 void get(long ikey) {
181 quick_istr key(ikey);
184 void get_check(const Str &key, const Str &expected);
185 void get_check(const char *key, const char *expected) {
186 get_check(Str(key, strlen(key)), Str(expected, strlen(expected)));
188 void get_check(long ikey, long iexpected) {
189 quick_istr key(ikey), expected(iexpected);
190 get_check(key.string(), expected.string());
192 void get_check_key8(long ikey, long iexpected) {
193 quick_istr key(ikey, 8), expected(iexpected);
194 get_check(key.string(), expected.string());
196 void get_check_key10(long ikey, long iexpected) {
197 quick_istr key(ikey, 10), expected(iexpected);
198 get_check(key.string(), expected.string());
200 void get_col_check(const Str &key, int col, const Str &expected);
201 void get_col_check_key10(long ikey, int col, long iexpected) {
202 quick_istr key(ikey, 10), expected(iexpected);
203 get_col_check(key.string(), col, expected.string());
205 bool get_sync(long ikey);
207 void put(const Str &key, const Str &value);
208 void put(const char *key, const char *val) {
209 put(Str(key, strlen(key)), Str(val, strlen(val)));
211 void put(long ikey, long ivalue) {
212 quick_istr key(ikey), value(ivalue);
213 put(key.string(), value.string());
215 void put_key8(long ikey, long ivalue) {
216 quick_istr key(ikey, 8), value(ivalue);
217 put(key.string(), value.string());
219 void put_key10(long ikey, long ivalue) {
220 quick_istr key(ikey, 10), value(ivalue);
221 put(key.string(), value.string());
223 void put_col(const Str &key, int col, const Str &value);
224 void put_col_key10(long ikey, int col, long ivalue) {
225 quick_istr key(ikey, 10), value(ivalue);
226 put_col(key.string(), col, value.string());
229 bool remove_sync(long ikey);
237 String make_message(StringAccum &sa) const;
238 void notice(const char *fmt, ...);
239 void fail(const char *fmt, ...);
240 const Json& report(const Json& x) {
241 return report_.merge(x);
244 fprintf(stderr, "%d: %s\n", ti_->index(), report_.unparse().c_str());
247 query<row_type> q_[10];
248 const char *testname_;
249 kvrandom_lcg_nr rand;
253 static volatile int failing;
256 volatile int kvtest_client::failing;
258 void kvtest_client::get(long ikey, Str *value)
260 quick_istr key(ikey);
261 if (!q_[0].run_get1(tree->table(), key.string(), 0, *value, *ti_))
265 void kvtest_client::get(const Str &key)
268 (void) q_[0].run_get1(tree->table(), key, 0, val, *ti_);
271 void kvtest_client::get_check(const Str &key, const Str &expected)
274 if (!q_[0].run_get1(tree->table(), key, 0, val, *ti_)) {
275 fail("get(%.*s) failed (expected %.*s)\n", key.len, key.s, expected.len, expected.s);
278 if (val.len != expected.len || memcmp(val.s, expected.s, val.len) != 0)
279 fail("get(%.*s) returned unexpected value %.*s (expected %.*s)\n", key.len, key.s,
280 std::min(val.len, 40), val.s, std::min(expected.len, 40), expected.s);
285 void kvtest_client::get_col_check(const Str &key, int col, const Str &expected)
288 if (!q_[0].run_get1(tree->table(), key, col, val, *ti_)) {
289 fail("get.%d(%.*s) failed (expected %.*s)\n", col, key.len, key.s,
290 expected.len, expected.s);
293 if (val.len != expected.len || memcmp(val.s, expected.s, val.len) != 0)
294 fail("get.%d(%.*s) returned unexpected value %.*s (expected %.*s)\n",
295 col, key.len, key.s, std::min(val.len, 40), val.s,
296 std::min(expected.len, 40), expected.s);
301 bool kvtest_client::get_sync(long ikey) {
302 quick_istr key(ikey);
304 return q_[0].run_get1(tree->table(), key.string(), 0, val, *ti_);
307 void kvtest_client::put(const Str &key, const Str &value) {
310 q_[0].run_replace(tree->table(), key, value, *ti_);
311 if (ti_->logger()) // NB may block
312 ti_->logger()->record(logcmd_replace, q_[0].query_times(), key, value);
315 void kvtest_client::put_col(const Str &key, int col, const Str &value) {
318 #if !MASSTREE_ROW_TYPE_STR
320 kvo_ = new_kvout(-1, 2048);
321 Json req[2] = {Json(col), Json(String::make_stable(value))};
322 (void) q_[0].run_put(tree->table(), key, &req[0], &req[2], *ti_);
323 if (ti_->logger()) // NB may block
324 ti_->logger()->record(logcmd_put, q_[0].query_times(), key,
327 (void) key, (void) col, (void) value;
332 bool kvtest_client::remove_sync(long ikey) {
333 quick_istr key(ikey);
334 bool removed = q_[0].run_remove(tree->table(), key.string(), *ti_);
335 if (removed && ti_->logger()) // NB may block
336 ti_->logger()->record(logcmd_remove, q_[0].query_times(), key.string(), Str());
340 String kvtest_client::make_message(StringAccum &sa) const {
341 const char *begin = sa.begin();
342 while (begin != sa.end() && isspace((unsigned char) *begin))
344 String s = String(begin, sa.end());
345 if (!s.empty() && s.back() != '\n')
350 void kvtest_client::notice(const char *fmt, ...) {
353 String m = make_message(StringAccum().vsnprintf(500, fmt, val));
356 fprintf(stderr, "%d: %s", ti_->index(), m.c_str());
359 void kvtest_client::fail(const char *fmt, ...) {
360 static nodeversion failing_lock(false);
361 static nodeversion fail_message_lock(false);
362 static String fail_message;
367 String m = make_message(StringAccum().vsnprintf(500, fmt, val));
370 m = "unknown failure";
372 fail_message_lock.lock();
373 if (fail_message != m) {
375 fprintf(stderr, "%d: %s", ti_->index(), m.c_str());
377 fail_message_lock.unlock();
381 fprintf(stdout, "%d: %s", ti_->index(), m.c_str());
382 tree->print(stdout, 0);
389 static void* testgo(threadinfo* ti) {
390 kvtest_client *kc = (kvtest_client*) ti->thread_data();
391 prepare_thread(kc->ti_);
393 if (strcmp(kc->testname_, "rw1") == 0)
395 else if (strcmp(kc->testname_, "rw2") == 0)
397 else if (strcmp(kc->testname_, "rw3") == 0)
399 else if (strcmp(kc->testname_, "rw4") == 0)
401 else if (strcmp(kc->testname_, "rwsmall24") == 0)
402 kvtest_rwsmall24(*kc);
403 else if (strcmp(kc->testname_, "rwsep24") == 0)
405 else if (strcmp(kc->testname_, "palma") == 0)
407 else if (strcmp(kc->testname_, "palmb") == 0)
409 else if (strcmp(kc->testname_, "rw16") == 0)
411 else if (strcmp(kc->testname_, "rw5") == 0
412 || strcmp(kc->testname_, "rw1fixed") == 0)
413 kvtest_rw1fixed(*kc);
414 else if (strcmp(kc->testname_, "ycsbk") == 0)
416 else if (strcmp(kc->testname_, "wd1") == 0)
417 kvtest_wd1(10000000, 1, *kc);
418 else if (strcmp(kc->testname_, "wd1check") == 0)
419 kvtest_wd1_check(10000000, 1, *kc);
420 else if (strcmp(kc->testname_, "w1") == 0)
421 kvtest_w1_seed(*kc, kvtest_first_seed + kc->id());
422 else if (strcmp(kc->testname_, "r1") == 0)
423 kvtest_r1_seed(*kc, kvtest_first_seed + kc->id());
424 else if (strcmp(kc->testname_, "wcol1") == 0)
425 kvtest_wcol1at(*kc, kc->id() % 24, kvtest_first_seed + kc->id() % 48, 5000000);
426 else if (strcmp(kc->testname_, "rcol1") == 0)
427 kvtest_rcol1at(*kc, kc->id() % 24, kvtest_first_seed + kc->id() % 48, 5000000);
429 kc->fail("unknown test '%s'", kc->testname_);
433 static const char * const kvstats_name[] = {
434 "ops", "ops_per_sec", "puts", "gets", "scans", "puts_per_sec", "gets_per_sec", "scans_per_sec"
437 void runtest(const char *testname, int nthreads) {
438 std::vector<kvtest_client> clients(nthreads, kvtest_client(testname));
439 ::testthreads = nthreads;
440 for (int i = 0; i < nthreads; ++i)
441 clients[i].set_thread(threadinfo::make(threadinfo::TI_PROCESS, i));
442 bzero((void *)timeout, sizeof(timeout));
443 signal(SIGALRM, test_timeout);
446 for (int i = 0; i < nthreads; ++i) {
447 int r = clients[i].ti_->run(testgo, &clients[i]);
448 always_assert(r == 0);
450 for (int i = 0; i < nthreads; ++i)
451 pthread_join(clients[i].ti_->threadid(), 0);
453 kvstats kvs[arraysize(kvstats_name)];
454 for (int i = 0; i < nthreads; ++i)
455 for (int j = 0; j < (int) arraysize(kvstats_name); ++j)
456 if (double x = clients[i].report_.get_d(kvstats_name[j]))
458 for (int j = 0; j < (int) arraysize(kvstats_name); ++j)
459 kvs[j].print_report(kvstats_name[j]);
465 enum { inbufsz = 20 * 1024, inbufrefill = 16 * 1024 };
468 : fd(s), inbuf_(new char[inbufsz]),
469 inbufpos_(0), inbuflen_(0), kvout(new_kvout(s, 20 * 1024)),
476 for (char* x : oldinbuf_)
481 while (!parser_.done() && check(2))
482 inbufpos_ += parser_.consume(inbuf_ + inbufpos_,
483 inbuflen_ - inbufpos_,
484 String::make_stable(inbuf_, inbufsz));
485 if (parser_.success() && parser_.result().is_a())
488 parser_.result() = Json();
489 return parser_.result();
492 int check(int tryhard) {
493 if (inbufpos_ == inbuflen_ && tryhard)
495 return inbuflen_ - inbufpos_;
498 uint64_t xposition() const {
499 return inbuftotal_ + inbufpos_;
501 Str recent_string(uint64_t xposition) const {
502 if (xposition - inbuftotal_ <= unsigned(inbufpos_))
503 return Str(inbuf_ + (xposition - inbuftotal_),
513 std::vector<char*> oldinbuf_;
514 msgpack::streaming_parser parser_;
518 uint64_t inbuftotal_;
520 void hard_check(int tryhard);
523 void conn::hard_check(int tryhard) {
524 masstree_precondition(inbufpos_ == inbuflen_);
525 if (parser_.empty()) {
526 inbuftotal_ += inbufpos_;
527 inbufpos_ = inbuflen_ = 0;
528 for (auto x : oldinbuf_)
531 } else if (inbufpos_ == inbufsz) {
532 oldinbuf_.push_back(inbuf_);
533 inbuf_ = new char[inbufsz];
534 inbuftotal_ += inbufpos_;
535 inbufpos_ = inbuflen_ = 0;
541 struct timeval tv = {0, 0};
542 if (select(fd + 1, &rfds, NULL, NULL, &tv) <= 0)
547 ssize_t r = read(fd, inbuf_ + inbufpos_, inbufsz - inbufpos_);
560 enum { clp_val_suffixdouble = Clp_ValFirstUser };
561 enum { opt_nolog = 1, opt_pin, opt_logdir, opt_port, opt_ckpdir, opt_duration,
562 opt_test, opt_test_name, opt_threads, opt_cores,
563 opt_print, opt_norun, opt_checkpoint, opt_limit };
564 static const Clp_Option options[] = {
565 { "no-log", 0, opt_nolog, 0, 0 },
566 { 0, 'n', opt_nolog, 0, 0 },
567 { "no-run", 0, opt_norun, 0, 0 },
568 { "pin", 'p', opt_pin, 0, Clp_Negate },
569 { "logdir", 0, opt_logdir, Clp_ValString, 0 },
570 { "ld", 0, opt_logdir, Clp_ValString, 0 },
571 { "checkpoint", 'c', opt_checkpoint, Clp_ValDouble, Clp_Optional | Clp_Negate },
572 { "ckp", 0, opt_checkpoint, Clp_ValDouble, Clp_Optional | Clp_Negate },
573 { "ckpdir", 0, opt_ckpdir, Clp_ValString, 0 },
574 { "ckdir", 0, opt_ckpdir, Clp_ValString, 0 },
575 { "cd", 0, opt_ckpdir, Clp_ValString, 0 },
576 { "port", 0, opt_port, Clp_ValInt, 0 },
577 { "duration", 'd', opt_duration, Clp_ValDouble, 0 },
578 { "limit", 'l', opt_limit, clp_val_suffixdouble, 0 },
579 { "test", 0, opt_test, Clp_ValString, 0 },
580 { "test-rw1", 0, opt_test_name, 0, 0 },
581 { "test-rw2", 0, opt_test_name, 0, 0 },
582 { "test-rw3", 0, opt_test_name, 0, 0 },
583 { "test-rw4", 0, opt_test_name, 0, 0 },
584 { "test-rw5", 0, opt_test_name, 0, 0 },
585 { "test-rw16", 0, opt_test_name, 0, 0 },
586 { "test-palm", 0, opt_test_name, 0, 0 },
587 { "test-ycsbk", 0, opt_test_name, 0, 0 },
588 { "test-rw1fixed", 0, opt_test_name, 0, 0 },
589 { "threads", 'j', opt_threads, Clp_ValInt, 0 },
590 { "cores", 0, opt_cores, Clp_ValString, 0 },
591 { "print", 0, opt_print, 0, Clp_Negate }
595 main(int argc, char *argv[])
598 int s, ret, yes = 1, i = 1, firstcore = -1, corestride = 1;
599 const char *dotest = 0;
600 nlogger = tcpthreads = udpthreads = nckthreads = sysconf(_SC_NPROCESSORS_ONLN);
601 Clp_Parser *clp = Clp_NewParser(argc, argv, (int) arraysize(options), options);
602 Clp_AddType(clp, clp_val_suffixdouble, Clp_DisallowOptions, clp_parse_suffixdouble, 0);
604 while ((opt = Clp_Next(clp)) >= 0) {
610 pinthreads = !clp->negated;
613 nlogger = tcpthreads = udpthreads = nckthreads = clp->val.i;
616 const char *s = strtok((char *) clp->vstr, ",");
617 for (; s; s = strtok(NULL, ","))
618 logdirs.push_back(s);
622 const char *s = strtok((char *) clp->vstr, ",");
623 for (; s; s = strtok(NULL, ","))
624 ckpdirs.push_back(s);
628 if (clp->negated || (clp->have_val && clp->val.d <= 0))
629 checkpoint_interval = -1;
630 else if (clp->have_val)
631 checkpoint_interval = clp->val.d;
633 checkpoint_interval = 30;
639 duration[0] = clp->val.d;
642 test_limit = (uint64_t) clp->val.d;
648 dotest = clp->option->long_name + 5;
651 doprint = !clp->negated;
654 if (firstcore >= 0 || cores.size() > 0) {
655 Clp_OptionError(clp, "%<%O%> already given");
658 const char *plus = strchr(clp->vstr, '+');
659 Json ij = Json::parse(clp->vstr),
660 aj = Json::parse(String("[") + String(clp->vstr) + String("]")),
661 pj1 = Json::parse(plus ? String(clp->vstr, plus) : "x"),
662 pj2 = Json::parse(plus ? String(plus + 1) : "x");
663 for (int i = 0; aj && i < aj.size(); ++i)
664 if (!aj[i].is_int() || aj[i].to_i() < 0)
666 if (ij && ij.is_int() && ij.to_i() >= 0)
667 firstcore = ij.to_i(), corestride = 1;
668 else if (pj1 && pj2 && pj1.is_int() && pj1.to_i() >= 0 && pj2.is_int())
669 firstcore = pj1.to_i(), corestride = pj2.to_i();
671 for (int i = 0; i < aj.size(); ++i)
672 cores.push_back(aj[i].to_i());
674 Clp_OptionError(clp, "bad %<%O%>, expected %<CORE1%>, %<CORE1+STRIDE%>, or %<CORE1,CORE2,...%>");
680 recovery_only = true;
683 fprintf(stderr, "Usage: kvd [-np] [--ld dir1[,dir2,...]] [--cd dir1[,dir2,...]]\n");
687 Clp_DeleteParser(clp);
689 logdirs.push_back(".");
691 ckpdirs.push_back(".");
693 firstcore = cores.size() ? cores.back() + 1 : 0;
694 for (; (int) cores.size() < udpthreads; firstcore += corestride)
695 cores.push_back(firstcore);
698 signal(SIGINT, catchint);
700 // log epoch starts at 1
701 global_log_epoch = 1;
702 global_wake_epoch = 0;
703 log_epoch_interval.tv_sec = 0;
704 log_epoch_interval.tv_usec = 200000;
706 // increment the global epoch every second
708 signal(SIGALRM, epochinc);
709 struct itimerval etimer;
710 etimer.it_interval.tv_sec = 1;
711 etimer.it_interval.tv_usec = 0;
712 etimer.it_value.tv_sec = 1;
713 etimer.it_value.tv_usec = 0;
714 ret = setitimer(ITIMER_REAL, &etimer, NULL);
715 always_assert(ret == 0);
718 // arrange for a per-thread threadinfo pointer
719 ret = pthread_key_create(&threadinfo::key, 0);
720 always_assert(ret == 0);
722 // for parallel recovery
723 ret = pthread_cond_init(&rec_cond, 0);
724 always_assert(ret == 0);
725 ret = pthread_mutex_init(&rec_mu, 0);
726 always_assert(ret == 0);
728 // for waking up the checkpoint thread
729 ret = pthread_cond_init(&checkpoint_cond, 0);
730 always_assert(ret == 0);
731 ret = pthread_mutex_init(&checkpoint_mu, 0);
732 always_assert(ret == 0);
734 threadinfo *main_ti = threadinfo::make(threadinfo::TI_MAIN, -1);
737 initial_timestamp = timestamp();
738 tree = new Masstree::default_table;
739 tree->initialize(*main_ti);
740 printf("%s, %s, pin-threads %s, ", tree->name(), row_type::name(),
741 pinthreads ? "enabled" : "disabled");
743 printf("logging enabled\n");
747 printf("logging disabled\n");
750 // UDP threads, each with its own port.
752 printf("0 udp threads\n");
753 else if (udpthreads == 1)
754 printf("1 udp thread (port %d)\n", port);
756 printf("%d udp threads (ports %d-%d)\n", udpthreads, port, port + udpthreads - 1);
757 for(i = 0; i < udpthreads; i++){
758 threadinfo *ti = threadinfo::make(threadinfo::TI_PROCESS, i);
759 ret = ti->run(udp_threadfunc);
760 always_assert(ret == 0);
764 if (strcmp(dotest, "palm") == 0) {
766 runtest("palmb", tcpthreads);
768 runtest(dotest, tcpthreads);
771 tree->print(stdout, 0);
775 // TCP socket and threads
777 s = socket(AF_INET, SOCK_STREAM, 0);
778 always_assert(s >= 0);
779 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
780 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
782 struct sockaddr_in sin;
783 sin.sin_family = AF_INET;
784 sin.sin_addr.s_addr = INADDR_ANY;
785 sin.sin_port = htons(port);
786 ret = bind(s, (struct sockaddr *) &sin, sizeof(sin));
792 ret = listen(s, 100);
798 threadinfo **tcpti = new threadinfo *[tcpthreads];
799 tcp_thread_pipes = new int[tcpthreads * 2];
800 printf("%d tcp threads (port %d)\n", tcpthreads, port);
801 for(i = 0; i < tcpthreads; i++){
802 threadinfo *ti = threadinfo::make(threadinfo::TI_PROCESS, i);
803 ret = pipe(&tcp_thread_pipes[i * 2]);
804 always_assert(ret == 0);
805 ret = ti->run(tcp_threadfunc);
806 always_assert(ret == 0);
809 // Create a canceling thread.
810 ret = pipe(quit_pipe);
813 pthread_create(&tid, NULL, canceling, NULL);
818 struct sockaddr_in sin1;
819 socklen_t sinlen = sizeof(sin1);
821 bzero(&sin1, sizeof(sin1));
822 s1 = accept(s, (struct sockaddr *) &sin1, &sinlen);
823 always_assert(s1 >= 0);
824 setsockopt(s1, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
826 // Complete handshake.
828 ssize_t nr = read(s1, buf, BUFSIZ);
836 msgpack::streaming_parser sp;
837 if (nr == 0 || sp.consume(buf, nr) != (size_t) nr
838 || !sp.result().is_a() || sp.result().size() < 2
839 || !sp.result()[1].is_i() || sp.result()[1].as_i() != Cmd_Handshake) {
840 fprintf(stderr, "failed handshake\n");
841 goto kill_connection;
844 int target_core = -1;
845 if (sp.result().size() >= 3 && sp.result()[2].is_o()
846 && sp.result()[2]["core"].is_i())
847 target_core = sp.result()[2]["core"].as_i();
848 if (target_core < 0 || target_core >= tcpthreads) {
849 target_core = next % tcpthreads;
853 conninfo* ci = new conninfo;
855 swap(ci->handshake, sp.result());
857 ssize_t w = write(tcp_thread_pipes[2*target_core + 1], &ci, sizeof(ci));
858 always_assert((size_t) w == sizeof(ci));
867 // Does not matter if the write fails (when the pipe is full)
868 int r = write(quit_pipe[1], &cmd, sizeof(cmd));
872 inline const char *threadtype(int type) {
874 case threadinfo::TI_MAIN:
876 case threadinfo::TI_PROCESS:
878 case threadinfo::TI_LOG:
880 case threadinfo::TI_CHECKPOINT:
883 always_assert(0 && "Unknown threadtype");
892 int r = read(quit_pipe[0], &cmd, sizeof(cmd));
894 assert(r == sizeof(cmd) && cmd == 0);
895 // Cancel wake up checkpointing threads
896 pthread_mutex_lock(&checkpoint_mu);
897 pthread_cond_signal(&checkpoint_cond);
898 pthread_mutex_unlock(&checkpoint_mu);
900 pthread_t me = pthread_self();
901 fprintf(stderr, "\n");
902 // cancel outstanding threads. Checkpointing threads will exit safely
903 // when the checkpointing thread 0 sees go_quit, and don't need cancel
904 for (threadinfo *ti = threadinfo::allthreads; ti; ti = ti->next())
905 if (ti->purpose() != threadinfo::TI_MAIN
906 && ti->purpose() != threadinfo::TI_CHECKPOINT
907 && !pthread_equal(me, ti->threadid())) {
908 int r = pthread_cancel(ti->threadid());
909 always_assert(r == 0);
912 // join canceled threads
913 for (threadinfo *ti = threadinfo::allthreads; ti; ti = ti->next())
914 if (ti->purpose() != threadinfo::TI_MAIN
915 && !pthread_equal(me, ti->threadid())) {
916 fprintf(stderr, "joining thread %s:%d\n",
917 threadtype(ti->purpose()), ti->index());
918 int r = pthread_join(ti->threadid(), 0);
919 always_assert(r == 0);
931 // Return 1 if success, -1 if I/O error or protocol unmatch
932 int handshake(Json& request, threadinfo& ti) {
933 always_assert(request.is_a() && request.size() >= 2
934 && request[1].is_i() && request[1].as_i() == Cmd_Handshake
935 && (request.size() == 2 || request[2].is_o()));
936 if (request.size() >= 2
937 && request[2]["maxkeylen"].is_i()
938 && request[2]["maxkeylen"].as_i() > MASSTREE_MAXKEYLEN) {
940 request[3] = "bad maxkeylen";
944 request[3] = ti.index();
945 request[4] = row_type::name();
948 request[1] = Cmd_Handshake + 1;
949 return request[2].as_b() ? 1 : -1;
952 // execute command, return result.
953 int onego(query<row_type>& q, Json& request, Str request_str, threadinfo& ti) {
954 int command = request[1].as_i();
955 if (command == Cmd_Checkpoint) {
957 pthread_mutex_lock(&checkpoint_mu);
958 pthread_cond_broadcast(&checkpoint_cond);
959 pthread_mutex_unlock(&checkpoint_mu);
961 } else if (command == Cmd_Get) {
962 q.run_get(tree->table(), request, ti);
963 } else if (command == Cmd_Put && request.size() > 3
964 && (request.size() % 2) == 1) { // insert or update
965 Str key(request[2].as_s());
966 const Json* req = request.array_data() + 3;
967 const Json* end_req = request.end_array_data();
968 request[2] = q.run_put(tree->table(), request[2].as_s(),
970 if (ti.logger() && request_str) {
971 // use the client's parsed version of the request
972 msgpack::parser mp(request_str.data());
973 mp.skip_array_size().skip_primitives(3);
974 ti.logger()->record(logcmd_put, q.query_times(), key, Str(mp.position(), request_str.end()));
975 } else if (ti.logger())
976 ti.logger()->record(logcmd_put, q.query_times(), key, req, end_req);
978 } else if (command == Cmd_Replace) { // insert or update
979 Str key(request[2].as_s()), value(request[3].as_s());
980 request[2] = q.run_replace(tree->table(), key, value, ti);
981 if (ti.logger()) // NB may block
982 ti.logger()->record(logcmd_replace, q.query_times(), key, value);
984 } else if (command == Cmd_Remove) { // remove
985 Str key(request[2].as_s());
986 bool removed = q.run_remove(tree->table(), key, ti);
987 if (removed && ti.logger()) // NB may block
988 ti.logger()->record(logcmd_remove, q.query_times(), key, Str());
989 request[2] = removed;
991 } else if (command == Cmd_Scan) {
992 q.run_scan(tree->table(), request, ti);
998 request[1] = command + 1;
1002 #if HAVE_SYS_EPOLL_H
1006 tcpfds(int pipefd) {
1007 epollfd = epoll_create(10);
1009 perror("epoll_create");
1012 struct epoll_event ev;
1013 ev.events = EPOLLIN;
1014 ev.data.ptr = (void *) 1;
1015 int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, pipefd, &ev);
1016 always_assert(r == 0);
1019 enum { max_events = 100 };
1020 typedef struct epoll_event eventset[max_events];
1021 int wait(eventset &es) {
1022 return epoll_wait(epollfd, es, max_events, -1);
1025 conn *event_conn(eventset &es, int i) const {
1026 return (conn *) es[i].data.ptr;
1029 void add(int fd, conn *c) {
1030 struct epoll_event ev;
1031 ev.events = EPOLLIN;
1033 int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
1034 always_assert(r == 0);
1037 void remove(int fd) {
1038 int r = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
1039 always_assert(r == 0);
1046 std::vector<conn *> conns_;
1050 : nfds_(pipefd + 1) {
1051 always_assert(pipefd < FD_SETSIZE);
1053 FD_SET(pipefd, &rfds_);
1054 conns_.resize(nfds_, 0);
1055 conns_[pipefd] = (conn *) 1;
1058 typedef fd_set eventset;
1059 int wait(eventset &es) {
1061 int r = select(nfds_, &es, 0, 0, 0);
1062 return r > 0 ? nfds_ : r;
1065 conn *event_conn(eventset &es, int i) const {
1066 return FD_ISSET(i, &es) ? conns_[i] : 0;
1069 void add(int fd, conn *c) {
1070 always_assert(fd < FD_SETSIZE);
1074 conns_.resize(nfds_, 0);
1079 void remove(int fd) {
1080 always_assert(fd < FD_SETSIZE);
1082 if (fd == nfds_ - 1) {
1083 while (nfds_ > 0 && !FD_ISSET(nfds_ - 1, &rfds_))
1090 void prepare_thread(threadinfo *ti) {
1095 CPU_SET(cores[ti->index()], &cs);
1096 always_assert(sched_setaffinity(0, sizeof(cs), &cs) == 0);
1099 always_assert(!pinthreads && "pinthreads not supported\n");
1102 ti->set_logger(&logs->log(ti->index() % nlogger));
1105 void* tcp_threadfunc(threadinfo* ti) {
1108 int myfd = tcp_thread_pipes[2 * ti->index()];
1110 tcpfds::eventset events;
1111 std::deque<conn*> ready;
1115 int nev = sloop.wait(events);
1116 for (int i = 0; i < nev; i++)
1117 if (conn *c = sloop.event_conn(events, i))
1120 while (!ready.empty()) {
1121 conn* c = ready.front();
1124 if (c == (conn *) 1) {
1126 #define MAX_NEWCONN 100
1127 conninfo* ci[MAX_NEWCONN];
1128 ssize_t len = read(myfd, ci, sizeof(ci));
1129 always_assert(len > 0 && len % sizeof(int) == 0);
1130 for (int j = 0; j * sizeof(*ci) < (size_t) len; ++j) {
1131 struct conn *c = new conn(ci[j]->s);
1132 sloop.add(c->fd, c);
1133 int ret = handshake(ci[j]->handshake, *ti);
1134 msgpack::unparse(*c->kvout, ci[j]->handshake);
1137 sloop.remove(c->fd);
1143 // Should not block as suggested by epoll
1144 uint64_t xposition = c->xposition();
1145 Json& request = c->receive();
1147 if (unlikely(!request))
1150 ret = onego(q, request, c->recent_string(xposition), *ti);
1152 msgpack::unparse(*c->kvout, request);
1154 if (likely(ret >= 0)) {
1161 printf("socket read error\n");
1164 sloop.remove(c->fd);
1172 // serve a client udp socket, in a dedicated thread
1173 void* udp_threadfunc(threadinfo* ti) {
1177 struct sockaddr_in sin;
1178 bzero(&sin, sizeof(sin));
1179 sin.sin_family = AF_INET;
1180 sin.sin_port = htons(port + ti->index());
1182 int s = socket(AF_INET, SOCK_DGRAM, 0);
1183 always_assert(s >= 0);
1184 ret = bind(s, (struct sockaddr *) &sin, sizeof(sin));
1185 always_assert(ret == 0 && "bind failed");
1186 int sobuflen = 512*1024;
1187 setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sobuflen, sizeof(sobuflen));
1189 String buf = String::make_uninitialized(4096);
1190 struct kvout *kvout = new_bufkvout();
1191 msgpack::streaming_parser parser;
1196 struct sockaddr_in sin;
1197 socklen_t sinlen = sizeof(sin);
1198 ssize_t cc = recvfrom(s, const_cast<char*>(buf.data()), buf.length(),
1199 0, (struct sockaddr *) &sin, &sinlen);
1201 perror("udpgo read");
1207 unsigned consumed = parser.consume(buf.data(), buf.length(), buf);
1209 // Fail if we received a partial request
1210 if (parser.success() && parser.result().is_a()) {
1212 if (onego(q, parser.result(), Str(buf.data(), consumed), *ti) >= 0) {
1214 msgpack::unparser<StringAccum> cu(sa);
1215 cu << parser.result();
1216 cc = sendto(s, sa.data(), sa.length(), 0,
1217 (struct sockaddr*) &sin, sinlen);
1218 always_assert(cc == (ssize_t) sa.length());
1222 printf("onego failed\n");
1227 static String log_filename(const char* logdir, int logindex) {
1229 int r = stat(logdir, &sb);
1230 if (r < 0 && errno == ENOENT) {
1231 r = mkdir(logdir, 0777);
1233 fprintf(stderr, "%s: %s\n", logdir, strerror(errno));
1239 sa.snprintf(strlen(logdir) + 24, "%s/kvd-log-%d", logdir, logindex);
1240 return sa.take_string();
1246 logs = logset::make(nlogger);
1247 for (i = 0; i < nlogger; i++)
1248 logs->log(i).initialize(log_filename(logdirs[i % logdirs.size()], i));
1250 cks = (ckstate *)malloc(sizeof(ckstate) * nckthreads);
1251 for (i = 0; i < nckthreads; i++) {
1252 threadinfo *ti = threadinfo::make(threadinfo::TI_CHECKPOINT, i);
1253 cks[i].state = CKState_Uninit;
1255 ret = ti->run(conc_checkpointer);
1256 always_assert(ret == 0);
1260 // read a checkpoint, insert key/value pairs into tree.
1261 // must be followed by a read of the log!
1262 // since checkpoint is not consistent
1263 // with any one point in time.
1264 // returns the timestamp of the first log record that needs
1265 // to come from the log.
1266 kvepoch_t read_checkpoint(threadinfo *ti, const char *path) {
1269 int fd = open(path, 0);
1271 printf("no %s\n", path);
1275 int ret = fstat(fd, &sb);
1276 always_assert(ret == 0);
1277 char *p = (char *) mmap(0, sb.st_size, PROT_READ, MAP_FILE|MAP_PRIVATE, fd, 0);
1278 always_assert(p != MAP_FAILED);
1281 msgpack::parser par(String::make_stable(p, sb.st_size));
1284 std::cerr << j << "\n";
1285 always_assert(j["generation"].is_i() && j["size"].is_i());
1286 uint64_t gen = j["generation"].as_i();
1287 uint64_t n = j["size"].as_i();
1288 printf("reading checkpoint with %" PRIu64 " nodes\n", n);
1291 for (uint64_t i = 0; i != n; ++i)
1292 ckstate::insert(tree->table(), par, *ti);
1294 munmap(p, sb.st_size);
1296 printf("%.1f MB, %.2f sec, %.1f MB/sec\n",
1297 sb.st_size / 1000000.0,
1299 (sb.st_size / 1000000.0) / (t1 - t0));
1304 waituntilphase(int phase)
1306 always_assert(pthread_mutex_lock(&rec_mu) == 0);
1307 while (rec_state != phase)
1308 always_assert(pthread_cond_wait(&rec_cond, &rec_mu) == 0);
1309 always_assert(pthread_mutex_unlock(&rec_mu) == 0);
1315 always_assert(pthread_mutex_lock(&rec_mu) == 0);
1317 always_assert(pthread_cond_broadcast(&rec_cond) == 0);
1318 always_assert(pthread_mutex_unlock(&rec_mu) == 0);
1321 void recovercheckpoint(threadinfo *ti) {
1322 waituntilphase(REC_CKP);
1324 sprintf(path, "%s/kvd-ckp-%" PRId64 "-%d",
1325 ckpdirs[ti->index() % ckpdirs.size()],
1326 ckp_gen.value(), ti->index());
1327 kvepoch_t gen = read_checkpoint(ti, path);
1328 always_assert(ckp_gen == gen);
1333 recphase(int nactive, int state)
1335 rec_nactive = nactive;
1337 always_assert(pthread_cond_broadcast(&rec_cond) == 0);
1339 always_assert(pthread_cond_wait(&rec_cond, &rec_mu) == 0);
1342 // read the checkpoint file.
1343 // read each log file.
1344 // insert will ignore attempts to update with timestamps
1345 // less than what was in the entry from the checkpoint file.
1346 // so we don't have to do an explicit merge by time of the log files.
1348 recover(threadinfo *)
1351 // XXX: discard temporary checkpoint and ckp-gen files generated before crash
1353 // get the generation of the checkpoint from ckp-gen, if any
1355 sprintf(path, "%s/kvd-ckp-gen", ckpdirs[0]);
1357 rec_ckp_min_epoch = rec_ckp_max_epoch = 0;
1358 int fd = open(path, O_RDONLY);
1360 Json ckpj = Json::parse(read_file_contents(fd));
1362 if (ckpj && ckpj["kvdb_checkpoint"] && ckpj["generation"].is_number()) {
1363 ckp_gen = ckpj["generation"].to_u64();
1364 rec_ckp_min_epoch = ckpj["min_epoch"].to_u64();
1365 rec_ckp_max_epoch = ckpj["max_epoch"].to_u64();
1366 printf("recover from checkpoint %" PRIu64 " [%" PRIu64 ", %" PRIu64 "]\n", ckp_gen.value(), rec_ckp_min_epoch.value(), rec_ckp_max_epoch.value());
1369 printf("no %s\n", path);
1371 always_assert(pthread_mutex_lock(&rec_mu) == 0);
1373 // recover from checkpoint, and set timestamp of the checkpoint
1374 recphase(nckthreads, REC_CKP);
1376 // find minimum maximum timestamp of entries in each log
1377 rec_log_infos = new logreplay::info_type[nlogger];
1378 recphase(nlogger, REC_LOG_TS);
1380 // replay log entries, remove inconsistent entries, and append
1381 // an empty log entry with minimum timestamp
1383 // calculate log range
1385 // Maximum epoch seen in the union of the logs and the checkpoint. (We
1386 // don't commit a checkpoint until all logs are flushed past the
1387 // checkpoint's max_epoch.)
1388 kvepoch_t max_epoch = rec_ckp_max_epoch;
1390 max_epoch = max_epoch.next_nonzero();
1391 for (logreplay::info_type *it = rec_log_infos;
1392 it != rec_log_infos + nlogger; ++it)
1394 && (!max_epoch || max_epoch < it->last_epoch))
1395 max_epoch = it->last_epoch;
1397 // Maximum first_epoch seen in the logs. Full log information is not
1398 // available for epochs before max_first_epoch.
1399 kvepoch_t max_first_epoch = 0;
1400 for (logreplay::info_type *it = rec_log_infos;
1401 it != rec_log_infos + nlogger; ++it)
1403 && (!max_first_epoch || max_first_epoch < it->first_epoch))
1404 max_first_epoch = it->first_epoch;
1406 // Maximum epoch of all logged wake commands.
1407 kvepoch_t max_wake_epoch = 0;
1408 for (logreplay::info_type *it = rec_log_infos;
1409 it != rec_log_infos + nlogger; ++it)
1411 && (!max_wake_epoch || max_wake_epoch < it->wake_epoch))
1412 max_wake_epoch = it->wake_epoch;
1414 // Minimum last_epoch seen in QUIESCENT logs.
1415 kvepoch_t min_quiescent_last_epoch = 0;
1416 for (logreplay::info_type *it = rec_log_infos;
1417 it != rec_log_infos + nlogger; ++it)
1419 && (!min_quiescent_last_epoch || min_quiescent_last_epoch > it->last_epoch))
1420 min_quiescent_last_epoch = it->last_epoch;
1422 // If max_wake_epoch && min_quiescent_last_epoch <= max_wake_epoch, then a
1423 // wake command was missed by at least one quiescent log. We can't replay
1424 // anything at or beyond the minimum missed wake epoch. So record, for
1425 // each log, the minimum wake command that at least one quiescent thread
1427 if (max_wake_epoch && min_quiescent_last_epoch <= max_wake_epoch)
1428 rec_replay_min_quiescent_last_epoch = min_quiescent_last_epoch;
1430 rec_replay_min_quiescent_last_epoch = 0;
1431 recphase(nlogger, REC_LOG_ANALYZE_WAKE);
1433 // Calculate upper bound of epochs to replay.
1434 // This is the minimum of min_post_quiescent_wake_epoch (if any) and the
1435 // last_epoch of all non-quiescent logs.
1436 rec_replay_max_epoch = max_epoch;
1437 for (logreplay::info_type *it = rec_log_infos;
1438 it != rec_log_infos + nlogger; ++it) {
1441 && it->last_epoch < rec_replay_max_epoch)
1442 rec_replay_max_epoch = it->last_epoch;
1443 if (it->min_post_quiescent_wake_epoch
1444 && it->min_post_quiescent_wake_epoch < rec_replay_max_epoch)
1445 rec_replay_max_epoch = it->min_post_quiescent_wake_epoch;
1448 // Calculate lower bound of epochs to replay.
1449 rec_replay_min_epoch = rec_ckp_min_epoch;
1450 // XXX what about max_first_epoch?
1453 if (rec_ckp_min_epoch) {
1454 always_assert(rec_ckp_min_epoch > max_first_epoch);
1455 always_assert(rec_ckp_min_epoch < rec_replay_max_epoch);
1456 always_assert(rec_ckp_max_epoch < rec_replay_max_epoch);
1457 fprintf(stderr, "replay [%" PRIu64 ",%" PRIu64 ") from [%" PRIu64 ",%" PRIu64 ") into ckp [%" PRIu64 ",%" PRIu64 "]\n",
1458 rec_replay_min_epoch.value(), rec_replay_max_epoch.value(),
1459 max_first_epoch.value(), max_epoch.value(),
1460 rec_ckp_min_epoch.value(), rec_ckp_max_epoch.value());
1464 delete[] rec_log_infos;
1466 recphase(nlogger, REC_LOG_REPLAY);
1469 recphase(0, REC_DONE);
1471 // check that all delta markers have been recycled (leaving only remove
1472 // markers and real values)
1473 uint64_t deltas_created = 0, deltas_removed = 0;
1474 for (threadinfo *ti = threadinfo::allthreads; ti; ti = ti->next()) {
1475 deltas_created += ti->counter(tc_replay_create_delta);
1476 deltas_removed += ti->counter(tc_replay_remove_delta);
1479 fprintf(stderr, "deltas created: %" PRIu64 ", removed: %" PRIu64 "\n", deltas_created, deltas_removed);
1480 always_assert(deltas_created == deltas_removed);
1483 global_log_epoch = rec_replay_max_epoch.next_nonzero();
1485 always_assert(pthread_mutex_unlock(&rec_mu) == 0);
1492 writecheckpoint(const char *path, ckstate *c, double t0)
1495 printf("memory phase: %" PRIu64 " nodes, %.2f sec\n", c->count, t1 - t0);
1497 int fd = creat(path, 0666);
1498 always_assert(fd >= 0);
1500 // checkpoint file format, all msgpack:
1501 // {"generation": generation, "size": size, ...}
1502 // then `size` triples of key (string), timestmap (int), value (whatever)
1503 Json j = Json().set("generation", ckp_gen.value())
1504 .set("size", c->count)
1505 .set("firstkey", c->startkey);
1507 msgpack::unparse(sa, j);
1508 checked_write(fd, sa.data(), sa.length());
1509 checked_write(fd, c->vals->buf, c->vals->n);
1511 int ret = fsync(fd);
1512 always_assert(ret == 0);
1514 always_assert(ret == 0);
1517 c->bytes = c->vals->n;
1518 printf("file phase (%s): %" PRIu64 " bytes, %.2f sec, %.1f MB/sec\n",
1522 (c->bytes / 1000000.0) / (t2 - t1));
1526 conc_filecheckpoint(threadinfo *ti)
1528 ckstate *c = &cks[ti->index()];
1529 c->vals = new_bufkvout();
1531 tree->table().scan(c->startkey, true, *c, *ti);
1533 sprintf(path, "%s/kvd-ckp-%" PRId64 "-%d",
1534 ckpdirs[ti->index() % ckpdirs.size()],
1535 ckp_gen.value(), ti->index());
1536 writecheckpoint(path, c, t0);
1542 prepare_checkpoint(kvepoch_t min_epoch, int nckthreads, const Str *pv)
1545 j.set("kvdb_checkpoint", true)
1546 .set("min_epoch", min_epoch.value())
1547 .set("max_epoch", global_log_epoch.value())
1548 .set("generation", ckp_gen.value())
1549 .set("nckthreads", nckthreads);
1552 for (int i = 1; i < nckthreads; ++i)
1553 pvj.push_back(Json::make_string(pv[i].s, pv[i].len));
1554 j.set("pivots", pvj);
1560 commit_checkpoint(Json ckpj)
1562 // atomically commit a set of checkpoint files by incrementing
1563 // the checkpoint generation on disk
1565 sprintf(path, "%s/kvd-ckp-gen", ckpdirs[0]);
1566 int r = atomic_write_file_contents(path, ckpj.unparse());
1567 always_assert(r == 0);
1568 fprintf(stderr, "kvd-ckp-%" PRIu64 " [%s,%s]: committed\n",
1569 ckp_gen.value(), ckpj["min_epoch"].to_s().c_str(),
1570 ckpj["max_epoch"].to_s().c_str());
1572 // delete old checkpoint files
1573 for (int i = 0; i < nckthreads; i++) {
1575 sprintf(path, "%s/kvd-ckp-%" PRId64 "-%d",
1576 ckpdirs[i % ckpdirs.size()],
1577 ckp_gen.value() - 1, i);
1585 kvepoch_t mfe = 0, ge = global_log_epoch;
1586 for (int i = 0; i < nlogger; ++i) {
1587 loginfo& log = logs->log(i);
1588 kvepoch_t fe = log.quiescent() ? ge : log.flushed_epoch();
1589 if (!mfe || fe < mfe)
1595 // concurrent periodic checkpoint
1596 void* conc_checkpointer(threadinfo* ti) {
1597 recovercheckpoint(ti);
1598 ckstate *c = &cks[ti->index()];
1600 pthread_cond_init(&c->state_cond, NULL);
1601 c->state = CKState_Ready;
1604 if (checkpoint_interval <= 0)
1606 if (ti->index() == 0) {
1607 for (int i = 1; i < nckthreads; i++)
1608 while (cks[i].state != CKState_Ready)
1610 Str *pv = new Str[nckthreads + 1];
1611 Json uncommitted_ckp;
1615 set_timespec(ts, now() + (uncommitted_ckp ? 0.25 : checkpoint_interval));
1617 pthread_mutex_lock(&checkpoint_mu);
1619 pthread_cond_timedwait(&checkpoint_cond, &checkpoint_mu, &ts);
1621 for (int i = 0; i < nckthreads; i++) {
1622 cks[i].state = CKState_Quit;
1623 pthread_cond_signal(&cks[i].state_cond);
1625 pthread_mutex_unlock(&checkpoint_mu);
1628 pthread_mutex_unlock(&checkpoint_mu);
1630 if (uncommitted_ckp) {
1631 kvepoch_t mfe = max_flushed_epoch();
1632 if (!mfe || mfe > uncommitted_ckp["max_epoch"].to_u64()) {
1633 commit_checkpoint(uncommitted_ckp);
1634 uncommitted_ckp = Json();
1641 for (int i = 0; i < nckthreads + 1; i++)
1642 pv[i].assign(NULL, 0);
1643 tree->findpivots(pv, nckthreads + 1);
1646 kvepoch_t min_epoch = global_log_epoch;
1647 pthread_mutex_lock(&checkpoint_mu);
1648 ckp_gen = ckp_gen.next_nonzero();
1649 for (int i = 0; i < nckthreads; i++) {
1650 cks[i].startkey = pv[i];
1651 cks[i].endkey = (i == nckthreads - 1 ? Str() : pv[i + 1]);
1652 cks[i].state = CKState_Go;
1653 pthread_cond_signal(&cks[i].state_cond);
1655 pthread_mutex_unlock(&checkpoint_mu);
1658 conc_filecheckpoint(ti);
1661 cks[0].state = CKState_Ready;
1662 uint64_t bytes = cks[0].bytes;
1663 pthread_mutex_lock(&checkpoint_mu);
1664 for (int i = 1; i < nckthreads; i++) {
1665 while (cks[i].state != CKState_Ready)
1666 pthread_cond_wait(&cks[i].state_cond, &checkpoint_mu);
1667 bytes += cks[i].bytes;
1669 pthread_mutex_unlock(&checkpoint_mu);
1671 uncommitted_ckp = prepare_checkpoint(min_epoch, nckthreads, pv);
1673 for (int i = 0; i < nckthreads + 1; i++)
1675 free((void *)pv[i].s);
1676 double t = now() - t0;
1677 fprintf(stderr, "kvd-ckp-%" PRIu64 " [%s,%s]: prepared (%.2f sec, %" PRIu64 " MB, %" PRIu64 " MB/sec)\n",
1678 ckp_gen.value(), uncommitted_ckp["min_epoch"].to_s().c_str(),
1679 uncommitted_ckp["max_epoch"].to_s().c_str(),
1680 t, bytes / (1 << 20), (uint64_t)(bytes / t) >> 20);
1684 pthread_mutex_lock(&checkpoint_mu);
1685 while (c->state != CKState_Go && c->state != CKState_Quit)
1686 pthread_cond_wait(&c->state_cond, &checkpoint_mu);
1687 if (c->state == CKState_Quit) {
1688 pthread_mutex_unlock(&checkpoint_mu);
1691 pthread_mutex_unlock(&checkpoint_mu);
1694 conc_filecheckpoint(ti);
1697 pthread_mutex_lock(&checkpoint_mu);
1698 c->state = CKState_Ready;
1699 pthread_cond_signal(&c->state_cond);
1700 pthread_mutex_unlock(&checkpoint_mu);