62a18536b014ea19bca10d6d504e15a7307588f0
[c11concurrency-benchmarks.git] / mabain / examples / mb_multi_thread_insert_test.cpp
1 /**
2  * Copyright (C) 2017 Cisco Inc.
3  *
4  * This program is free software: you can redistribute it and/or  modify
5  * it under the terms of the GNU General Public License, version 2,
6  * as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  */
16
17 // @author Changxue Deng <chadeng@cisco.com>
18
19 #include <assert.h>
20 #include <string.h>
21 #include <unistd.h>
22 #include <pthread.h>
23 #include <atomic>
24
25 #include <mabain/db.h>
26
27 #include "test_key.h"
28
29 //#define ASSERT_TEST
30
31 using namespace mabain;
32
33 static int max_key = 100000;
34 static std::atomic<int> write_index;
35 static bool stop_processing = false;
36 static std::string mbdir = "./multi_test/";
37
38 static void* insert_thread(void *arg)
39 {
40     int curr_key;
41     TestKey mkey(MABAIN_TEST_KEY_TYPE_INT);
42     std::string kv;
43     DB *db_r = new DB(mbdir.c_str(), CONSTS::ReaderOptions(), 128LL*1024*1024, 128LL*1024*1024);
44     assert(db_r->is_open());
45
46     while(!stop_processing) {
47         curr_key = write_index.fetch_add(1, std::memory_order_release);
48         kv = mkey.get_key(curr_key);
49         if(curr_key < max_key) {
50 #ifdef ASSERT_TEST
51             assert(db_r->Add(kv, kv) == MBError::SUCCESS);
52 #else
53             db_r->Add(kv, kv);
54 #endif
55         } else {
56             stop_processing = true;
57             break;
58         }
59     }
60
61     // Reader must unregister the async writer pointer
62     delete db_r;
63     return NULL;
64 }
65
66 void SetTestStatus(bool success)
67 {
68     std::string cmd;
69     if(success) {
70         cmd = std::string("touch ") + mbdir + "/_success";
71     } else {
72         cmd = std::string("rm ") + mbdir + "/_success >" + mbdir + "/out 2>" + mbdir + "/err";
73     }
74     if(system(cmd.c_str()) != 0) {
75     }
76 }
77
78 static void Lookup()
79 {
80     TestKey mkey(MABAIN_TEST_KEY_TYPE_INT);
81     std::string kv;
82     DB *db_r = new DB(mbdir.c_str(), CONSTS::ReaderOptions(), 128LL*1024*1024, 128LL*1024*1024);
83     assert(db_r->is_open());
84     MBData mbd;
85
86     for(int i = 0; i < max_key; i++) {
87         kv = mkey.get_key(i);
88 #ifdef ASSERT_TEST
89         assert(db_r->Find(kv, mbd) == MBError::SUCCESS);
90         assert(kv == std::string((const char *)mbd.buff, mbd.data_len));
91 #else
92         db_r->Find(kv, mbd);
93 #endif
94     }
95     db_r->Close();
96     delete db_r;
97 }
98
99 // Multiple threads performing DB insertion/deletion/updating
100 int main(int argc, char *argv[])
101 {
102     pthread_t pid[256];
103     int nthread = 4;
104     if(nthread > 256) {
105         abort();
106     }
107
108     if(argc > 1) {
109         mbdir = std::string(argv[1]);
110         std::cout << "Mabain test db directory " << mbdir << "\n";
111     }
112     if(argc > 2) {
113         max_key = atoi(argv[2]);
114         std::cout << "Setting number of keys to be " << max_key << "\n";
115     }
116
117 //    SetTestStatus(false);
118     mabain::DB::SetLogFile(mbdir + "/mabain.log");
119
120     write_index.store(0, std::memory_order_release);
121     // Writer needs to enable async writer mode.
122     int options = CONSTS::WriterOptions() | CONSTS::ASYNC_WRITER_MODE;
123     DB *db = new DB(mbdir.c_str(), options, 128LL*1024*1024, 128LL*1024*1024);
124     assert(db->is_open());
125     db->RemoveAll();
126
127     for(int i = 0; i < nthread; i++) {
128         if(pthread_create(&pid[i], NULL, insert_thread, db) != 0) {
129             std::cout << "failed to create thread\n";
130             abort();
131         }
132     }
133
134     for(int i = 0; i < nthread; i++) {
135         pthread_join(pid[i], NULL);
136     }
137
138     assert(db->Close() == MBError::SUCCESS);
139     delete db;
140
141     Lookup();
142
143     mabain::DB::CloseLogFile();
144 //    SetTestStatus(true);
145     return 0;
146 }