inital commit
[c11concurrency-benchmarks.git] / mabain / examples / mb_rc_test.cpp
1 /**
2  * Copyright (C) 2017 Cisco Inc.
3  *
4  * This program is free software: you can redistribute it and/or  modify
5  * it under the terms of the GNU General Public License, version 2,
6  * as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  */
16
17 // @author Changxue Deng <chadeng@cisco.com>
18
19 #include <assert.h>
20 #include <string.h>
21 #include <unistd.h>
22 #include <pthread.h>
23 #include <atomic>
24
25 #include <mabain/db.h>
26
27 #include "test_key.h"
28
29 using namespace mabain;
30
31 #define DB_SIZE 128ULL*1024*1024
32
33 static int max_key = 10000000;
34 static std::atomic<int> write_index;
35 static bool stop_processing = false;
36 static std::string mbdir = "/var/tmp/mabain_test/";
37
38 static int key_list_size = 10;
39 std::string key_list[] = {
40     "facebook",
41     "facetime",
42     "faceoff",
43     "furious",
44     "falcons",
45     "fences",
46     "fireman",
47     "fantasticfour",
48     "fragile",
49     "frankenstein"
50 };
51
52 static void* insert_thread(void *arg)
53 {
54     int curr_key;
55     TestKey mkey(MABAIN_TEST_KEY_TYPE_SHA_256);
56     std::string kv;
57     DB *db_r = new DB(mbdir.c_str(), CONSTS::ReaderOptions(), 128LL*1024*1024, 128LL*1024*1024);
58     // If a reader wants to perform DB update, the async writer pointer must be set.
59     assert(db_r->is_open());
60     assert(db_r->SetAsyncWriterPtr((DB *) arg) == MBError::SUCCESS);
61     assert(db_r->AsyncWriterEnabled());
62
63     while(!stop_processing) {
64         curr_key = write_index.fetch_add(1, std::memory_order_release);
65         kv = mkey.get_key(curr_key);
66         if(curr_key < max_key) {
67             assert(db_r->Add(kv, kv) == MBError::SUCCESS);
68         } else {
69             stop_processing = true;
70             break;
71         }
72     }
73
74     // Reader must unregister the async writer pointer
75     assert(db_r->UnsetAsyncWriterPtr((DB *) arg) == MBError::SUCCESS);
76     db_r->Close();
77     delete db_r;
78     return NULL;
79 }
80
81 static void SetTestStatus(bool success)
82 {
83     std::string cmd;
84     if(success) {
85         cmd = std::string("touch ") + mbdir + "/_success";
86     } else {
87         cmd = std::string("rm ") + mbdir + "/_success >" + mbdir + "/out 2>" + mbdir + "/err";
88     }
89     if(system(cmd.c_str()) != 0) {
90     }
91 }
92
93 static void Lookup()
94 {
95     TestKey mkey(MABAIN_TEST_KEY_TYPE_SHA_256);
96     std::string kv;
97     DB *db_r = new DB(mbdir.c_str(), CONSTS::ReaderOptions(), 128LL*1024*1024, 128LL*1024*1024);
98     assert(db_r->is_open());
99     MBData mbd;
100
101     for(int i = 0; i < 100; i++) {
102         kv = mkey.get_key(i);
103         assert(db_r->Find(kv, mbd) == MBError::SUCCESS);
104         assert(kv == std::string((const char *)mbd.buff, mbd.data_len));
105     }
106     db_r->Close();
107     delete db_r;
108 }
109
110 static void GarbageLookup()
111 {
112     std::cout << "\nCalling Lookups during Garbage Collection" << std::endl;
113     std::string kv;
114     DB *db_r = new DB(mbdir.c_str(), CONSTS::ReaderOptions(), 128LL*1024*1024, 128LL*1024*1024);
115     assert(db_r->is_open());
116     MBData mbd;
117
118     for(int i = 0; i < key_list_size; i++) {
119         int rval = db_r->Find(key_list[i], mbd);
120         if(rval != MBError::SUCCESS) {
121             std::cout << key_list[i] << ": " << MBError::get_error_str(rval) << std::endl;
122         } else {
123             std::cout << key_list[i] << ": " << std::string((const char *)mbd.buff, mbd.data_len) << std::endl;
124             assert(rval == MBError::SUCCESS);
125             assert(key_list[i] == std::string((const char *)mbd.buff, mbd.data_len));
126         }
127     }
128
129     db_r->Close();
130     delete db_r;
131 }
132
133 static void Deletekeys()
134 {
135     std::cout << "\nStart Deletekeys ()" << std::endl;
136     std::string kv;
137
138     int options = CONSTS::WriterOptions() | CONSTS::ReaderOptions() | CONSTS::ASYNC_WRITER_MODE;
139     DB *db = new DB(mbdir.c_str(), options, 128LL*1024*1024, 128LL*1024*1024);
140     assert(db->is_open());
141     MBData mbd;
142
143     TestKey mkey(MABAIN_TEST_KEY_TYPE_SHA_256);
144     for(int i = 0; i < 10000; i++) {
145         kv = mkey.get_key(i);
146         assert(db->Remove(kv) == MBError::SUCCESS);
147     }
148
149     db->Close();
150     delete db;
151 }
152
153
154 static void GarbageCollectResources()
155 {
156     std::cout << "\nKick-start Garbage Collection" << std::endl;
157     TestKey mkey(MABAIN_TEST_KEY_TYPE_SHA_256);
158     std::string kv;
159
160     int options = CONSTS::WriterOptions() | CONSTS::ReaderOptions() | CONSTS::ASYNC_WRITER_MODE;
161     DB *db = new DB(mbdir.c_str(), options, 128LL*1024*1024, 128LL*1024*1024);
162     assert(db->is_open());
163     MBData mbd;
164
165     db->CollectResource(1, 1);
166
167     for(int i = 0; i < key_list_size; i++) {
168         assert(db->Add(key_list[i], key_list[i]) == MBError::SUCCESS);
169     }
170
171     db->Close();
172     delete db;
173 }
174
175
176 // Multiple threads performing DB insertion/deletion/updating
177 int main(int argc, char *argv[])
178 {
179     std::string cmd = std::string("rm -rf ") + mbdir;
180     if(system(cmd.c_str()) != 0) {}
181
182     cmd = std::string("mkdir -p ") + mbdir;
183     if(system(cmd.c_str()) != 0) {}
184
185     pthread_t pid[256];
186     int nthread = 4;
187     if(nthread > 256) {
188         abort();
189     }
190
191     if(argc > 1) {
192         mbdir = std::string(argv[1]);
193         std::cout << "Mabain test db directory " << mbdir << "\n";
194     }
195     if(argc > 2) {
196         max_key = atoi(argv[2]);
197         std::cout << "Setting number of keys to be " << max_key << "\n";
198     }
199
200     SetTestStatus(false);
201     mabain::DB::SetLogFile(mbdir + "/mabain.log");
202
203     write_index.store(0, std::memory_order_release);
204     // Writer needs to enable async writer mode.
205     int options = CONSTS::WriterOptions() | CONSTS::ASYNC_WRITER_MODE;
206     DB *db = new DB(mbdir.c_str(), options, 128LL*1024*1024, 128LL*1024*1024);
207     assert(db->is_open());
208     db->RemoveAll();
209
210     for(int i = 0; i < nthread; i++) {
211         if(pthread_create(&pid[i], NULL, insert_thread, db) != 0) {
212             std::cout << "failed to create thread\n";
213             abort();
214         }
215     }
216
217     while(!stop_processing) {
218         usleep(5);
219     }
220
221     for(int i = 0; i < nthread; i++) {
222         pthread_join(pid[i], NULL);
223     }
224
225     // Writer handle must be the last one to close if reader handles are used for DB update.
226     assert(db->Close() == MBError::SUCCESS);
227     delete db;
228
229     // at this timer we added all required keys, verify all those keys
230     Lookup();
231
232     // delete keys so that we can create some garbage data to be reclaimed
233     Deletekeys();
234
235     // kick start the garbage collection task
236     GarbageCollectResources();
237
238     // lookup few keys that we added during garbage collection
239     GarbageLookup();
240
241     mabain::DB::CloseLogFile();
242     SetTestStatus(true);
243     return 0;
244 }