benchmark silo added
[c11concurrency-benchmarks.git] / silo / benchmarks / mysql_wrapper.cc
1 #include <sys/stat.h>
2 #include <sys/types.h>
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <string.h>
6
7 #include <iostream>
8 #include <sstream>
9
10 #include "../macros.h"
11 #include "mysql_wrapper.h"
12
13 using namespace std;
14 static bool embed_active = false;
15
16 static inline void
17 print_error_and_bail(MYSQL *conn)
18 {
19   cerr << "mysql_error_message: " << mysql_error(conn) << endl;
20   ALWAYS_ASSERT(false);
21 }
22
23 static inline void
24 check_result(MYSQL *conn, int result)
25 {
26   if (likely(result == 0))
27     return;
28   print_error_and_bail(conn);
29 }
30
31 mysql_wrapper::mysql_wrapper(const string &dir, const string &db)
32   : db(db)
33 {
34   struct stat st;
35   if (stat(dir.c_str(), &st) != 0) {
36     cerr << "ERROR! The db directory " << dir << " does not exist" << endl;
37     ALWAYS_ASSERT(false);
38   }
39
40   if (!__sync_bool_compare_and_swap(&embed_active, false, true)) {
41     cerr << "only one embedmysql object can exist at once" << endl;
42     ALWAYS_ASSERT(false);
43   }
44
45   char dir_arg[1024];
46   snprintf(dir_arg, sizeof(dir_arg), "--datadir=%s", dir.c_str());
47
48   /**
49        --innodb-buffer-pool-size=$SPACE
50        --innodb_log_file_size=1792M
51        --port=$PORT
52        --transaction_isolation=serializable
53        --max_connections=300
54        --local-infile=1
55        --max_allowed_packet=1073741824
56        --max_heap_table_size=2147483648
57        --group_concat_max_len=1073741824
58        --skip-slave-start
59        --innodb_flush_method=O_DIRECT
60        --log-error
61   */
62
63   const char *mysql_av[] =
64     {
65       "progname",
66       "--skip-grant-tables",
67       dir_arg,
68       "--character-set-server=utf8",
69       "--innodb-buffer-pool-size=4G", // XXX: don't hardocde
70       "--innodb_log_file_size=1792M", // max log file size
71       "--transaction_isolation=serializable",
72       "--innodb_flush_method=O_DIRECT",
73       "--innodb_flush_log_at_trx_commit=0", // only flush log once every second
74       "--sync_binlog=0",
75       "--language=" MYSQL_SHARE_DIR,
76     };
77
78   check_result(0, mysql_library_init(ARRAY_NELEMS(mysql_av), (char **) mysql_av, 0));
79
80   MYSQL *conn = new_connection("");
81   ostringstream b;
82   b << "CREATE DATABASE IF NOT EXISTS " << db << ";";
83   check_result(conn, mysql_query(conn, b.str().c_str()));
84   check_result(conn, mysql_select_db(conn, db.c_str()));
85   mysql_close(conn);
86 }
87
88 mysql_wrapper::~mysql_wrapper()
89 {
90   mysql_server_end();
91   ALWAYS_ASSERT(__sync_bool_compare_and_swap(&embed_active, true, false));
92 }
93
94 void
95 mysql_wrapper::thread_init(bool loader)
96 {
97   ALWAYS_ASSERT(tl_conn == NULL);
98   tl_conn = new_connection(db);
99   ALWAYS_ASSERT(tl_conn);
100 }
101
102 void
103 mysql_wrapper::thread_end()
104 {
105   ALWAYS_ASSERT(tl_conn);
106   mysql_close(tl_conn);
107   tl_conn = NULL;
108   mysql_thread_end();
109 }
110
111 void *
112 mysql_wrapper::new_txn(
113     uint64_t txn_flags,
114     str_arena &arena,
115     void *buf,
116     TxnProfileHint hint)
117 {
118   ALWAYS_ASSERT(tl_conn);
119   check_result(tl_conn, mysql_real_query(tl_conn, "BEGIN", 5));
120   return (void *) tl_conn;
121 }
122
123 bool
124 mysql_wrapper::commit_txn(void *p)
125 {
126   ALWAYS_ASSERT(tl_conn == p);
127   return mysql_commit(tl_conn) == 0;
128 }
129
130 void
131 mysql_wrapper::abort_txn(void *p)
132 {
133   ALWAYS_ASSERT(tl_conn == p);
134   check_result(tl_conn, mysql_rollback(tl_conn));
135 }
136
137 abstract_ordered_index *
138 mysql_wrapper::open_index(const string &name, size_t value_size_hint, bool mostly_append)
139 {
140   ALWAYS_ASSERT(value_size_hint <= 256); // limitation
141   MYSQL *conn = new_connection(db);
142   ostringstream b_create, b_truncate;
143   b_create <<
144     "CREATE TABLE IF NOT EXISTS " << name << " ("
145     "  tbl_key VARBINARY(256) PRIMARY KEY, "
146     "  tbl_value VARBINARY(256) "
147     ") ENGINE=InnoDB;";
148   b_truncate <<
149     "TRUNCATE TABLE " << name << ";";
150   check_result(conn, mysql_query(conn, b_create.str().c_str()));
151   check_result(conn, mysql_query(conn, b_truncate.str().c_str()));
152   check_result(conn, mysql_commit(conn));
153   mysql_close(conn);
154   return new mysql_ordered_index(name);
155 }
156
157 void
158 mysql_wrapper::close_index(abstract_ordered_index *idx)
159 {
160   delete idx;
161 }
162
163 static inline string
164 my_escape(MYSQL *conn, const char *p, size_t l)
165 {
166   char buf[2*l + 1];
167   unsigned long newl = mysql_real_escape_string(conn, &buf[0], p, l);
168   return string(&buf[0], newl);
169 }
170
171 bool
172 mysql_ordered_index::get(
173     void *txn,
174     const string &key,
175     string &value, size_t max_bytes_read)
176 {
177   INVARIANT(txn == mysql_wrapper::tl_conn);
178   ALWAYS_ASSERT(key.size() <= 256);
179   ostringstream b;
180   b << "SELECT tbl_value FROM " << name << " WHERE tbl_key = '" << my_escape(mysql_wrapper::tl_conn, key.data(), key.size()) << "';";
181   string q = b.str();
182   check_result(mysql_wrapper::tl_conn, mysql_real_query(mysql_wrapper::tl_conn, q.data(), q.size()));
183   MYSQL_RES *res = mysql_store_result(mysql_wrapper::tl_conn);
184   ALWAYS_ASSERT(res);
185   MYSQL_ROW row = mysql_fetch_row(res);
186   bool ret = false;
187   if (row) {
188     unsigned long *lengths = mysql_fetch_lengths(res);
189     value.assign(row[0], min(lengths[0], max_bytes_read));
190     ret = true;
191   }
192   mysql_free_result(res);
193   return ret;
194 }
195
196 const char *
197 mysql_ordered_index::put(
198     void *txn,
199     const string &key,
200     const string &value)
201 {
202   INVARIANT(txn == mysql_wrapper::tl_conn);
203   ALWAYS_ASSERT(key.size() <= 256);
204   ALWAYS_ASSERT(value.size() <= 256);
205   string escaped_key = my_escape(mysql_wrapper::tl_conn, key.data(), key.size());
206   string escaped_value = my_escape(mysql_wrapper::tl_conn, value.data(), value.size());
207   ostringstream b;
208   b << "UPDATE " << name << " SET tbl_value='" << escaped_value << "' WHERE tbl_key='" << escaped_key << "';";
209   string q = b.str();
210   check_result(mysql_wrapper::tl_conn, mysql_real_query(mysql_wrapper::tl_conn, q.data(), q.size()));
211   my_ulonglong ret = mysql_affected_rows(mysql_wrapper::tl_conn);
212   if (unlikely(ret == (my_ulonglong) -1))
213     print_error_and_bail(mysql_wrapper::tl_conn);
214   if (ret)
215     return 0;
216   ostringstream b1;
217   b1 << "INSERT INTO " << name << " VALUES ('" << escaped_key << "', '" << escaped_value << "');";
218   string q1 = b1.str();
219   check_result(mysql_wrapper::tl_conn, mysql_real_query(mysql_wrapper::tl_conn, q1.data(), q1.size()));
220   return 0;
221 }
222
223 const char *
224 mysql_ordered_index::insert(
225     void *txn,
226     const string &key,
227     const string &value)
228 {
229   INVARIANT(txn == mysql_wrapper::tl_conn);
230   ALWAYS_ASSERT(key.size() <= 256);
231   ALWAYS_ASSERT(value.size() <= 256);
232   string escaped_key = my_escape(mysql_wrapper::tl_conn, key.data(), key.size());
233   string escaped_value = my_escape(mysql_wrapper::tl_conn, value.data(), value.size());
234   ostringstream b1;
235   b1 << "INSERT INTO " << name << " VALUES ('" << escaped_key << "', '" << escaped_value << "');";
236   string q1 = b1.str();
237   check_result(mysql_wrapper::tl_conn, mysql_real_query(mysql_wrapper::tl_conn, q1.data(), q1.size()));
238   return 0;
239 }
240
241 MYSQL *
242 mysql_wrapper::new_connection(const string &db)
243 {
244   MYSQL *conn = mysql_init(0);
245   mysql_options(conn, MYSQL_OPT_USE_EMBEDDED_CONNECTION, 0);
246   if (!mysql_real_connect(conn, 0, 0, 0, db.c_str(), 0, 0, CLIENT_FOUND_ROWS | CLIENT_MULTI_STATEMENTS)) {
247     mysql_close(conn);
248     cerr << "mysql_real_connect: " << mysql_error(conn) << endl;
249     ALWAYS_ASSERT(false);
250   }
251   check_result(conn, mysql_autocommit(conn, 0));
252   return conn;
253 }
254
255 __thread MYSQL *mysql_wrapper::tl_conn = NULL;