2 * Copyright (C) 2017 Cisco Inc.
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2,
6 * as published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // @author Changxue Deng <chadeng@cisco.com>
21 #include <sys/syscall.h>
32 #include "integer_4b_5b.h"
33 #include "async_writer.h"
34 #include "mb_backup.h"
35 #include "resource_pool.h"
37 #include "util/shm_mutex.h"
38 #include "util/utils.h"
42 // Current mabain version 1.2.0
43 uint16_t version[4] = {1, 2, 0, 0};
47 if(status != MBError::DB_CLOSED)
53 int rval = MBError::SUCCESS;
55 if((options & CONSTS::ACCESS_MODE_WRITER) && async_writer != NULL)
57 rval = async_writer->StopAsyncThread();
58 if(rval != MBError::SUCCESS)
60 Logger::Log(LOG_LEVEL_WARN, "failed to stop async writer thread: %s",
61 MBError::get_error_str(rval));
70 if(options & CONSTS::ACCESS_MODE_WRITER)
71 dict->PrintStats(Logger::GetLogStream());
72 UpdateNumHandlers(options, -1);
83 status = MBError::DB_CLOSED;
84 if(options & CONSTS::ACCESS_MODE_WRITER)
86 ResourcePool::getInstance().RemoveResourceByPath(mb_dir + "_lock");
87 release_writer_lock(writer_lock_fd);
89 Logger::Log(LOG_LEVEL_INFO, "connector %u disconnected from DB", identifier);
93 int DB::UpdateNumHandlers(int mode, int delta)
95 int rval = MBError::SUCCESS;
99 if(mode & CONSTS::ACCESS_MODE_WRITER)
100 rval = dict->UpdateNumWriter(delta);
102 dict->UpdateNumReader(delta);
109 // Constructor for initializing DB handle
110 DB::DB(const char *db_path,
115 uint32_t queue_size) : status(MBError::NOT_INITIALIZED),
119 memset(&config, 0, sizeof(config));
120 config.mbdir = db_path;
121 config.options = db_options;
122 config.memcap_index = memcap_index;
123 config.memcap_data = memcap_data;
124 config.connect_id = id;
125 config.queue_size = queue_size;
130 DB::DB(MBConfig &config) : status(MBError::NOT_INITIALIZED),
136 int DB::ValidateConfig(MBConfig &config)
138 if(config.mbdir == NULL)
139 return MBError::INVALID_ARG;
141 if(config.memcap_index == 0)
142 config.memcap_index = 2*config.block_size_index;
143 if(config.memcap_data == 0)
144 config.memcap_data = 2*config.block_size_data;
146 if(config.options & CONSTS::ACCESS_MODE_WRITER)
148 if(config.block_size_index == 0)
149 config.block_size_index = INDEX_BLOCK_SIZE_DEFAULT;
150 if(config.block_size_data == 0)
151 config.block_size_data = DATA_BLOCK_SIZE_DEFAULT;
152 if(config.num_entry_per_bucket <= 0)
153 config.num_entry_per_bucket = 1000;
154 if(config.num_entry_per_bucket < 8)
156 std::cerr << "count in eviction bucket must be greater than 7\n";
157 return MBError::INVALID_ARG;
160 if(config.options & CONSTS::USE_SLIDING_WINDOW)
162 std::cout << "sliding window support is deprecated\n";
163 config.options &= ~CONSTS::USE_SLIDING_WINDOW;
166 if(config.block_size_index != 0 && (config.block_size_index % BLOCK_SIZE_ALIGN != 0))
168 std::cerr << "block size must be multiple of " << BLOCK_SIZE_ALIGN << "\n";
169 return MBError::INVALID_ARG;
171 if(config.block_size_data != 0 && (config.block_size_data % BLOCK_SIZE_ALIGN != 0))
173 std::cerr << "block size must be multiple of " << BLOCK_SIZE_ALIGN << "\n";
174 return MBError::INVALID_ARG;
177 if(config.max_num_index_block == 0)
178 config.max_num_index_block = 1024;
179 if(config.max_num_data_block == 0)
180 config.max_num_data_block = 1024;
181 if (config.queue_size == 0)
182 config.queue_size = MB_MAX_NUM_SHM_QUEUE_NODE;
184 return MBError::SUCCESS;
187 void DB::PreCheckDB(const MBConfig &config, bool &init_header, bool &update_header)
189 if(config.options & CONSTS::ACCESS_MODE_WRITER)
191 std::string lock_file = mb_dir + "_lock";
192 // internal check first
193 int ret = ResourcePool::getInstance().AddResourceByPath(lock_file, NULL);
194 if(ret == MBError::SUCCESS)
196 if(!(config.options & CONSTS::MEMORY_ONLY_MODE))
198 // process check by file lock
199 writer_lock_fd = acquire_writer_lock(lock_file);
200 if(writer_lock_fd < 0)
201 status = MBError::WRITER_EXIST;
206 status = MBError::WRITER_EXIST;
208 if(status == MBError::WRITER_EXIST)
210 Logger::Log(LOG_LEVEL_ERROR, "failed to initialize db: %s",
211 MBError::get_error_str(status));
216 if(config.options & CONSTS::MEMORY_ONLY_MODE)
218 if(config.options & CONSTS::ACCESS_MODE_WRITER)
225 if(!ResourcePool::getInstance().CheckExistence(mb_dir + "_mabain_h"))
226 status = MBError::NO_DB;
231 // Check if the DB directory exist with proper permission
232 if(access(mb_dir.c_str(), F_OK))
234 std::cerr << "database directory check for " + mb_dir + " failed errno: " +
235 std::to_string(errno) << std::endl;
236 status = MBError::NO_DB;
239 Logger::Log(LOG_LEVEL_INFO, "connector %u DB options: %d",
240 config.connect_id, config.options);
241 // Check if DB exist. This can be done by check existence of the first index file.
242 // If this is the first time the DB is opened and it is in writer mode, then we
243 // need to update the header for the first time. If only reader access mode is
244 // required and the file does not exist, we should bail here and the DB open will
245 // not be successful.
246 std::string header_file = mb_dir + "_mabain_h";
247 if(access(header_file.c_str(), R_OK))
249 if(config.options & CONSTS::ACCESS_MODE_WRITER)
252 status = MBError::NO_DB;
256 // Check Header version
257 if(!init_header && !(config.options & CONSTS::MEMORY_ONLY_MODE))
260 DRMBase::ValidateHeaderFile(mb_dir + "_mabain_h", config.options,
261 config.queue_size * sizeof(AsyncNode), update_header);
262 } catch (int error) {
269 void DB::PostDBUpdate(const MBConfig &config, bool init_header, bool update_header)
271 if((config.options & CONSTS::ACCESS_MODE_WRITER) && (init_header || update_header))
275 Logger::Log(LOG_LEVEL_INFO, "opened a new db %s", mb_dir.c_str());
279 Logger::Log(LOG_LEVEL_INFO, "converted %s to version %d.%d.%d", mb_dir.c_str(),
280 version[0], version[1], version[2]);
282 IndexHeader *header = dict->GetHeaderPtr();
283 if(header != NULL) header->async_queue_size = config.queue_size;
284 dict->Init(identifier);
285 dict->InitShmObjects();
288 if(dict->Status() != MBError::SUCCESS)
290 Logger::Log(LOG_LEVEL_ERROR, "failed to iniitialize dict: %s ",
291 MBError::get_error_str(dict->Status()));
292 status = dict->Status();
296 lock.Init(dict->GetShmLockPtrs());
297 UpdateNumHandlers(config.options, 1);
299 if(config.options & CONSTS::ACCESS_MODE_WRITER)
301 if(config.options & CONSTS::ASYNC_WRITER_MODE)
302 async_writer = new AsyncWriter(this);
306 if(!(init_header || update_header))
308 IndexHeader *header = dict->GetHeaderPtr();
309 if(header != NULL && header->async_queue_size != (int) config.queue_size)
311 Logger::Log(LOG_LEVEL_ERROR, "async queue size not matching with header: %d %d",
312 header->async_queue_size, (int) config.queue_size);
313 status = MBError::INVALID_SIZE;
319 Logger::Log(LOG_LEVEL_INFO, "connector %u successfully opened DB %s for %s",
320 identifier, mb_dir.c_str(),
321 (config.options & CONSTS::ACCESS_MODE_WRITER) ? "writing":"reading");
322 status = MBError::SUCCESS;
324 if(config.options & CONSTS::ACCESS_MODE_WRITER)
326 // Run rc exception recovery
327 ResourceCollection rc(*this);
328 rc.ExceptionRecovery();
332 void DB::InitDB(MBConfig &config)
337 if(ValidateConfig(config) != MBError::SUCCESS)
340 // save the configuration
341 memcpy(&dbConfig, &config, sizeof(MBConfig));
342 dbConfig.mbdir = NULL;
344 // If id not given, use thread ID
345 if(config.connect_id == 0)
348 config.connect_id = reinterpret_cast<uint64_t>(pthread_self()) & 0x7FFFFFFF;
350 config.connect_id = static_cast<uint32_t>(syscall(SYS_gettid));
353 identifier = config.connect_id;
354 mb_dir = std::string(config.mbdir);
355 if(mb_dir[mb_dir.length()-1] != '/')
357 options = config.options;
359 bool init_header = false;
360 bool update_header = false; // true when header version is different from lib version
361 PreCheckDB(config, init_header, update_header);
362 if(MBError::NOT_INITIALIZED != status)
364 Logger::Log(LOG_LEVEL_ERROR, "database %s check failed: %s", mb_dir.c_str(),
365 MBError::get_error_str(status));
369 dict = new Dict(mb_dir, init_header, config.data_size, config.options,
370 config.memcap_index, config.memcap_data,
371 config.block_size_index, config.block_size_data,
372 config.max_num_index_block, config.max_num_data_block,
373 config.num_entry_per_bucket, config.queue_size);
375 PostDBUpdate(config, init_header, update_header);
378 int DB::Status() const
383 DB::DB(const DB &db) : status(MBError::NOT_INITIALIZED),
386 MBConfig db_config = db.dbConfig;
387 db_config.mbdir = db.mb_dir.c_str();
391 const DB& DB::operator = (const DB &db)
394 return *this; // no self-assignment
398 MBConfig db_config = db.dbConfig;
399 db_config.mbdir = db.mb_dir.c_str();
400 status = MBError::NOT_INITIALIZED;
407 bool DB::is_open() const
409 return status == MBError::SUCCESS;
412 const char* DB::StatusStr() const
414 return MBError::get_error_str(status);
417 // Find the exact key match
418 int DB::Find(const char* key, int len, MBData &mdata) const
421 return MBError::INVALID_ARG;
422 if(status != MBError::SUCCESS)
423 return MBError::NOT_INITIALIZED;
424 // Writer in async mode cannot be used for lookup
425 if(options & CONSTS::ASYNC_WRITER_MODE)
426 return MBError::NOT_ALLOWED;
428 return dict->Find(reinterpret_cast<const uint8_t*>(key), len, mdata);
431 int DB::Find(const std::string &key, MBData &mdata) const
433 return Find(key.data(), key.size(), mdata);
436 // Find all possible prefix matches. The caller needs to call this function
437 // repeatedly if data.next is true.
438 int DB::FindPrefix(const char* key, int len, MBData &data) const
441 return MBError::INVALID_ARG;
442 if(status != MBError::SUCCESS)
443 return MBError::NOT_INITIALIZED;
444 // Writer in async mode cannot be used for lookup
445 if(options & CONSTS::ASYNC_WRITER_MODE)
446 return MBError::NOT_ALLOWED;
448 if(data.match_len >= len)
449 return MBError::OUT_OF_BOUND;
452 rval = dict->FindPrefix(reinterpret_cast<const uint8_t*>(key+data.match_len),
453 len-data.match_len, data);
458 // Find the longest prefix match
459 int DB::FindLongestPrefix(const char* key, int len, MBData &data) const
462 return MBError::INVALID_ARG;
463 if(status != MBError::SUCCESS)
464 return MBError::NOT_INITIALIZED;
465 // Writer in async mode cannot be used for lookup
466 if(options & CONSTS::ASYNC_WRITER_MODE)
467 return MBError::NOT_ALLOWED;
471 return dict->FindPrefix(reinterpret_cast<const uint8_t*>(key), len, data);
474 int DB::FindLongestPrefix(const std::string &key, MBData &data) const
476 return FindLongestPrefix(key.data(), key.size(), data);
479 // Add a key-value pair
480 int DB::Add(const char* key, int len, MBData &mbdata, bool overwrite)
482 int rval = MBError::SUCCESS;
485 return MBError::INVALID_ARG;
486 if(status != MBError::SUCCESS)
487 return MBError::NOT_INITIALIZED;
489 #ifndef __SHM_QUEUE__
490 if(async_writer != NULL)
491 return async_writer->Add(key, len, reinterpret_cast<const char *>(mbdata.buff),
492 mbdata.data_len, overwrite);
494 rval = dict->Add(reinterpret_cast<const uint8_t*>(key), len, mbdata, overwrite);
496 if (async_writer == NULL && (options & CONSTS::ACCESS_MODE_WRITER))
498 rval = dict->Add(reinterpret_cast<const uint8_t*>(key), len, mbdata, overwrite);
502 rval = dict->SHMQ_Add(reinterpret_cast<const char*>(key), len,
503 reinterpret_cast<const char*>(mbdata.buff), mbdata.data_len, overwrite);
510 int DB::Add(const char* key, int len, const char* data, int data_len, bool overwrite)
512 if(key == NULL || data == NULL)
513 return MBError::INVALID_ARG;
514 if(status != MBError::SUCCESS)
515 return MBError::NOT_INITIALIZED;
517 #ifndef __SHM_QUEUE__
518 if(async_writer != NULL)
519 return async_writer->Add(key, len, data, data_len, overwrite);
523 mbdata.data_len = data_len;
524 mbdata.buff = (uint8_t*) data;
526 int rval = MBError::SUCCESS;
527 #ifndef __SHM_QUEUE__
528 rval = dict->Add(reinterpret_cast<const uint8_t*>(key), len, mbdata, overwrite);
530 if (async_writer == NULL && (options & CONSTS::ACCESS_MODE_WRITER))
532 rval = dict->Add(reinterpret_cast<const uint8_t*>(key), len, mbdata, overwrite);
536 rval = dict->SHMQ_Add(reinterpret_cast<const char*>(key), len,
537 reinterpret_cast<const char*>(mbdata.buff), mbdata.data_len, overwrite);
545 int DB::Add(const std::string &key, const std::string &value, bool overwrite)
547 return Add(key.data(), key.size(), value.data(), value.size(), overwrite);
550 int DB::Remove(const char *key, int len)
552 int rval = MBError::SUCCESS;
555 return MBError::INVALID_ARG;
556 if(status != MBError::SUCCESS)
557 return MBError::NOT_INITIALIZED;
559 #ifndef __SHM_QUEUE__
560 if(async_writer != NULL)
561 return async_writer->Remove(key, len);
563 rval = dict->Remove(reinterpret_cast<const uint8_t*>(key), len);
565 if (async_writer == NULL && (options & CONSTS::ACCESS_MODE_WRITER))
567 rval = dict->Remove(reinterpret_cast<const uint8_t*>(key), len);
571 rval = dict->SHMQ_Remove(reinterpret_cast<const char*>(key), len);
578 int DB::Remove(const std::string &key)
580 return Remove(key.data(), key.size());
585 if(status != MBError::SUCCESS)
586 return MBError::NOT_INITIALIZED;
588 #ifndef __SHM_QUEUE__
589 if(async_writer != NULL)
590 return async_writer->RemoveAll();
594 rval = dict->RemoveAll();
598 int DB::Backup(const char *bk_dir)
600 int rval = MBError::SUCCESS;
602 if(options & CONSTS::MEMORY_ONLY_MODE)
603 return MBError::NOT_ALLOWED;
606 return MBError::INVALID_ARG;
607 if(status != MBError::SUCCESS)
608 return MBError::NOT_INITIALIZED;
609 if(options & MMAP_ANONYMOUS_MODE)
610 return MBError::NOT_ALLOWED;
612 #ifndef __SHM_QUEUE__
613 if(async_writer != NULL)
614 return async_writer->Backup(bk_dir);
618 rval = bk.Backup(bk_dir);
619 } catch (int error) {
620 Logger::Log(LOG_LEVEL_WARN, "Backup failed :%s", MBError::get_error_str(error));
625 if (async_writer == NULL && (options & CONSTS::ASYNC_WRITER_MODE))
628 rval = bk.Backup(bk_dir);
632 rval = dict->SHMQ_Backup(bk_dir);
634 } catch (int error) {
635 Logger::Log(LOG_LEVEL_WARN, "Backup failed :%s", MBError::get_error_str(error));
642 void DB::Flush() const
644 if(options & CONSTS::MEMORY_ONLY_MODE)
647 if(status != MBError::SUCCESS)
653 int DB::CollectResource(int64_t min_index_rc_size, int64_t min_data_rc_size,
654 int64_t max_dbsz, int64_t max_dbcnt)
656 if(status != MBError::SUCCESS)
659 #ifndef __SHM_QUEUE__
660 if(async_writer != NULL)
661 return async_writer->CollectResource(min_index_rc_size, min_data_rc_size,
662 max_dbsz, max_dbcnt);
665 ResourceCollection rc(*this);
666 rc.ReclaimResource(min_index_rc_size, min_data_rc_size, max_dbsz, max_dbcnt);
667 } catch (int error) {
668 if(error != MBError::RC_SKIPPED)
670 Logger::Log(LOG_LEVEL_ERROR, "failed to run gc: %s",
671 MBError::get_error_str(error));
677 if (async_writer == NULL && (options & CONSTS::ACCESS_MODE_WRITER))
679 ResourceCollection rc(*this);
680 rc.ReclaimResource(min_index_rc_size, min_data_rc_size, max_dbsz, max_dbcnt);
684 dict->SHMQ_CollectResource(min_index_rc_size, min_data_rc_size, max_dbsz, max_dbcnt);
686 } catch (int error) {
687 if(error != MBError::RC_SKIPPED)
689 Logger::Log(LOG_LEVEL_ERROR, "failed to run gc: %s",
690 MBError::get_error_str(error));
695 return MBError::SUCCESS;
698 int64_t DB::Count() const
700 if(status != MBError::SUCCESS)
703 return dict->Count();
706 void DB::PrintStats(std::ostream &out_stream) const
708 if(status != MBError::SUCCESS)
711 dict->PrintStats(out_stream);
714 void DB::PrintHeader(std::ostream &out_stream) const
717 dict->PrintHeader(out_stream);
722 return lock.WrLock();
727 return lock.RdLock();
732 return lock.UnLock();
737 return lock.TryWrLock();
740 int DB::ClearLock() const
743 // No db handler should hold mutex when this is called.
744 if(status != MBError::SUCCESS)
746 IndexHeader *hdr = dict->GetHeaderPtr();
747 return InitShmRWLock(&hdr->mb_rw_lock);
749 // Nothing needs to be done if we don't use shared memory mutex.
750 return MBError::SUCCESS;
754 int DB::SetLogLevel(int level)
756 return Logger::SetLogLevel(level);
761 Logger::SetLogLevel(LOG_LEVEL_DEBUG);
764 Dict* DB::GetDictPtr() const
766 if(options & CONSTS::ACCESS_MODE_WRITER)
771 int DB::GetDBOptions() const
776 const std::string& DB::GetDBDir() const
781 void DB::GetDBConfig(MBConfig &config) const
783 memcpy(&config, &dbConfig, sizeof(MBConfig));
787 int DB::SetAsyncWriterPtr(DB *db_writer)
789 #ifndef __SHM_QUEUE__
790 if(db_writer == NULL)
791 return MBError::INVALID_ARG;
792 if(options & CONSTS::ACCESS_MODE_WRITER)
793 return MBError::NOT_ALLOWED;
794 if(db_writer->mb_dir != mb_dir)
795 return MBError::INVALID_ARG;
796 if(!(db_writer->options & CONSTS::ACCESS_MODE_WRITER) ||
797 !(db_writer->options & CONSTS::ASYNC_WRITER_MODE) ||
798 db_writer->async_writer == NULL)
800 return MBError::INVALID_ARG;
803 db_writer->async_writer->UpdateNumUsers(1);
804 async_writer = db_writer->async_writer;
806 return MBError::SUCCESS;
809 int DB::UnsetAsyncWriterPtr(DB *db_writer)
811 #ifndef __SHM_QUEUE__
812 if(db_writer == NULL)
813 return MBError::INVALID_ARG;
814 if(options & CONSTS::ACCESS_MODE_WRITER)
815 return MBError::NOT_ALLOWED;
816 if(db_writer->mb_dir != mb_dir)
817 return MBError::INVALID_ARG;
818 if(!(db_writer->options & CONSTS::ACCESS_MODE_WRITER) ||
819 !(db_writer->options & CONSTS::ASYNC_WRITER_MODE) ||
820 db_writer->async_writer == NULL)
822 return MBError::INVALID_ARG;
825 db_writer->async_writer->UpdateNumUsers(-1);
828 return MBError::SUCCESS;
831 bool DB::AsyncWriterEnabled() const
836 return (async_writer != NULL);
840 bool DB::AsyncWriterBusy() const
843 return dict->SHMQ_Busy();
845 if(async_writer != NULL)
846 return async_writer->Busy();
851 void DB::SetLogFile(const std::string &log_file)
853 Logger::InitLogFile(log_file);
856 void DB::CloseLogFile()
861 void DB::ClearResources(const std::string &path)
863 ResourcePool::getInstance().RemoveResourceByDB(path);
866 } // namespace mabain