2 * Copyright (C) 2017 Cisco Inc.
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2,
6 * as published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // @author Changxue Deng <chadeng@cisco.com>
23 #include "mabain_consts.h"
28 #include "integer_4b_5b.h"
29 #include "async_writer.h"
30 #include "util/shm_mutex.h"
32 #define MAX_DATA_BUFFER_RESERVE_SIZE 0xFFFF
33 #define NUM_DATA_BUFFER_RESERVE MAX_DATA_BUFFER_RESERVE_SIZE/DATA_BUFFER_ALIGNMENT
34 #define DATA_HEADER_SIZE 32
36 #define READER_LOCK_FREE_START \
37 LockFreeData snapshot; \
39 lfree.ReaderLockFreeStart(snapshot);
40 #define READER_LOCK_FREE_STOP(edgeoff, data) \
41 lf_ret = lfree.ReaderLockFreeStop(snapshot, (edgeoff), (data)); \
42 if(lf_ret != MBError::SUCCESS) \
47 Dict::Dict(const std::string &mbdir, bool init_header, int datasize,
48 int db_options, size_t memsize_index, size_t memsize_data,
49 uint32_t block_sz_idx, uint32_t block_sz_data,
50 int max_num_index_blk, int max_num_data_blk,
51 int64_t entry_per_bucket, uint32_t queue_size)
52 : options(db_options),
54 mm(mbdir, init_header, memsize_index, db_options, block_sz_idx, max_num_index_blk, queue_size),
57 mm(mbdir, init_header, memsize_index, db_options, block_sz_idx, max_num_index_blk, queue_size)
60 status = MBError::NOT_INITIALIZED;
63 header = mm.GetHeaderPtr();
66 Logger::Log(LOG_LEVEL_ERROR, "header not mapped");
67 throw (int) MBError::MMAP_FAILED;
70 // confirm block size is the same
73 if(block_sz_data != 0 && header->data_block_size != block_sz_data)
75 std::cerr << "mabain data block size not match " << block_sz_data << ": "
76 << header->data_block_size << std::endl;
77 PrintHeader(std::cout);
79 throw (int) MBError::INVALID_SIZE;
84 header->data_block_size = block_sz_data;
87 lfree.LockFreeInit(&header->lock_free, header, db_options);
88 mm.InitLockFreePtr(&lfree);
91 // initialize shared memory queue
92 char *hdr_ptr = (char *) header;
93 queue = reinterpret_cast<AsyncNode *>(hdr_ptr + RollableFile::page_size);
97 kv_file = new RollableFile(mbdir + "_mabain_d",
98 static_cast<size_t>(header->data_block_size),
99 memsize_data, db_options, max_num_data_blk);
101 kv_file->InitShmSlidingAddr(&header->shm_data_sliding_start);
102 // If init_header is false, we can set the dict status to SUCCESS.
103 // Otherwise, the status will be set in the Init.
107 header->entry_per_bucket = entry_per_bucket;
108 header->index_block_size = block_sz_idx;
109 header->data_block_size = block_sz_data;
110 header->data_size = datasize;
112 header->m_data_offset = GetStartDataOffset(); // start from a non-zero offset
113 // We known that only writers will set init_header to true.
114 free_lists = new FreeList(mbdir+"_dbfl", DATA_BUFFER_ALIGNMENT,
115 NUM_DATA_BUFFER_RESERVE);
119 if(options & CONSTS::ACCESS_MODE_WRITER)
121 if(header->entry_per_bucket != entry_per_bucket)
124 std::cerr << "mabain count per bucket not match\n";
125 throw (int) MBError::INVALID_SIZE;
127 mm.ResetSlidingWindow();
128 ResetSlidingWindow();
129 free_lists = new FreeList(mbdir+"_dbfl", DATA_BUFFER_ALIGNMENT,
130 NUM_DATA_BUFFER_RESERVE);
133 int rval = ExceptionRecovery();
134 if(rval == MBError::SUCCESS)
136 header->excep_lf_offset = 0;
137 header->excep_offset = 0;
138 status = MBError::SUCCESS;
145 status = MBError::SUCCESS;
154 // This function only needs to be called by writer.
155 int Dict::Init(uint32_t id)
157 if(!(options & CONSTS::ACCESS_MODE_WRITER))
159 Logger::Log(LOG_LEVEL_ERROR, "dict initialization not allowed for non-writer");
160 return MBError::NOT_ALLOWED;
163 if(status != MBError::NOT_INITIALIZED)
165 // status can be NOT_INITIALIZED or SUCCESS.
166 Logger::Log(LOG_LEVEL_WARN, "connector %u dict already initialized", id);
167 return MBError::SUCCESS;
172 Logger::Log(LOG_LEVEL_ERROR, "connector %u header not mapped", id);
173 return MBError::ALLOCATION_ERROR;
176 Logger::Log(LOG_LEVEL_INFO, "connector %u initializing DictMem", id);
179 if(header->data_size > CONSTS::MAX_DATA_SIZE)
181 Logger::Log(LOG_LEVEL_ERROR, "data size %d is too large", header->data_size);
182 return MBError::INVALID_SIZE;
186 status = MBError::SUCCESS;
193 if(options & CONSTS::ACCESS_MODE_WRITER)
195 mm.ResetSlidingWindow();
196 ResetSlidingWindow();
201 if(free_lists != NULL)
208 int Dict::Status() const
213 // Add a key-value pair
214 // if overwrite is true and an entry with input key already exists, the old data will
215 // be overwritten. Otherwise, IN_DICT will be returned.
216 int Dict::Add(const uint8_t *key, int len, MBData &data, bool overwrite)
218 if(!(options & CONSTS::ACCESS_MODE_WRITER))
220 return MBError::NOT_ALLOWED;
222 if(len > CONSTS::MAX_KEY_LENGHTH || data.data_len > CONSTS::MAX_DATA_SIZE ||
223 len <= 0 || data.data_len <= 0)
224 return MBError::OUT_OF_BOUND;
227 size_t data_offset = 0;
230 rval = mm.GetRootEdge_Writer(data.options & CONSTS::OPTION_RC_MODE, key[0], edge_ptrs);
231 if(rval != MBError::SUCCESS)
234 if(edge_ptrs.len_ptr[0] == 0)
236 ReserveData(data.buff, data.data_len, data_offset);
237 // Add the first edge along this edge
238 mm.AddRootEdge(edge_ptrs, key, len, data_offset);
239 if(data.options & CONSTS::OPTION_RC_MODE)
246 header->num_update++;
249 return MBError::SUCCESS;
252 bool inc_count = true;
254 const uint8_t *key_buff;
255 uint8_t tmp_key_buff[NUM_ALPHABET];
256 const uint8_t *p = key;
257 int edge_len = edge_ptrs.len_ptr[0];
258 if(edge_len > LOCAL_EDGE_LEN)
260 if(mm.ReadData(tmp_key_buff, edge_len-1, Get5BInteger(edge_ptrs.ptr)) != edge_len-1)
261 return MBError::READ_ERROR;
262 key_buff = tmp_key_buff;
266 key_buff = edge_ptrs.ptr;
270 for(i = 1; i < edge_len; i++)
272 if(key_buff[i-1] != key[i])
281 while((next = mm.FindNext(p, len, match_len, edge_ptrs, tmp_key_buff)))
283 if(match_len < edge_ptrs.len_ptr[0])
293 ReserveData(data.buff, data.data_len, data_offset);
294 rval = mm.UpdateNode(edge_ptrs, p, len, data_offset);
296 else if(match_len < static_cast<int>(edge_ptrs.len_ptr[0]))
300 ReserveData(data.buff, data.data_len, data_offset);
301 rval = mm.AddLink(edge_ptrs, match_len, p+match_len, len-match_len,
304 else if(len == match_len)
306 ReserveData(data.buff, data.data_len, data_offset);
307 rval = mm.InsertNode(edge_ptrs, match_len, data_offset, data);
312 rval = UpdateDataBuffer(edge_ptrs, overwrite, data.buff, data.data_len, inc_count);
317 ReserveData(data.buff, data.data_len, data_offset);
318 rval = mm.AddLink(edge_ptrs, i, p+i, len-i, data_offset, data);
323 for(i = 1; i < len; i++)
325 if(key_buff[i-1] != key[i])
330 ReserveData(data.buff, data.data_len, data_offset);
331 rval = mm.AddLink(edge_ptrs, i, p+i, len-i, data_offset, data);
335 if(edge_ptrs.len_ptr[0] > len)
337 ReserveData(data.buff, data.data_len, data_offset);
338 rval = mm.InsertNode(edge_ptrs, i, data_offset, data);
342 rval = UpdateDataBuffer(edge_ptrs, overwrite, data.buff, data.data_len, inc_count);
347 if(data.options & CONSTS::OPTION_RC_MODE)
349 if(rval == MBError::SUCCESS)
354 if(rval == MBError::SUCCESS)
355 header->num_update++;
362 int Dict::ReadDataFromEdge(MBData &data, const EdgePtrs &edge_ptrs) const
365 if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
367 data_off = Get6BInteger(edge_ptrs.offset_ptr);
371 uint8_t node_buff[NODE_EDGE_KEY_FIRST];
372 if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, Get6BInteger(edge_ptrs.offset_ptr))
373 != NODE_EDGE_KEY_FIRST)
374 return MBError::READ_ERROR;
375 if(!(node_buff[0] & FLAG_NODE_MATCH))
376 return MBError::NOT_EXIST;
377 data_off = Get6BInteger(node_buff+2);
379 data.data_offset = data_off;
381 uint16_t data_len[2];
382 // Read data length first
383 if(ReadData(reinterpret_cast<uint8_t*>(&data_len[0]), DATA_HDR_BYTE, data_off)
385 return MBError::READ_ERROR;
386 data_off += DATA_HDR_BYTE;
387 if(data.buff_len < data_len[0] + 1)
389 if(data.Resize(data_len[0]) != MBError::SUCCESS)
390 return MBError::NO_MEMORY;
392 if(ReadData(data.buff, data_len[0], data_off) != data_len[0])
393 return MBError::READ_ERROR;
395 data.data_len = data_len[0];
396 data.bucket_index = data_len[1];
397 return MBError::SUCCESS;
400 // Delete operations:
401 // If this is a leaf node, need to remove the edge. Otherwise, unset the match flag.
402 // Also need to set the delete flag in the data block so that it can be reclaimed later.
403 int Dict::DeleteDataFromEdge(MBData &data, EdgePtrs &edge_ptrs)
405 int rval = MBError::SUCCESS;
410 // Check if this is a leaf node first by using the EDGE_FLAG_DATA_OFF bit
411 if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
413 data_off = Get6BInteger(edge_ptrs.offset_ptr);
414 if(ReadData(reinterpret_cast<uint8_t*>(&data_len), DATA_SIZE_BYTE, data_off)
416 return MBError::READ_ERROR;
418 rel_size = free_lists->GetAlignmentSize(data_len + DATA_HDR_BYTE);
419 header->pending_data_buff_size += rel_size;
420 free_lists->ReleaseBuffer(data_off, rel_size);
422 rval = mm.RemoveEdgeByIndex(edge_ptrs, data);
426 // No exception handling in this case
427 header->excep_lf_offset = 0;
428 header->excep_offset = 0;
430 uint8_t node_buff[NODE_EDGE_KEY_FIRST];
431 size_t node_off = Get6BInteger(edge_ptrs.offset_ptr);
434 if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, node_off) != NODE_EDGE_KEY_FIRST)
435 return MBError::READ_ERROR;
437 if(node_buff[0] & FLAG_NODE_MATCH)
439 // Unset the match flag
440 node_buff[0] &= ~FLAG_NODE_MATCH;
441 mm.WriteData(&node_buff[0], 1, node_off);
443 // Release data buffer
444 data_off = Get6BInteger(node_buff+2);
445 if(ReadData(reinterpret_cast<uint8_t*>(&data_len), DATA_SIZE_BYTE, data_off)
447 return MBError::READ_ERROR;
449 rel_size = free_lists->GetAlignmentSize(data_len + DATA_HDR_BYTE);
450 header->pending_data_buff_size += rel_size;
451 free_lists->ReleaseBuffer(data_off, rel_size);
455 rval = MBError::NOT_EXIST;
462 int Dict::ReadDataFromNode(MBData &data, const uint8_t *node_ptr) const
464 size_t data_off = Get6BInteger(node_ptr+2);
466 return MBError::NOT_EXIST;
468 data.data_offset = data_off;
470 // Read data length first
471 uint16_t data_len[2];
472 if(ReadData(reinterpret_cast<uint8_t *>(&data_len[0]), DATA_HDR_BYTE, data_off)
474 return MBError::READ_ERROR;
475 data_off += DATA_HDR_BYTE;
477 if(data.buff_len < data_len[0] + 1)
479 if(data.Resize(data_len[0]) != MBError::SUCCESS)
480 return MBError::NO_MEMORY;
482 if(ReadData(data.buff, data_len[0], data_off) != data_len[0])
483 return MBError::READ_ERROR;
485 data.data_len = data_len[0];
486 data.bucket_index = data_len[1];
487 return MBError::SUCCESS;
490 int Dict::FindPrefix(const uint8_t *key, int len, MBData &data)
494 size_t rc_root_offset = header->rc_root_offset.load(MEMORY_ORDER_READER);
495 if(rc_root_offset != 0)
497 reader_rc_off = rc_root_offset;
498 rval = FindPrefix_Internal(rc_root_offset, key, len, data_rc);
500 while(rval == MBError::TRY_AGAIN)
502 nanosleep((const struct timespec[]){{0, 10L}}, NULL);
504 rval = FindPrefix_Internal(rc_root_offset, key, len, data_rc);
507 if(rval != MBError::NOT_EXIST && rval != MBError::SUCCESS)
509 data.options &= ~(CONSTS::OPTION_RC_MODE | CONSTS::OPTION_READ_SAVED_EDGE);
513 if(reader_rc_off != 0)
521 rval = FindPrefix_Internal(0, key, len, data);
523 while(rval == MBError::TRY_AGAIN)
525 nanosleep((const struct timespec[]){{0, 10L}}, NULL);
527 rval = FindPrefix_Internal(0, key, len, data);
531 // The longer match wins.
532 if(data_rc.match_len > data.match_len)
534 data_rc.TransferValueTo(data.buff, data.data_len);
535 rval = MBError::SUCCESS;
541 int Dict::FindPrefix_Internal(size_t root_off, const uint8_t *key, int len, MBData &data)
545 EdgePtrs &edge_ptrs = data.edge_ptrs;
547 READER_LOCK_FREE_START
550 if(data.match_len == 0)
552 rval = mm.GetRootEdge(data.options & CONSTS::OPTION_RC_MODE, key[0], edge_ptrs);
553 if(rval != MBError::SUCCESS)
554 return MBError::READ_ERROR;
556 if(edge_ptrs.len_ptr[0] == 0)
559 READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
561 return MBError::NOT_EXIST;
565 // Compare edge string
566 const uint8_t *key_buff;
567 uint8_t *node_buff = data.node_buff;
568 const uint8_t *p = key;
569 int edge_len = edge_ptrs.len_ptr[0];
570 int edge_len_m1 = edge_len - 1;
571 if(edge_len > LOCAL_EDGE_LEN)
573 if(mm.ReadData(node_buff, edge_len_m1, Get5BInteger(edge_ptrs.ptr))
577 READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
579 return MBError::READ_ERROR;
581 key_buff = node_buff;
585 key_buff = edge_ptrs.ptr;
588 rval = MBError::NOT_EXIST;
591 if(edge_len > 1 && memcmp(key_buff, key+1, edge_len_m1) != 0)
594 READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
596 return MBError::NOT_EXIST;
602 if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
604 // prefix match for leaf node
606 READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
608 data.match_len = p - key;
609 return ReadDataFromEdge(data, edge_ptrs);
612 uint8_t last_node_buffer[NODE_EDGE_KEY_FIRST];
614 size_t edge_offset_prev = edge_ptrs.offset;
616 int last_prefix_rval = MBError::NOT_EXIST;
619 rval = mm.NextEdge(p, edge_ptrs, node_buff, data);
620 if(rval != MBError::READ_ERROR)
622 if(node_buff[0] & FLAG_NODE_MATCH)
624 data.match_len = p - key;
625 if(data.options & CONSTS::OPTION_ALL_PREFIX)
627 rval = ReadDataFromNode(data, node_buff);
629 rval = last_prefix_rval;
634 memcpy(last_node_buffer, node_buff, NODE_EDGE_KEY_FIRST);
635 last_prefix_rval = MBError::SUCCESS;
640 if(rval != MBError::SUCCESS)
644 READER_LOCK_FREE_STOP(edge_offset_prev, data)
646 edge_len = edge_ptrs.len_ptr[0];
647 edge_len_m1 = edge_len - 1;
649 if(edge_len > LOCAL_EDGE_LEN)
651 if(mm.ReadData(node_buff, edge_len_m1, Get5BInteger(edge_ptrs.ptr))
654 rval = MBError::READ_ERROR;
657 key_buff = node_buff;
661 key_buff = edge_ptrs.ptr;
664 if((edge_len > 1 && memcmp(key_buff, p+1, edge_len_m1) != 0) || edge_len == 0)
666 rval = MBError::NOT_EXIST;
672 if(len <= 0 || (edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF))
674 data.match_len = p - key;
675 rval = ReadDataFromEdge(data, edge_ptrs);
679 edge_offset_prev = edge_ptrs.offset;
683 if(rval == MBError::NOT_EXIST && last_prefix_rval != rval)
684 rval = ReadDataFromNode(data, last_node_buffer);
686 else if(edge_len == len)
688 if(edge_len_m1 == 0 || memcmp(key_buff, key+1, edge_len_m1) == 0)
690 data.match_len = len;
691 rval = ReadDataFromEdge(data, edge_ptrs);
696 READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
701 int Dict::Find(const uint8_t *key, int len, MBData &data)
704 size_t rc_root_offset = header->rc_root_offset.load(MEMORY_ORDER_READER);
706 if(rc_root_offset != 0)
708 reader_rc_off = rc_root_offset;
709 rval = Find_Internal(rc_root_offset, key, len, data);
711 while(rval == MBError::TRY_AGAIN)
713 nanosleep((const struct timespec[]){{0, 10L}}, NULL);
714 rval = Find_Internal(rc_root_offset, key, len, data);
717 if(rval == MBError::SUCCESS)
719 data.match_len = len;
722 else if(rval != MBError::NOT_EXIST)
724 data.options &= ~(CONSTS::OPTION_RC_MODE | CONSTS::OPTION_READ_SAVED_EDGE);
728 if(reader_rc_off != 0)
736 rval = Find_Internal(0, key, len, data);
738 while(rval == MBError::TRY_AGAIN)
740 nanosleep((const struct timespec[]){{0, 10L}}, NULL);
741 rval = Find_Internal(0, key, len, data);
744 if(rval == MBError::SUCCESS)
745 data.match_len = len;
750 int Dict::Find_Internal(size_t root_off, const uint8_t *key, int len, MBData &data)
752 EdgePtrs &edge_ptrs = data.edge_ptrs;
754 READER_LOCK_FREE_START
757 rval = mm.GetRootEdge(root_off, key[0], edge_ptrs);
759 if(rval != MBError::SUCCESS)
760 return MBError::READ_ERROR;
761 if(edge_ptrs.len_ptr[0] == 0)
764 READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
766 return MBError::NOT_EXIST;
769 // Compare edge string
770 const uint8_t *key_buff;
771 uint8_t *node_buff = data.node_buff;
772 const uint8_t *p = key;
773 int edge_len = edge_ptrs.len_ptr[0];
774 int edge_len_m1 = edge_len - 1;
776 rval = MBError::NOT_EXIST;
777 if(edge_len > LOCAL_EDGE_LEN)
779 size_t edge_str_off_lf = Get5BInteger(edge_ptrs.ptr);
780 if(mm.ReadData(node_buff, edge_len_m1, edge_str_off_lf) != edge_len_m1)
783 READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
785 return MBError::READ_ERROR;
787 key_buff = node_buff;
791 key_buff = edge_ptrs.ptr;
796 if((edge_len > 1 && memcmp(key_buff, key+1, edge_len_m1) != 0) ||
797 (edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF))
800 READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
802 return MBError::NOT_EXIST;
809 size_t edge_offset_prev = edge_ptrs.offset;
813 rval = mm.NextEdge(p, edge_ptrs, node_buff, data);
814 if(rval != MBError::SUCCESS)
818 READER_LOCK_FREE_STOP(edge_offset_prev, data)
820 edge_len = edge_ptrs.len_ptr[0];
821 edge_len_m1 = edge_len - 1;
823 if(edge_len > LOCAL_EDGE_LEN)
825 size_t edge_str_off_lf = Get5BInteger(edge_ptrs.ptr);
826 if(mm.ReadData(node_buff, edge_len_m1, edge_str_off_lf) != edge_len_m1)
828 rval = MBError::READ_ERROR;
831 key_buff = node_buff;
835 key_buff = edge_ptrs.ptr;
838 if((edge_len_m1 > 0 && memcmp(key_buff, p+1, edge_len_m1) != 0) || edge_len_m1 < 0)
840 rval = MBError::NOT_EXIST;
847 // If this is for remove operation, return IN_DICT to caller.
848 if(data.options & CONSTS::OPTION_FIND_AND_STORE_PARENT)
849 rval = MBError::IN_DICT;
851 rval = ReadDataFromEdge(data, edge_ptrs);
856 if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
858 // Reach a leaf node and no match found
859 rval = MBError::NOT_EXIST;
865 edge_offset_prev = edge_ptrs.offset;
869 else if(edge_len == len)
871 if(len > 1 && memcmp(key_buff, key+1, len-1) != 0)
873 rval = MBError::NOT_EXIST;
877 // If this is for remove operation, return IN_DICT to caller.
878 if(data.options & CONSTS::OPTION_FIND_AND_STORE_PARENT)
880 data.edge_ptrs.curr_node_offset = mm.GetRootOffset();
881 data.edge_ptrs.curr_nt = 1;
882 data.edge_ptrs.curr_edge_index = 0;
883 data.edge_ptrs.parent_offset = data.edge_ptrs.offset;
884 rval = MBError::IN_DICT;
888 rval = ReadDataFromEdge(data, edge_ptrs);
894 READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
899 void Dict::PrintStats(std::ostream *out_stream) const
901 if(out_stream != NULL)
902 return PrintStats(*out_stream);
903 return PrintStats(std::cout);
906 void Dict::PrintStats(std::ostream &out_stream) const
908 if(status != MBError::SUCCESS)
911 out_stream << "DB stats:\n";
912 out_stream << "\tNumber of DB writer: " << header->num_writer << std::endl;
913 out_stream << "\tNumber of DB reader: " << header->num_reader << std::endl;
914 out_stream << "\tEntry count in DB: " << header->count << std::endl;
915 out_stream << "\tEntry count per bucket: " << header->entry_per_bucket << std::endl;
916 out_stream << "\tEviction bucket index: " << header->eviction_bucket_index << std::endl;
917 out_stream << "\tData block size: " << header->data_block_size << std::endl;
918 out_stream << "\tData size: " << header->m_data_offset << std::endl;
919 out_stream << "\tPending Buffer Size: " << header->pending_data_buff_size << std::endl;
921 out_stream << "\tTrackable Buffer Size: " << free_lists->GetTotSize() << std::endl;
922 mm.PrintStats(out_stream);
924 kv_file->PrintStats(out_stream);
927 int64_t Dict::Count() const
931 Logger::Log(LOG_LEVEL_WARN, "db was not initialized successfully: %s",
932 MBError::get_error_str(status));
936 return header->count;
940 int Dict::ReadNextEdge(const uint8_t *node_buff, EdgePtrs &edge_ptrs,
941 int &match, MBData &data, std::string &match_str,
942 size_t &node_off, bool rd_kv) const
944 if(edge_ptrs.curr_nt > static_cast<int>(node_buff[1]))
945 return MBError::OUT_OF_BOUND;
947 if(mm.ReadData(edge_ptrs.edge_buff, EDGE_SIZE, edge_ptrs.offset) != EDGE_SIZE)
948 return MBError::READ_ERROR;
953 int rval = MBError::SUCCESS;
954 InitTempEdgePtrs(edge_ptrs);
955 if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
957 // match of leaf node
961 rval = ReadDataFromEdge(data, edge_ptrs);
962 if(rval != MBError::SUCCESS)
969 if(edge_ptrs.len_ptr[0] > 0)
971 node_off = Get6BInteger(edge_ptrs.offset_ptr);
973 rval = ReadNodeMatch(node_off, match, data);
977 if(edge_ptrs.len_ptr[0] > 0 && rd_kv)
979 int edge_len_m1 = edge_ptrs.len_ptr[0] - 1;
980 match_str = std::string(1, (const char)node_buff[NODE_EDGE_KEY_FIRST+edge_ptrs.curr_nt]);
981 if(edge_len_m1 > LOCAL_EDGE_LEN_M1)
983 if(mm.ReadData(data.node_buff, edge_len_m1, Get5BInteger(edge_ptrs.ptr)) != edge_len_m1)
984 return MBError::READ_ERROR;
985 match_str += std::string(reinterpret_cast<char*>(data.node_buff), edge_len_m1);
987 else if(edge_len_m1 > 0)
989 match_str += std::string(reinterpret_cast<char*>(edge_ptrs.ptr), edge_len_m1);
994 edge_ptrs.offset += EDGE_SIZE;
999 int Dict::ReadNode(size_t node_off, uint8_t *node_buff, EdgePtrs &edge_ptrs,
1000 int &match, MBData &data, bool rd_kv) const
1002 if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, node_off) != NODE_EDGE_KEY_FIRST)
1003 return MBError::READ_ERROR;
1005 edge_ptrs.curr_nt = 0;
1006 int nt = node_buff[1] + 1;
1007 node_off += NODE_EDGE_KEY_FIRST;
1008 if(mm.ReadData(node_buff + NODE_EDGE_KEY_FIRST, nt, node_off) != nt)
1009 return MBError::READ_ERROR;
1011 int rval = MBError::SUCCESS;
1012 edge_ptrs.offset = node_off + nt;
1013 if(node_buff[0] & FLAG_NODE_MATCH)
1015 // match of non-leaf node
1018 rval = ReadDataFromNode(data, node_buff);
1022 // no match at the non-leaf node
1029 void Dict::ReadNodeHeader(size_t node_off, int &node_size, int &match,
1030 size_t &data_offset, size_t &data_link_offset)
1032 uint8_t node_buff[NODE_EDGE_KEY_FIRST];
1033 if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, node_off) != NODE_EDGE_KEY_FIRST)
1034 throw (int) MBError::READ_ERROR;
1036 node_size = mm.GetNodeSizePtr()[ node_buff[1] ];
1037 if(node_buff[0] & FLAG_NODE_MATCH)
1040 data_offset = Get6BInteger(node_buff + 2);
1041 data_link_offset = node_off + 2;
1045 int Dict::ReadNodeMatch(size_t node_off, int &match, MBData &data) const
1047 uint8_t node_buff[NODE_EDGE_KEY_FIRST];
1048 if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, node_off) != NODE_EDGE_KEY_FIRST)
1049 return MBError::READ_ERROR;
1051 int rval = MBError::SUCCESS;
1052 if(node_buff[0] & FLAG_NODE_MATCH)
1055 rval = ReadDataFromNode(data, node_buff);
1056 if(rval != MBError::SUCCESS)
1060 return MBError::SUCCESS;
1063 size_t Dict::GetRootOffset() const
1065 return mm.GetRootOffset();
1069 int Dict::ReadRootNode(uint8_t *node_buff, EdgePtrs &edge_ptrs, int &match,
1073 if(data.options & CONSTS::OPTION_RC_MODE)
1074 root_off = header->rc_root_offset;
1076 root_off = mm.GetRootOffset();
1077 return ReadNode(root_off, node_buff, edge_ptrs, match, data);
1080 int Dict::Remove(const uint8_t *key, int len)
1082 MBData data(0, CONSTS::OPTION_FIND_AND_STORE_PARENT);
1083 return Remove(key, len, data);
1086 int Dict::Remove(const uint8_t *key, int len, MBData &data)
1088 if(!(options & CONSTS::ACCESS_MODE_WRITER))
1090 return MBError::NOT_ALLOWED;
1092 if(data.options & CONSTS::OPTION_RC_MODE)
1094 // FIXME Remove in RC mode not implemented yet!!!
1095 return MBError::INVALID_ARG;
1098 // The DELETE flag must be set
1099 if(!(data.options & CONSTS::OPTION_FIND_AND_STORE_PARENT))
1100 return MBError::INVALID_ARG;
1103 rval = Find(key, len, data);
1104 if(rval == MBError::IN_DICT)
1106 rval = DeleteDataFromEdge(data, data.edge_ptrs);
1107 while(rval == MBError::TRY_AGAIN)
1110 len -= data.edge_ptrs.len_ptr[0];
1114 rval = Find(key, len, data);
1115 if(MBError::IN_DICT == rval)
1117 rval = mm.RemoveEdgeByIndex(data.edge_ptrs, data);
1122 if(rval == MBError::SUCCESS)
1125 if(header->count == 0)
1134 int Dict::RemoveAll()
1136 int rval = MBError::SUCCESS;;
1137 for(int c = 0; c < NUM_ALPHABET; c++)
1139 rval = mm.ClearRootEdge(c);
1140 if(rval != MBError::SUCCESS)
1145 mm.ResetSlidingWindow();
1148 header->m_data_offset = GetStartDataOffset();
1149 free_lists->Empty();
1150 header->pending_data_buff_size = 0;
1151 ResetSlidingWindow();
1153 header->eviction_bucket_index = 0;
1154 header->num_update = 0;
1158 pthread_rwlock_t* Dict::GetShmLockPtrs() const
1160 return &header->mb_rw_lock;
1163 int Dict::InitShmObjects()
1165 if(status != MBError::SUCCESS)
1166 return MBError::NOT_INITIALIZED;
1168 Logger::Log(LOG_LEVEL_INFO, "initializing shared memory objects");
1171 status = InitShmRWLock(&header->mb_rw_lock);
1172 if(status != MBError::SUCCESS)
1176 #ifdef __SHM_QUEUE__
1177 for(int i = 0; i < header->async_queue_size; i++)
1179 status = InitShmMutex(&queue[i].mutex);
1180 if(status != MBError::SUCCESS)
1182 status = InitShmCond(&queue[i].cond);
1183 if(status != MBError::SUCCESS)
1191 // Reserve buffer and write to it
1192 void Dict::ReserveData(const uint8_t* buff, int size, size_t &offset)
1195 assert(size <= CONSTS::MAX_DATA_SIZE);
1198 int buf_size = free_lists->GetAlignmentSize(size + DATA_HDR_BYTE);
1199 int buf_index = free_lists->GetBufferIndex(buf_size);
1201 dsize[0] = static_cast<uint16_t>(size);
1202 // store bucket index for LRU eviction
1203 dsize[1] = (header->num_update / header->entry_per_bucket) % 0xFFFF;
1204 if(dsize[1] == header->eviction_bucket_index &&
1205 header->num_update > header->entry_per_bucket)
1207 header->eviction_bucket_index++;
1210 if(free_lists->GetBufferCountByIndex(buf_index) > 0)
1212 offset = free_lists->RemoveBufferByIndex(buf_index);
1213 WriteData(reinterpret_cast<const uint8_t*>(&dsize[0]), DATA_HDR_BYTE, offset);
1214 WriteData(buff, size, offset+DATA_HDR_BYTE);
1215 header->pending_data_buff_size -= buf_size;
1219 size_t old_off = header->m_data_offset;
1222 int rval = kv_file->Reserve(header->m_data_offset, buf_size, ptr);
1223 if(rval != MBError::SUCCESS)
1226 //Checking missing buffer due to alignment
1227 if(old_off < header->m_data_offset)
1229 free_lists->ReleaseAlignmentBuffer(old_off, header->m_data_offset);
1230 header->pending_data_buff_size += header->m_data_offset - old_off;
1233 offset = header->m_data_offset;
1234 header->m_data_offset += buf_size;
1237 memcpy(ptr, &dsize[0], DATA_HDR_BYTE);
1238 memcpy(ptr+DATA_HDR_BYTE, buff, size);
1242 WriteData(reinterpret_cast<const uint8_t*>(&dsize[0]), DATA_HDR_BYTE, offset);
1243 WriteData(buff, size, offset+DATA_HDR_BYTE);
1248 int Dict::ReleaseBuffer(size_t offset)
1252 if(ReadData(reinterpret_cast<uint8_t*>(&data_size), DATA_SIZE_BYTE, offset)
1254 return MBError::READ_ERROR;
1256 int rel_size = free_lists->GetAlignmentSize(data_size + DATA_HDR_BYTE);
1257 header->pending_data_buff_size += rel_size;
1258 return free_lists->ReleaseBuffer(offset, rel_size);
1261 int Dict::UpdateDataBuffer(EdgePtrs &edge_ptrs, bool overwrite, const uint8_t *buff,
1262 int len, bool &inc_count)
1266 if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
1271 return MBError::IN_DICT;
1273 data_off = Get6BInteger(edge_ptrs.offset_ptr);
1274 if(ReleaseBuffer(data_off) != MBError::SUCCESS)
1275 Logger::Log(LOG_LEVEL_WARN, "failed to release data buffer: %llu", data_off);
1276 ReserveData(buff, len, data_off);
1277 Write6BInteger(edge_ptrs.offset_ptr, data_off);
1279 header->excep_lf_offset = edge_ptrs.offset;
1280 memcpy(header->excep_buff, edge_ptrs.offset_ptr, OFFSET_SIZE);
1281 #ifdef __LOCK_FREE__
1282 lfree.WriterLockFreeStart(edge_ptrs.offset);
1284 header->excep_updating_status = EXCEP_STATUS_ADD_DATA_OFF;
1285 mm.WriteData(edge_ptrs.offset_ptr, OFFSET_SIZE, edge_ptrs.offset+EDGE_NODE_LEADING_POS);
1286 #ifdef __LOCK_FREE__
1287 lfree.WriterLockFreeStop();
1289 header->excep_updating_status = EXCEP_STATUS_NONE;
1293 uint8_t *node_buff = header->excep_buff;
1294 size_t node_off = Get6BInteger(edge_ptrs.offset_ptr);
1296 if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, node_off) != NODE_EDGE_KEY_FIRST)
1297 return MBError::READ_ERROR;
1299 if(node_buff[0] & FLAG_NODE_MATCH)
1303 return MBError::IN_DICT;
1305 data_off = Get6BInteger(node_buff+2);
1306 if(ReleaseBuffer(data_off) != MBError::SUCCESS)
1307 Logger::Log(LOG_LEVEL_WARN, "failed to release data buffer %llu", data_off);
1309 node_buff[NODE_EDGE_KEY_FIRST] = 0;
1313 // set the match flag
1314 node_buff[0] |= FLAG_NODE_MATCH;
1316 node_buff[NODE_EDGE_KEY_FIRST] = 1;
1319 ReserveData(buff, len, data_off);
1320 Write6BInteger(node_buff+2, data_off);
1322 header->excep_offset = node_off;
1323 #ifdef __LOCK_FREE__
1324 header->excep_lf_offset = edge_ptrs.offset;
1325 lfree.WriterLockFreeStart(edge_ptrs.offset);
1327 header->excep_updating_status = EXCEP_STATUS_ADD_NODE;
1328 mm.WriteData(node_buff, NODE_EDGE_KEY_FIRST, node_off);
1329 #ifdef __LOCK_FREE__
1330 lfree.WriterLockFreeStop();
1332 header->excep_updating_status = EXCEP_STATUS_NONE;
1335 return MBError::SUCCESS;
1338 // delta should be either +1 or -1.
1339 void Dict::UpdateNumReader(int delta) const
1341 header->num_reader += delta;
1342 if(header->num_reader < 0)
1343 header->num_reader = 0;
1345 Logger::Log(LOG_LEVEL_INFO, "number of reader is set to: %d",
1346 header->num_reader);
1349 // delta should be either +1 or -1.
1350 int Dict::UpdateNumWriter(int delta) const
1354 // Only one writer allowed
1355 if(header->num_writer > 0)
1357 Logger::Log(LOG_LEVEL_WARN, "writer was not shutdown cleanly previously");
1358 header->num_writer = 1;
1359 // Reset number of reader too.
1360 header->num_reader = 0;
1361 return MBError::WRITER_EXIST;
1364 header->num_writer = 1;
1368 header->num_writer = 0;
1369 header->lock_free.offset = MAX_6B_OFFSET;
1372 Logger::Log(LOG_LEVEL_INFO, "number of writer is set to: %d", header->num_writer);
1373 return MBError::SUCCESS;
1376 DictMem* Dict::GetMM() const
1378 return (DictMem*) &mm;
1381 size_t Dict::GetStartDataOffset() const
1383 return DATA_HEADER_SIZE;
1386 void Dict::ResetSlidingWindow() const
1388 kv_file->ResetSlidingWindow();
1390 header->shm_data_sliding_start.store(0, std::memory_order_relaxed);
1393 LockFree* Dict::GetLockFreePtr()
1398 void Dict::Flush() const
1400 if(!(options & CONSTS::ACCESS_MODE_WRITER))
1408 // Recovery from abnormal writer terminations (segfault, kill -9 etc)
1409 // during DB updates (insertion, replacing and deletion).
1410 int Dict::ExceptionRecovery()
1413 return MBError::NOT_INITIALIZED;
1415 int rval = MBError::SUCCESS;
1416 if(header->excep_updating_status == EXCEP_STATUS_NONE)
1418 Logger::Log(LOG_LEVEL_INFO, "writer was shutdown successfully previously");
1422 Logger::Log(LOG_LEVEL_INFO, "writer was not shutdown gracefully with exception status %d",
1423 header->excep_updating_status);
1424 // Dumper header before running recover
1425 std::ofstream *logstream = Logger::GetLogStream();
1426 if(logstream != NULL)
1428 PrintHeader(*logstream);
1432 PrintHeader(std::cout);
1435 switch(header->excep_updating_status)
1437 case EXCEP_STATUS_ADD_EDGE:
1438 #ifdef __LOCK_FREE__
1439 lfree.WriterLockFreeStart(header->excep_lf_offset);
1441 mm.WriteData(header->excep_buff, EDGE_SIZE, header->excep_lf_offset);
1444 case EXCEP_STATUS_ADD_DATA_OFF:
1445 #ifdef __LOCK_FREE__
1446 lfree.WriterLockFreeStart(header->excep_lf_offset);
1448 mm.WriteData(header->excep_buff, OFFSET_SIZE,
1449 header->excep_lf_offset+EDGE_NODE_LEADING_POS);
1451 case EXCEP_STATUS_ADD_NODE:
1452 #ifdef __LOCK_FREE__
1453 lfree.WriterLockFreeStart(header->excep_lf_offset);
1455 mm.WriteData(header->excep_buff, NODE_EDGE_KEY_FIRST,
1456 header->excep_offset);
1457 if(header->excep_buff[NODE_EDGE_KEY_FIRST]) header->count++;
1459 case EXCEP_STATUS_REMOVE_EDGE:
1460 #ifdef __LOCK_FREE__
1461 lfree.WriterLockFreeStart(header->excep_lf_offset);
1463 Write6BInteger(header->excep_buff, header->excep_offset);
1464 mm.WriteData(header->excep_buff, OFFSET_SIZE,
1465 header->excep_lf_offset+EDGE_NODE_LEADING_POS);
1467 case EXCEP_STATUS_CLEAR_EDGE:
1468 #ifdef __LOCK_FREE__
1469 lfree.WriterLockFreeStart(header->excep_lf_offset);
1471 mm.WriteData(DictMem::empty_edge, EDGE_SIZE, header->excep_lf_offset);
1474 case EXCEP_STATUS_RC_NODE:
1475 case EXCEP_STATUS_RC_DATA:
1476 #ifdef __LOCK_FREE__
1477 lfree.WriterLockFreeStart(header->excep_lf_offset);
1479 mm.WriteData(header->excep_buff, OFFSET_SIZE, header->excep_offset);
1481 case EXCEP_STATUS_RC_EDGE_STR:
1482 #ifdef __LOCK_FREE__
1483 lfree.WriterLockFreeStart(header->excep_lf_offset);
1485 mm.WriteData(header->excep_buff, OFFSET_SIZE-1, header->excep_offset);
1488 Logger::Log(LOG_LEVEL_ERROR, "unknown exception status: %d",
1489 header->excep_updating_status);
1490 rval = MBError::INVALID_ARG;
1492 #ifdef __LOCK_FREE__
1493 lfree.WriterLockFreeStop();
1496 if(rval == MBError::SUCCESS)
1498 header->excep_updating_status = EXCEP_STATUS_NONE;
1499 Logger::Log(LOG_LEVEL_INFO, "successfully recovered from abnormal termination");
1503 Logger::Log(LOG_LEVEL_ERROR, "failed to recover from abnormal termination");
1509 void Dict::WriteData(const uint8_t *buff, unsigned len, size_t offset) const
1511 if(offset + len > header->m_data_offset)
1513 std::cerr << "invalid dict write: " << offset << " " << len << " "
1514 << header->m_data_offset << "\n";
1515 throw (int) MBError::OUT_OF_BOUND;
1518 if(kv_file->RandomWrite(buff, len, offset) != len)
1519 throw (int) MBError::WRITE_ERROR;