benchmarks compiles with clang
[c11concurrency-benchmarks.git] / mabain / src / db.cpp
1 /*
2  * Copyright (C) 2017 Cisco Inc.
3  *
4  * This program is free software: you can redistribute it and/or  modify
5  * it under the terms of the GNU General Public License, version 2,
6  * as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  */
16
17 // @author Changxue Deng <chadeng@cisco.com>
18
19 #include <iostream>
20 #include <unistd.h>
21 #include <sys/syscall.h>
22
23 #include <errno.h>
24
25 #include "dict.h"
26 #include "db.h"
27 #include "error.h"
28 #include "logger.h"
29 #include "mb_lsq.h"
30 #include "version.h"
31 #include "mb_rc.h"
32 #include "integer_4b_5b.h"
33 #include "async_writer.h"
34 #include "mb_backup.h"
35 #include "resource_pool.h"
36 #include "drm_base.h"
37 #include "util/shm_mutex.h"
38 #include "util/utils.h"
39
40 namespace mabain {
41
42 // Current mabain version 1.2.0
43 uint16_t version[4] = {1, 2, 0, 0};
44
45 DB::~DB()
46 {
47     if(status != MBError::DB_CLOSED)
48         Close();
49 }
50
51 int DB::Close()
52 {
53     int rval = MBError::SUCCESS;
54
55     if((options & CONSTS::ACCESS_MODE_WRITER) && async_writer != NULL)
56     {
57         rval = async_writer->StopAsyncThread();
58         if(rval != MBError::SUCCESS)
59         {
60             Logger::Log(LOG_LEVEL_WARN, "failed to stop async writer thread: %s",
61                         MBError::get_error_str(rval));
62         }
63
64         delete async_writer;
65         async_writer = NULL;
66     }
67
68     if(dict != NULL)
69     {
70         if(options & CONSTS::ACCESS_MODE_WRITER)
71             dict->PrintStats(Logger::GetLogStream());
72         UpdateNumHandlers(options, -1);
73
74         dict->Destroy();
75         delete dict;
76         dict = NULL;
77     }
78     else
79     {
80         rval = status;
81     }
82
83     status = MBError::DB_CLOSED;
84     if(options & CONSTS::ACCESS_MODE_WRITER)
85     {
86         ResourcePool::getInstance().RemoveResourceByPath(mb_dir + "_lock");
87         release_writer_lock(writer_lock_fd);
88     }
89     Logger::Log(LOG_LEVEL_INFO, "connector %u disconnected from DB", identifier);
90     return rval;
91 }
92
93 int DB::UpdateNumHandlers(int mode, int delta)
94 {
95     int rval = MBError::SUCCESS;
96
97     WrLock();
98
99     if(mode & CONSTS::ACCESS_MODE_WRITER)
100         rval = dict->UpdateNumWriter(delta);
101     else
102         dict->UpdateNumReader(delta);
103
104     UnLock();
105
106     return rval;
107 }
108
109 // Constructor for initializing DB handle
110 DB::DB(const char *db_path,
111        int db_options,
112        size_t memcap_index,
113        size_t memcap_data,
114        uint32_t id,
115        uint32_t queue_size) : status(MBError::NOT_INITIALIZED),
116                       writer_lock_fd(-1)
117 {
118     MBConfig config;
119     memset(&config, 0, sizeof(config));
120     config.mbdir = db_path;
121     config.options = db_options;
122     config.memcap_index = memcap_index;
123     config.memcap_data = memcap_data;
124     config.connect_id = id;
125     config.queue_size = queue_size;
126
127     InitDB(config);
128 }
129
130 DB::DB(MBConfig &config) : status(MBError::NOT_INITIALIZED),
131                            writer_lock_fd(-1)
132 {
133     InitDB(config);
134 }
135
136 int DB::ValidateConfig(MBConfig &config)
137 {
138     if(config.mbdir == NULL)
139         return MBError::INVALID_ARG;
140
141     if(config.memcap_index == 0)
142         config.memcap_index = 2*config.block_size_index;
143     if(config.memcap_data == 0)
144         config.memcap_data = 2*config.block_size_data;
145
146     if(config.options & CONSTS::ACCESS_MODE_WRITER)
147     {
148         if(config.block_size_index == 0)
149             config.block_size_index = INDEX_BLOCK_SIZE_DEFAULT;
150         if(config.block_size_data == 0)
151             config.block_size_data = DATA_BLOCK_SIZE_DEFAULT;
152         if(config.num_entry_per_bucket <= 0)
153             config.num_entry_per_bucket = 1000;
154         if(config.num_entry_per_bucket < 8)
155         {
156             std::cerr << "count in eviction bucket must be greater than 7\n";
157             return MBError::INVALID_ARG;
158         }
159     }
160     if(config.options & CONSTS::USE_SLIDING_WINDOW)
161     {
162         std::cout << "sliding window support is deprecated\n";
163         config.options &= ~CONSTS::USE_SLIDING_WINDOW;
164     }
165
166     if(config.block_size_index != 0 && (config.block_size_index % BLOCK_SIZE_ALIGN != 0))
167     {
168         std::cerr << "block size must be multiple of " << BLOCK_SIZE_ALIGN << "\n";
169         return MBError::INVALID_ARG;
170     }
171     if(config.block_size_data != 0 && (config.block_size_data % BLOCK_SIZE_ALIGN != 0))
172     {
173         std::cerr << "block size must be multiple of " << BLOCK_SIZE_ALIGN << "\n";
174         return MBError::INVALID_ARG;
175     }
176
177     if(config.max_num_index_block == 0)
178         config.max_num_index_block = 1024;
179     if(config.max_num_data_block == 0)
180         config.max_num_data_block = 1024;
181     if (config.queue_size == 0)
182         config.queue_size = MB_MAX_NUM_SHM_QUEUE_NODE;
183
184     return MBError::SUCCESS;
185 }
186
187 void DB::PreCheckDB(const MBConfig &config, bool &init_header, bool &update_header)
188 {
189     if(config.options & CONSTS::ACCESS_MODE_WRITER)
190     {
191         std::string lock_file = mb_dir + "_lock";
192         // internal check first
193         int ret = ResourcePool::getInstance().AddResourceByPath(lock_file, NULL);
194         if(ret == MBError::SUCCESS)
195         {
196             if(!(config.options & CONSTS::MEMORY_ONLY_MODE))
197             {
198                 // process check by file lock
199                 writer_lock_fd = acquire_writer_lock(lock_file);
200                 if(writer_lock_fd < 0)
201                     status = MBError::WRITER_EXIST;
202             }
203         }
204         else
205         {
206             status = MBError::WRITER_EXIST;
207         }
208         if(status == MBError::WRITER_EXIST)
209         {
210             Logger::Log(LOG_LEVEL_ERROR, "failed to initialize db: %s",
211                         MBError::get_error_str(status));
212             return;
213         }
214     }
215
216     if(config.options & CONSTS::MEMORY_ONLY_MODE)
217     {
218         if(config.options & CONSTS::ACCESS_MODE_WRITER)
219         {
220             init_header = true;
221         }
222         else
223         {
224             init_header = false;
225             if(!ResourcePool::getInstance().CheckExistence(mb_dir + "_mabain_h"))
226                 status = MBError::NO_DB;
227         }
228     }
229     else
230     {
231         // Check if the DB directory exist with proper permission
232         if(access(mb_dir.c_str(), F_OK))
233         {
234             std::cerr << "database directory check for " + mb_dir + " failed errno: " +
235                           std::to_string(errno) << std::endl;
236             status = MBError::NO_DB;
237             return;
238         }
239         Logger::Log(LOG_LEVEL_INFO, "connector %u DB options: %d",
240                     config.connect_id, config.options);
241         // Check if DB exist. This can be done by check existence of the first index file.
242         // If this is the first time the DB is opened and it is in writer mode, then we
243         // need to update the header for the first time. If only reader access mode is
244         // required and the file does not exist, we should bail here and the DB open will
245         // not be successful.
246         std::string header_file = mb_dir + "_mabain_h";
247         if(access(header_file.c_str(), R_OK))
248         {
249             if(config.options & CONSTS::ACCESS_MODE_WRITER)
250                 init_header = true;
251             else
252                 status = MBError::NO_DB;
253         }
254     }
255
256     // Check Header version
257     if(!init_header && !(config.options & CONSTS::MEMORY_ONLY_MODE))
258     {
259         try {
260             DRMBase::ValidateHeaderFile(mb_dir + "_mabain_h", config.options,
261                                         config.queue_size * sizeof(AsyncNode), update_header);
262         } catch (int error) {
263             status = error;
264             return;
265         }
266     }
267 }
268
269 void DB::PostDBUpdate(const MBConfig &config, bool init_header, bool update_header)
270 {
271     if((config.options & CONSTS::ACCESS_MODE_WRITER) && (init_header || update_header))
272     {
273         if(init_header)
274         {
275             Logger::Log(LOG_LEVEL_INFO, "opened a new db %s", mb_dir.c_str());
276         }
277         else
278         {
279             Logger::Log(LOG_LEVEL_INFO, "converted %s to version %d.%d.%d", mb_dir.c_str(),
280                         version[0], version[1], version[2]);
281         }
282         IndexHeader *header = dict->GetHeaderPtr();
283         if(header != NULL) header->async_queue_size = config.queue_size;
284         dict->Init(identifier);
285         dict->InitShmObjects();
286     }
287
288     if(dict->Status() != MBError::SUCCESS)
289     {
290         Logger::Log(LOG_LEVEL_ERROR, "failed to iniitialize dict: %s ",
291                     MBError::get_error_str(dict->Status()));
292         status = dict->Status();
293         return;
294     }
295
296     lock.Init(dict->GetShmLockPtrs());
297     UpdateNumHandlers(config.options, 1);
298
299     if(config.options & CONSTS::ACCESS_MODE_WRITER)
300     {
301         if(config.options & CONSTS::ASYNC_WRITER_MODE)
302             async_writer = new AsyncWriter(this);
303     }
304
305 #ifdef __SHM_QUEUE__
306     if(!(init_header || update_header))
307     {
308         IndexHeader *header = dict->GetHeaderPtr();
309         if(header != NULL && header->async_queue_size != (int) config.queue_size)
310         {
311             Logger::Log(LOG_LEVEL_ERROR, "async queue size not matching with header: %d %d",
312                         header->async_queue_size, (int) config.queue_size);
313             status = MBError::INVALID_SIZE;
314             return;
315         }
316     }
317 #endif
318
319     Logger::Log(LOG_LEVEL_INFO, "connector %u successfully opened DB %s for %s",
320                 identifier, mb_dir.c_str(),
321                 (config.options & CONSTS::ACCESS_MODE_WRITER) ? "writing":"reading");
322     status = MBError::SUCCESS;
323
324     if(config.options & CONSTS::ACCESS_MODE_WRITER)
325     {
326         // Run rc exception recovery
327         ResourceCollection rc(*this);
328         rc.ExceptionRecovery();
329     }
330 }
331
332 void DB::InitDB(MBConfig &config)
333 {
334     dict = NULL;
335     async_writer = NULL;
336
337     if(ValidateConfig(config) != MBError::SUCCESS)
338         return;
339
340     // save the configuration
341     memcpy(&dbConfig, &config, sizeof(MBConfig));
342     dbConfig.mbdir = NULL;
343
344     // If id not given, use thread ID
345     if(config.connect_id == 0)
346     {
347 #ifdef __APPLE__
348         config.connect_id = reinterpret_cast<uint64_t>(pthread_self()) & 0x7FFFFFFF;
349 #else
350         config.connect_id = static_cast<uint32_t>(syscall(SYS_gettid));
351 #endif
352     }
353     identifier = config.connect_id;
354     mb_dir = std::string(config.mbdir);
355     if(mb_dir[mb_dir.length()-1] != '/')
356         mb_dir += "/";
357     options = config.options;
358
359     bool init_header = false;
360     bool update_header = false; // true when header version is different from lib version
361     PreCheckDB(config, init_header, update_header);
362     if(MBError::NOT_INITIALIZED != status)
363     {
364         Logger::Log(LOG_LEVEL_ERROR, "database %s check failed: %s", mb_dir.c_str(),
365                     MBError::get_error_str(status));
366         return;
367     }
368
369     dict = new Dict(mb_dir, init_header, config.data_size, config.options,
370                     config.memcap_index, config.memcap_data,
371                     config.block_size_index, config.block_size_data,
372                     config.max_num_index_block, config.max_num_data_block,
373                     config.num_entry_per_bucket, config.queue_size);
374
375     PostDBUpdate(config, init_header, update_header);
376 }
377
378 int DB::Status() const
379 {
380     return status;
381 }
382
383 DB::DB(const DB &db) : status(MBError::NOT_INITIALIZED),
384                        writer_lock_fd(-1)
385 {
386     MBConfig db_config = db.dbConfig;
387     db_config.mbdir = db.mb_dir.c_str();
388     InitDB(db_config);
389 }
390
391 const DB& DB::operator = (const DB &db)
392 {
393     if(this == &db)
394         return *this; // no self-assignment
395
396     this->Close();
397
398     MBConfig db_config = db.dbConfig;
399     db_config.mbdir = db.mb_dir.c_str();
400     status = MBError::NOT_INITIALIZED;
401     writer_lock_fd = -1;
402     InitDB(db_config);
403
404     return *this;
405 }
406
407 bool DB::is_open() const
408 {
409     return status == MBError::SUCCESS;
410 }
411
412 const char* DB::StatusStr() const
413 {
414     return MBError::get_error_str(status);
415 }
416
417 // Find the exact key match
418 int DB::Find(const char* key, int len, MBData &mdata) const
419 {
420     if(key == NULL)
421         return MBError::INVALID_ARG;
422     if(status != MBError::SUCCESS)
423         return MBError::NOT_INITIALIZED;
424     // Writer in async mode cannot be used for lookup
425     if(options & CONSTS::ASYNC_WRITER_MODE)
426         return MBError::NOT_ALLOWED;
427
428     return dict->Find(reinterpret_cast<const uint8_t*>(key), len, mdata);
429 }
430
431 int DB::Find(const std::string &key, MBData &mdata) const
432 {
433     return Find(key.data(), key.size(), mdata);
434 }
435
436 // Find all possible prefix matches. The caller needs to call this function
437 // repeatedly if data.next is true.
438 int DB::FindPrefix(const char* key, int len, MBData &data) const
439 {
440     if(key == NULL)
441         return MBError::INVALID_ARG;
442     if(status != MBError::SUCCESS)
443         return MBError::NOT_INITIALIZED;
444     // Writer in async mode cannot be used for lookup
445     if(options & CONSTS::ASYNC_WRITER_MODE)
446         return MBError::NOT_ALLOWED;
447
448     if(data.match_len >= len)
449         return MBError::OUT_OF_BOUND;
450
451     int rval;
452     rval = dict->FindPrefix(reinterpret_cast<const uint8_t*>(key+data.match_len),
453                             len-data.match_len, data);
454
455     return rval;
456 }
457
458 // Find the longest prefix match
459 int DB::FindLongestPrefix(const char* key, int len, MBData &data) const
460 {
461     if(key == NULL)
462         return MBError::INVALID_ARG;
463     if(status != MBError::SUCCESS)
464         return MBError::NOT_INITIALIZED;
465     // Writer in async mode cannot be used for lookup
466     if(options & CONSTS::ASYNC_WRITER_MODE)
467         return MBError::NOT_ALLOWED;
468
469     data.match_len = 0;
470
471     return dict->FindPrefix(reinterpret_cast<const uint8_t*>(key), len, data);
472 }
473
474 int DB::FindLongestPrefix(const std::string &key, MBData &data) const
475 {
476     return FindLongestPrefix(key.data(), key.size(), data);
477 }
478
479 // Add a key-value pair
480 int DB::Add(const char* key, int len, MBData &mbdata, bool overwrite)
481 {
482     int rval = MBError::SUCCESS;
483
484     if(key == NULL)
485         return MBError::INVALID_ARG;
486     if(status != MBError::SUCCESS)
487         return MBError::NOT_INITIALIZED;
488
489 #ifndef __SHM_QUEUE__
490     if(async_writer != NULL)
491         return async_writer->Add(key, len, reinterpret_cast<const char *>(mbdata.buff),
492                                  mbdata.data_len, overwrite);
493
494     rval = dict->Add(reinterpret_cast<const uint8_t*>(key), len, mbdata, overwrite);
495 #else
496     if (async_writer == NULL && (options & CONSTS::ACCESS_MODE_WRITER))
497     {
498         rval = dict->Add(reinterpret_cast<const uint8_t*>(key), len, mbdata, overwrite);
499     }
500     else
501     {
502         rval = dict->SHMQ_Add(reinterpret_cast<const char*>(key), len,
503                      reinterpret_cast<const char*>(mbdata.buff), mbdata.data_len, overwrite);
504     }
505 #endif
506
507     return rval;
508 }
509
510 int DB::Add(const char* key, int len, const char* data, int data_len, bool overwrite)
511 {
512     if(key == NULL || data == NULL)
513         return MBError::INVALID_ARG;
514     if(status != MBError::SUCCESS)
515         return MBError::NOT_INITIALIZED;
516
517 #ifndef __SHM_QUEUE__
518     if(async_writer != NULL)
519         return async_writer->Add(key, len, data, data_len, overwrite);
520 #endif
521
522     MBData mbdata;
523     mbdata.data_len = data_len;
524     mbdata.buff = (uint8_t*) data;
525
526     int rval = MBError::SUCCESS;
527 #ifndef __SHM_QUEUE__
528     rval = dict->Add(reinterpret_cast<const uint8_t*>(key), len, mbdata, overwrite);
529 #else
530     if (async_writer == NULL && (options & CONSTS::ACCESS_MODE_WRITER))
531     {
532         rval = dict->Add(reinterpret_cast<const uint8_t*>(key), len, mbdata, overwrite);
533     }
534     else
535     {
536         rval = dict->SHMQ_Add(reinterpret_cast<const char*>(key), len,
537                      reinterpret_cast<const char*>(mbdata.buff), mbdata.data_len, overwrite);
538     }
539 #endif
540
541     mbdata.buff = NULL;
542     return rval;
543 }
544
545 int DB::Add(const std::string &key, const std::string &value, bool overwrite)
546 {
547     return Add(key.data(), key.size(), value.data(), value.size(), overwrite);
548 }
549
550 int DB::Remove(const char *key, int len)
551 {
552     int rval = MBError::SUCCESS;
553
554     if(key == NULL)
555         return MBError::INVALID_ARG;
556     if(status != MBError::SUCCESS)
557         return MBError::NOT_INITIALIZED;
558
559 #ifndef __SHM_QUEUE__
560     if(async_writer != NULL)
561         return async_writer->Remove(key, len);
562
563     rval = dict->Remove(reinterpret_cast<const uint8_t*>(key), len);
564 #else
565     if (async_writer == NULL && (options & CONSTS::ACCESS_MODE_WRITER))
566     {
567         rval = dict->Remove(reinterpret_cast<const uint8_t*>(key), len);
568     }
569     else
570     {
571         rval = dict->SHMQ_Remove(reinterpret_cast<const char*>(key), len);
572     }
573 #endif
574
575     return rval;
576 }
577
578 int DB::Remove(const std::string &key)
579 {
580     return Remove(key.data(), key.size());
581 }
582
583 int DB::RemoveAll()
584 {
585     if(status != MBError::SUCCESS)
586         return MBError::NOT_INITIALIZED;
587
588 #ifndef __SHM_QUEUE__
589     if(async_writer != NULL)
590         return async_writer->RemoveAll();
591 #endif
592
593     int rval;
594     rval = dict->RemoveAll();
595     return rval;
596 }
597
598 int DB::Backup(const char *bk_dir)
599 {
600     int rval = MBError::SUCCESS;
601
602     if(options & CONSTS::MEMORY_ONLY_MODE)
603         return MBError::NOT_ALLOWED;
604
605     if(bk_dir == NULL)
606         return MBError::INVALID_ARG;
607     if(status != MBError::SUCCESS)
608         return MBError::NOT_INITIALIZED;
609     if(options & MMAP_ANONYMOUS_MODE)
610         return MBError::NOT_ALLOWED;
611
612 #ifndef __SHM_QUEUE__
613     if(async_writer != NULL)
614         return async_writer->Backup(bk_dir);
615
616     try {
617         DBBackup bk(*this);
618         rval = bk.Backup(bk_dir);
619     } catch  (int error) {
620         Logger::Log(LOG_LEVEL_WARN, "Backup failed :%s", MBError::get_error_str(error));
621         rval = error;
622     }
623 #else
624     try {
625         if (async_writer == NULL && (options & CONSTS::ASYNC_WRITER_MODE))
626         {
627             DBBackup bk(*this);
628             rval = bk.Backup(bk_dir);
629         }
630         else
631         {
632             rval = dict->SHMQ_Backup(bk_dir);
633         }
634     } catch  (int error) {
635         Logger::Log(LOG_LEVEL_WARN, "Backup failed :%s", MBError::get_error_str(error));
636         rval = error;
637     }
638 #endif
639     return rval;
640 }
641
642 void DB::Flush() const
643 {
644     if(options & CONSTS::MEMORY_ONLY_MODE)
645         return;
646
647     if(status != MBError::SUCCESS)
648         return;
649
650     dict->Flush();
651 }
652
653 int DB::CollectResource(int64_t min_index_rc_size, int64_t min_data_rc_size,
654                         int64_t max_dbsz, int64_t max_dbcnt)
655 {
656     if(status != MBError::SUCCESS)
657         return status;
658
659 #ifndef __SHM_QUEUE__
660     if(async_writer != NULL)
661         return async_writer->CollectResource(min_index_rc_size, min_data_rc_size,
662                                              max_dbsz, max_dbcnt);
663
664     try {
665         ResourceCollection rc(*this);
666         rc.ReclaimResource(min_index_rc_size, min_data_rc_size, max_dbsz, max_dbcnt);
667     } catch (int error) {
668         if(error != MBError::RC_SKIPPED)
669         {
670             Logger::Log(LOG_LEVEL_ERROR, "failed to run gc: %s",
671                         MBError::get_error_str(error));
672             return error;
673         }
674     }
675 #else
676     try {
677         if (async_writer == NULL && (options & CONSTS::ACCESS_MODE_WRITER))
678         {
679             ResourceCollection rc(*this);
680             rc.ReclaimResource(min_index_rc_size, min_data_rc_size, max_dbsz, max_dbcnt);
681         }
682         else
683         {
684             dict->SHMQ_CollectResource(min_index_rc_size, min_data_rc_size, max_dbsz, max_dbcnt);
685         }
686     } catch (int error) {
687         if(error != MBError::RC_SKIPPED)
688         {
689             Logger::Log(LOG_LEVEL_ERROR, "failed to run gc: %s",
690                         MBError::get_error_str(error));
691             return error;
692         }
693     }
694 #endif
695     return MBError::SUCCESS;
696 }
697
698 int64_t DB::Count() const
699 {
700     if(status != MBError::SUCCESS)
701         return -1;
702
703     return dict->Count();
704 }
705
706 void DB::PrintStats(std::ostream &out_stream) const
707 {
708     if(status != MBError::SUCCESS)
709         return;
710
711     dict->PrintStats(out_stream);
712 }
713
714 void DB::PrintHeader(std::ostream &out_stream) const
715 {
716     if(dict != NULL)
717         dict->PrintHeader(out_stream);
718 }
719
720 int DB::WrLock()
721 {
722     return lock.WrLock();
723 }
724
725 int DB::RdLock()
726 {
727     return lock.RdLock();
728 }
729
730 int DB::UnLock()
731 {
732     return lock.UnLock();
733 }
734
735 int DB::TryWrLock()
736 {
737     return lock.TryWrLock();
738 }
739
740 int DB::ClearLock() const
741 {
742 #ifdef __SHM_LOCK__
743     // No db handler should hold mutex when this is called.
744     if(status != MBError::SUCCESS)
745         return status;
746     IndexHeader *hdr = dict->GetHeaderPtr();
747     return InitShmRWLock(&hdr->mb_rw_lock);
748 #else
749     // Nothing needs to be done if we don't use shared memory mutex.
750     return MBError::SUCCESS;
751 #endif
752 }
753
754 int DB::SetLogLevel(int level)
755 {
756     return Logger::SetLogLevel(level);
757 }
758
759 void DB::LogDebug()
760 {
761     Logger::SetLogLevel(LOG_LEVEL_DEBUG);
762 }
763
764 Dict* DB::GetDictPtr() const
765 {
766     if(options & CONSTS::ACCESS_MODE_WRITER)
767         return dict;
768     return NULL;
769 }
770
771 int DB::GetDBOptions() const
772 {
773     return options;
774 }
775
776 const std::string& DB::GetDBDir() const
777 {
778     return mb_dir;
779 }
780
781 void DB::GetDBConfig(MBConfig &config) const
782 {
783     memcpy(&config, &dbConfig, sizeof(MBConfig));
784     config.mbdir = NULL;
785 }
786
787 int DB::SetAsyncWriterPtr(DB *db_writer)
788 {
789 #ifndef __SHM_QUEUE__
790     if(db_writer == NULL)
791         return MBError::INVALID_ARG;
792     if(options & CONSTS::ACCESS_MODE_WRITER)
793         return MBError::NOT_ALLOWED;
794     if(db_writer->mb_dir != mb_dir)
795         return MBError::INVALID_ARG;
796     if(!(db_writer->options & CONSTS::ACCESS_MODE_WRITER) ||
797        !(db_writer->options & CONSTS::ASYNC_WRITER_MODE)  ||
798        db_writer->async_writer == NULL)
799     {
800         return MBError::INVALID_ARG;
801     }
802
803    db_writer->async_writer->UpdateNumUsers(1);
804    async_writer = db_writer->async_writer;
805 #endif
806    return MBError::SUCCESS;
807 }
808
809 int DB::UnsetAsyncWriterPtr(DB *db_writer)
810 {
811 #ifndef __SHM_QUEUE__
812     if(db_writer == NULL)
813         return MBError::INVALID_ARG;
814     if(options & CONSTS::ACCESS_MODE_WRITER)
815         return MBError::NOT_ALLOWED;
816     if(db_writer->mb_dir != mb_dir)
817         return MBError::INVALID_ARG;
818     if(!(db_writer->options & CONSTS::ACCESS_MODE_WRITER) ||
819        !(db_writer->options & CONSTS::ASYNC_WRITER_MODE)  ||
820        db_writer->async_writer == NULL)
821     {
822         return MBError::INVALID_ARG;
823     }
824
825     db_writer->async_writer->UpdateNumUsers(-1);
826     async_writer = NULL;
827 #endif
828     return MBError::SUCCESS;
829 }
830
831 bool DB::AsyncWriterEnabled() const
832 {
833 #ifdef __SHM_QUEUE__
834     return true;
835 #else
836     return (async_writer != NULL);
837 #endif
838 }
839
840 bool DB::AsyncWriterBusy() const
841 {
842 #ifdef __SHM_QUEUE__
843     return dict->SHMQ_Busy();
844 #else
845     if(async_writer != NULL)
846         return async_writer->Busy();
847     return true;
848 #endif
849 }
850
851 void DB::SetLogFile(const std::string &log_file)
852 {
853     Logger::InitLogFile(log_file);
854 }
855
856 void DB::CloseLogFile()
857 {
858     Logger::Close();
859 }
860
861 void DB::ClearResources(const std::string &path)
862 {
863     ResourcePool::getInstance().RemoveResourceByDB(path);
864 }
865
866 } // namespace mabain