7e73ec0a4e6c1b25d32dea71986ec5005ce83ab5
[c11concurrency-benchmarks.git] / mabain / examples / mb_multi_thread_insert_test.cpp
1 /**
2  * Copyright (C) 2017 Cisco Inc.
3  *
4  * This program is free software: you can redistribute it and/or  modify
5  * it under the terms of the GNU General Public License, version 2,
6  * as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  */
16
17 // @author Changxue Deng <chadeng@cisco.com>
18
19 #include <assert.h>
20 #include <string.h>
21 #include <unistd.h>
22 #include <pthread.h>
23 #include <atomic>
24
25 #include <mabain/db.h>
26
27 #include "test_key.h"
28
29 using namespace mabain;
30
31 static int max_key = 1000000;
32 static std::atomic<int> write_index;
33 static bool stop_processing = false;
34 static std::string mbdir = "./multi_test/";
35
36 static void* insert_thread(void *arg)
37 {
38     int curr_key;
39     TestKey mkey(MABAIN_TEST_KEY_TYPE_INT);
40     std::string kv;
41     DB *db_r = new DB(mbdir.c_str(), CONSTS::ReaderOptions(), 128LL*1024*1024, 128LL*1024*1024);
42     assert(db_r->is_open());
43
44     while(!stop_processing) {
45         curr_key = write_index.fetch_add(1, std::memory_order_release);
46         kv = mkey.get_key(curr_key);
47         if(curr_key < max_key) {
48             assert(db_r->Add(kv, kv) == MBError::SUCCESS);
49         } else {
50             stop_processing = true;
51             break;
52         }
53     }
54
55     // Reader must unregister the async writer pointer
56     delete db_r;
57     return NULL;
58 }
59
60 void SetTestStatus(bool success)
61 {
62     std::string cmd;
63     if(success) {
64         cmd = std::string("touch ") + mbdir + "/_success";
65     } else {
66         cmd = std::string("rm ") + mbdir + "/_success >" + mbdir + "/out 2>" + mbdir + "/err";
67     }
68     if(system(cmd.c_str()) != 0) {
69     }
70 }
71
72 static void Lookup()
73 {
74     TestKey mkey(MABAIN_TEST_KEY_TYPE_INT);
75     std::string kv;
76     DB *db_r = new DB(mbdir.c_str(), CONSTS::ReaderOptions(), 128LL*1024*1024, 128LL*1024*1024);
77     assert(db_r->is_open());
78     MBData mbd;
79
80     for(int i = 0; i < max_key; i++) {
81         kv = mkey.get_key(i);
82         assert(db_r->Find(kv, mbd) == MBError::SUCCESS);
83         assert(kv == std::string((const char *)mbd.buff, mbd.data_len));
84     }
85     db_r->Close();
86     delete db_r;
87 }
88
89 // Multiple threads performing DB insertion/deletion/updating
90 int main(int argc, char *argv[])
91 {
92     pthread_t pid[256];
93     int nthread = 4;
94     if(nthread > 256) {
95         abort();
96     }
97
98     if(argc > 1) {
99         mbdir = std::string(argv[1]);
100         std::cout << "Mabain test db directory " << mbdir << "\n";
101     }
102     if(argc > 2) {
103         max_key = atoi(argv[2]);
104         std::cout << "Setting number of keys to be " << max_key << "\n";
105     }
106
107 //    SetTestStatus(false);
108     mabain::DB::SetLogFile(mbdir + "/mabain.log");
109
110     write_index.store(0, std::memory_order_release);
111     // Writer needs to enable async writer mode.
112     int options = CONSTS::WriterOptions() | CONSTS::ASYNC_WRITER_MODE;
113     DB *db = new DB(mbdir.c_str(), options, 128LL*1024*1024, 128LL*1024*1024);
114     assert(db->is_open());
115     db->RemoveAll();
116
117     for(int i = 0; i < nthread; i++) {
118         if(pthread_create(&pid[i], NULL, insert_thread, db) != 0) {
119             std::cout << "failed to create thread\n";
120             abort();
121         }
122     }
123
124     for(int i = 0; i < nthread; i++) {
125         pthread_join(pid[i], NULL);
126     }
127
128     assert(db->Close() == MBError::SUCCESS);
129     delete db;
130
131     Lookup();
132
133     mabain::DB::CloseLogFile();
134 //    SetTestStatus(true);
135     return 0;
136 }