2 * Copyright (C) 2017 Cisco Inc.
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2,
6 * as published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // @author Changxue Deng <chadeng@cisco.com>
24 #include "integer_4b_5b.h"
26 #define MAX_PRUNE_COUNT 3 // maximum lru eviction attempts
27 #define NUM_ASYNC_TASK 10 // number of other tasks to be checked during eviction
28 #define PRUNE_TASK_CHECK 100 // every Xth eviction check Y number of other tasks
29 #define RC_TASK_CHECK 100 // every Xth async task try to reclaim resources
30 #define MIN_RC_OFFSET_GAP 1ULL*1024*1024 // 1M
35 ResourceCollection::ResourceCollection(const DB &db, int rct)
36 : DBTraverseBase(db), rc_type(rct)
38 async_writer_ptr = NULL;
41 ResourceCollection::~ResourceCollection()
45 #define CIRCULAR_INDEX_DIFF(x, y) ((x)>(y) ? ((x)-(y)) : (0xFFFF-(y)+(x)))
46 #define CIRCULAR_PRUNE_DIFF(x, y) ((x)>=(y) ? ((x)-(y)) : (0xFFFF-(y)+(x)))
47 int ResourceCollection::LRUEviction()
51 int rval = MBError::SUCCESS;
53 Logger::Log(LOG_LEVEL_INFO, "running LRU eviction for bucket %u", header->eviction_bucket_index);
55 uint16_t index_diff = CIRCULAR_INDEX_DIFF(header->eviction_bucket_index,
56 (header->num_update/header->entry_per_bucket) % 0xFFFF);
57 uint16_t prune_diff = 0xFFFF - index_diff;
59 prune_diff = uint16_t(prune_diff * 0.35); // prune 35%
60 else if(index_diff < 3276)
61 prune_diff = uint16_t(prune_diff * 0.30); // prune 30%
62 else if(index_diff < 6554)
63 prune_diff = uint16_t(prune_diff * 0.25); // prune 25%
64 else if(index_diff < 9830)
65 prune_diff = uint16_t(prune_diff * 0.20); // prune 20%
67 prune_diff = uint16_t(prune_diff * 0.15); // prune 15%
71 for(DB::iterator iter = db_ref.begin(false); iter != db_ref.end(); ++iter)
73 if(CIRCULAR_PRUNE_DIFF(iter.value.bucket_index, header->eviction_bucket_index) < prune_diff)
75 rval = dict->Remove((const uint8_t *)iter.key.data(), iter.key.size());
76 if(rval != MBError::SUCCESS)
77 Logger::Log(LOG_LEVEL_DEBUG, "failed to run eviction %s", MBError::get_error_str(rval));
82 if(async_writer_ptr != NULL)
84 if(count++ > PRUNE_TASK_CHECK)
87 rval = async_writer_ptr->ProcessTask(NUM_ASYNC_TASK, false);
88 if(rval == MBError::RC_SKIPPED)
94 if(rval != MBError::RC_SKIPPED)
96 // It is expected that eviction_bucket_index can overflow since we are only
97 // interested in circular difference.
98 header->eviction_bucket_index += prune_diff;
99 // If not enough pruned, need to retry.
100 if(pruned < int64_t(prune_diff * header->entry_per_bucket * 0.75))
101 rval = MBError::TRY_AGAIN;
102 Logger::Log(LOG_LEVEL_INFO, "LRU eviction done %d pruned, current bucket index %u",
103 pruned, header->eviction_bucket_index);
107 Logger::Log(LOG_LEVEL_INFO, "LRU eviction skipped %d pruned", pruned);
113 void ResourceCollection::ReclaimResource(int64_t min_index_size,
114 int64_t min_data_size,
119 if(!db_ref.is_open())
120 throw db_ref.Status();
122 async_writer_ptr = awr;
126 // Check LRU eviction first
127 if(header->m_data_offset + header->m_index_offset > (size_t) max_dbsz ||
128 header->count > max_dbcnt)
131 gettimeofday(&start,NULL);
132 while(cnt < MAX_PRUNE_COUNT) {
133 if(LRUEviction() != MBError::TRY_AGAIN)
137 gettimeofday(&stop,NULL);
138 timediff = (stop.tv_sec - start.tv_sec)*1000000 + (stop.tv_usec - start.tv_usec);
139 if(timediff > 1000000)
141 Logger::Log(LOG_LEVEL_INFO, "LRU eviction finished in %lf seconds",
146 Logger::Log(LOG_LEVEL_INFO, "LRU eviction finished in %lf milliseconds",
151 if(min_index_size > 0 || min_data_size > 0)
153 Prepare(min_index_size, min_data_size);
154 Logger::Log(LOG_LEVEL_INFO, "defragmentation started for [index - %s] [data - %s]",
155 rc_type & RESOURCE_COLLECTION_TYPE_INDEX ? "yes":"no",
156 rc_type & RESOURCE_COLLECTION_TYPE_DATA ? " yes":"no");
157 gettimeofday(&start, NULL);
163 gettimeofday(&stop, NULL);
164 async_writer_ptr = NULL;
165 timediff = (stop.tv_sec - start.tv_sec)*1000000 + (stop.tv_usec - start.tv_usec);
166 if(timediff > 1000000)
168 Logger::Log(LOG_LEVEL_INFO, "defragmentation finished in %lf seconds",
173 Logger::Log(LOG_LEVEL_INFO, "defragmentation finished in %lf milliseconds",
179 /////////////////////////////////////////////////////////
180 ////////////////// Private Methods //////////////////////
181 /////////////////////////////////////////////////////////
183 void ResourceCollection::Prepare(int64_t min_index_size, int64_t min_data_size)
185 // make sure there is enough grabaged index buffers before initiating collection
186 if(min_index_size == 0 || header->pending_index_buff_size < min_index_size)
188 rc_type &= ~RESOURCE_COLLECTION_TYPE_INDEX;
191 // make sure there is enough grabaged data buffers before initiating collection
192 if(min_data_size == 0 || header->pending_data_buff_size < min_data_size)
194 rc_type &= ~RESOURCE_COLLECTION_TYPE_DATA;
197 // minimum defragmentation throshold is not reached, skip grabage collection
200 Logger::Log(LOG_LEVEL_DEBUG, "pending_index_buff_size (%llu) min_index_size (%llu)",
201 header->pending_index_buff_size, min_index_size);
203 Logger::Log(LOG_LEVEL_DEBUG, "pending_data_buff_size (%llu) min_data_size (%llu)",
204 header->pending_data_buff_size, min_data_size);
206 Logger::Log(LOG_LEVEL_DEBUG, "garbage collection skipped since pending"
207 "sizes smaller than required");
208 throw (int) MBError::RC_SKIPPED;
211 index_free_lists->Empty();
212 data_free_lists->Empty();
215 index_reorder_cnt = 0;
216 data_reorder_cnt = 0;
217 index_rc_status = MBError::NOT_INITIALIZED;
218 data_rc_status = MBError::NOT_INITIALIZED;
219 index_reorder_status = MBError::NOT_INITIALIZED;
220 data_reorder_status = MBError::NOT_INITIALIZED;
221 header->rc_m_index_off_pre = header->m_index_offset;
222 header->rc_m_data_off_pre = header->m_data_offset;
226 // set rc root to at the end of max size
227 rc_index_offset = dmm->GetResourceCollectionOffset();
228 rc_data_offset = dict->GetResourceCollectionOffset();
230 // make sure there is some space left at the end of current index
231 // [start|....|current_index_offset|...MIN_RC_OFFSET_GAP/more...|rc_index_offset|.....|end]
232 if(rc_index_offset < header->m_index_offset + MIN_RC_OFFSET_GAP ||
233 rc_data_offset < header->m_data_offset + MIN_RC_OFFSET_GAP)
235 Logger::Log(LOG_LEVEL_WARN, "not enough space for rc, index: "
236 "%llu %d, %llu, data: %llu %d, %llu",
237 header->m_index_offset, MIN_RC_OFFSET_GAP, rc_index_offset,
238 header->m_data_offset, MIN_RC_OFFSET_GAP, rc_data_offset);
239 throw (int) MBError::OUT_OF_BOUND;
242 // update current indexs to rc offsets
243 header->m_index_offset = rc_index_offset;
244 header->m_data_offset = rc_data_offset;
246 // create rc root node
247 size_t rc_off = dmm->InitRootNode_RC();
248 header->rc_root_offset.store(rc_off, MEMORY_ORDER_WRITER);
251 Logger::Log(LOG_LEVEL_DEBUG, "setting rc index off start to: %llu", header->m_index_offset);
252 Logger::Log(LOG_LEVEL_DEBUG, "setting rc data off start to: %llu", header->m_data_offset);
255 void ResourceCollection::CollectBuffers()
257 if((rc_type & RESOURCE_COLLECTION_TYPE_INDEX) &&
258 (index_reorder_status != MBError::SUCCESS))
260 if((rc_type & RESOURCE_COLLECTION_TYPE_DATA) &&
261 (data_reorder_status != MBError::SUCCESS))
264 TraverseDB(RESOURCE_COLLECTION_PHASE_COLLECT);
266 if(rc_type & RESOURCE_COLLECTION_TYPE_INDEX)
267 index_rc_status = MBError::SUCCESS;
268 if(rc_type & RESOURCE_COLLECTION_TYPE_DATA)
269 data_rc_status = MBError::SUCCESS;
272 void ResourceCollection::Finish()
274 if(index_rc_status == MBError::SUCCESS)
276 Logger::Log(LOG_LEVEL_INFO, "index buffer size reclaimed: %lld",
277 (header->rc_m_index_off_pre > index_size) ?
278 (header->rc_m_index_off_pre - index_size) : 0);
279 header->m_index_offset = index_size;
280 header->pending_index_buff_size = 0;
284 if(header->rc_m_index_off_pre == 0)
285 throw (int) MBError::INVALID_ARG;
286 header->m_index_offset = header->rc_m_index_off_pre;
288 if(data_rc_status == MBError::SUCCESS)
290 Logger::Log(LOG_LEVEL_INFO, "data buffer size reclaimed: %lld",
291 (header->rc_m_data_off_pre > data_size) ?
292 (header->rc_m_data_off_pre - data_size) : 0);
293 header->m_data_offset = data_size;
294 header->pending_data_buff_size = 0;
298 if(header->rc_m_data_off_pre == 0)
299 throw (int) MBError::INVALID_ARG;
300 header->m_data_offset = header->rc_m_data_off_pre;
303 if(async_writer_ptr != NULL)
305 index_free_lists->Empty();
306 data_free_lists->Empty();
310 header->rc_m_index_off_pre = 0;
311 header->rc_m_data_off_pre = 0;
313 dict->RemoveUnused(header->m_data_offset, true);
314 dmm->RemoveUnused(header->m_index_offset, true);
317 bool ResourceCollection::MoveIndexBuffer(int phase, size_t &offset_src, int size)
319 index_size = dmm->CheckAlignment(index_size, size);
321 if(index_size == offset_src)
329 if(phase == RESOURCE_COLLECTION_PHASE_REORDER)
331 if(index_size + size <= offset_src)
334 offset_dst = header->m_index_offset;
335 rval = dmm->Reserve(offset_dst, size, ptr_dst);
336 if(rval != MBError::SUCCESS)
338 header->m_index_offset = offset_dst + size;
344 assert(index_size + size <= offset_src);
346 offset_dst = index_size;
347 ptr_dst = dmm->GetShmPtr(offset_dst, size);
350 ptr_src = dmm->GetShmPtr(offset_src, size);
351 BufferCopy(offset_dst, ptr_dst, offset_src, ptr_src, size, dmm);
353 offset_src = offset_dst;
357 bool ResourceCollection::MoveDataBuffer(int phase, size_t &offset_src, int size)
359 data_size = dict->CheckAlignment(data_size, size);
360 if(data_size == offset_src)
368 if(phase == RESOURCE_COLLECTION_PHASE_REORDER)
370 if(data_size + size < offset_src)
373 offset_dst = header->m_data_offset;
374 rval = dict->Reserve(offset_dst, size, ptr_dst);
375 if(rval != MBError::SUCCESS)
377 header->m_data_offset = offset_dst + size;
383 assert(data_size + size <= offset_src);
385 offset_dst = data_size;
386 ptr_dst = dict->GetShmPtr(offset_dst, size);
389 ptr_src = dict->GetShmPtr(offset_src, size);
390 BufferCopy(offset_dst, ptr_dst, offset_src, ptr_src, size, dict);
392 offset_src = offset_dst;
396 void ResourceCollection::DoTask(int phase, DBTraverseNode &dbt_node)
398 if(phase == RESOURCE_COLLECTION_PHASE_REORDER)
400 // collect stats for adjusting values in header
401 if(dbt_node.buffer_type & BUFFER_TYPE_DATA)
403 if(dbt_node.buffer_type & BUFFER_TYPE_EDGE_STR)
404 edge_str_size += dbt_node.edgestr_size;
405 if(dbt_node.buffer_type & BUFFER_TYPE_NODE)
409 header->excep_lf_offset = dbt_node.edge_offset;
410 if(rc_type & RESOURCE_COLLECTION_TYPE_INDEX)
412 if(dbt_node.buffer_type & BUFFER_TYPE_NODE)
414 if(MoveIndexBuffer(phase, dbt_node.node_offset, dbt_node.node_size))
416 Write6BInteger(header->excep_buff, dbt_node.node_offset);
418 lfree->WriterLockFreeStart(dbt_node.edge_offset);
420 header->excep_offset = dbt_node.node_link_offset;
421 header->excep_updating_status = EXCEP_STATUS_RC_NODE;
422 dmm->WriteData(header->excep_buff, OFFSET_SIZE, dbt_node.node_link_offset);
423 header->excep_updating_status = 0;
425 lfree->WriterLockFreeStop();
427 // Update data_link_offset since node may have been moved.
428 if(dbt_node.buffer_type & BUFFER_TYPE_DATA)
429 dbt_node.data_link_offset = dbt_node.node_offset + 2;
431 index_size += dbt_node.node_size;
434 if(dbt_node.buffer_type & BUFFER_TYPE_EDGE_STR)
436 if(MoveIndexBuffer(phase, dbt_node.edgestr_offset, dbt_node.edgestr_size))
438 Write5BInteger(header->excep_buff, dbt_node.edgestr_offset);
440 lfree->WriterLockFreeStart(dbt_node.edge_offset);
442 header->excep_offset = dbt_node.edgestr_link_offset;
443 header->excep_updating_status = EXCEP_STATUS_RC_EDGE_STR;
444 dmm->WriteData(header->excep_buff, OFFSET_SIZE-1, dbt_node.edgestr_link_offset);
445 header->excep_updating_status = 0;
447 lfree->WriterLockFreeStop();
450 index_size += dbt_node.edgestr_size;
454 if(rc_type & RESOURCE_COLLECTION_TYPE_DATA)
456 if(dbt_node.buffer_type & BUFFER_TYPE_DATA)
458 if(MoveDataBuffer(phase, dbt_node.data_offset, dbt_node.data_size))
460 Write6BInteger(header->excep_buff, dbt_node.data_offset);
462 lfree->WriterLockFreeStart(dbt_node.edge_offset);
464 header->excep_offset = dbt_node.data_link_offset;;
465 header->excep_updating_status = EXCEP_STATUS_RC_DATA;
466 dmm->WriteData(header->excep_buff, OFFSET_SIZE, dbt_node.data_link_offset);
467 header->excep_updating_status = 0;
469 lfree->WriterLockFreeStop();
472 data_size += dbt_node.data_size;
476 header->excep_updating_status = 0;
478 if(async_writer_ptr != NULL)
480 if(rc_loop_counter++ > RC_TASK_CHECK)
483 async_writer_ptr->ProcessTask(NUM_ASYNC_TASK, true);
489 void ResourceCollection::ReorderBuffers()
491 if(rc_type & RESOURCE_COLLECTION_TYPE_INDEX)
492 Logger::Log(LOG_LEVEL_INFO, "index size before reorder: %llu", header->m_index_offset);
493 if(rc_type & RESOURCE_COLLECTION_TYPE_DATA)
494 Logger::Log(LOG_LEVEL_INFO, "data size before reorder: %llu", header->m_data_offset);
499 TraverseDB(RESOURCE_COLLECTION_PHASE_REORDER);
501 if(rc_type & RESOURCE_COLLECTION_TYPE_INDEX)
502 Logger::Log(LOG_LEVEL_INFO, "index size after reorder: %llu", header->m_index_offset);
503 if(rc_type & RESOURCE_COLLECTION_TYPE_DATA)
504 Logger::Log(LOG_LEVEL_INFO, "data size after reorder: %llu", header->m_data_offset);
506 if(rc_type & RESOURCE_COLLECTION_TYPE_INDEX)
508 index_reorder_status = MBError::SUCCESS;
509 Logger::Log(LOG_LEVEL_INFO, "number of index buffer reordered: %lld", index_reorder_cnt);
511 if(rc_type & RESOURCE_COLLECTION_TYPE_DATA)
513 data_reorder_status = MBError::SUCCESS;
514 Logger::Log(LOG_LEVEL_INFO, "number of data buffer reordered: %lld", data_reorder_cnt);
517 if(db_cnt != header->count)
519 Logger::Log(LOG_LEVEL_INFO, "adjusting db count to %lld from %lld", db_cnt, header->count);
520 header->count = db_cnt;
522 header->edge_str_size = edge_str_size;
523 header->n_states = node_cnt;
526 void ResourceCollection::ProcessRCTree()
528 Logger::Log(LOG_LEVEL_INFO, "resource collection done, traversing the rc tree %llu entries", header->rc_count);
532 for(DB::iterator iter = db_ref.begin(false, true); iter != db_ref.end(); ++iter)
534 iter.value.options = 0;
535 rval = dict->Add((const uint8_t *)iter.key.data(), iter.key.size(), iter.value, true);
536 if(rval != MBError::SUCCESS)
537 Logger::Log(LOG_LEVEL_WARN, "failed to add: %s", MBError::get_error_str(rval));
538 if(count++ > RC_TASK_CHECK)
541 async_writer_ptr->ProcessTask(NUM_ASYNC_TASK, false);
544 if(header->m_index_offset > rc_index_offset ||
545 header->m_data_offset > rc_data_offset)
547 Logger::Log(LOG_LEVEL_ERROR, "not enough space for insertion: %llu, %llu",
548 header->m_index_offset, header->m_data_offset);
553 header->rc_count = 0;
554 header->rc_root_offset.store(0, MEMORY_ORDER_WRITER);
557 dmm->ClearRootEdges_RC();
560 int ResourceCollection::ExceptionRecovery()
562 if(!db_ref.is_open())
563 return db_ref.Status();
565 int rval = MBError::SUCCESS;
566 if(header->rc_m_index_off_pre != 0 && header->rc_m_data_off_pre != 0)
568 Logger::Log(LOG_LEVEL_WARN, "previous rc was not completed successfully, retrying...");
570 // This is a blocking call and should be called when writer starts up.
571 ReclaimResource(1, 1, MAX_6B_OFFSET, MAX_6B_OFFSET, NULL);
573 if(err != MBError::RC_SKIPPED)
577 if(rval != MBError::SUCCESS)
579 Logger::Log(LOG_LEVEL_ERROR, "failed to run rc recovery: %s, clear db!!!", MBError::get_error_str(rval));
584 header->rc_root_offset = 0;
585 header->rc_count = 0;