remove conflict
[c11concurrency-benchmarks.git] / mabain / src / dict.cpp
1 /**
2  * Copyright (C) 2017 Cisco Inc.
3  *
4  * This program is free software: you can redistribute it and/or  modify
5  * it under the terms of the GNU General Public License, version 2,
6  * as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  */
16
17 // @author Changxue Deng <chadeng@cisco.com>
18
19 #include <stdlib.h>
20 #include <iostream>
21 #include <errno.h>
22
23 #include "mabain_consts.h"
24 #include "db.h"
25 #include "dict.h"
26 #include "dict_mem.h"
27 #include "error.h"
28 #include "integer_4b_5b.h"
29 #include "async_writer.h"
30 #include "util/shm_mutex.h"
31
32 #define MAX_DATA_BUFFER_RESERVE_SIZE    0xFFFF
33 #define NUM_DATA_BUFFER_RESERVE         MAX_DATA_BUFFER_RESERVE_SIZE/DATA_BUFFER_ALIGNMENT
34 #define DATA_HEADER_SIZE                32
35
36 #define READER_LOCK_FREE_START               \
37     LockFreeData snapshot;                   \
38     int lf_ret;                              \
39     lfree.ReaderLockFreeStart(snapshot);
40 #define READER_LOCK_FREE_STOP(edgeoff, data)                        \
41     lf_ret = lfree.ReaderLockFreeStop(snapshot, (edgeoff), (data)); \
42     if(lf_ret != MBError::SUCCESS)                                  \
43         return lf_ret;                                              \
44
45 namespace mabain {
46
47 Dict::Dict(const std::string &mbdir, bool init_header, int datasize,
48            int db_options, size_t memsize_index, size_t memsize_data,
49            uint32_t block_sz_idx, uint32_t block_sz_data,
50            int max_num_index_blk, int max_num_data_blk,
51            int64_t entry_per_bucket, uint32_t queue_size)
52          : options(db_options),
53 #ifdef __SHM_QUEUE__
54            mm(mbdir, init_header, memsize_index, db_options, block_sz_idx, max_num_index_blk, queue_size),
55            queue(NULL)
56 #else
57            mm(mbdir, init_header, memsize_index, db_options, block_sz_idx, max_num_index_blk, queue_size)
58 #endif
59 {
60     status = MBError::NOT_INITIALIZED;
61     reader_rc_off = 0;
62
63     header = mm.GetHeaderPtr();
64     if(header == NULL)
65     {
66         Logger::Log(LOG_LEVEL_ERROR, "header not mapped");
67         throw (int) MBError::MMAP_FAILED;
68     }
69
70     // confirm block size is the same
71     if(!init_header)
72     {
73         if(block_sz_data != 0 && header->data_block_size != block_sz_data)
74         {
75             std::cerr << "mabain data block size not match " << block_sz_data << ": "
76                       << header->data_block_size << std::endl;
77             PrintHeader(std::cout);
78             Destroy();
79             throw (int) MBError::INVALID_SIZE;
80         }
81     }
82     else
83     {
84         header->data_block_size = block_sz_data;
85     }
86
87     lfree.LockFreeInit(&header->lock_free, header, db_options);
88     mm.InitLockFreePtr(&lfree);
89
90 #ifdef __SHM_QUEUE__
91     // initialize shared memory queue
92     char *hdr_ptr = (char *) header;
93     queue = reinterpret_cast<AsyncNode *>(hdr_ptr + RollableFile::page_size);
94 #endif
95
96     // Open data file
97     kv_file = new RollableFile(mbdir + "_mabain_d",
98                                static_cast<size_t>(header->data_block_size),
99                                memsize_data, db_options, max_num_data_blk);
100
101     kv_file->InitShmSlidingAddr(&header->shm_data_sliding_start);
102     // If init_header is false, we can set the dict status to SUCCESS.
103     // Otherwise, the status will be set in the Init.
104     if(init_header)
105     {
106         // Initialize header
107         header->entry_per_bucket = entry_per_bucket;
108         header->index_block_size = block_sz_idx;
109         header->data_block_size = block_sz_data;
110         header->data_size = datasize;
111         header->count = 0;
112         header->m_data_offset = GetStartDataOffset(); // start from a non-zero offset
113         // We known that only writers will set init_header to true.
114         free_lists = new FreeList(mbdir+"_dbfl", DATA_BUFFER_ALIGNMENT,
115                                   NUM_DATA_BUFFER_RESERVE);
116     }
117     else
118     {
119         if(options & CONSTS::ACCESS_MODE_WRITER)
120         {
121             if(header->entry_per_bucket != entry_per_bucket)
122             {
123                 Destroy();
124                 std::cerr << "mabain count per bucket not match\n";
125                 throw (int) MBError::INVALID_SIZE;
126             }
127             mm.ResetSlidingWindow();
128             ResetSlidingWindow();
129             free_lists = new FreeList(mbdir+"_dbfl", DATA_BUFFER_ALIGNMENT,
130                                       NUM_DATA_BUFFER_RESERVE);
131             if(mm.IsValid())
132             {
133                 int rval = ExceptionRecovery();
134                 if(rval == MBError::SUCCESS)
135                 {
136                     header->excep_lf_offset = 0;
137                     header->excep_offset = 0;
138                     status = MBError::SUCCESS;
139                 }
140             }
141         }
142         else
143         {
144             if(mm.IsValid())
145                 status = MBError::SUCCESS;
146         }
147     }
148 }
149
150 Dict::~Dict()
151 {
152 }
153
154 // This function only needs to be called by writer.
155 int Dict::Init(uint32_t id)
156 {
157     if(!(options & CONSTS::ACCESS_MODE_WRITER))
158     {
159         Logger::Log(LOG_LEVEL_ERROR, "dict initialization not allowed for non-writer");
160         return MBError::NOT_ALLOWED;
161     }
162
163     if(status != MBError::NOT_INITIALIZED)
164     {
165         // status can be NOT_INITIALIZED or SUCCESS.
166         Logger::Log(LOG_LEVEL_WARN, "connector %u dict already initialized", id);
167         return MBError::SUCCESS;
168     }
169
170     if(header == NULL)
171     {
172         Logger::Log(LOG_LEVEL_ERROR, "connector %u header not mapped", id);
173         return MBError::ALLOCATION_ERROR;
174     }
175
176     Logger::Log(LOG_LEVEL_INFO, "connector %u initializing DictMem", id);
177     mm.InitRootNode();
178
179     if(header->data_size > CONSTS::MAX_DATA_SIZE)
180     {
181         Logger::Log(LOG_LEVEL_ERROR, "data size %d is too large", header->data_size);
182         return MBError::INVALID_SIZE;
183     }
184
185     if(mm.IsValid())
186         status = MBError::SUCCESS;
187
188     return status;
189 }
190
191 void Dict::Destroy()
192 {
193     if(options & CONSTS::ACCESS_MODE_WRITER)
194     {
195         mm.ResetSlidingWindow();
196         ResetSlidingWindow();
197     }
198
199     mm.Destroy();
200
201     if(free_lists != NULL)
202         delete free_lists;
203
204     if(kv_file != NULL)
205         delete kv_file;
206 }
207
208 int Dict::Status() const
209 {
210     return status;
211 }
212
213 // Add a key-value pair
214 // if overwrite is true and an entry with input key already exists, the old data will
215 // be overwritten. Otherwise, IN_DICT will be returned.
216 int Dict::Add(const uint8_t *key, int len, MBData &data, bool overwrite)
217 {
218     if(!(options & CONSTS::ACCESS_MODE_WRITER))
219     {
220         return MBError::NOT_ALLOWED;
221     }
222     if(len > CONSTS::MAX_KEY_LENGHTH || data.data_len > CONSTS::MAX_DATA_SIZE ||
223        len <= 0 || data.data_len <= 0)
224         return MBError::OUT_OF_BOUND;
225
226     EdgePtrs edge_ptrs;
227     size_t data_offset = 0;
228     int rval;
229
230     rval = mm.GetRootEdge_Writer(data.options & CONSTS::OPTION_RC_MODE, key[0], edge_ptrs);
231     if(rval != MBError::SUCCESS)
232         return rval;
233
234     if(edge_ptrs.len_ptr[0] == 0)
235     {
236         ReserveData(data.buff, data.data_len, data_offset);
237         // Add the first edge along this edge
238         mm.AddRootEdge(edge_ptrs, key, len, data_offset);
239         if(data.options & CONSTS::OPTION_RC_MODE)
240         {
241             header->rc_count++;
242         }
243         else
244         {
245             header->count++;
246             header->num_update++;
247         }
248
249         return MBError::SUCCESS;
250     }
251
252     bool inc_count = true;
253     int i;
254     const uint8_t *key_buff;
255     uint8_t tmp_key_buff[NUM_ALPHABET];
256     const uint8_t *p = key;
257     int edge_len = edge_ptrs.len_ptr[0];
258     if(edge_len > LOCAL_EDGE_LEN)
259     {
260         if(mm.ReadData(tmp_key_buff, edge_len-1, Get5BInteger(edge_ptrs.ptr)) != edge_len-1)
261             return MBError::READ_ERROR;
262         key_buff = tmp_key_buff;
263     }
264     else
265     {
266         key_buff = edge_ptrs.ptr;
267     }
268     if(edge_len < len)
269     {
270         for(i = 1; i < edge_len; i++)
271         {
272             if(key_buff[i-1] != key[i])
273                 break;
274         }
275         if(i >= edge_len)
276         {
277             int match_len;
278             bool next;
279             p += edge_len;
280             len -= edge_len;
281             while((next = mm.FindNext(p, len, match_len, edge_ptrs, tmp_key_buff)))
282             {
283                 if(match_len < edge_ptrs.len_ptr[0])
284                     break;
285
286                 p += match_len;
287                 len -= match_len;
288                 if(len <= 0)
289                     break;
290             }
291             if(!next)
292             {
293                 ReserveData(data.buff, data.data_len, data_offset);
294                 rval = mm.UpdateNode(edge_ptrs, p, len, data_offset);
295             }
296             else if(match_len < static_cast<int>(edge_ptrs.len_ptr[0]))
297             {
298                 if(len > match_len)
299                 {
300                     ReserveData(data.buff, data.data_len, data_offset);
301                     rval = mm.AddLink(edge_ptrs, match_len, p+match_len, len-match_len,
302                                       data_offset, data);
303                 }
304                 else if(len == match_len)
305                 {
306                     ReserveData(data.buff, data.data_len, data_offset);
307                     rval = mm.InsertNode(edge_ptrs, match_len, data_offset, data);
308                 }
309             }
310             else if(len == 0)
311             {
312                 rval = UpdateDataBuffer(edge_ptrs, overwrite, data.buff, data.data_len, inc_count);
313             }
314         }
315         else
316         {
317             ReserveData(data.buff, data.data_len, data_offset);
318             rval = mm.AddLink(edge_ptrs, i, p+i, len-i, data_offset, data);
319         }
320     }
321     else
322     {
323         for(i = 1; i < len; i++)
324         {
325             if(key_buff[i-1] != key[i])
326                 break;
327         }
328         if(i < len)
329         {
330             ReserveData(data.buff, data.data_len, data_offset);
331             rval = mm.AddLink(edge_ptrs, i, p+i, len-i, data_offset, data);
332         }
333         else
334         {
335             if(edge_ptrs.len_ptr[0] > len)
336             {
337                 ReserveData(data.buff, data.data_len, data_offset);
338                 rval = mm.InsertNode(edge_ptrs, i, data_offset, data);
339             }
340             else
341             {
342                 rval = UpdateDataBuffer(edge_ptrs, overwrite, data.buff, data.data_len, inc_count);
343             }
344         }
345     }
346
347     if(data.options & CONSTS::OPTION_RC_MODE)
348     {
349         if(rval == MBError::SUCCESS)
350             header->rc_count++;
351     }
352     else
353     {
354         if(rval == MBError::SUCCESS)
355             header->num_update++;
356         if(inc_count)
357             header->count++;
358     }
359     return rval;
360 }
361
362 int Dict::ReadDataFromEdge(MBData &data, const EdgePtrs &edge_ptrs) const
363 {
364     size_t data_off;
365     if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
366     {
367         data_off = Get6BInteger(edge_ptrs.offset_ptr);
368     }
369     else
370     {
371         uint8_t node_buff[NODE_EDGE_KEY_FIRST];
372         if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, Get6BInteger(edge_ptrs.offset_ptr))
373                       != NODE_EDGE_KEY_FIRST)
374             return MBError::READ_ERROR;
375         if(!(node_buff[0] & FLAG_NODE_MATCH))
376             return MBError::NOT_EXIST;
377         data_off = Get6BInteger(node_buff+2);
378     }
379     data.data_offset = data_off;
380
381     uint16_t data_len[2];
382     // Read data length first
383     if(ReadData(reinterpret_cast<uint8_t*>(&data_len[0]), DATA_HDR_BYTE, data_off)
384                != DATA_HDR_BYTE)
385         return MBError::READ_ERROR;
386     data_off += DATA_HDR_BYTE;
387     if(data.buff_len < data_len[0] + 1)
388     {
389         if(data.Resize(data_len[0]) != MBError::SUCCESS)
390             return MBError::NO_MEMORY;
391     }
392     if(ReadData(data.buff, data_len[0], data_off) != data_len[0])
393         return MBError::READ_ERROR;
394
395     data.data_len = data_len[0];
396     data.bucket_index = data_len[1];
397     return MBError::SUCCESS;
398 }
399
400 // Delete operations:
401 //   If this is a leaf node, need to remove the edge. Otherwise, unset the match flag.
402 //   Also need to set the delete flag in the data block so that it can be reclaimed later.
403 int Dict::DeleteDataFromEdge(MBData &data, EdgePtrs &edge_ptrs)
404 {
405     int rval = MBError::SUCCESS;
406     size_t data_off;
407     uint16_t data_len;
408     int rel_size;
409
410     // Check if this is a leaf node first by using the EDGE_FLAG_DATA_OFF bit
411     if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
412     {
413         data_off = Get6BInteger(edge_ptrs.offset_ptr);
414         if(ReadData(reinterpret_cast<uint8_t*>(&data_len), DATA_SIZE_BYTE, data_off)
415                    != DATA_SIZE_BYTE)
416             return MBError::READ_ERROR;
417
418         rel_size = free_lists->GetAlignmentSize(data_len + DATA_HDR_BYTE);
419         header->pending_data_buff_size += rel_size;
420         free_lists->ReleaseBuffer(data_off, rel_size);
421
422         rval = mm.RemoveEdgeByIndex(edge_ptrs, data);
423     }
424     else
425     {
426         // No exception handling in this case
427         header->excep_lf_offset = 0;
428         header->excep_offset = 0;
429
430         uint8_t node_buff[NODE_EDGE_KEY_FIRST];
431         size_t node_off = Get6BInteger(edge_ptrs.offset_ptr);
432
433         // Read node header
434         if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, node_off) != NODE_EDGE_KEY_FIRST)
435             return MBError::READ_ERROR;
436
437         if(node_buff[0] & FLAG_NODE_MATCH)
438         {
439             // Unset the match flag
440             node_buff[0] &= ~FLAG_NODE_MATCH;
441             mm.WriteData(&node_buff[0], 1, node_off);
442
443             // Release data buffer
444             data_off = Get6BInteger(node_buff+2);
445             if(ReadData(reinterpret_cast<uint8_t*>(&data_len), DATA_SIZE_BYTE, data_off)
446                        != DATA_SIZE_BYTE)
447                 return MBError::READ_ERROR;
448
449             rel_size = free_lists->GetAlignmentSize(data_len + DATA_HDR_BYTE);
450             header->pending_data_buff_size += rel_size;
451             free_lists->ReleaseBuffer(data_off, rel_size);
452         }
453         else
454         {
455             rval = MBError::NOT_EXIST;
456         }
457     }
458
459     return rval;
460 }
461
462 int Dict::ReadDataFromNode(MBData &data, const uint8_t *node_ptr) const
463 {
464     size_t data_off = Get6BInteger(node_ptr+2);
465     if(data_off == 0)
466         return MBError::NOT_EXIST;
467
468     data.data_offset = data_off;
469
470     // Read data length first
471     uint16_t data_len[2];
472     if(ReadData(reinterpret_cast<uint8_t *>(&data_len[0]), DATA_HDR_BYTE, data_off)
473                != DATA_HDR_BYTE)
474         return MBError::READ_ERROR;
475     data_off += DATA_HDR_BYTE;
476
477     if(data.buff_len < data_len[0] + 1)
478     {
479         if(data.Resize(data_len[0]) != MBError::SUCCESS)
480             return MBError::NO_MEMORY;
481     }
482     if(ReadData(data.buff, data_len[0], data_off) != data_len[0])
483         return MBError::READ_ERROR;
484
485     data.data_len = data_len[0];
486     data.bucket_index = data_len[1];
487     return MBError::SUCCESS;
488 }
489
490 int Dict::FindPrefix(const uint8_t *key, int len, MBData &data)
491 {
492     int rval;
493     MBData data_rc;
494     size_t rc_root_offset = header->rc_root_offset.load(MEMORY_ORDER_READER);
495     if(rc_root_offset != 0)
496     {
497         reader_rc_off = rc_root_offset;
498         rval = FindPrefix_Internal(rc_root_offset, key, len, data_rc);
499 #ifdef __LOCK_FREE__
500         while(rval == MBError::TRY_AGAIN)
501         {
502             nanosleep((const struct timespec[]){{0, 10L}}, NULL);
503             data_rc.Clear();
504             rval = FindPrefix_Internal(rc_root_offset, key, len, data_rc);
505         }
506 #endif
507         if(rval != MBError::NOT_EXIST && rval != MBError::SUCCESS)
508             return rval;
509         data.options &= ~(CONSTS::OPTION_RC_MODE | CONSTS::OPTION_READ_SAVED_EDGE);
510     }
511     else
512     {
513         if(reader_rc_off != 0)
514         {
515             reader_rc_off = 0;
516             RemoveUnused(0);
517             mm.RemoveUnused(0);
518         }
519     }
520
521     rval = FindPrefix_Internal(0, key, len, data);
522 #ifdef __LOCK_FREE__
523     while(rval == MBError::TRY_AGAIN)
524     {
525         nanosleep((const struct timespec[]){{0, 10L}}, NULL);
526         data.Clear();
527         rval = FindPrefix_Internal(0, key, len, data);
528     }
529 #endif
530
531     // The longer match wins.
532     if(data_rc.match_len > data.match_len)
533     {
534         data_rc.TransferValueTo(data.buff, data.data_len);
535         rval = MBError::SUCCESS;
536     }
537     return rval;
538
539 }
540
541 int Dict::FindPrefix_Internal(size_t root_off, const uint8_t *key, int len, MBData &data)
542 {
543     int rval;
544     data.next = false;
545     EdgePtrs &edge_ptrs = data.edge_ptrs;
546 #ifdef __LOCK_FREE__
547     READER_LOCK_FREE_START
548 #endif
549
550     if(data.match_len == 0)
551     {
552         rval = mm.GetRootEdge(data.options & CONSTS::OPTION_RC_MODE, key[0], edge_ptrs);
553         if(rval != MBError::SUCCESS)
554             return MBError::READ_ERROR;
555
556         if(edge_ptrs.len_ptr[0] == 0)
557         {
558 #ifdef __LOCK_FREE__
559             READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
560 #endif
561             return MBError::NOT_EXIST;
562         }
563     }
564
565     // Compare edge string
566     const uint8_t *key_buff;
567     uint8_t *node_buff = data.node_buff;
568     const uint8_t *p = key;
569     int edge_len = edge_ptrs.len_ptr[0];
570     int edge_len_m1 = edge_len - 1;
571     if(edge_len > LOCAL_EDGE_LEN)
572     {
573         if(mm.ReadData(node_buff, edge_len_m1, Get5BInteger(edge_ptrs.ptr))
574                       != edge_len_m1)
575         {
576 #ifdef __LOCK_FREE__
577             READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
578 #endif
579             return MBError::READ_ERROR;
580         }
581         key_buff = node_buff;
582     }
583     else
584     {
585         key_buff = edge_ptrs.ptr;
586     }
587
588     rval = MBError::NOT_EXIST;
589     if(edge_len < len)
590     {
591         if(edge_len > 1 && memcmp(key_buff, key+1, edge_len_m1) != 0)
592         {
593 #ifdef __LOCK_FREE__
594             READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
595 #endif
596             return MBError::NOT_EXIST;
597         }
598
599         len -= edge_len;
600         p += edge_len;
601
602         if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
603         {
604             // prefix match for leaf node
605 #ifdef __LOCK_FREE__
606             READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
607 #endif
608             data.match_len = p - key;
609             return ReadDataFromEdge(data, edge_ptrs);
610         }
611
612         uint8_t last_node_buffer[NODE_EDGE_KEY_FIRST];
613 #ifdef __LOCK_FREE__
614         size_t edge_offset_prev = edge_ptrs.offset;
615 #endif
616         int last_prefix_rval = MBError::NOT_EXIST;
617         while(true)
618         {
619             rval = mm.NextEdge(p, edge_ptrs, node_buff, data);
620             if(rval != MBError::READ_ERROR)
621             {
622                 if(node_buff[0] & FLAG_NODE_MATCH)
623                 {
624                     data.match_len = p - key;
625                     if(data.options & CONSTS::OPTION_ALL_PREFIX)
626                     {
627                         rval = ReadDataFromNode(data, node_buff);
628                         data.next = true;
629                         rval = last_prefix_rval;
630                         break;
631                     }
632                     else
633                     {
634                         memcpy(last_node_buffer, node_buff, NODE_EDGE_KEY_FIRST);
635                         last_prefix_rval = MBError::SUCCESS;
636                     }
637                 }
638             }
639
640             if(rval != MBError::SUCCESS)
641                 break;
642
643 #ifdef __LOCK_FREE__
644             READER_LOCK_FREE_STOP(edge_offset_prev, data)
645 #endif
646             edge_len = edge_ptrs.len_ptr[0];
647             edge_len_m1 = edge_len - 1;
648             // match edge string
649             if(edge_len > LOCAL_EDGE_LEN)
650             {
651                 if(mm.ReadData(node_buff, edge_len_m1, Get5BInteger(edge_ptrs.ptr))
652                               != edge_len_m1)
653                 {
654                     rval = MBError::READ_ERROR;
655                     break;
656                 }
657                 key_buff = node_buff;
658             }
659             else
660             {
661                 key_buff = edge_ptrs.ptr;
662             }
663
664             if((edge_len > 1 && memcmp(key_buff, p+1, edge_len_m1) != 0) || edge_len == 0)
665             {
666                 rval = MBError::NOT_EXIST;
667                 break;
668             }
669
670             len -= edge_len;
671             p += edge_len;
672             if(len <= 0 || (edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF))
673             {
674                 data.match_len = p - key;
675                 rval = ReadDataFromEdge(data, edge_ptrs);
676                 break;
677             }
678 #ifdef __LOCK_FREE__
679             edge_offset_prev = edge_ptrs.offset;
680 #endif
681         }
682
683         if(rval == MBError::NOT_EXIST && last_prefix_rval != rval)
684             rval = ReadDataFromNode(data, last_node_buffer);
685     }
686     else if(edge_len == len)
687     {
688         if(edge_len_m1 == 0 || memcmp(key_buff, key+1, edge_len_m1) == 0)
689         {
690             data.match_len = len;
691             rval = ReadDataFromEdge(data, edge_ptrs);
692         }
693     }
694
695 #ifdef __LOCK_FREE__
696     READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
697 #endif
698     return rval;
699 }
700
701 int Dict::Find(const uint8_t *key, int len, MBData &data)
702 {
703     int rval;
704     size_t rc_root_offset = header->rc_root_offset.load(MEMORY_ORDER_READER);
705
706     if(rc_root_offset != 0)
707     {
708         reader_rc_off = rc_root_offset;
709         rval = Find_Internal(rc_root_offset, key, len, data);
710 #ifdef __LOCK_FREE__
711         while(rval == MBError::TRY_AGAIN)
712         {
713             nanosleep((const struct timespec[]){{0, 10L}}, NULL);
714             rval = Find_Internal(rc_root_offset, key, len, data);
715         }
716 #endif
717         if(rval == MBError::SUCCESS)
718         {
719             data.match_len = len;
720             return rval;
721         }
722         else if(rval != MBError::NOT_EXIST)
723             return rval;
724         data.options &= ~(CONSTS::OPTION_RC_MODE | CONSTS::OPTION_READ_SAVED_EDGE);
725     }
726     else
727     {
728         if(reader_rc_off != 0)
729         {
730             reader_rc_off = 0;
731             RemoveUnused(0);
732             mm.RemoveUnused(0);
733         }
734     }
735
736     rval = Find_Internal(0, key, len, data);
737 #ifdef __LOCK_FREE__
738     while(rval == MBError::TRY_AGAIN)
739     {
740         nanosleep((const struct timespec[]){{0, 10L}}, NULL);
741         rval = Find_Internal(0, key, len, data);
742     }
743 #endif
744     if(rval == MBError::SUCCESS)
745         data.match_len = len;
746
747     return rval;
748 }
749
750 int Dict::Find_Internal(size_t root_off, const uint8_t *key, int len, MBData &data)
751 {
752     EdgePtrs &edge_ptrs = data.edge_ptrs;
753 #ifdef __LOCK_FREE__
754     READER_LOCK_FREE_START
755 #endif
756     int rval;
757     rval = mm.GetRootEdge(root_off, key[0], edge_ptrs);
758
759     if(rval != MBError::SUCCESS)
760         return MBError::READ_ERROR;
761     if(edge_ptrs.len_ptr[0] == 0)
762     {
763 #ifdef __LOCK_FREE__
764         READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
765 #endif
766         return MBError::NOT_EXIST;
767     }
768
769     // Compare edge string
770     const uint8_t *key_buff;
771     uint8_t *node_buff = data.node_buff;
772     const uint8_t *p = key;
773     int edge_len = edge_ptrs.len_ptr[0];
774     int edge_len_m1 = edge_len - 1;
775
776     rval = MBError::NOT_EXIST;
777     if(edge_len > LOCAL_EDGE_LEN)
778     {
779         size_t edge_str_off_lf = Get5BInteger(edge_ptrs.ptr);
780         if(mm.ReadData(node_buff, edge_len_m1, edge_str_off_lf) != edge_len_m1)
781         {
782 #ifdef __LOCK_FREE__
783             READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
784 #endif
785             return MBError::READ_ERROR;
786         }
787         key_buff = node_buff;
788     }
789     else
790     {
791         key_buff = edge_ptrs.ptr;
792     }
793
794     if(edge_len < len)
795     {
796         if((edge_len > 1 && memcmp(key_buff, key+1, edge_len_m1) != 0) ||
797            (edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF))
798         {
799 #ifdef __LOCK_FREE__
800             READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
801 #endif
802             return MBError::NOT_EXIST;
803         }
804
805         len -= edge_len;
806         p += edge_len;
807
808 #ifdef __LOCK_FREE__
809         size_t edge_offset_prev = edge_ptrs.offset;
810 #endif
811         while(true)
812         {
813             rval = mm.NextEdge(p, edge_ptrs, node_buff, data);
814             if(rval != MBError::SUCCESS)
815                 break;
816
817 #ifdef __LOCK_FREE__
818             READER_LOCK_FREE_STOP(edge_offset_prev, data)
819 #endif
820             edge_len = edge_ptrs.len_ptr[0];
821             edge_len_m1 = edge_len - 1;
822             // match edge string
823             if(edge_len > LOCAL_EDGE_LEN)
824             {
825                 size_t edge_str_off_lf = Get5BInteger(edge_ptrs.ptr);
826                 if(mm.ReadData(node_buff, edge_len_m1, edge_str_off_lf) != edge_len_m1)
827                 {
828                     rval = MBError::READ_ERROR;
829                     break;
830                 }
831                 key_buff = node_buff;
832             }
833             else
834             {
835                 key_buff = edge_ptrs.ptr;
836             }
837
838             if((edge_len_m1 > 0 && memcmp(key_buff, p+1, edge_len_m1) != 0) || edge_len_m1 < 0)
839             {
840                 rval = MBError::NOT_EXIST;
841                 break;
842             }
843
844             len -= edge_len;
845             if(len <= 0)
846             {
847                 // If this is for remove operation, return IN_DICT to caller.
848                 if(data.options & CONSTS::OPTION_FIND_AND_STORE_PARENT)
849                     rval = MBError::IN_DICT;
850                 else
851                     rval = ReadDataFromEdge(data, edge_ptrs);
852                 break;
853             }
854             else
855             {
856                 if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
857                 {
858                     // Reach a leaf node and no match found
859                     rval = MBError::NOT_EXIST;
860                     break;
861                 }
862             }
863             p += edge_len;
864 #ifdef __LOCK_FREE__
865             edge_offset_prev = edge_ptrs.offset;
866 #endif
867         }
868     }
869     else if(edge_len == len)
870     {
871         if(len > 1 && memcmp(key_buff, key+1, len-1) != 0)
872         {
873             rval = MBError::NOT_EXIST;
874         }
875         else
876         {
877             // If this is for remove operation, return IN_DICT to caller.
878             if(data.options & CONSTS::OPTION_FIND_AND_STORE_PARENT)
879             {
880                 data.edge_ptrs.curr_node_offset = mm.GetRootOffset();
881                 data.edge_ptrs.curr_nt = 1;
882                 data.edge_ptrs.curr_edge_index = 0;
883                 data.edge_ptrs.parent_offset = data.edge_ptrs.offset;
884                 rval = MBError::IN_DICT;
885             }
886             else
887             {
888                 rval = ReadDataFromEdge(data, edge_ptrs);
889             }
890         }
891     }
892
893 #ifdef __LOCK_FREE__
894     READER_LOCK_FREE_STOP(edge_ptrs.offset, data)
895 #endif
896     return rval;
897 }
898
899 void Dict::PrintStats(std::ostream *out_stream) const
900 {
901     if(out_stream != NULL)
902         return PrintStats(*out_stream);
903     return PrintStats(std::cout);
904 }
905
906 void Dict::PrintStats(std::ostream &out_stream) const
907 {
908     if(status != MBError::SUCCESS)
909         return;
910
911     out_stream << "DB stats:\n";
912     out_stream << "\tNumber of DB writer: " << header->num_writer << std::endl;
913     out_stream << "\tNumber of DB reader: " << header->num_reader << std::endl;
914     out_stream << "\tEntry count in DB: "  << header->count << std::endl;
915     out_stream << "\tEntry count per bucket: "  << header->entry_per_bucket << std::endl;
916     out_stream << "\tEviction bucket index: "  << header->eviction_bucket_index << std::endl;
917     out_stream << "\tData block size: " << header->data_block_size << std::endl;
918     out_stream << "\tData size: " << header->m_data_offset << std::endl;
919     out_stream << "\tPending Buffer Size: " << header->pending_data_buff_size << std::endl;
920     if(free_lists)
921         out_stream << "\tTrackable Buffer Size: " << free_lists->GetTotSize() << std::endl;
922     mm.PrintStats(out_stream);
923
924     kv_file->PrintStats(out_stream);
925 }
926
927 int64_t Dict::Count() const
928 {
929     if(header == NULL)
930     {
931         Logger::Log(LOG_LEVEL_WARN, "db was not initialized successfully: %s",
932                     MBError::get_error_str(status));
933         return 0;
934     }
935
936     return header->count;
937 }
938
939 // For DB iterator
940 int Dict::ReadNextEdge(const uint8_t *node_buff, EdgePtrs &edge_ptrs,
941                        int &match, MBData &data, std::string &match_str,
942                        size_t &node_off, bool rd_kv) const
943 {
944     if(edge_ptrs.curr_nt > static_cast<int>(node_buff[1]))
945         return MBError::OUT_OF_BOUND;
946
947     if(mm.ReadData(edge_ptrs.edge_buff, EDGE_SIZE, edge_ptrs.offset) != EDGE_SIZE)
948         return MBError::READ_ERROR;
949
950     node_off = 0;
951     match_str = "";
952
953     int rval = MBError::SUCCESS;
954     InitTempEdgePtrs(edge_ptrs);
955     if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
956     {
957         // match of leaf node
958         match = MATCH_EDGE;
959         if(rd_kv)
960         {
961             rval = ReadDataFromEdge(data, edge_ptrs);
962             if(rval != MBError::SUCCESS)
963                 return rval;
964         }
965     }
966     else
967     {
968         match = MATCH_NONE;
969         if(edge_ptrs.len_ptr[0] > 0)
970         {
971             node_off = Get6BInteger(edge_ptrs.offset_ptr);
972             if(rd_kv)
973                 rval = ReadNodeMatch(node_off, match, data);
974         }
975     }
976
977     if(edge_ptrs.len_ptr[0] > 0 && rd_kv)
978     {
979         int edge_len_m1 = edge_ptrs.len_ptr[0] - 1;
980         match_str = std::string(1, (const char)node_buff[NODE_EDGE_KEY_FIRST+edge_ptrs.curr_nt]);
981         if(edge_len_m1 > LOCAL_EDGE_LEN_M1)
982         {
983             if(mm.ReadData(data.node_buff, edge_len_m1, Get5BInteger(edge_ptrs.ptr)) != edge_len_m1)
984                 return MBError::READ_ERROR;
985             match_str += std::string(reinterpret_cast<char*>(data.node_buff), edge_len_m1);
986         }
987         else if(edge_len_m1 > 0)
988         {
989             match_str += std::string(reinterpret_cast<char*>(edge_ptrs.ptr), edge_len_m1);
990         }
991     }
992
993     edge_ptrs.curr_nt++;
994     edge_ptrs.offset += EDGE_SIZE;
995     return rval;
996 }
997
998 // For DB iterator
999 int Dict::ReadNode(size_t node_off, uint8_t *node_buff, EdgePtrs &edge_ptrs,
1000                     int &match, MBData &data, bool rd_kv) const
1001 {
1002     if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, node_off) != NODE_EDGE_KEY_FIRST)
1003         return MBError::READ_ERROR;
1004
1005     edge_ptrs.curr_nt = 0;
1006     int nt = node_buff[1] + 1;
1007     node_off += NODE_EDGE_KEY_FIRST;
1008     if(mm.ReadData(node_buff + NODE_EDGE_KEY_FIRST, nt, node_off) != nt)
1009         return MBError::READ_ERROR;
1010
1011     int rval = MBError::SUCCESS;
1012     edge_ptrs.offset = node_off + nt;
1013     if(node_buff[0] & FLAG_NODE_MATCH)
1014     {
1015         // match of non-leaf node
1016         match = MATCH_NODE;
1017         if(rd_kv)
1018             rval = ReadDataFromNode(data, node_buff);
1019     }
1020     else
1021     {
1022         // no match at the non-leaf node
1023         match = MATCH_NONE;
1024     }
1025
1026     return rval;
1027 }
1028
1029 void Dict::ReadNodeHeader(size_t node_off, int &node_size, int &match,
1030                           size_t &data_offset, size_t &data_link_offset)
1031 {
1032     uint8_t node_buff[NODE_EDGE_KEY_FIRST];
1033     if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, node_off) != NODE_EDGE_KEY_FIRST)
1034         throw (int) MBError::READ_ERROR;
1035
1036     node_size = mm.GetNodeSizePtr()[ node_buff[1] ];
1037     if(node_buff[0] & FLAG_NODE_MATCH)
1038     {
1039         match = MATCH_NODE;
1040         data_offset = Get6BInteger(node_buff + 2);
1041         data_link_offset = node_off + 2;
1042     }
1043 }
1044
1045 int Dict::ReadNodeMatch(size_t node_off, int &match, MBData &data) const
1046 {
1047     uint8_t node_buff[NODE_EDGE_KEY_FIRST];
1048     if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, node_off) != NODE_EDGE_KEY_FIRST)
1049         return MBError::READ_ERROR;
1050
1051     int rval = MBError::SUCCESS;
1052     if(node_buff[0] & FLAG_NODE_MATCH)
1053     {
1054         match = MATCH_NODE;
1055         rval = ReadDataFromNode(data, node_buff);
1056         if(rval != MBError::SUCCESS)
1057             return rval;
1058     }
1059
1060     return MBError::SUCCESS;
1061 }
1062
1063 size_t Dict::GetRootOffset() const
1064 {
1065     return mm.GetRootOffset();
1066 }
1067
1068 // For DB iterator
1069 int Dict::ReadRootNode(uint8_t *node_buff, EdgePtrs &edge_ptrs, int &match,
1070                        MBData &data) const
1071 {
1072     size_t root_off;
1073     if(data.options & CONSTS::OPTION_RC_MODE)
1074         root_off = header->rc_root_offset;
1075     else
1076         root_off = mm.GetRootOffset();
1077     return ReadNode(root_off, node_buff, edge_ptrs, match, data);
1078 }
1079
1080 int Dict::Remove(const uint8_t *key, int len)
1081 {
1082     MBData data(0, CONSTS::OPTION_FIND_AND_STORE_PARENT);
1083     return Remove(key, len, data);
1084 }
1085
1086 int Dict::Remove(const uint8_t *key, int len, MBData &data)
1087 {
1088     if(!(options & CONSTS::ACCESS_MODE_WRITER))
1089     {
1090         return MBError::NOT_ALLOWED;
1091     }
1092     if(data.options & CONSTS::OPTION_RC_MODE)
1093     {
1094         // FIXME Remove in RC mode not implemented yet!!!
1095         return MBError::INVALID_ARG;
1096     }
1097
1098     // The DELETE flag must be set
1099     if(!(data.options & CONSTS::OPTION_FIND_AND_STORE_PARENT))
1100         return MBError::INVALID_ARG;
1101
1102     int rval;
1103     rval = Find(key, len, data);
1104     if(rval == MBError::IN_DICT)
1105     {
1106         rval = DeleteDataFromEdge(data, data.edge_ptrs);
1107         while(rval == MBError::TRY_AGAIN)
1108         {
1109             data.Clear();
1110             len -= data.edge_ptrs.len_ptr[0];
1111 #ifdef __DEBUG__
1112             assert(len > 0);
1113 #endif
1114             rval = Find(key, len, data);
1115             if(MBError::IN_DICT == rval)
1116             {
1117                 rval = mm.RemoveEdgeByIndex(data.edge_ptrs, data);
1118             }
1119         }
1120     }
1121
1122     if(rval == MBError::SUCCESS)
1123     {
1124         header->count--;
1125         if(header->count == 0)
1126         {
1127             RemoveAll();
1128         }
1129     }
1130
1131     return rval;
1132 }
1133
1134 int Dict::RemoveAll()
1135 {
1136     int rval = MBError::SUCCESS;;
1137     for(int c = 0; c < NUM_ALPHABET; c++)
1138     {
1139         rval = mm.ClearRootEdge(c);
1140         if(rval != MBError::SUCCESS)
1141             break;
1142     }
1143
1144     mm.ClearMem();
1145     mm.ResetSlidingWindow();
1146
1147     header->count = 0;
1148     header->m_data_offset = GetStartDataOffset();
1149     free_lists->Empty();
1150     header->pending_data_buff_size = 0;
1151     ResetSlidingWindow();
1152
1153     header->eviction_bucket_index = 0;
1154     header->num_update = 0;
1155     return rval;
1156 }
1157
1158 pthread_rwlock_t* Dict::GetShmLockPtrs() const
1159 {
1160     return &header->mb_rw_lock;
1161 }
1162
1163 int Dict::InitShmObjects()
1164 {
1165     if(status != MBError::SUCCESS)
1166         return MBError::NOT_INITIALIZED;
1167
1168     Logger::Log(LOG_LEVEL_INFO, "initializing shared memory objects");
1169
1170 #ifdef __SHM_LOCK__
1171     status = InitShmRWLock(&header->mb_rw_lock);
1172     if(status != MBError::SUCCESS)
1173         return status;
1174 #endif
1175
1176 #ifdef __SHM_QUEUE__
1177     for(int i = 0; i < header->async_queue_size; i++)
1178     {
1179         status = InitShmMutex(&queue[i].mutex);
1180         if(status  != MBError::SUCCESS)
1181             break;
1182         status = InitShmCond(&queue[i].cond);
1183         if(status  != MBError::SUCCESS)
1184             break;
1185     }
1186 #endif
1187
1188     return status;
1189 }
1190
1191 // Reserve buffer and write to it
1192 void Dict::ReserveData(const uint8_t* buff, int size, size_t &offset)
1193 {
1194 #ifdef __DEBUG__
1195     assert(size <= CONSTS::MAX_DATA_SIZE);
1196 #endif
1197
1198     int buf_size  = free_lists->GetAlignmentSize(size + DATA_HDR_BYTE);
1199     int buf_index = free_lists->GetBufferIndex(buf_size);
1200     uint16_t dsize[2];
1201     dsize[0] = static_cast<uint16_t>(size);
1202     // store bucket index for LRU eviction
1203     dsize[1] = (header->num_update / header->entry_per_bucket) % 0xFFFF;
1204     if(dsize[1] == header->eviction_bucket_index &&
1205        header->num_update > header->entry_per_bucket)
1206     {
1207         header->eviction_bucket_index++;
1208     }
1209
1210     if(free_lists->GetBufferCountByIndex(buf_index) > 0)
1211     {
1212         offset = free_lists->RemoveBufferByIndex(buf_index);
1213         WriteData(reinterpret_cast<const uint8_t*>(&dsize[0]), DATA_HDR_BYTE, offset);
1214         WriteData(buff, size, offset+DATA_HDR_BYTE);
1215         header->pending_data_buff_size -= buf_size;
1216     }
1217     else
1218     {
1219         size_t old_off = header->m_data_offset;
1220         uint8_t *ptr;
1221
1222         int rval = kv_file->Reserve(header->m_data_offset, buf_size, ptr);
1223         if(rval != MBError::SUCCESS)
1224             throw rval;
1225
1226         //Checking missing buffer due to alignment
1227         if(old_off < header->m_data_offset)
1228         {
1229             free_lists->ReleaseAlignmentBuffer(old_off, header->m_data_offset);
1230             header->pending_data_buff_size += header->m_data_offset - old_off;
1231         }
1232
1233         offset = header->m_data_offset;
1234         header->m_data_offset += buf_size;
1235         if(ptr != NULL)
1236         {
1237             memcpy(ptr, &dsize[0], DATA_HDR_BYTE);
1238             memcpy(ptr+DATA_HDR_BYTE, buff, size);
1239         }
1240         else
1241         {
1242             WriteData(reinterpret_cast<const uint8_t*>(&dsize[0]), DATA_HDR_BYTE, offset);
1243             WriteData(buff, size, offset+DATA_HDR_BYTE);
1244         }
1245     }
1246 }
1247
1248 int Dict::ReleaseBuffer(size_t offset)
1249 {
1250     uint16_t data_size;
1251
1252     if(ReadData(reinterpret_cast<uint8_t*>(&data_size), DATA_SIZE_BYTE, offset)
1253                != DATA_SIZE_BYTE)
1254         return MBError::READ_ERROR;
1255
1256     int rel_size = free_lists->GetAlignmentSize(data_size + DATA_HDR_BYTE);
1257     header->pending_data_buff_size += rel_size;
1258     return free_lists->ReleaseBuffer(offset, rel_size);
1259 }
1260
1261 int Dict::UpdateDataBuffer(EdgePtrs &edge_ptrs, bool overwrite, const uint8_t *buff,
1262                            int len, bool &inc_count)
1263 {
1264     size_t data_off;
1265
1266     if(edge_ptrs.flag_ptr[0] & EDGE_FLAG_DATA_OFF)
1267     {
1268         inc_count = false;
1269         // leaf node
1270         if(!overwrite)
1271             return MBError::IN_DICT;
1272
1273         data_off = Get6BInteger(edge_ptrs.offset_ptr);
1274         if(ReleaseBuffer(data_off) != MBError::SUCCESS)
1275             Logger::Log(LOG_LEVEL_WARN, "failed to release data buffer: %llu", data_off);
1276         ReserveData(buff, len, data_off);
1277         Write6BInteger(edge_ptrs.offset_ptr, data_off);
1278
1279         header->excep_lf_offset = edge_ptrs.offset;
1280         memcpy(header->excep_buff, edge_ptrs.offset_ptr, OFFSET_SIZE);
1281 #ifdef __LOCK_FREE__
1282         lfree.WriterLockFreeStart(edge_ptrs.offset);
1283 #endif
1284         header->excep_updating_status = EXCEP_STATUS_ADD_DATA_OFF;
1285         mm.WriteData(edge_ptrs.offset_ptr, OFFSET_SIZE, edge_ptrs.offset+EDGE_NODE_LEADING_POS);
1286 #ifdef __LOCK_FREE__
1287         lfree.WriterLockFreeStop();
1288 #endif
1289         header->excep_updating_status = EXCEP_STATUS_NONE;
1290     }
1291     else
1292     {
1293         uint8_t *node_buff = header->excep_buff;
1294         size_t node_off = Get6BInteger(edge_ptrs.offset_ptr);
1295
1296         if(mm.ReadData(node_buff, NODE_EDGE_KEY_FIRST, node_off) != NODE_EDGE_KEY_FIRST)
1297             return MBError::READ_ERROR;
1298
1299         if(node_buff[0] & FLAG_NODE_MATCH)
1300         {
1301             inc_count = false;
1302             if(!overwrite)
1303                 return MBError::IN_DICT;
1304
1305             data_off = Get6BInteger(node_buff+2);
1306             if(ReleaseBuffer(data_off) != MBError::SUCCESS)
1307                 Logger::Log(LOG_LEVEL_WARN, "failed to release data buffer %llu", data_off);
1308
1309             node_buff[NODE_EDGE_KEY_FIRST] = 0;
1310         }
1311         else
1312         {
1313             // set the match flag
1314             node_buff[0] |= FLAG_NODE_MATCH;
1315
1316             node_buff[NODE_EDGE_KEY_FIRST] = 1;
1317         }
1318
1319         ReserveData(buff, len, data_off);
1320         Write6BInteger(node_buff+2, data_off);
1321
1322         header->excep_offset = node_off;
1323 #ifdef __LOCK_FREE__
1324         header->excep_lf_offset = edge_ptrs.offset;
1325         lfree.WriterLockFreeStart(edge_ptrs.offset);
1326 #endif
1327         header->excep_updating_status = EXCEP_STATUS_ADD_NODE;
1328         mm.WriteData(node_buff, NODE_EDGE_KEY_FIRST, node_off);
1329 #ifdef __LOCK_FREE__
1330         lfree.WriterLockFreeStop();
1331 #endif
1332         header->excep_updating_status = EXCEP_STATUS_NONE;
1333     }
1334
1335     return MBError::SUCCESS;
1336 }
1337
1338 // delta should be either +1 or -1.
1339 void Dict::UpdateNumReader(int delta) const
1340 {
1341     header->num_reader += delta;
1342     if(header->num_reader < 0)
1343         header->num_reader = 0;
1344
1345     Logger::Log(LOG_LEVEL_INFO, "number of reader is set to: %d",
1346                 header->num_reader);
1347 }
1348
1349 // delta should be either +1 or -1.
1350 int Dict::UpdateNumWriter(int delta) const
1351 {
1352     if(delta > 0)
1353     {
1354         // Only one writer allowed
1355         if(header->num_writer > 0)
1356         {
1357             Logger::Log(LOG_LEVEL_WARN, "writer was not shutdown cleanly previously");
1358             header->num_writer = 1;
1359             // Reset number of reader too.
1360             header->num_reader = 0;
1361             return MBError::WRITER_EXIST;
1362         }
1363
1364         header->num_writer = 1;
1365     }
1366     else if(delta < 0)
1367     {
1368         header->num_writer = 0;
1369         header->lock_free.offset = MAX_6B_OFFSET;
1370     }
1371
1372     Logger::Log(LOG_LEVEL_INFO, "number of writer is set to: %d", header->num_writer);
1373     return MBError::SUCCESS;
1374 }
1375
1376 DictMem* Dict::GetMM() const
1377 {
1378     return (DictMem*) &mm;
1379 }
1380
1381 size_t Dict::GetStartDataOffset() const
1382 {
1383     return DATA_HEADER_SIZE;
1384 }
1385
1386 void Dict::ResetSlidingWindow() const
1387 {
1388     kv_file->ResetSlidingWindow();
1389     if(header != NULL)
1390         header->shm_data_sliding_start.store(0, std::memory_order_relaxed);
1391 }
1392
1393 LockFree* Dict::GetLockFreePtr()
1394 {
1395     return &lfree;
1396 }
1397
1398 void Dict::Flush() const
1399 {
1400     if(!(options & CONSTS::ACCESS_MODE_WRITER))
1401         return;
1402
1403     if(kv_file != NULL)
1404         kv_file->Flush();
1405     mm.Flush();
1406 }
1407
1408 // Recovery from abnormal writer terminations (segfault, kill -9 etc)
1409 // during DB updates (insertion, replacing and deletion).
1410 int Dict::ExceptionRecovery()
1411 {
1412     if(header == NULL)
1413         return MBError::NOT_INITIALIZED;
1414
1415     int rval = MBError::SUCCESS;
1416     if(header->excep_updating_status == EXCEP_STATUS_NONE)
1417     {
1418         Logger::Log(LOG_LEVEL_INFO, "writer was shutdown successfully previously");
1419         return rval;
1420     }
1421
1422     Logger::Log(LOG_LEVEL_INFO, "writer was not shutdown gracefully with exception status %d",
1423                 header->excep_updating_status);
1424     // Dumper header before running recover
1425     std::ofstream *logstream = Logger::GetLogStream();
1426     if(logstream != NULL)
1427     {
1428         PrintHeader(*logstream);
1429     }
1430     else
1431     {
1432         PrintHeader(std::cout);
1433     }
1434
1435     switch(header->excep_updating_status)
1436     {
1437         case EXCEP_STATUS_ADD_EDGE:
1438 #ifdef __LOCK_FREE__
1439             lfree.WriterLockFreeStart(header->excep_lf_offset);
1440 #endif
1441             mm.WriteData(header->excep_buff, EDGE_SIZE, header->excep_lf_offset);
1442             header->count++;
1443             break;
1444         case EXCEP_STATUS_ADD_DATA_OFF:
1445 #ifdef __LOCK_FREE__
1446             lfree.WriterLockFreeStart(header->excep_lf_offset);
1447 #endif
1448             mm.WriteData(header->excep_buff, OFFSET_SIZE,
1449                          header->excep_lf_offset+EDGE_NODE_LEADING_POS);
1450             break;
1451         case EXCEP_STATUS_ADD_NODE:
1452 #ifdef __LOCK_FREE__
1453             lfree.WriterLockFreeStart(header->excep_lf_offset);
1454 #endif
1455             mm.WriteData(header->excep_buff, NODE_EDGE_KEY_FIRST,
1456                          header->excep_offset);
1457             if(header->excep_buff[NODE_EDGE_KEY_FIRST]) header->count++;
1458             break;
1459         case EXCEP_STATUS_REMOVE_EDGE:
1460 #ifdef __LOCK_FREE__
1461             lfree.WriterLockFreeStart(header->excep_lf_offset);
1462 #endif
1463             Write6BInteger(header->excep_buff, header->excep_offset);
1464             mm.WriteData(header->excep_buff, OFFSET_SIZE,
1465                          header->excep_lf_offset+EDGE_NODE_LEADING_POS);
1466             break;
1467         case EXCEP_STATUS_CLEAR_EDGE:
1468 #ifdef __LOCK_FREE__
1469             lfree.WriterLockFreeStart(header->excep_lf_offset);
1470 #endif
1471             mm.WriteData(DictMem::empty_edge, EDGE_SIZE, header->excep_lf_offset);
1472             header->count--;
1473             break;
1474         case EXCEP_STATUS_RC_NODE:
1475         case EXCEP_STATUS_RC_DATA:
1476 #ifdef __LOCK_FREE__
1477             lfree.WriterLockFreeStart(header->excep_lf_offset);
1478 #endif
1479             mm.WriteData(header->excep_buff, OFFSET_SIZE, header->excep_offset);
1480             break;
1481         case EXCEP_STATUS_RC_EDGE_STR:
1482 #ifdef __LOCK_FREE__
1483             lfree.WriterLockFreeStart(header->excep_lf_offset);
1484 #endif
1485             mm.WriteData(header->excep_buff, OFFSET_SIZE-1, header->excep_offset);
1486             break;
1487         default:
1488             Logger::Log(LOG_LEVEL_ERROR, "unknown exception status: %d",
1489                         header->excep_updating_status);
1490             rval = MBError::INVALID_ARG;
1491     }
1492 #ifdef __LOCK_FREE__
1493     lfree.WriterLockFreeStop();
1494 #endif
1495
1496     if(rval == MBError::SUCCESS)
1497     {
1498         header->excep_updating_status = EXCEP_STATUS_NONE;
1499         Logger::Log(LOG_LEVEL_INFO, "successfully recovered from abnormal termination");
1500     }
1501     else
1502     {
1503         Logger::Log(LOG_LEVEL_ERROR, "failed to recover from abnormal termination");
1504     }
1505
1506     return rval;
1507 }
1508
1509 void Dict::WriteData(const uint8_t *buff, unsigned len, size_t offset) const
1510 {
1511     if(offset + len > header->m_data_offset)
1512     {
1513         std::cerr << "invalid dict write: " << offset << " " << len << " "
1514                   << header->m_data_offset << "\n";
1515         throw (int) MBError::OUT_OF_BOUND;
1516     }
1517
1518     if(kv_file->RandomWrite(buff, len, offset) != len)
1519         throw (int) MBError::WRITE_ERROR;
1520 }
1521
1522 }