10 #include "../macros.h"
11 #include "mysql_wrapper.h"
14 static bool embed_active = false;
17 print_error_and_bail(MYSQL *conn)
19 cerr << "mysql_error_message: " << mysql_error(conn) << endl;
24 check_result(MYSQL *conn, int result)
26 if (likely(result == 0))
28 print_error_and_bail(conn);
31 mysql_wrapper::mysql_wrapper(const string &dir, const string &db)
35 if (stat(dir.c_str(), &st) != 0) {
36 cerr << "ERROR! The db directory " << dir << " does not exist" << endl;
40 if (!__sync_bool_compare_and_swap(&embed_active, false, true)) {
41 cerr << "only one embedmysql object can exist at once" << endl;
46 snprintf(dir_arg, sizeof(dir_arg), "--datadir=%s", dir.c_str());
49 --innodb-buffer-pool-size=$SPACE
50 --innodb_log_file_size=1792M
52 --transaction_isolation=serializable
55 --max_allowed_packet=1073741824
56 --max_heap_table_size=2147483648
57 --group_concat_max_len=1073741824
59 --innodb_flush_method=O_DIRECT
63 const char *mysql_av[] =
66 "--skip-grant-tables",
68 "--character-set-server=utf8",
69 "--innodb-buffer-pool-size=4G", // XXX: don't hardocde
70 "--innodb_log_file_size=1792M", // max log file size
71 "--transaction_isolation=serializable",
72 "--innodb_flush_method=O_DIRECT",
73 "--innodb_flush_log_at_trx_commit=0", // only flush log once every second
75 "--language=" MYSQL_SHARE_DIR,
78 check_result(0, mysql_library_init(ARRAY_NELEMS(mysql_av), (char **) mysql_av, 0));
80 MYSQL *conn = new_connection("");
82 b << "CREATE DATABASE IF NOT EXISTS " << db << ";";
83 check_result(conn, mysql_query(conn, b.str().c_str()));
84 check_result(conn, mysql_select_db(conn, db.c_str()));
88 mysql_wrapper::~mysql_wrapper()
91 ALWAYS_ASSERT(__sync_bool_compare_and_swap(&embed_active, true, false));
95 mysql_wrapper::thread_init(bool loader)
97 ALWAYS_ASSERT(tl_conn == NULL);
98 tl_conn = new_connection(db);
99 ALWAYS_ASSERT(tl_conn);
103 mysql_wrapper::thread_end()
105 ALWAYS_ASSERT(tl_conn);
106 mysql_close(tl_conn);
112 mysql_wrapper::new_txn(
118 ALWAYS_ASSERT(tl_conn);
119 check_result(tl_conn, mysql_real_query(tl_conn, "BEGIN", 5));
120 return (void *) tl_conn;
124 mysql_wrapper::commit_txn(void *p)
126 ALWAYS_ASSERT(tl_conn == p);
127 return mysql_commit(tl_conn) == 0;
131 mysql_wrapper::abort_txn(void *p)
133 ALWAYS_ASSERT(tl_conn == p);
134 check_result(tl_conn, mysql_rollback(tl_conn));
137 abstract_ordered_index *
138 mysql_wrapper::open_index(const string &name, size_t value_size_hint, bool mostly_append)
140 ALWAYS_ASSERT(value_size_hint <= 256); // limitation
141 MYSQL *conn = new_connection(db);
142 ostringstream b_create, b_truncate;
144 "CREATE TABLE IF NOT EXISTS " << name << " ("
145 " tbl_key VARBINARY(256) PRIMARY KEY, "
146 " tbl_value VARBINARY(256) "
149 "TRUNCATE TABLE " << name << ";";
150 check_result(conn, mysql_query(conn, b_create.str().c_str()));
151 check_result(conn, mysql_query(conn, b_truncate.str().c_str()));
152 check_result(conn, mysql_commit(conn));
154 return new mysql_ordered_index(name);
158 mysql_wrapper::close_index(abstract_ordered_index *idx)
164 my_escape(MYSQL *conn, const char *p, size_t l)
167 unsigned long newl = mysql_real_escape_string(conn, &buf[0], p, l);
168 return string(&buf[0], newl);
172 mysql_ordered_index::get(
175 string &value, size_t max_bytes_read)
177 INVARIANT(txn == mysql_wrapper::tl_conn);
178 ALWAYS_ASSERT(key.size() <= 256);
180 b << "SELECT tbl_value FROM " << name << " WHERE tbl_key = '" << my_escape(mysql_wrapper::tl_conn, key.data(), key.size()) << "';";
182 check_result(mysql_wrapper::tl_conn, mysql_real_query(mysql_wrapper::tl_conn, q.data(), q.size()));
183 MYSQL_RES *res = mysql_store_result(mysql_wrapper::tl_conn);
185 MYSQL_ROW row = mysql_fetch_row(res);
188 unsigned long *lengths = mysql_fetch_lengths(res);
189 value.assign(row[0], min(lengths[0], max_bytes_read));
192 mysql_free_result(res);
197 mysql_ordered_index::put(
202 INVARIANT(txn == mysql_wrapper::tl_conn);
203 ALWAYS_ASSERT(key.size() <= 256);
204 ALWAYS_ASSERT(value.size() <= 256);
205 string escaped_key = my_escape(mysql_wrapper::tl_conn, key.data(), key.size());
206 string escaped_value = my_escape(mysql_wrapper::tl_conn, value.data(), value.size());
208 b << "UPDATE " << name << " SET tbl_value='" << escaped_value << "' WHERE tbl_key='" << escaped_key << "';";
210 check_result(mysql_wrapper::tl_conn, mysql_real_query(mysql_wrapper::tl_conn, q.data(), q.size()));
211 my_ulonglong ret = mysql_affected_rows(mysql_wrapper::tl_conn);
212 if (unlikely(ret == (my_ulonglong) -1))
213 print_error_and_bail(mysql_wrapper::tl_conn);
217 b1 << "INSERT INTO " << name << " VALUES ('" << escaped_key << "', '" << escaped_value << "');";
218 string q1 = b1.str();
219 check_result(mysql_wrapper::tl_conn, mysql_real_query(mysql_wrapper::tl_conn, q1.data(), q1.size()));
224 mysql_ordered_index::insert(
229 INVARIANT(txn == mysql_wrapper::tl_conn);
230 ALWAYS_ASSERT(key.size() <= 256);
231 ALWAYS_ASSERT(value.size() <= 256);
232 string escaped_key = my_escape(mysql_wrapper::tl_conn, key.data(), key.size());
233 string escaped_value = my_escape(mysql_wrapper::tl_conn, value.data(), value.size());
235 b1 << "INSERT INTO " << name << " VALUES ('" << escaped_key << "', '" << escaped_value << "');";
236 string q1 = b1.str();
237 check_result(mysql_wrapper::tl_conn, mysql_real_query(mysql_wrapper::tl_conn, q1.data(), q1.size()));
242 mysql_wrapper::new_connection(const string &db)
244 MYSQL *conn = mysql_init(0);
245 mysql_options(conn, MYSQL_OPT_USE_EMBEDDED_CONNECTION, 0);
246 if (!mysql_real_connect(conn, 0, 0, 0, db.c_str(), 0, 0, CLIENT_FOUND_ROWS | CLIENT_MULTI_STATEMENTS)) {
248 cerr << "mysql_real_connect: " << mysql_error(conn) << endl;
249 ALWAYS_ASSERT(false);
251 check_result(conn, mysql_autocommit(conn, 0));
255 __thread MYSQL *mysql_wrapper::tl_conn = NULL;