benchmark silo added
[c11concurrency-benchmarks.git] / silo / masstree / log.hh
1 /* Masstree
2  * Eddie Kohler, Yandong Mao, Robert Morris
3  * Copyright (c) 2012-2014 President and Fellows of Harvard College
4  * Copyright (c) 2012-2014 Massachusetts Institute of Technology
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, subject to the conditions
9  * listed in the Masstree LICENSE file. These conditions include: you must
10  * preserve this copyright notice, and you cannot mention the copyright
11  * holders in advertising related to the Software without their permission.
12  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13  * notice is a summary of the Masstree LICENSE file; the license in that file
14  * is legally binding.
15  */
16 #ifndef MASSTREE_LOG_HH
17 #define MASSTREE_LOG_HH
18 #include "kvthread.hh"
19 #include "string.hh"
20 #include "kvproto.hh"
21 #include "str.hh"
22 #include <pthread.h>
23 class logset;
24 using lcdf::Str;
25 namespace lcdf { class Json; }
26
27 // in-memory log.
28 // more than one, to reduce contention on the lock.
29 class loginfo {
30   public:
31     void initialize(const lcdf::String& logfile);
32
33     inline void acquire();
34     inline void release();
35
36     inline kvepoch_t flushed_epoch() const;
37     inline bool quiescent() const;
38
39     // logging
40     struct query_times {
41         kvepoch_t epoch;
42         kvtimestamp_t ts;
43         kvtimestamp_t prev_ts;
44     };
45     // NB may block!
46     void record(int command, const query_times& qt, Str key, Str value);
47     void record(int command, const query_times& qt, Str key,
48                 const lcdf::Json* req, const lcdf::Json* end_req);
49
50   private:
51     struct waitlist {
52         waitlist* next;
53     };
54     struct front {
55         uint32_t lock_;
56         waitlist* waiting_;
57         lcdf::String::rep_type filename_;
58         logset* logset_;
59     };
60     struct logset_info {
61         int32_t size_;
62         int allocation_offset_;
63     };
64
65     front f_;
66     char padding1_[CACHE_LINE_SIZE - sizeof(front)];
67
68     kvepoch_t log_epoch_;       // epoch written to log (non-quiescent)
69     kvepoch_t quiescent_epoch_; // epoch we went quiescent
70     kvepoch_t wake_epoch_;      // epoch for which we recorded a wake command
71     kvepoch_t flushed_epoch_;   // epoch fsync()ed to disk
72
73     union {
74         struct {
75             char *buf_;
76             uint32_t pos_;
77             uint32_t len_;
78
79             // We have logged all writes up to, but not including,
80             // flushed_epoch_.
81             // Log is quiesced to disk if quiescent_epoch_ != 0
82             // and quiescent_epoch_ == flushed_epoch_.
83             // When a log wakes up from quiescence, it sets global_wake_epoch;
84             // other threads must record a logcmd_wake in their logs.
85             // Invariant: log_epoch_ != quiescent_epoch_ (unless both are 0).
86
87             threadinfo *ti_;
88             int logindex_;
89         };
90         struct {
91             char cache_line_2_[CACHE_LINE_SIZE - 4 * sizeof(kvepoch_t) - sizeof(logset_info)];
92             logset_info lsi_;
93         };
94     };
95
96     loginfo(logset* ls, int logindex);
97     ~loginfo();
98     void* run();
99     static void* logger_trampoline(threadinfo* ti);
100
101     friend class logset;
102 };
103
104 class logset {
105   public:
106     static logset* make(int size);
107     static void free(logset* ls);
108
109     inline int size() const;
110     inline loginfo& log(int i);
111     inline const loginfo& log(int i) const;
112
113   private:
114     loginfo li_[0];
115 };
116
117 extern kvepoch_t global_log_epoch;
118 extern kvepoch_t global_wake_epoch;
119 extern struct timeval log_epoch_interval;
120
121 enum logcommand {
122     logcmd_none = 0,
123     logcmd_put = 0x5455506B,            // "kPUT" in little endian
124     logcmd_replace = 0x3155506B,        // "kPU1"
125     logcmd_modify = 0x444F4D6B,         // "kMOD"
126     logcmd_remove = 0x4D45526B,         // "kREM"
127     logcmd_epoch = 0x4F50456B,          // "kEPO"
128     logcmd_quiesce = 0x4955516B,        // "kQUI"
129     logcmd_wake = 0x4B41576B            // "kWAK"
130 };
131
132
133 class logreplay {
134   public:
135     logreplay(const lcdf::String &filename);
136     ~logreplay();
137     int unmap();
138
139     struct info_type {
140         kvepoch_t first_epoch;
141         kvepoch_t last_epoch;
142         kvepoch_t wake_epoch;
143         kvepoch_t min_post_quiescent_wake_epoch;
144         bool quiescent;
145     };
146     info_type info() const;
147     kvepoch_t min_post_quiescent_wake_epoch(kvepoch_t quiescent_epoch) const;
148
149     void replay(int i, threadinfo *ti);
150
151   private:
152     lcdf::String filename_;
153     int errno_;
154     off_t size_;
155     char *buf_;
156
157     uint64_t replayandclean1(kvepoch_t min_epoch, kvepoch_t max_epoch,
158                              threadinfo *ti);
159     int replay_truncate(size_t len);
160     int replay_copy(const char *tmpname, const char *first, const char *last);
161 };
162
163 enum { REC_NONE, REC_CKP, REC_LOG_TS, REC_LOG_ANALYZE_WAKE,
164        REC_LOG_REPLAY, REC_DONE };
165 extern void recphase(int nactive, int state);
166 extern void waituntilphase(int phase);
167 extern void inactive();
168 extern pthread_mutex_t rec_mu;
169 extern logreplay::info_type *rec_log_infos;
170 extern kvepoch_t rec_ckp_min_epoch;
171 extern kvepoch_t rec_ckp_max_epoch;
172 extern kvepoch_t rec_replay_min_epoch;
173 extern kvepoch_t rec_replay_max_epoch;
174 extern kvepoch_t rec_replay_min_quiescent_last_epoch;
175
176
177 inline void loginfo::acquire() {
178     test_and_set_acquire(&f_.lock_);
179 }
180
181 inline void loginfo::release() {
182     test_and_set_release(&f_.lock_);
183 }
184
185 inline kvepoch_t loginfo::flushed_epoch() const {
186     return flushed_epoch_;
187 }
188
189 inline bool loginfo::quiescent() const {
190     return quiescent_epoch_ && quiescent_epoch_ == flushed_epoch_;
191 }
192
193 inline int logset::size() const {
194     return li_[-1].lsi_.size_;
195 }
196
197 inline loginfo& logset::log(int i) {
198     assert(unsigned(i) < unsigned(size()));
199     return li_[i];
200 }
201
202 inline const loginfo& logset::log(int i) const {
203     assert(unsigned(i) < unsigned(size()));
204     return li_[i];
205 }
206
207
208 template <typename R>
209 struct row_delta_marker : public row_marker {
210     kvtimestamp_t prev_ts_;
211     R *prev_;
212     char s_[0];
213 };
214
215 template <typename R>
216 inline bool row_is_delta_marker(const R* row) {
217     if (row_is_marker(row)) {
218         const row_marker* m =
219             reinterpret_cast<const row_marker *>(row->col(0).s);
220         return m->marker_type_ == m->mt_delta;
221     } else
222         return false;
223 }
224
225 template <typename R>
226 inline row_delta_marker<R>* row_get_delta_marker(const R* row, bool force = false) {
227     (void) force;
228     assert(force || row_is_delta_marker(row));
229     return reinterpret_cast<row_delta_marker<R>*>
230         (const_cast<char*>(row->col(0).s));
231 }
232
233 #endif