Merge
[c11concurrency-benchmarks.git] / silo / masstree / mtd.cc
1 /* Masstree
2  * Eddie Kohler, Yandong Mao, Robert Morris
3  * Copyright (c) 2012-2014 President and Fellows of Harvard College
4  * Copyright (c) 2012-2014 Massachusetts Institute of Technology
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, subject to the conditions
9  * listed in the Masstree LICENSE file. These conditions include: you must
10  * preserve this copyright notice, and you cannot mention the copyright
11  * holders in advertising related to the Software without their permission.
12  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13  * notice is a summary of the Masstree LICENSE file; the license in that file
14  * is legally binding.
15  */
16 // -*- mode: c++ -*-
17 // kvd: key/value server
18 //
19
20 #include <stdio.h>
21 #include <stdarg.h>
22 #include <ctype.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27 #include <netinet/tcp.h>
28 #include <sys/select.h>
29 #include <sys/mman.h>
30 #include <sys/stat.h>
31 #include <limits.h>
32 #if HAVE_SYS_EPOLL_H
33 #include <sys/epoll.h>
34 #endif
35 #if __linux__
36 #include <asm-generic/mman.h>
37 #endif
38 #include <fcntl.h>
39 #include <assert.h>
40 #include <string.h>
41 #include <pthread.h>
42 #include <math.h>
43 #include <signal.h>
44 #include <errno.h>
45 #ifdef __linux__
46 #include <malloc.h>
47 #endif
48 #include "nodeversion.hh"
49 #include "kvstats.hh"
50 #include "json.hh"
51 #include "kvtest.hh"
52 #include "kvrandom.hh"
53 #include "clp.h"
54 #include "log.hh"
55 #include "checkpoint.hh"
56 #include "file.hh"
57 #include "kvproto.hh"
58 #include "query_masstree.hh"
59 #include "masstree_tcursor.hh"
60 #include "masstree_insert.hh"
61 #include "masstree_remove.hh"
62 #include "masstree_scan.hh"
63 #include "msgpack.hh"
64 #include <algorithm>
65 #include <deque>
66 using lcdf::StringAccum;
67
68 enum { CKState_Quit, CKState_Uninit, CKState_Ready, CKState_Go };
69
70 volatile bool timeout[2] = {false, false};
71 double duration[2] = {10, 0};
72
73 Masstree::default_table *tree;
74
75 // all default to the number of cores
76 static int udpthreads = 0;
77 static int tcpthreads = 0;
78 static int nckthreads = 0;
79 static int testthreads = 0;
80 static int nlogger = 0;
81 static std::vector<int> cores;
82
83 static bool logging = true;
84 static bool pinthreads = false;
85 static bool recovery_only = false;
86 volatile uint64_t globalepoch = 1;     // global epoch, updated by main thread regularly
87 static int port = 2117;
88 static uint64_t test_limit = ~uint64_t(0);
89 static int doprint = 0;
90 int kvtest_first_seed = 31949;
91
92 static volatile sig_atomic_t go_quit = 0;
93 static int quit_pipe[2];
94
95 static std::vector<const char*> logdirs;
96 static std::vector<const char*> ckpdirs;
97
98 static logset* logs;
99 volatile bool recovering = false; // so don't add log entries, and free old value immediately
100
101 static double checkpoint_interval = 1000000;
102 static kvepoch_t ckp_gen = 0; // recover from checkpoint
103 static ckstate *cks = NULL; // checkpoint status of all checkpointing threads
104 static pthread_cond_t rec_cond;
105 pthread_mutex_t rec_mu;
106 static int rec_nactive;
107 static int rec_state = REC_NONE;
108
109 kvtimestamp_t initial_timestamp;
110
111 static pthread_cond_t checkpoint_cond;
112 static pthread_mutex_t checkpoint_mu;
113
114 static void prepare_thread(threadinfo *ti);
115 static int* tcp_thread_pipes;
116 static void* tcp_threadfunc(threadinfo* ti);
117 static void* udp_threadfunc(threadinfo* ti);
118
119 static void log_init();
120 static void recover(threadinfo *);
121 static kvepoch_t read_checkpoint(threadinfo*, const char *path);
122
123 static void* conc_checkpointer(threadinfo* ti);
124 static void recovercheckpoint(threadinfo *ti);
125
126 static void *canceling(void *);
127 static void catchint(int);
128 static void epochinc(int);
129
130 /* running local tests */
131 void test_timeout(int) {
132     size_t n;
133     for (n = 0; n < arraysize(timeout) && timeout[n]; ++n)
134         /* do nothing */;
135     if (n < arraysize(timeout)) {
136         timeout[n] = true;
137         if (n + 1 < arraysize(timeout) && duration[n + 1])
138             xalarm(duration[n + 1]);
139     }
140 }
141
142 struct kvtest_client {
143     kvtest_client()
144         : checks_(0), kvo_() {
145     }
146     kvtest_client(const char *testname)
147         : testname_(testname), checks_(0), kvo_() {
148     }
149
150     int id() const {
151         return ti_->index();
152     }
153     int nthreads() const {
154         return testthreads;
155     }
156     void set_thread(threadinfo *ti) {
157         ti_ = ti;
158     }
159     void register_timeouts(int n) {
160         always_assert(n <= (int) arraysize(::timeout));
161         for (int i = 1; i < n; ++i)
162             if (duration[i] == 0)
163                 duration[i] = 0;//duration[i - 1];
164     }
165     bool timeout(int which) const {
166         return ::timeout[which];
167     }
168     uint64_t limit() const {
169         return test_limit;
170     }
171     Json param(const String&) const {
172         return Json();
173     }
174     double now() const {
175         return ::now();
176     }
177
178     void get(long ikey, Str *value);
179     void get(const Str &key);
180     void get(long ikey) {
181         quick_istr key(ikey);
182         get(key.string());
183     }
184     void get_check(const Str &key, const Str &expected);
185     void get_check(const char *key, const char *expected) {
186         get_check(Str(key, strlen(key)), Str(expected, strlen(expected)));
187     }
188     void get_check(long ikey, long iexpected) {
189         quick_istr key(ikey), expected(iexpected);
190         get_check(key.string(), expected.string());
191     }
192     void get_check_key8(long ikey, long iexpected) {
193         quick_istr key(ikey, 8), expected(iexpected);
194         get_check(key.string(), expected.string());
195     }
196     void get_check_key10(long ikey, long iexpected) {
197         quick_istr key(ikey, 10), expected(iexpected);
198         get_check(key.string(), expected.string());
199     }
200     void get_col_check(const Str &key, int col, const Str &expected);
201     void get_col_check_key10(long ikey, int col, long iexpected) {
202         quick_istr key(ikey, 10), expected(iexpected);
203         get_col_check(key.string(), col, expected.string());
204     }
205     bool get_sync(long ikey);
206
207     void put(const Str &key, const Str &value);
208     void put(const char *key, const char *val) {
209         put(Str(key, strlen(key)), Str(val, strlen(val)));
210     }
211     void put(long ikey, long ivalue) {
212         quick_istr key(ikey), value(ivalue);
213         put(key.string(), value.string());
214     }
215     void put_key8(long ikey, long ivalue) {
216         quick_istr key(ikey, 8), value(ivalue);
217         put(key.string(), value.string());
218     }
219     void put_key10(long ikey, long ivalue) {
220         quick_istr key(ikey, 10), value(ivalue);
221         put(key.string(), value.string());
222     }
223     void put_col(const Str &key, int col, const Str &value);
224     void put_col_key10(long ikey, int col, long ivalue) {
225         quick_istr key(ikey, 10), value(ivalue);
226         put_col(key.string(), col, value.string());
227     }
228
229     bool remove_sync(long ikey);
230
231     void puts_done() {
232     }
233     void wait_all() {
234     }
235     void rcu_quiesce() {
236     }
237     String make_message(StringAccum &sa) const;
238     void notice(const char *fmt, ...);
239     void fail(const char *fmt, ...);
240     const Json& report(const Json& x) {
241         return report_.merge(x);
242     }
243     void finish() {
244         fprintf(stderr, "%d: %s\n", ti_->index(), report_.unparse().c_str());
245     }
246     threadinfo *ti_;
247     query<row_type> q_[10];
248     const char *testname_;
249     kvrandom_lcg_nr rand;
250     int checks_;
251     Json report_;
252     struct kvout *kvo_;
253     static volatile int failing;
254 };
255
256 volatile int kvtest_client::failing;
257
258 void kvtest_client::get(long ikey, Str *value)
259 {
260     quick_istr key(ikey);
261     if (!q_[0].run_get1(tree->table(), key.string(), 0, *value, *ti_))
262         *value = Str();
263 }
264
265 void kvtest_client::get(const Str &key)
266 {
267     Str val;
268     (void) q_[0].run_get1(tree->table(), key, 0, val, *ti_);
269 }
270
271 void kvtest_client::get_check(const Str &key, const Str &expected)
272 {
273     Str val;
274     if (!q_[0].run_get1(tree->table(), key, 0, val, *ti_)) {
275         fail("get(%.*s) failed (expected %.*s)\n", key.len, key.s, expected.len, expected.s);
276         return;
277     }
278     if (val.len != expected.len || memcmp(val.s, expected.s, val.len) != 0)
279         fail("get(%.*s) returned unexpected value %.*s (expected %.*s)\n", key.len, key.s,
280              std::min(val.len, 40), val.s, std::min(expected.len, 40), expected.s);
281     else
282         ++checks_;
283 }
284
285 void kvtest_client::get_col_check(const Str &key, int col, const Str &expected)
286 {
287     Str val;
288     if (!q_[0].run_get1(tree->table(), key, col, val, *ti_)) {
289         fail("get.%d(%.*s) failed (expected %.*s)\n", col, key.len, key.s,
290              expected.len, expected.s);
291         return;
292     }
293     if (val.len != expected.len || memcmp(val.s, expected.s, val.len) != 0)
294         fail("get.%d(%.*s) returned unexpected value %.*s (expected %.*s)\n",
295              col, key.len, key.s, std::min(val.len, 40), val.s,
296              std::min(expected.len, 40), expected.s);
297     else
298         ++checks_;
299 }
300
301 bool kvtest_client::get_sync(long ikey) {
302     quick_istr key(ikey);
303     Str val;
304     return q_[0].run_get1(tree->table(), key.string(), 0, val, *ti_);
305 }
306
307 void kvtest_client::put(const Str &key, const Str &value) {
308     while (failing)
309         /* do nothing */;
310     q_[0].run_replace(tree->table(), key, value, *ti_);
311     if (ti_->logger()) // NB may block
312         ti_->logger()->record(logcmd_replace, q_[0].query_times(), key, value);
313 }
314
315 void kvtest_client::put_col(const Str &key, int col, const Str &value) {
316     while (failing)
317         /* do nothing */;
318 #if !MASSTREE_ROW_TYPE_STR
319     if (!kvo_)
320         kvo_ = new_kvout(-1, 2048);
321     Json req[2] = {Json(col), Json(String::make_stable(value))};
322     (void) q_[0].run_put(tree->table(), key, &req[0], &req[2], *ti_);
323     if (ti_->logger()) // NB may block
324         ti_->logger()->record(logcmd_put, q_[0].query_times(), key,
325                               &req[0], &req[2]);
326 #else
327     (void) key, (void) col, (void) value;
328     assert(0);
329 #endif
330 }
331
332 bool kvtest_client::remove_sync(long ikey) {
333     quick_istr key(ikey);
334     bool removed = q_[0].run_remove(tree->table(), key.string(), *ti_);
335     if (removed && ti_->logger()) // NB may block
336         ti_->logger()->record(logcmd_remove, q_[0].query_times(), key.string(), Str());
337     return removed;
338 }
339
340 String kvtest_client::make_message(StringAccum &sa) const {
341     const char *begin = sa.begin();
342     while (begin != sa.end() && isspace((unsigned char) *begin))
343         ++begin;
344     String s = String(begin, sa.end());
345     if (!s.empty() && s.back() != '\n')
346         s += '\n';
347     return s;
348 }
349
350 void kvtest_client::notice(const char *fmt, ...) {
351     va_list val;
352     va_start(val, fmt);
353     String m = make_message(StringAccum().vsnprintf(500, fmt, val));
354     va_end(val);
355     if (m)
356         fprintf(stderr, "%d: %s", ti_->index(), m.c_str());
357 }
358
359 void kvtest_client::fail(const char *fmt, ...) {
360     static nodeversion failing_lock(false);
361     static nodeversion fail_message_lock(false);
362     static String fail_message;
363     failing = 1;
364
365     va_list val;
366     va_start(val, fmt);
367     String m = make_message(StringAccum().vsnprintf(500, fmt, val));
368     va_end(val);
369     if (!m)
370         m = "unknown failure";
371
372     fail_message_lock.lock();
373     if (fail_message != m) {
374         fail_message = m;
375         fprintf(stderr, "%d: %s", ti_->index(), m.c_str());
376     }
377     fail_message_lock.unlock();
378
379     if (doprint) {
380         failing_lock.lock();
381         fprintf(stdout, "%d: %s", ti_->index(), m.c_str());
382         tree->print(stdout, 0);
383         fflush(stdout);
384     }
385
386     always_assert(0);
387 }
388
389 static void* testgo(threadinfo* ti) {
390     kvtest_client *kc = (kvtest_client*) ti->thread_data();
391     prepare_thread(kc->ti_);
392
393     if (strcmp(kc->testname_, "rw1") == 0)
394         kvtest_rw1(*kc);
395     else if (strcmp(kc->testname_, "rw2") == 0)
396         kvtest_rw2(*kc);
397     else if (strcmp(kc->testname_, "rw3") == 0)
398         kvtest_rw3(*kc);
399     else if (strcmp(kc->testname_, "rw4") == 0)
400         kvtest_rw4(*kc);
401     else if (strcmp(kc->testname_, "rwsmall24") == 0)
402         kvtest_rwsmall24(*kc);
403     else if (strcmp(kc->testname_, "rwsep24") == 0)
404         kvtest_rwsep24(*kc);
405     else if (strcmp(kc->testname_, "palma") == 0)
406         kvtest_palma(*kc);
407     else if (strcmp(kc->testname_, "palmb") == 0)
408         kvtest_palmb(*kc);
409     else if (strcmp(kc->testname_, "rw16") == 0)
410         kvtest_rw16(*kc);
411     else if (strcmp(kc->testname_, "rw5") == 0
412              || strcmp(kc->testname_, "rw1fixed") == 0)
413         kvtest_rw1fixed(*kc);
414     else if (strcmp(kc->testname_, "ycsbk") == 0)
415         kvtest_ycsbk(*kc);
416     else if (strcmp(kc->testname_, "wd1") == 0)
417         kvtest_wd1(10000000, 1, *kc);
418     else if (strcmp(kc->testname_, "wd1check") == 0)
419         kvtest_wd1_check(10000000, 1, *kc);
420     else if (strcmp(kc->testname_, "w1") == 0)
421         kvtest_w1_seed(*kc, kvtest_first_seed + kc->id());
422     else if (strcmp(kc->testname_, "r1") == 0)
423         kvtest_r1_seed(*kc, kvtest_first_seed + kc->id());
424     else if (strcmp(kc->testname_, "wcol1") == 0)
425         kvtest_wcol1at(*kc, kc->id() % 24, kvtest_first_seed + kc->id() % 48, 5000000);
426     else if (strcmp(kc->testname_, "rcol1") == 0)
427         kvtest_rcol1at(*kc, kc->id() % 24, kvtest_first_seed + kc->id() % 48, 5000000);
428     else
429         kc->fail("unknown test '%s'", kc->testname_);
430     return 0;
431 }
432
433 static const char * const kvstats_name[] = {
434     "ops", "ops_per_sec", "puts", "gets", "scans", "puts_per_sec", "gets_per_sec", "scans_per_sec"
435 };
436
437 void runtest(const char *testname, int nthreads) {
438     std::vector<kvtest_client> clients(nthreads, kvtest_client(testname));
439     ::testthreads = nthreads;
440     for (int i = 0; i < nthreads; ++i)
441         clients[i].set_thread(threadinfo::make(threadinfo::TI_PROCESS, i));
442     bzero((void *)timeout, sizeof(timeout));
443     signal(SIGALRM, test_timeout);
444     if (duration[0])
445         xalarm(duration[0]);
446     for (int i = 0; i < nthreads; ++i) {
447         int r = clients[i].ti_->run(testgo, &clients[i]);
448         always_assert(r == 0);
449     }
450     for (int i = 0; i < nthreads; ++i)
451         pthread_join(clients[i].ti_->threadid(), 0);
452
453     kvstats kvs[arraysize(kvstats_name)];
454     for (int i = 0; i < nthreads; ++i)
455         for (int j = 0; j < (int) arraysize(kvstats_name); ++j)
456             if (double x = clients[i].report_.get_d(kvstats_name[j]))
457                 kvs[j].add(x);
458     for (int j = 0; j < (int) arraysize(kvstats_name); ++j)
459         kvs[j].print_report(kvstats_name[j]);
460 }
461
462
463 struct conn {
464     int fd;
465     enum { inbufsz = 20 * 1024, inbufrefill = 16 * 1024 };
466
467     conn(int s)
468         : fd(s), inbuf_(new char[inbufsz]),
469           inbufpos_(0), inbuflen_(0), kvout(new_kvout(s, 20 * 1024)),
470           inbuftotal_(0) {
471     }
472     ~conn() {
473         close(fd);
474         free_kvout(kvout);
475         delete[] inbuf_;
476         for (char* x : oldinbuf_)
477             delete[] x;
478     }
479
480     Json& receive() {
481         while (!parser_.done() && check(2))
482             inbufpos_ += parser_.consume(inbuf_ + inbufpos_,
483                                          inbuflen_ - inbufpos_,
484                                          String::make_stable(inbuf_, inbufsz));
485         if (parser_.success() && parser_.result().is_a())
486             parser_.reset();
487         else
488             parser_.result() = Json();
489         return parser_.result();
490     }
491
492     int check(int tryhard) {
493         if (inbufpos_ == inbuflen_ && tryhard)
494             hard_check(tryhard);
495         return inbuflen_ - inbufpos_;
496     }
497
498     uint64_t xposition() const {
499         return inbuftotal_ + inbufpos_;
500     }
501     Str recent_string(uint64_t xposition) const {
502         if (xposition - inbuftotal_ <= unsigned(inbufpos_))
503             return Str(inbuf_ + (xposition - inbuftotal_),
504                        inbuf_ + inbufpos_);
505         else
506             return Str();
507     }
508
509   private:
510     char* inbuf_;
511     int inbufpos_;
512     int inbuflen_;
513     std::vector<char*> oldinbuf_;
514     msgpack::streaming_parser parser_;
515   public:
516     struct kvout *kvout;
517   private:
518     uint64_t inbuftotal_;
519
520     void hard_check(int tryhard);
521 };
522
523 void conn::hard_check(int tryhard) {
524     masstree_precondition(inbufpos_ == inbuflen_);
525     if (parser_.empty()) {
526         inbuftotal_ += inbufpos_;
527         inbufpos_ = inbuflen_ = 0;
528         for (auto x : oldinbuf_)
529             delete[] x;
530         oldinbuf_.clear();
531     } else if (inbufpos_ == inbufsz) {
532         oldinbuf_.push_back(inbuf_);
533         inbuf_ = new char[inbufsz];
534         inbuftotal_ += inbufpos_;
535         inbufpos_ = inbuflen_ = 0;
536     }
537     if (tryhard == 1) {
538         fd_set rfds;
539         FD_ZERO(&rfds);
540         FD_SET(fd, &rfds);
541         struct timeval tv = {0, 0};
542         if (select(fd + 1, &rfds, NULL, NULL, &tv) <= 0)
543             return;
544     } else
545         kvflush(kvout);
546
547     ssize_t r = read(fd, inbuf_ + inbufpos_, inbufsz - inbufpos_);
548     if (r != -1)
549         inbuflen_ += r;
550 }
551
552 struct conninfo {
553     int s;
554     Json handshake;
555 };
556
557
558 /* main loop */
559
560 enum { clp_val_suffixdouble = Clp_ValFirstUser };
561 enum { opt_nolog = 1, opt_pin, opt_logdir, opt_port, opt_ckpdir, opt_duration,
562        opt_test, opt_test_name, opt_threads, opt_cores,
563        opt_print, opt_norun, opt_checkpoint, opt_limit };
564 static const Clp_Option options[] = {
565     { "no-log", 0, opt_nolog, 0, 0 },
566     { 0, 'n', opt_nolog, 0, 0 },
567     { "no-run", 0, opt_norun, 0, 0 },
568     { "pin", 'p', opt_pin, 0, Clp_Negate },
569     { "logdir", 0, opt_logdir, Clp_ValString, 0 },
570     { "ld", 0, opt_logdir, Clp_ValString, 0 },
571     { "checkpoint", 'c', opt_checkpoint, Clp_ValDouble, Clp_Optional | Clp_Negate },
572     { "ckp", 0, opt_checkpoint, Clp_ValDouble, Clp_Optional | Clp_Negate },
573     { "ckpdir", 0, opt_ckpdir, Clp_ValString, 0 },
574     { "ckdir", 0, opt_ckpdir, Clp_ValString, 0 },
575     { "cd", 0, opt_ckpdir, Clp_ValString, 0 },
576     { "port", 0, opt_port, Clp_ValInt, 0 },
577     { "duration", 'd', opt_duration, Clp_ValDouble, 0 },
578     { "limit", 'l', opt_limit, clp_val_suffixdouble, 0 },
579     { "test", 0, opt_test, Clp_ValString, 0 },
580     { "test-rw1", 0, opt_test_name, 0, 0 },
581     { "test-rw2", 0, opt_test_name, 0, 0 },
582     { "test-rw3", 0, opt_test_name, 0, 0 },
583     { "test-rw4", 0, opt_test_name, 0, 0 },
584     { "test-rw5", 0, opt_test_name, 0, 0 },
585     { "test-rw16", 0, opt_test_name, 0, 0 },
586     { "test-palm", 0, opt_test_name, 0, 0 },
587     { "test-ycsbk", 0, opt_test_name, 0, 0 },
588     { "test-rw1fixed", 0, opt_test_name, 0, 0 },
589     { "threads", 'j', opt_threads, Clp_ValInt, 0 },
590     { "cores", 0, opt_cores, Clp_ValString, 0 },
591     { "print", 0, opt_print, 0, Clp_Negate }
592 };
593
594 int
595 main(int argc, char *argv[])
596 {
597   using std::swap;
598   int s, ret, yes = 1, i = 1, firstcore = -1, corestride = 1;
599   const char *dotest = 0;
600   nlogger = tcpthreads = udpthreads = nckthreads = sysconf(_SC_NPROCESSORS_ONLN);
601   Clp_Parser *clp = Clp_NewParser(argc, argv, (int) arraysize(options), options);
602   Clp_AddType(clp, clp_val_suffixdouble, Clp_DisallowOptions, clp_parse_suffixdouble, 0);
603   int opt;
604   while ((opt = Clp_Next(clp)) >= 0) {
605       switch (opt) {
606       case opt_nolog:
607           logging = false;
608           break;
609       case opt_pin:
610           pinthreads = !clp->negated;
611           break;
612       case opt_threads:
613           nlogger = tcpthreads = udpthreads = nckthreads = clp->val.i;
614           break;
615       case opt_logdir: {
616           const char *s = strtok((char *) clp->vstr, ",");
617           for (; s; s = strtok(NULL, ","))
618               logdirs.push_back(s);
619           break;
620       }
621       case opt_ckpdir: {
622           const char *s = strtok((char *) clp->vstr, ",");
623           for (; s; s = strtok(NULL, ","))
624               ckpdirs.push_back(s);
625           break;
626       }
627       case opt_checkpoint:
628           if (clp->negated || (clp->have_val && clp->val.d <= 0))
629               checkpoint_interval = -1;
630           else if (clp->have_val)
631               checkpoint_interval = clp->val.d;
632           else
633               checkpoint_interval = 30;
634           break;
635       case opt_port:
636           port = clp->val.i;
637           break;
638       case opt_duration:
639           duration[0] = clp->val.d;
640           break;
641       case opt_limit:
642           test_limit = (uint64_t) clp->val.d;
643           break;
644       case opt_test:
645           dotest = clp->vstr;
646           break;
647       case opt_test_name:
648           dotest = clp->option->long_name + 5;
649           break;
650       case opt_print:
651           doprint = !clp->negated;
652           break;
653       case opt_cores:
654           if (firstcore >= 0 || cores.size() > 0) {
655               Clp_OptionError(clp, "%<%O%> already given");
656               exit(EXIT_FAILURE);
657           } else {
658               const char *plus = strchr(clp->vstr, '+');
659               Json ij = Json::parse(clp->vstr),
660                   aj = Json::parse(String("[") + String(clp->vstr) + String("]")),
661                   pj1 = Json::parse(plus ? String(clp->vstr, plus) : "x"),
662                   pj2 = Json::parse(plus ? String(plus + 1) : "x");
663               for (int i = 0; aj && i < aj.size(); ++i)
664                   if (!aj[i].is_int() || aj[i].to_i() < 0)
665                       aj = Json();
666               if (ij && ij.is_int() && ij.to_i() >= 0)
667                   firstcore = ij.to_i(), corestride = 1;
668               else if (pj1 && pj2 && pj1.is_int() && pj1.to_i() >= 0 && pj2.is_int())
669                   firstcore = pj1.to_i(), corestride = pj2.to_i();
670               else if (aj) {
671                   for (int i = 0; i < aj.size(); ++i)
672                       cores.push_back(aj[i].to_i());
673               } else {
674                   Clp_OptionError(clp, "bad %<%O%>, expected %<CORE1%>, %<CORE1+STRIDE%>, or %<CORE1,CORE2,...%>");
675                   exit(EXIT_FAILURE);
676               }
677           }
678           break;
679       case opt_norun:
680           recovery_only = true;
681           break;
682       default:
683           fprintf(stderr, "Usage: kvd [-np] [--ld dir1[,dir2,...]] [--cd dir1[,dir2,...]]\n");
684           exit(EXIT_FAILURE);
685       }
686   }
687   Clp_DeleteParser(clp);
688   if (logdirs.empty())
689       logdirs.push_back(".");
690   if (ckpdirs.empty())
691       ckpdirs.push_back(".");
692   if (firstcore < 0)
693       firstcore = cores.size() ? cores.back() + 1 : 0;
694   for (; (int) cores.size() < udpthreads; firstcore += corestride)
695       cores.push_back(firstcore);
696
697   // for -pg profiling
698   signal(SIGINT, catchint);
699
700   // log epoch starts at 1
701   global_log_epoch = 1;
702   global_wake_epoch = 0;
703   log_epoch_interval.tv_sec = 0;
704   log_epoch_interval.tv_usec = 200000;
705
706   // increment the global epoch every second
707   if (!dotest) {
708       signal(SIGALRM, epochinc);
709       struct itimerval etimer;
710       etimer.it_interval.tv_sec = 1;
711       etimer.it_interval.tv_usec = 0;
712       etimer.it_value.tv_sec = 1;
713       etimer.it_value.tv_usec = 0;
714       ret = setitimer(ITIMER_REAL, &etimer, NULL);
715       always_assert(ret == 0);
716   }
717
718   // arrange for a per-thread threadinfo pointer
719   ret = pthread_key_create(&threadinfo::key, 0);
720   always_assert(ret == 0);
721
722   // for parallel recovery
723   ret = pthread_cond_init(&rec_cond, 0);
724   always_assert(ret == 0);
725   ret = pthread_mutex_init(&rec_mu, 0);
726   always_assert(ret == 0);
727
728   // for waking up the checkpoint thread
729   ret = pthread_cond_init(&checkpoint_cond, 0);
730   always_assert(ret == 0);
731   ret = pthread_mutex_init(&checkpoint_mu, 0);
732   always_assert(ret == 0);
733
734   threadinfo *main_ti = threadinfo::make(threadinfo::TI_MAIN, -1);
735   main_ti->run();
736
737   initial_timestamp = timestamp();
738   tree = new Masstree::default_table;
739   tree->initialize(*main_ti);
740   printf("%s, %s, pin-threads %s, ", tree->name(), row_type::name(),
741          pinthreads ? "enabled" : "disabled");
742   if(logging){
743     printf("logging enabled\n");
744     log_init();
745     recover(main_ti);
746   } else {
747     printf("logging disabled\n");
748   }
749
750   // UDP threads, each with its own port.
751   if (udpthreads == 0)
752       printf("0 udp threads\n");
753   else if (udpthreads == 1)
754       printf("1 udp thread (port %d)\n", port);
755   else
756       printf("%d udp threads (ports %d-%d)\n", udpthreads, port, port + udpthreads - 1);
757   for(i = 0; i < udpthreads; i++){
758     threadinfo *ti = threadinfo::make(threadinfo::TI_PROCESS, i);
759     ret = ti->run(udp_threadfunc);
760     always_assert(ret == 0);
761   }
762
763   if (dotest) {
764       if (strcmp(dotest, "palm") == 0) {
765         runtest("palma", 1);
766         runtest("palmb", tcpthreads);
767       } else
768         runtest(dotest, tcpthreads);
769       tree->stats(stderr);
770       if (doprint)
771           tree->print(stdout, 0);
772       exit(0);
773   }
774
775   // TCP socket and threads
776
777   s = socket(AF_INET, SOCK_STREAM, 0);
778   always_assert(s >= 0);
779   setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
780   setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
781
782   struct sockaddr_in sin;
783   sin.sin_family = AF_INET;
784   sin.sin_addr.s_addr = INADDR_ANY;
785   sin.sin_port = htons(port);
786   ret = bind(s, (struct sockaddr *) &sin, sizeof(sin));
787   if (ret < 0) {
788       perror("bind");
789       exit(EXIT_FAILURE);
790   }
791
792   ret = listen(s, 100);
793   if (ret < 0) {
794       perror("listen");
795       exit(EXIT_FAILURE);
796   }
797
798   threadinfo **tcpti = new threadinfo *[tcpthreads];
799   tcp_thread_pipes = new int[tcpthreads * 2];
800   printf("%d tcp threads (port %d)\n", tcpthreads, port);
801   for(i = 0; i < tcpthreads; i++){
802     threadinfo *ti = threadinfo::make(threadinfo::TI_PROCESS, i);
803     ret = pipe(&tcp_thread_pipes[i * 2]);
804     always_assert(ret == 0);
805     ret = ti->run(tcp_threadfunc);
806     always_assert(ret == 0);
807     tcpti[i] = ti;
808   }
809   // Create a canceling thread.
810   ret = pipe(quit_pipe);
811   assert(ret == 0);
812   pthread_t tid;
813   pthread_create(&tid, NULL, canceling, NULL);
814
815   static int next = 0;
816   while(1){
817     int s1;
818     struct sockaddr_in sin1;
819     socklen_t sinlen = sizeof(sin1);
820
821     bzero(&sin1, sizeof(sin1));
822     s1 = accept(s, (struct sockaddr *) &sin1, &sinlen);
823     always_assert(s1 >= 0);
824     setsockopt(s1, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes));
825
826     // Complete handshake.
827     char buf[BUFSIZ];
828     ssize_t nr = read(s1, buf, BUFSIZ);
829     if (nr == -1) {
830         perror("read");
831     kill_connection:
832         close(s1);
833         continue;
834     }
835
836     msgpack::streaming_parser sp;
837     if (nr == 0 || sp.consume(buf, nr) != (size_t) nr
838         || !sp.result().is_a() || sp.result().size() < 2
839         || !sp.result()[1].is_i() || sp.result()[1].as_i() != Cmd_Handshake) {
840         fprintf(stderr, "failed handshake\n");
841         goto kill_connection;
842     }
843
844     int target_core = -1;
845     if (sp.result().size() >= 3 && sp.result()[2].is_o()
846         && sp.result()[2]["core"].is_i())
847         target_core = sp.result()[2]["core"].as_i();
848     if (target_core < 0 || target_core >= tcpthreads) {
849         target_core = next % tcpthreads;
850         ++next;
851     }
852
853     conninfo* ci = new conninfo;
854     ci->s = s1;
855     swap(ci->handshake, sp.result());
856
857     ssize_t w = write(tcp_thread_pipes[2*target_core + 1], &ci, sizeof(ci));
858     always_assert((size_t) w == sizeof(ci));
859   }
860 }
861
862 void
863 catchint(int)
864 {
865     go_quit = 1;
866     char cmd = 0;
867     // Does not matter if the write fails (when the pipe is full)
868     int r = write(quit_pipe[1], &cmd, sizeof(cmd));
869     (void)r;
870 }
871
872 inline const char *threadtype(int type) {
873   switch (type) {
874     case threadinfo::TI_MAIN:
875       return "main";
876     case threadinfo::TI_PROCESS:
877       return "process";
878     case threadinfo::TI_LOG:
879       return "log";
880     case threadinfo::TI_CHECKPOINT:
881       return "checkpoint";
882     default:
883       always_assert(0 && "Unknown threadtype");
884       break;
885   };
886 }
887
888 void *
889 canceling(void *)
890 {
891     char cmd;
892     int r = read(quit_pipe[0], &cmd, sizeof(cmd));
893     (void) r;
894     assert(r == sizeof(cmd) && cmd == 0);
895     // Cancel wake up checkpointing threads
896     pthread_mutex_lock(&checkpoint_mu);
897     pthread_cond_signal(&checkpoint_cond);
898     pthread_mutex_unlock(&checkpoint_mu);
899
900     pthread_t me = pthread_self();
901     fprintf(stderr, "\n");
902     // cancel outstanding threads. Checkpointing threads will exit safely
903     // when the checkpointing thread 0 sees go_quit, and don't need cancel
904     for (threadinfo *ti = threadinfo::allthreads; ti; ti = ti->next())
905         if (ti->purpose() != threadinfo::TI_MAIN
906             && ti->purpose() != threadinfo::TI_CHECKPOINT
907             && !pthread_equal(me, ti->threadid())) {
908             int r = pthread_cancel(ti->threadid());
909             always_assert(r == 0);
910         }
911
912     // join canceled threads
913     for (threadinfo *ti = threadinfo::allthreads; ti; ti = ti->next())
914         if (ti->purpose() != threadinfo::TI_MAIN
915             && !pthread_equal(me, ti->threadid())) {
916             fprintf(stderr, "joining thread %s:%d\n",
917                     threadtype(ti->purpose()), ti->index());
918             int r = pthread_join(ti->threadid(), 0);
919             always_assert(r == 0);
920         }
921     tree->stats(stderr);
922     exit(0);
923 }
924
925 void
926 epochinc(int)
927 {
928     globalepoch += 2;
929 }
930
931 // Return 1 if success, -1 if I/O error or protocol unmatch
932 int handshake(Json& request, threadinfo& ti) {
933     always_assert(request.is_a() && request.size() >= 2
934                   && request[1].is_i() && request[1].as_i() == Cmd_Handshake
935                   && (request.size() == 2 || request[2].is_o()));
936     if (request.size() >= 2
937         && request[2]["maxkeylen"].is_i()
938         && request[2]["maxkeylen"].as_i() > MASSTREE_MAXKEYLEN) {
939         request[2] = false;
940         request[3] = "bad maxkeylen";
941         request.resize(4);
942     } else {
943         request[2] = true;
944         request[3] = ti.index();
945         request[4] = row_type::name();
946         request.resize(5);
947     }
948     request[1] = Cmd_Handshake + 1;
949     return request[2].as_b() ? 1 : -1;
950 }
951
952 // execute command, return result.
953 int onego(query<row_type>& q, Json& request, Str request_str, threadinfo& ti) {
954     int command = request[1].as_i();
955     if (command == Cmd_Checkpoint) {
956         // force checkpoint
957         pthread_mutex_lock(&checkpoint_mu);
958         pthread_cond_broadcast(&checkpoint_cond);
959         pthread_mutex_unlock(&checkpoint_mu);
960         request.resize(2);
961     } else if (command == Cmd_Get) {
962         q.run_get(tree->table(), request, ti);
963     } else if (command == Cmd_Put && request.size() > 3
964                && (request.size() % 2) == 1) { // insert or update
965         Str key(request[2].as_s());
966         const Json* req = request.array_data() + 3;
967         const Json* end_req = request.end_array_data();
968         request[2] = q.run_put(tree->table(), request[2].as_s(),
969                                req, end_req, ti);
970         if (ti.logger() && request_str) {
971             // use the client's parsed version of the request
972             msgpack::parser mp(request_str.data());
973             mp.skip_array_size().skip_primitives(3);
974             ti.logger()->record(logcmd_put, q.query_times(), key, Str(mp.position(), request_str.end()));
975         } else if (ti.logger())
976             ti.logger()->record(logcmd_put, q.query_times(), key, req, end_req);
977         request.resize(3);
978     } else if (command == Cmd_Replace) { // insert or update
979         Str key(request[2].as_s()), value(request[3].as_s());
980         request[2] = q.run_replace(tree->table(), key, value, ti);
981         if (ti.logger()) // NB may block
982             ti.logger()->record(logcmd_replace, q.query_times(), key, value);
983         request.resize(3);
984     } else if (command == Cmd_Remove) { // remove
985         Str key(request[2].as_s());
986         bool removed = q.run_remove(tree->table(), key, ti);
987         if (removed && ti.logger()) // NB may block
988             ti.logger()->record(logcmd_remove, q.query_times(), key, Str());
989         request[2] = removed;
990         request.resize(3);
991     } else if (command == Cmd_Scan) {
992         q.run_scan(tree->table(), request, ti);
993     } else {
994         request[1] = -1;
995         request.resize(2);
996         return -1;
997     }
998     request[1] = command + 1;
999     return 1;
1000 }
1001
1002 #if HAVE_SYS_EPOLL_H
1003 struct tcpfds {
1004     int epollfd;
1005
1006     tcpfds(int pipefd) {
1007         epollfd = epoll_create(10);
1008         if (epollfd < 0) {
1009             perror("epoll_create");
1010             exit(EXIT_FAILURE);
1011         }
1012         struct epoll_event ev;
1013         ev.events = EPOLLIN;
1014         ev.data.ptr = (void *) 1;
1015         int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, pipefd, &ev);
1016         always_assert(r == 0);
1017     }
1018
1019     enum { max_events = 100 };
1020     typedef struct epoll_event eventset[max_events];
1021     int wait(eventset &es) {
1022         return epoll_wait(epollfd, es, max_events, -1);
1023     }
1024
1025     conn *event_conn(eventset &es, int i) const {
1026         return (conn *) es[i].data.ptr;
1027     }
1028
1029     void add(int fd, conn *c) {
1030         struct epoll_event ev;
1031         ev.events = EPOLLIN;
1032         ev.data.ptr = c;
1033         int r = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
1034         always_assert(r == 0);
1035     }
1036
1037     void remove(int fd) {
1038         int r = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
1039         always_assert(r == 0);
1040     }
1041 };
1042 #else
1043 class tcpfds {
1044     int nfds_;
1045     fd_set rfds_;
1046     std::vector<conn *> conns_;
1047
1048   public:
1049     tcpfds(int pipefd)
1050         : nfds_(pipefd + 1) {
1051         always_assert(pipefd < FD_SETSIZE);
1052         FD_ZERO(&rfds_);
1053         FD_SET(pipefd, &rfds_);
1054         conns_.resize(nfds_, 0);
1055         conns_[pipefd] = (conn *) 1;
1056     }
1057
1058     typedef fd_set eventset;
1059     int wait(eventset &es) {
1060         es = rfds_;
1061         int r = select(nfds_, &es, 0, 0, 0);
1062         return r > 0 ? nfds_ : r;
1063     }
1064
1065     conn *event_conn(eventset &es, int i) const {
1066         return FD_ISSET(i, &es) ? conns_[i] : 0;
1067     }
1068
1069     void add(int fd, conn *c) {
1070         always_assert(fd < FD_SETSIZE);
1071         FD_SET(fd, &rfds_);
1072         if (fd >= nfds_) {
1073             nfds_ = fd + 1;
1074             conns_.resize(nfds_, 0);
1075         }
1076         conns_[fd] = c;
1077     }
1078
1079     void remove(int fd) {
1080         always_assert(fd < FD_SETSIZE);
1081         FD_CLR(fd, &rfds_);
1082         if (fd == nfds_ - 1) {
1083             while (nfds_ > 0 && !FD_ISSET(nfds_ - 1, &rfds_))
1084                 --nfds_;
1085         }
1086     }
1087 };
1088 #endif
1089
1090 void prepare_thread(threadinfo *ti) {
1091 #if __linux__
1092     if (pinthreads) {
1093         cpu_set_t cs;
1094         CPU_ZERO(&cs);
1095         CPU_SET(cores[ti->index()], &cs);
1096         always_assert(sched_setaffinity(0, sizeof(cs), &cs) == 0);
1097     }
1098 #else
1099     always_assert(!pinthreads && "pinthreads not supported\n");
1100 #endif
1101     if (logging)
1102         ti->set_logger(&logs->log(ti->index() % nlogger));
1103 }
1104
1105 void* tcp_threadfunc(threadinfo* ti) {
1106     prepare_thread(ti);
1107
1108     int myfd = tcp_thread_pipes[2 * ti->index()];
1109     tcpfds sloop(myfd);
1110     tcpfds::eventset events;
1111     std::deque<conn*> ready;
1112     query<row_type> q;
1113
1114     while (1) {
1115         int nev = sloop.wait(events);
1116         for (int i = 0; i < nev; i++)
1117             if (conn *c = sloop.event_conn(events, i))
1118                 ready.push_back(c);
1119
1120         while (!ready.empty()) {
1121             conn* c = ready.front();
1122             ready.pop_front();
1123
1124             if (c == (conn *) 1) {
1125                 // new connections
1126 #define MAX_NEWCONN 100
1127                 conninfo* ci[MAX_NEWCONN];
1128                 ssize_t len = read(myfd, ci, sizeof(ci));
1129                 always_assert(len > 0 && len % sizeof(int) == 0);
1130                 for (int j = 0; j * sizeof(*ci) < (size_t) len; ++j) {
1131                     struct conn *c = new conn(ci[j]->s);
1132                     sloop.add(c->fd, c);
1133                     int ret = handshake(ci[j]->handshake, *ti);
1134                     msgpack::unparse(*c->kvout, ci[j]->handshake);
1135                     kvflush(c->kvout);
1136                     if (ret < 0) {
1137                         sloop.remove(c->fd);
1138                         delete c;
1139                     }
1140                     delete ci[j];
1141                 }
1142             } else if (c) {
1143                 // Should not block as suggested by epoll
1144                 uint64_t xposition = c->xposition();
1145                 Json& request = c->receive();
1146                 int ret;
1147                 if (unlikely(!request))
1148                     goto closed;
1149                 ti->rcu_start();
1150                 ret = onego(q, request, c->recent_string(xposition), *ti);
1151                 ti->rcu_stop();
1152                 msgpack::unparse(*c->kvout, request);
1153                 request.clear();
1154                 if (likely(ret >= 0)) {
1155                     if (c->check(0))
1156                         ready.push_back(c);
1157                     else
1158                         kvflush(c->kvout);
1159                     continue;
1160                 }
1161                 printf("socket read error\n");
1162             closed:
1163                 kvflush(c->kvout);
1164                 sloop.remove(c->fd);
1165                 delete c;
1166             }
1167         }
1168     }
1169     return 0;
1170 }
1171
1172 // serve a client udp socket, in a dedicated thread
1173 void* udp_threadfunc(threadinfo* ti) {
1174   int ret;
1175   prepare_thread(ti);
1176
1177   struct sockaddr_in sin;
1178   bzero(&sin, sizeof(sin));
1179   sin.sin_family = AF_INET;
1180   sin.sin_port = htons(port + ti->index());
1181
1182   int s = socket(AF_INET, SOCK_DGRAM, 0);
1183   always_assert(s >= 0);
1184   ret = bind(s, (struct sockaddr *) &sin, sizeof(sin));
1185   always_assert(ret == 0 && "bind failed");
1186   int sobuflen = 512*1024;
1187   setsockopt(s, SOL_SOCKET, SO_RCVBUF, &sobuflen, sizeof(sobuflen));
1188
1189   String buf = String::make_uninitialized(4096);
1190   struct kvout *kvout = new_bufkvout();
1191   msgpack::streaming_parser parser;
1192   StringAccum sa;
1193
1194   query<row_type> q;
1195   while(1){
1196     struct sockaddr_in sin;
1197     socklen_t sinlen = sizeof(sin);
1198     ssize_t cc = recvfrom(s, const_cast<char*>(buf.data()), buf.length(),
1199                           0, (struct sockaddr *) &sin, &sinlen);
1200     if(cc < 0){
1201       perror("udpgo read");
1202       exit(EXIT_FAILURE);
1203     }
1204     kvout_reset(kvout);
1205
1206     parser.reset();
1207     unsigned consumed = parser.consume(buf.data(), buf.length(), buf);
1208
1209     // Fail if we received a partial request
1210     if (parser.success() && parser.result().is_a()) {
1211         ti->rcu_start();
1212         if (onego(q, parser.result(), Str(buf.data(), consumed), *ti) >= 0) {
1213             sa.clear();
1214             msgpack::unparser<StringAccum> cu(sa);
1215             cu << parser.result();
1216             cc = sendto(s, sa.data(), sa.length(), 0,
1217                         (struct sockaddr*) &sin, sinlen);
1218             always_assert(cc == (ssize_t) sa.length());
1219         }
1220         ti->rcu_stop();
1221     } else
1222       printf("onego failed\n");
1223   }
1224   return 0;
1225 }
1226
1227 static String log_filename(const char* logdir, int logindex) {
1228     struct stat sb;
1229     int r = stat(logdir, &sb);
1230     if (r < 0 && errno == ENOENT) {
1231         r = mkdir(logdir, 0777);
1232         if (r < 0) {
1233             fprintf(stderr, "%s: %s\n", logdir, strerror(errno));
1234             always_assert(0);
1235         }
1236     }
1237
1238     StringAccum sa;
1239     sa.snprintf(strlen(logdir) + 24, "%s/kvd-log-%d", logdir, logindex);
1240     return sa.take_string();
1241 }
1242
1243 void log_init() {
1244   int ret, i;
1245
1246   logs = logset::make(nlogger);
1247   for (i = 0; i < nlogger; i++)
1248       logs->log(i).initialize(log_filename(logdirs[i % logdirs.size()], i));
1249
1250   cks = (ckstate *)malloc(sizeof(ckstate) * nckthreads);
1251   for (i = 0; i < nckthreads; i++) {
1252     threadinfo *ti = threadinfo::make(threadinfo::TI_CHECKPOINT, i);
1253     cks[i].state = CKState_Uninit;
1254     cks[i].ti = ti;
1255     ret = ti->run(conc_checkpointer);
1256     always_assert(ret == 0);
1257   }
1258 }
1259
1260 // read a checkpoint, insert key/value pairs into tree.
1261 // must be followed by a read of the log!
1262 // since checkpoint is not consistent
1263 // with any one point in time.
1264 // returns the timestamp of the first log record that needs
1265 // to come from the log.
1266 kvepoch_t read_checkpoint(threadinfo *ti, const char *path) {
1267     double t0 = now();
1268
1269     int fd = open(path, 0);
1270     if(fd < 0){
1271         printf("no %s\n", path);
1272         return 0;
1273     }
1274     struct stat sb;
1275     int ret = fstat(fd, &sb);
1276     always_assert(ret == 0);
1277     char *p = (char *) mmap(0, sb.st_size, PROT_READ, MAP_FILE|MAP_PRIVATE, fd, 0);
1278     always_assert(p != MAP_FAILED);
1279     close(fd);
1280
1281     msgpack::parser par(String::make_stable(p, sb.st_size));
1282     Json j;
1283     par >> j;
1284     std::cerr << j << "\n";
1285     always_assert(j["generation"].is_i() && j["size"].is_i());
1286     uint64_t gen = j["generation"].as_i();
1287     uint64_t n = j["size"].as_i();
1288     printf("reading checkpoint with %" PRIu64 " nodes\n", n);
1289
1290     // read data
1291     for (uint64_t i = 0; i != n; ++i)
1292         ckstate::insert(tree->table(), par, *ti);
1293
1294     munmap(p, sb.st_size);
1295     double t1 = now();
1296     printf("%.1f MB, %.2f sec, %.1f MB/sec\n",
1297            sb.st_size / 1000000.0,
1298            t1 - t0,
1299            (sb.st_size / 1000000.0) / (t1 - t0));
1300     return gen;
1301 }
1302
1303 void
1304 waituntilphase(int phase)
1305 {
1306   always_assert(pthread_mutex_lock(&rec_mu) == 0);
1307   while (rec_state != phase)
1308     always_assert(pthread_cond_wait(&rec_cond, &rec_mu) == 0);
1309   always_assert(pthread_mutex_unlock(&rec_mu) == 0);
1310 }
1311
1312 void
1313 inactive(void)
1314 {
1315   always_assert(pthread_mutex_lock(&rec_mu) == 0);
1316   rec_nactive --;
1317   always_assert(pthread_cond_broadcast(&rec_cond) == 0);
1318   always_assert(pthread_mutex_unlock(&rec_mu) == 0);
1319 }
1320
1321 void recovercheckpoint(threadinfo *ti) {
1322     waituntilphase(REC_CKP);
1323     char path[256];
1324     sprintf(path, "%s/kvd-ckp-%" PRId64 "-%d",
1325             ckpdirs[ti->index() % ckpdirs.size()],
1326             ckp_gen.value(), ti->index());
1327     kvepoch_t gen = read_checkpoint(ti, path);
1328     always_assert(ckp_gen == gen);
1329     inactive();
1330 }
1331
1332 void
1333 recphase(int nactive, int state)
1334 {
1335   rec_nactive = nactive;
1336   rec_state = state;
1337   always_assert(pthread_cond_broadcast(&rec_cond) == 0);
1338   while (rec_nactive)
1339     always_assert(pthread_cond_wait(&rec_cond, &rec_mu) == 0);
1340 }
1341
1342 // read the checkpoint file.
1343 // read each log file.
1344 // insert will ignore attempts to update with timestamps
1345 // less than what was in the entry from the checkpoint file.
1346 // so we don't have to do an explicit merge by time of the log files.
1347 void
1348 recover(threadinfo *)
1349 {
1350   recovering = true;
1351   // XXX: discard temporary checkpoint and ckp-gen files generated before crash
1352
1353   // get the generation of the checkpoint from ckp-gen, if any
1354   char path[256];
1355   sprintf(path, "%s/kvd-ckp-gen", ckpdirs[0]);
1356   ckp_gen = 0;
1357   rec_ckp_min_epoch = rec_ckp_max_epoch = 0;
1358   int fd = open(path, O_RDONLY);
1359   if (fd >= 0) {
1360       Json ckpj = Json::parse(read_file_contents(fd));
1361       close(fd);
1362       if (ckpj && ckpj["kvdb_checkpoint"] && ckpj["generation"].is_number()) {
1363           ckp_gen = ckpj["generation"].to_u64();
1364           rec_ckp_min_epoch = ckpj["min_epoch"].to_u64();
1365           rec_ckp_max_epoch = ckpj["max_epoch"].to_u64();
1366           printf("recover from checkpoint %" PRIu64 " [%" PRIu64 ", %" PRIu64 "]\n", ckp_gen.value(), rec_ckp_min_epoch.value(), rec_ckp_max_epoch.value());
1367       }
1368   } else {
1369     printf("no %s\n", path);
1370   }
1371   always_assert(pthread_mutex_lock(&rec_mu) == 0);
1372
1373   // recover from checkpoint, and set timestamp of the checkpoint
1374   recphase(nckthreads, REC_CKP);
1375
1376   // find minimum maximum timestamp of entries in each log
1377   rec_log_infos = new logreplay::info_type[nlogger];
1378   recphase(nlogger, REC_LOG_TS);
1379
1380   // replay log entries, remove inconsistent entries, and append
1381   // an empty log entry with minimum timestamp
1382
1383   // calculate log range
1384
1385   // Maximum epoch seen in the union of the logs and the checkpoint. (We
1386   // don't commit a checkpoint until all logs are flushed past the
1387   // checkpoint's max_epoch.)
1388   kvepoch_t max_epoch = rec_ckp_max_epoch;
1389   if (max_epoch)
1390       max_epoch = max_epoch.next_nonzero();
1391   for (logreplay::info_type *it = rec_log_infos;
1392        it != rec_log_infos + nlogger; ++it)
1393       if (it->last_epoch
1394           && (!max_epoch || max_epoch < it->last_epoch))
1395           max_epoch = it->last_epoch;
1396
1397   // Maximum first_epoch seen in the logs. Full log information is not
1398   // available for epochs before max_first_epoch.
1399   kvepoch_t max_first_epoch = 0;
1400   for (logreplay::info_type *it = rec_log_infos;
1401        it != rec_log_infos + nlogger; ++it)
1402       if (it->first_epoch
1403           && (!max_first_epoch || max_first_epoch < it->first_epoch))
1404           max_first_epoch = it->first_epoch;
1405
1406   // Maximum epoch of all logged wake commands.
1407   kvepoch_t max_wake_epoch = 0;
1408   for (logreplay::info_type *it = rec_log_infos;
1409        it != rec_log_infos + nlogger; ++it)
1410       if (it->wake_epoch
1411           && (!max_wake_epoch || max_wake_epoch < it->wake_epoch))
1412           max_wake_epoch = it->wake_epoch;
1413
1414   // Minimum last_epoch seen in QUIESCENT logs.
1415   kvepoch_t min_quiescent_last_epoch = 0;
1416   for (logreplay::info_type *it = rec_log_infos;
1417        it != rec_log_infos + nlogger; ++it)
1418       if (it->quiescent
1419           && (!min_quiescent_last_epoch || min_quiescent_last_epoch > it->last_epoch))
1420           min_quiescent_last_epoch = it->last_epoch;
1421
1422   // If max_wake_epoch && min_quiescent_last_epoch <= max_wake_epoch, then a
1423   // wake command was missed by at least one quiescent log. We can't replay
1424   // anything at or beyond the minimum missed wake epoch. So record, for
1425   // each log, the minimum wake command that at least one quiescent thread
1426   // missed.
1427   if (max_wake_epoch && min_quiescent_last_epoch <= max_wake_epoch)
1428       rec_replay_min_quiescent_last_epoch = min_quiescent_last_epoch;
1429   else
1430       rec_replay_min_quiescent_last_epoch = 0;
1431   recphase(nlogger, REC_LOG_ANALYZE_WAKE);
1432
1433   // Calculate upper bound of epochs to replay.
1434   // This is the minimum of min_post_quiescent_wake_epoch (if any) and the
1435   // last_epoch of all non-quiescent logs.
1436   rec_replay_max_epoch = max_epoch;
1437   for (logreplay::info_type *it = rec_log_infos;
1438        it != rec_log_infos + nlogger; ++it) {
1439       if (!it->quiescent
1440           && it->last_epoch
1441           && it->last_epoch < rec_replay_max_epoch)
1442           rec_replay_max_epoch = it->last_epoch;
1443       if (it->min_post_quiescent_wake_epoch
1444           && it->min_post_quiescent_wake_epoch < rec_replay_max_epoch)
1445           rec_replay_max_epoch = it->min_post_quiescent_wake_epoch;
1446   }
1447
1448   // Calculate lower bound of epochs to replay.
1449   rec_replay_min_epoch = rec_ckp_min_epoch;
1450   // XXX what about max_first_epoch?
1451
1452   // Checks.
1453   if (rec_ckp_min_epoch) {
1454       always_assert(rec_ckp_min_epoch > max_first_epoch);
1455       always_assert(rec_ckp_min_epoch < rec_replay_max_epoch);
1456       always_assert(rec_ckp_max_epoch < rec_replay_max_epoch);
1457       fprintf(stderr, "replay [%" PRIu64 ",%" PRIu64 ") from [%" PRIu64 ",%" PRIu64 ") into ckp [%" PRIu64 ",%" PRIu64 "]\n",
1458               rec_replay_min_epoch.value(), rec_replay_max_epoch.value(),
1459               max_first_epoch.value(), max_epoch.value(),
1460               rec_ckp_min_epoch.value(), rec_ckp_max_epoch.value());
1461   }
1462
1463   // Actually replay.
1464   delete[] rec_log_infos;
1465   rec_log_infos = 0;
1466   recphase(nlogger, REC_LOG_REPLAY);
1467
1468   // done recovering
1469   recphase(0, REC_DONE);
1470 #if !NDEBUG
1471   // check that all delta markers have been recycled (leaving only remove
1472   // markers and real values)
1473   uint64_t deltas_created = 0, deltas_removed = 0;
1474   for (threadinfo *ti = threadinfo::allthreads; ti; ti = ti->next()) {
1475       deltas_created += ti->counter(tc_replay_create_delta);
1476       deltas_removed += ti->counter(tc_replay_remove_delta);
1477   }
1478   if (deltas_created)
1479       fprintf(stderr, "deltas created: %" PRIu64 ", removed: %" PRIu64 "\n", deltas_created, deltas_removed);
1480   always_assert(deltas_created == deltas_removed);
1481 #endif
1482
1483   global_log_epoch = rec_replay_max_epoch.next_nonzero();
1484
1485   always_assert(pthread_mutex_unlock(&rec_mu) == 0);
1486   recovering = false;
1487   if (recovery_only)
1488       exit(0);
1489 }
1490
1491 void
1492 writecheckpoint(const char *path, ckstate *c, double t0)
1493 {
1494   double t1 = now();
1495   printf("memory phase: %" PRIu64 " nodes, %.2f sec\n", c->count, t1 - t0);
1496
1497   int fd = creat(path, 0666);
1498   always_assert(fd >= 0);
1499
1500   // checkpoint file format, all msgpack:
1501   //   {"generation": generation, "size": size, ...}
1502   //   then `size` triples of key (string), timestmap (int), value (whatever)
1503   Json j = Json().set("generation", ckp_gen.value())
1504       .set("size", c->count)
1505       .set("firstkey", c->startkey);
1506   StringAccum sa;
1507   msgpack::unparse(sa, j);
1508   checked_write(fd, sa.data(), sa.length());
1509   checked_write(fd, c->vals->buf, c->vals->n);
1510
1511   int ret = fsync(fd);
1512   always_assert(ret == 0);
1513   ret = close(fd);
1514   always_assert(ret == 0);
1515
1516   double t2 = now();
1517   c->bytes = c->vals->n;
1518   printf("file phase (%s): %" PRIu64 " bytes, %.2f sec, %.1f MB/sec\n",
1519          path,
1520          c->bytes,
1521          t2 - t1,
1522          (c->bytes / 1000000.0) / (t2 - t1));
1523 }
1524
1525 void
1526 conc_filecheckpoint(threadinfo *ti)
1527 {
1528     ckstate *c = &cks[ti->index()];
1529     c->vals = new_bufkvout();
1530     double t0 = now();
1531     tree->table().scan(c->startkey, true, *c, *ti);
1532     char path[256];
1533     sprintf(path, "%s/kvd-ckp-%" PRId64 "-%d",
1534             ckpdirs[ti->index() % ckpdirs.size()],
1535             ckp_gen.value(), ti->index());
1536     writecheckpoint(path, c, t0);
1537     c->count = 0;
1538     free(c->vals);
1539 }
1540
1541 static Json
1542 prepare_checkpoint(kvepoch_t min_epoch, int nckthreads, const Str *pv)
1543 {
1544     Json j;
1545     j.set("kvdb_checkpoint", true)
1546         .set("min_epoch", min_epoch.value())
1547         .set("max_epoch", global_log_epoch.value())
1548         .set("generation", ckp_gen.value())
1549         .set("nckthreads", nckthreads);
1550
1551     Json pvj;
1552     for (int i = 1; i < nckthreads; ++i)
1553         pvj.push_back(Json::make_string(pv[i].s, pv[i].len));
1554     j.set("pivots", pvj);
1555
1556     return j;
1557 }
1558
1559 static void
1560 commit_checkpoint(Json ckpj)
1561 {
1562     // atomically commit a set of checkpoint files by incrementing
1563     // the checkpoint generation on disk
1564     char path[256];
1565     sprintf(path, "%s/kvd-ckp-gen", ckpdirs[0]);
1566     int r = atomic_write_file_contents(path, ckpj.unparse());
1567     always_assert(r == 0);
1568     fprintf(stderr, "kvd-ckp-%" PRIu64 " [%s,%s]: committed\n",
1569             ckp_gen.value(), ckpj["min_epoch"].to_s().c_str(),
1570             ckpj["max_epoch"].to_s().c_str());
1571
1572     // delete old checkpoint files
1573     for (int i = 0; i < nckthreads; i++) {
1574         char path[256];
1575         sprintf(path, "%s/kvd-ckp-%" PRId64 "-%d",
1576                 ckpdirs[i % ckpdirs.size()],
1577                 ckp_gen.value() - 1, i);
1578         unlink(path);
1579     }
1580 }
1581
1582 static kvepoch_t
1583 max_flushed_epoch()
1584 {
1585     kvepoch_t mfe = 0, ge = global_log_epoch;
1586     for (int i = 0; i < nlogger; ++i) {
1587         loginfo& log = logs->log(i);
1588         kvepoch_t fe = log.quiescent() ? ge : log.flushed_epoch();
1589         if (!mfe || fe < mfe)
1590             mfe = fe;
1591     }
1592     return mfe;
1593 }
1594
1595 // concurrent periodic checkpoint
1596 void* conc_checkpointer(threadinfo* ti) {
1597   recovercheckpoint(ti);
1598   ckstate *c = &cks[ti->index()];
1599   c->count = 0;
1600   pthread_cond_init(&c->state_cond, NULL);
1601   c->state = CKState_Ready;
1602   while (recovering)
1603     sleep(1);
1604   if (checkpoint_interval <= 0)
1605       return 0;
1606   if (ti->index() == 0) {
1607     for (int i = 1; i < nckthreads; i++)
1608       while (cks[i].state != CKState_Ready)
1609         ;
1610     Str *pv = new Str[nckthreads + 1];
1611     Json uncommitted_ckp;
1612
1613     while (1) {
1614       struct timespec ts;
1615       set_timespec(ts, now() + (uncommitted_ckp ? 0.25 : checkpoint_interval));
1616
1617       pthread_mutex_lock(&checkpoint_mu);
1618       if (!go_quit)
1619         pthread_cond_timedwait(&checkpoint_cond, &checkpoint_mu, &ts);
1620       if (go_quit) {
1621           for (int i = 0; i < nckthreads; i++) {
1622               cks[i].state = CKState_Quit;
1623               pthread_cond_signal(&cks[i].state_cond);
1624           }
1625           pthread_mutex_unlock(&checkpoint_mu);
1626           break;
1627       }
1628       pthread_mutex_unlock(&checkpoint_mu);
1629
1630       if (uncommitted_ckp) {
1631           kvepoch_t mfe = max_flushed_epoch();
1632           if (!mfe || mfe > uncommitted_ckp["max_epoch"].to_u64()) {
1633               commit_checkpoint(uncommitted_ckp);
1634               uncommitted_ckp = Json();
1635           }
1636           continue;
1637       }
1638
1639       double t0 = now();
1640       ti->rcu_start();
1641       for (int i = 0; i < nckthreads + 1; i++)
1642         pv[i].assign(NULL, 0);
1643       tree->findpivots(pv, nckthreads + 1);
1644       ti->rcu_stop();
1645
1646       kvepoch_t min_epoch = global_log_epoch;
1647       pthread_mutex_lock(&checkpoint_mu);
1648       ckp_gen = ckp_gen.next_nonzero();
1649       for (int i = 0; i < nckthreads; i++) {
1650           cks[i].startkey = pv[i];
1651           cks[i].endkey = (i == nckthreads - 1 ? Str() : pv[i + 1]);
1652           cks[i].state = CKState_Go;
1653           pthread_cond_signal(&cks[i].state_cond);
1654       }
1655       pthread_mutex_unlock(&checkpoint_mu);
1656
1657       ti->rcu_start();
1658       conc_filecheckpoint(ti);
1659       ti->rcu_stop();
1660
1661       cks[0].state = CKState_Ready;
1662       uint64_t bytes = cks[0].bytes;
1663       pthread_mutex_lock(&checkpoint_mu);
1664       for (int i = 1; i < nckthreads; i++) {
1665         while (cks[i].state != CKState_Ready)
1666           pthread_cond_wait(&cks[i].state_cond, &checkpoint_mu);
1667         bytes += cks[i].bytes;
1668       }
1669       pthread_mutex_unlock(&checkpoint_mu);
1670
1671       uncommitted_ckp = prepare_checkpoint(min_epoch, nckthreads, pv);
1672
1673       for (int i = 0; i < nckthreads + 1; i++)
1674         if (pv[i].s)
1675           free((void *)pv[i].s);
1676       double t = now() - t0;
1677       fprintf(stderr, "kvd-ckp-%" PRIu64 " [%s,%s]: prepared (%.2f sec, %" PRIu64 " MB, %" PRIu64 " MB/sec)\n",
1678               ckp_gen.value(), uncommitted_ckp["min_epoch"].to_s().c_str(),
1679               uncommitted_ckp["max_epoch"].to_s().c_str(),
1680               t, bytes / (1 << 20), (uint64_t)(bytes / t) >> 20);
1681     }
1682   } else {
1683     while(1) {
1684       pthread_mutex_lock(&checkpoint_mu);
1685       while (c->state != CKState_Go && c->state != CKState_Quit)
1686         pthread_cond_wait(&c->state_cond, &checkpoint_mu);
1687       if (c->state == CKState_Quit) {
1688         pthread_mutex_unlock(&checkpoint_mu);
1689         break;
1690       }
1691       pthread_mutex_unlock(&checkpoint_mu);
1692
1693       ti->rcu_start();
1694       conc_filecheckpoint(ti);
1695       ti->rcu_stop();
1696
1697       pthread_mutex_lock(&checkpoint_mu);
1698       c->state = CKState_Ready;
1699       pthread_cond_signal(&c->state_cond);
1700       pthread_mutex_unlock(&checkpoint_mu);
1701     }
1702   }
1703   return 0;
1704 }