inital commit
[c11concurrency-benchmarks.git] / mabain / src / mb_rc.cpp
1 /**
2  * Copyright (C) 2017 Cisco Inc.
3  *
4  * This program is free software: you can redistribute it and/or  modify
5  * it under the terms of the GNU General Public License, version 2,
6  * as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  */
16
17 // @author Changxue Deng <chadeng@cisco.com>
18
19 #include <sys/time.h>
20
21 #include "mb_rc.h"
22 #include "dict.h"
23 #include "dict_mem.h"
24 #include "integer_4b_5b.h"
25
26 #define MAX_PRUNE_COUNT      3                 // maximum lru eviction attempts
27 #define NUM_ASYNC_TASK       10                // number of other tasks to be checked during eviction
28 #define PRUNE_TASK_CHECK     100               // every Xth eviction check Y number of other tasks
29 #define RC_TASK_CHECK        100               // every Xth async task try to reclaim resources
30 #define MIN_RC_OFFSET_GAP    1ULL*1024*1024    // 1M
31
32
33 namespace mabain {
34
35 ResourceCollection::ResourceCollection(const DB &db, int rct)
36                    : DBTraverseBase(db), rc_type(rct)
37 {
38     async_writer_ptr = NULL;
39 }
40
41 ResourceCollection::~ResourceCollection()
42 {
43 }
44
45 #define CIRCULAR_INDEX_DIFF(x, y) ((x)>(y) ? ((x)-(y)) : (0xFFFF-(y)+(x)))
46 #define CIRCULAR_PRUNE_DIFF(x, y) ((x)>=(y) ? ((x)-(y)) : (0xFFFF-(y)+(x)))
47 int ResourceCollection::LRUEviction()
48 {
49     int64_t pruned = 0;
50     int64_t count = 0;
51     int rval = MBError::SUCCESS;
52
53     Logger::Log(LOG_LEVEL_INFO, "running LRU eviction for bucket %u", header->eviction_bucket_index);
54
55     uint16_t index_diff = CIRCULAR_INDEX_DIFF(header->eviction_bucket_index,
56                               (header->num_update/header->entry_per_bucket) % 0xFFFF);
57     uint16_t prune_diff = 0xFFFF - index_diff;
58     if(index_diff < 655)
59         prune_diff = uint16_t(prune_diff * 0.35); // prune 35%
60     else if(index_diff < 3276)
61         prune_diff = uint16_t(prune_diff * 0.30); // prune 30%
62     else if(index_diff < 6554)
63         prune_diff = uint16_t(prune_diff * 0.25); // prune 25%
64     else if(index_diff < 9830)
65         prune_diff = uint16_t(prune_diff * 0.20); // prune 20%
66     else
67         prune_diff = uint16_t(prune_diff * 0.15); // prune 15%
68     if(prune_diff == 0)
69         prune_diff = 1;
70
71     for(DB::iterator iter = db_ref.begin(false); iter != db_ref.end(); ++iter)
72     {
73         if(CIRCULAR_PRUNE_DIFF(iter.value.bucket_index, header->eviction_bucket_index) < prune_diff)
74         {
75             rval = dict->Remove((const uint8_t *)iter.key.data(), iter.key.size());
76             if(rval != MBError::SUCCESS)
77                 Logger::Log(LOG_LEVEL_DEBUG, "failed to run eviction %s", MBError::get_error_str(rval));
78             else
79                 pruned++;
80         }
81
82         if(async_writer_ptr != NULL)
83         {
84             if(count++ > PRUNE_TASK_CHECK)
85             {
86                 count = 0;
87                 rval = async_writer_ptr->ProcessTask(NUM_ASYNC_TASK, false);
88                 if(rval == MBError::RC_SKIPPED)
89                     break;
90             }
91         }
92     }
93
94     if(rval != MBError::RC_SKIPPED)
95     {
96         // It is expected that eviction_bucket_index can overflow since we are only
97         // interested in circular difference.
98         header->eviction_bucket_index += prune_diff;
99         // If not enough pruned, need to retry.
100         if(pruned < int64_t(prune_diff * header->entry_per_bucket * 0.75))
101             rval = MBError::TRY_AGAIN;
102         Logger::Log(LOG_LEVEL_INFO, "LRU eviction done %d pruned, current bucket index %u",
103                     pruned, header->eviction_bucket_index);
104     }
105     else
106     {
107         Logger::Log(LOG_LEVEL_INFO, "LRU eviction skipped %d pruned", pruned);
108     }
109
110     return rval;
111 }
112
113 void ResourceCollection::ReclaimResource(int64_t min_index_size,
114                                          int64_t min_data_size,
115                                          int64_t max_dbsz,
116                                          int64_t max_dbcnt,
117                                          AsyncWriter *awr)
118 {
119     if(!db_ref.is_open())
120         throw db_ref.Status();
121
122     async_writer_ptr = awr;
123     timeval start, stop;
124     uint64_t timediff;
125
126     // Check LRU eviction first
127     if(header->m_data_offset + header->m_index_offset > (size_t) max_dbsz ||
128             header->count > max_dbcnt)
129     {
130         int cnt = 0;
131         gettimeofday(&start,NULL);
132         while(cnt < MAX_PRUNE_COUNT) {
133             if(LRUEviction() != MBError::TRY_AGAIN)
134                 break;
135             cnt++;
136         }
137         gettimeofday(&stop,NULL);
138         timediff = (stop.tv_sec - start.tv_sec)*1000000 + (stop.tv_usec - start.tv_usec);
139         if(timediff > 1000000)
140         {
141             Logger::Log(LOG_LEVEL_INFO, "LRU eviction finished in %lf seconds",
142                     timediff/1000000.);
143         }
144         else
145         {
146             Logger::Log(LOG_LEVEL_INFO, "LRU eviction finished in %lf milliseconds",
147                     timediff/1000.);
148         }
149     }
150
151     if(min_index_size > 0 || min_data_size > 0)
152     {
153         Prepare(min_index_size, min_data_size);
154         Logger::Log(LOG_LEVEL_INFO, "defragmentation started for [index - %s] [data - %s]",
155                 rc_type & RESOURCE_COLLECTION_TYPE_INDEX ? "yes":"no",
156                 rc_type & RESOURCE_COLLECTION_TYPE_DATA ? " yes":"no");
157         gettimeofday(&start, NULL);
158
159         ReorderBuffers();
160         CollectBuffers();
161         Finish();
162
163         gettimeofday(&stop, NULL);
164         async_writer_ptr = NULL;
165         timediff = (stop.tv_sec - start.tv_sec)*1000000 + (stop.tv_usec - start.tv_usec);
166         if(timediff > 1000000)
167         {
168             Logger::Log(LOG_LEVEL_INFO, "defragmentation finished in %lf seconds",
169                     timediff/1000000.);
170         }
171         else
172         {
173             Logger::Log(LOG_LEVEL_INFO, "defragmentation finished in %lf milliseconds",
174                     timediff/1000.);
175         }
176     }
177 }
178
179 /////////////////////////////////////////////////////////
180 ////////////////// Private Methods //////////////////////
181 /////////////////////////////////////////////////////////
182
183 void ResourceCollection::Prepare(int64_t min_index_size, int64_t min_data_size)
184 {
185     // make sure there is enough grabaged index buffers before initiating collection
186     if(min_index_size == 0 || header->pending_index_buff_size < min_index_size)
187     {
188         rc_type &= ~RESOURCE_COLLECTION_TYPE_INDEX;
189     }
190
191     // make sure there is enough grabaged data buffers before initiating collection
192     if(min_data_size == 0 || header->pending_data_buff_size < min_data_size)
193     {
194         rc_type &= ~RESOURCE_COLLECTION_TYPE_DATA;
195     }
196
197     // minimum defragmentation throshold is not reached, skip grabage collection
198     if(rc_type == 0)
199     {
200         Logger::Log(LOG_LEVEL_DEBUG, "pending_index_buff_size (%llu) min_index_size (%llu)",
201                 header->pending_index_buff_size, min_index_size);
202
203         Logger::Log(LOG_LEVEL_DEBUG, "pending_data_buff_size (%llu) min_data_size (%llu)",
204                 header->pending_data_buff_size, min_data_size);
205
206         Logger::Log(LOG_LEVEL_DEBUG, "garbage collection skipped since pending"
207                 "sizes smaller than required");
208         throw (int) MBError::RC_SKIPPED;
209     }
210
211     index_free_lists->Empty();
212     data_free_lists->Empty();
213
214     rc_loop_counter = 0;
215     index_reorder_cnt = 0;
216     data_reorder_cnt = 0;
217     index_rc_status = MBError::NOT_INITIALIZED;
218     data_rc_status = MBError::NOT_INITIALIZED;
219     index_reorder_status = MBError::NOT_INITIALIZED;
220     data_reorder_status = MBError::NOT_INITIALIZED;
221     header->rc_m_index_off_pre = header->m_index_offset;
222     header->rc_m_data_off_pre = header->m_data_offset;
223
224     if(async_writer_ptr)
225     {
226         // set rc root to at the end of max size
227         rc_index_offset = dmm->GetResourceCollectionOffset();
228         rc_data_offset = dict->GetResourceCollectionOffset();
229
230         // make sure there is some space left at the end of current index
231         // [start|....|current_index_offset|...MIN_RC_OFFSET_GAP/more...|rc_index_offset|.....|end]
232         if(rc_index_offset < header->m_index_offset + MIN_RC_OFFSET_GAP ||
233                 rc_data_offset < header->m_data_offset + MIN_RC_OFFSET_GAP)
234         {
235             Logger::Log(LOG_LEVEL_WARN, "not enough space for rc, index: "
236                         "%llu %d, %llu, data: %llu %d, %llu",
237                         header->m_index_offset, MIN_RC_OFFSET_GAP, rc_index_offset,
238                         header->m_data_offset, MIN_RC_OFFSET_GAP, rc_data_offset);
239             throw (int) MBError::OUT_OF_BOUND;
240         }
241
242         // update current indexs to rc offsets
243         header->m_index_offset = rc_index_offset;
244         header->m_data_offset  = rc_data_offset;
245
246         // create rc root node
247         size_t rc_off = dmm->InitRootNode_RC();
248         header->rc_root_offset.store(rc_off, MEMORY_ORDER_WRITER);
249     }
250
251     Logger::Log(LOG_LEVEL_DEBUG, "setting rc index off start to: %llu", header->m_index_offset);
252     Logger::Log(LOG_LEVEL_DEBUG, "setting rc data off start to: %llu", header->m_data_offset);
253 }
254
255 void ResourceCollection::CollectBuffers()
256 {
257     if((rc_type & RESOURCE_COLLECTION_TYPE_INDEX) &&
258        (index_reorder_status != MBError::SUCCESS))
259         return;
260     if((rc_type & RESOURCE_COLLECTION_TYPE_DATA) &&
261        (data_reorder_status != MBError::SUCCESS))
262         return;
263
264     TraverseDB(RESOURCE_COLLECTION_PHASE_COLLECT);
265
266     if(rc_type & RESOURCE_COLLECTION_TYPE_INDEX)
267         index_rc_status = MBError::SUCCESS;
268     if(rc_type & RESOURCE_COLLECTION_TYPE_DATA)
269         data_rc_status = MBError::SUCCESS;
270 }
271
272 void ResourceCollection::Finish()
273 {
274     if(index_rc_status == MBError::SUCCESS)
275     {
276         Logger::Log(LOG_LEVEL_INFO, "index buffer size reclaimed: %lld",
277                     (header->rc_m_index_off_pre > index_size) ?
278                     (header->rc_m_index_off_pre - index_size) : 0);
279         header->m_index_offset = index_size;
280         header->pending_index_buff_size = 0;
281     }
282     else
283     {
284         if(header->rc_m_index_off_pre == 0)
285             throw (int) MBError::INVALID_ARG;
286         header->m_index_offset = header->rc_m_index_off_pre;
287     }
288     if(data_rc_status == MBError::SUCCESS)
289     {
290         Logger::Log(LOG_LEVEL_INFO, "data buffer size reclaimed: %lld",
291                     (header->rc_m_data_off_pre > data_size) ?
292                     (header->rc_m_data_off_pre - data_size) : 0);
293         header->m_data_offset = data_size;
294         header->pending_data_buff_size = 0;
295     }
296     else
297     {
298         if(header->rc_m_data_off_pre == 0)
299             throw (int) MBError::INVALID_ARG;
300         header->m_data_offset = header->rc_m_data_off_pre;
301     }
302
303     if(async_writer_ptr != NULL)
304     {
305         index_free_lists->Empty();
306         data_free_lists->Empty();
307         ProcessRCTree();
308     }
309
310     header->rc_m_index_off_pre = 0;
311     header->rc_m_data_off_pre = 0;
312
313     dict->RemoveUnused(header->m_data_offset, true);
314     dmm->RemoveUnused(header->m_index_offset, true);
315 }
316
317 bool ResourceCollection::MoveIndexBuffer(int phase, size_t &offset_src, int size)
318 {
319     index_size = dmm->CheckAlignment(index_size, size);
320
321     if(index_size == offset_src)
322         return false;
323
324     uint8_t *ptr_src;
325     uint8_t *ptr_dst;
326     size_t offset_dst;
327     int rval;
328
329     if(phase == RESOURCE_COLLECTION_PHASE_REORDER)
330     {
331         if(index_size + size <= offset_src)
332             return false;
333
334         offset_dst = header->m_index_offset;
335         rval = dmm->Reserve(offset_dst, size, ptr_dst);
336         if(rval != MBError::SUCCESS)
337             throw rval;
338         header->m_index_offset = offset_dst + size;
339         index_reorder_cnt++;
340     }
341     else
342     {
343 #ifdef __DEBUG__
344         assert(index_size + size <= offset_src);
345 #endif
346         offset_dst = index_size;
347         ptr_dst = dmm->GetShmPtr(offset_dst, size);
348     }
349
350     ptr_src = dmm->GetShmPtr(offset_src, size);
351     BufferCopy(offset_dst, ptr_dst, offset_src, ptr_src, size, dmm);
352
353     offset_src = offset_dst;
354     return true;
355 }
356
357 bool ResourceCollection::MoveDataBuffer(int phase, size_t &offset_src, int size)
358 {
359     data_size = dict->CheckAlignment(data_size, size);
360     if(data_size == offset_src)
361         return false;
362
363     uint8_t *ptr_src;
364     uint8_t *ptr_dst;
365     size_t offset_dst;
366     int rval;
367
368     if(phase == RESOURCE_COLLECTION_PHASE_REORDER)
369     {
370         if(data_size + size < offset_src)
371             return false;
372
373         offset_dst = header->m_data_offset;
374         rval = dict->Reserve(offset_dst, size, ptr_dst);
375         if(rval != MBError::SUCCESS)
376             throw rval;
377         header->m_data_offset = offset_dst + size;
378         data_reorder_cnt++;
379     }
380     else
381     {
382 #ifdef __DEBUG__
383         assert(data_size + size <= offset_src);
384 #endif
385         offset_dst = data_size;
386         ptr_dst = dict->GetShmPtr(offset_dst, size);
387     }
388
389     ptr_src = dict->GetShmPtr(offset_src, size);
390     BufferCopy(offset_dst, ptr_dst, offset_src, ptr_src, size, dict);
391
392     offset_src = offset_dst;
393     return true;
394 }
395
396 void ResourceCollection::DoTask(int phase, DBTraverseNode &dbt_node)
397 {
398     if(phase == RESOURCE_COLLECTION_PHASE_REORDER)
399     {
400         // collect stats for adjusting values in header
401         if(dbt_node.buffer_type & BUFFER_TYPE_DATA)
402             db_cnt++;
403         if(dbt_node.buffer_type & BUFFER_TYPE_EDGE_STR)
404             edge_str_size += dbt_node.edgestr_size;
405         if(dbt_node.buffer_type & BUFFER_TYPE_NODE)
406             node_cnt++;
407     }
408
409     header->excep_lf_offset = dbt_node.edge_offset;
410     if(rc_type & RESOURCE_COLLECTION_TYPE_INDEX)
411     {
412         if(dbt_node.buffer_type & BUFFER_TYPE_NODE)
413         {
414             if(MoveIndexBuffer(phase, dbt_node.node_offset, dbt_node.node_size))
415             {
416                 Write6BInteger(header->excep_buff, dbt_node.node_offset);
417 #ifdef __LOCK_FREE__
418                 lfree->WriterLockFreeStart(dbt_node.edge_offset);
419 #endif
420                 header->excep_offset = dbt_node.node_link_offset;
421                 header->excep_updating_status = EXCEP_STATUS_RC_NODE;
422                 dmm->WriteData(header->excep_buff, OFFSET_SIZE, dbt_node.node_link_offset);
423                 header->excep_updating_status = 0;
424 #ifdef __LOCK_FREE__
425                 lfree->WriterLockFreeStop();
426 #endif
427                 // Update data_link_offset since node may have been moved.
428                 if(dbt_node.buffer_type & BUFFER_TYPE_DATA)
429                     dbt_node.data_link_offset = dbt_node.node_offset + 2;
430             }
431             index_size += dbt_node.node_size;
432         }
433
434         if(dbt_node.buffer_type & BUFFER_TYPE_EDGE_STR)
435         {
436             if(MoveIndexBuffer(phase, dbt_node.edgestr_offset, dbt_node.edgestr_size))
437             {
438                 Write5BInteger(header->excep_buff, dbt_node.edgestr_offset);
439 #ifdef __LOCK_FREE__
440                 lfree->WriterLockFreeStart(dbt_node.edge_offset);
441 #endif
442                 header->excep_offset = dbt_node.edgestr_link_offset;
443                 header->excep_updating_status = EXCEP_STATUS_RC_EDGE_STR;
444                 dmm->WriteData(header->excep_buff, OFFSET_SIZE-1, dbt_node.edgestr_link_offset);
445                 header->excep_updating_status = 0;
446 #ifdef __LOCK_FREE__
447                 lfree->WriterLockFreeStop();
448 #endif
449             }
450             index_size += dbt_node.edgestr_size;
451         }
452     }
453
454     if(rc_type & RESOURCE_COLLECTION_TYPE_DATA)
455     {
456         if(dbt_node.buffer_type & BUFFER_TYPE_DATA)
457         {
458             if(MoveDataBuffer(phase, dbt_node.data_offset, dbt_node.data_size))
459             {
460                 Write6BInteger(header->excep_buff, dbt_node.data_offset);
461 #ifdef __LOCK_FREE__
462                 lfree->WriterLockFreeStart(dbt_node.edge_offset);
463 #endif
464                 header->excep_offset = dbt_node.data_link_offset;;
465                 header->excep_updating_status = EXCEP_STATUS_RC_DATA;
466                 dmm->WriteData(header->excep_buff, OFFSET_SIZE, dbt_node.data_link_offset);
467                 header->excep_updating_status = 0;
468 #ifdef __LOCK_FREE__
469                 lfree->WriterLockFreeStop();
470 #endif
471             }
472             data_size += dbt_node.data_size;
473         }
474     }
475
476     header->excep_updating_status = 0;
477
478     if(async_writer_ptr != NULL)
479     {
480         if(rc_loop_counter++ > RC_TASK_CHECK)
481         {
482             rc_loop_counter = 0;
483             async_writer_ptr->ProcessTask(NUM_ASYNC_TASK, true);
484         }
485     }
486 }
487
488
489 void ResourceCollection::ReorderBuffers()
490 {
491     if(rc_type & RESOURCE_COLLECTION_TYPE_INDEX)
492         Logger::Log(LOG_LEVEL_INFO, "index size before reorder: %llu", header->m_index_offset);
493     if(rc_type & RESOURCE_COLLECTION_TYPE_DATA)
494         Logger::Log(LOG_LEVEL_INFO, "data size before reorder: %llu", header->m_data_offset);
495
496     db_cnt = 0;
497     edge_str_size = 0;
498     node_cnt = 0;
499     TraverseDB(RESOURCE_COLLECTION_PHASE_REORDER);
500
501     if(rc_type & RESOURCE_COLLECTION_TYPE_INDEX)
502         Logger::Log(LOG_LEVEL_INFO, "index size after reorder: %llu", header->m_index_offset);
503     if(rc_type & RESOURCE_COLLECTION_TYPE_DATA)
504         Logger::Log(LOG_LEVEL_INFO, "data size after reorder: %llu", header->m_data_offset);
505
506     if(rc_type & RESOURCE_COLLECTION_TYPE_INDEX)
507     {
508         index_reorder_status = MBError::SUCCESS;
509         Logger::Log(LOG_LEVEL_INFO, "number of index buffer reordered: %lld", index_reorder_cnt);
510     }
511     if(rc_type & RESOURCE_COLLECTION_TYPE_DATA)
512     {
513         data_reorder_status = MBError::SUCCESS;
514         Logger::Log(LOG_LEVEL_INFO, "number of data buffer reordered: %lld", data_reorder_cnt);
515     }
516
517     if(db_cnt != header->count)
518     {
519         Logger::Log(LOG_LEVEL_INFO, "adjusting db count to %lld from %lld", db_cnt, header->count);
520         header->count = db_cnt;
521     }
522     header->edge_str_size = edge_str_size;
523     header->n_states = node_cnt;
524 }
525
526 void ResourceCollection::ProcessRCTree()
527 {
528     Logger::Log(LOG_LEVEL_INFO, "resource collection done, traversing the rc tree %llu entries", header->rc_count);
529
530     int count = 0;
531     int rval;
532     for(DB::iterator iter = db_ref.begin(false, true); iter != db_ref.end(); ++iter)
533     {
534         iter.value.options = 0;
535         rval = dict->Add((const uint8_t *)iter.key.data(), iter.key.size(), iter.value, true);
536         if(rval != MBError::SUCCESS)
537             Logger::Log(LOG_LEVEL_WARN, "failed to add: %s", MBError::get_error_str(rval));
538         if(count++ > RC_TASK_CHECK)
539         {
540             count = 0;
541             async_writer_ptr->ProcessTask(NUM_ASYNC_TASK, false);
542         }
543
544         if(header->m_index_offset > rc_index_offset ||
545            header->m_data_offset > rc_data_offset)
546         {
547             Logger::Log(LOG_LEVEL_ERROR, "not enough space for insertion: %llu, %llu",
548                         header->m_index_offset, header->m_data_offset);
549             break;
550         }
551     }
552
553     header->rc_count = 0;
554     header->rc_root_offset.store(0, MEMORY_ORDER_WRITER);
555
556     // Clear the rc tree
557     dmm->ClearRootEdges_RC();
558 }
559
560 int ResourceCollection::ExceptionRecovery()
561 {
562     if(!db_ref.is_open())
563         return db_ref.Status();
564
565     int rval = MBError::SUCCESS;
566     if(header->rc_m_index_off_pre != 0 && header->rc_m_data_off_pre != 0)
567     {
568         Logger::Log(LOG_LEVEL_WARN, "previous rc was not completed successfully, retrying...");
569         try {
570             // This is a blocking call and should be called when writer starts up.
571             ReclaimResource(1, 1, MAX_6B_OFFSET, MAX_6B_OFFSET, NULL);
572         } catch (int err) {
573             if(err != MBError::RC_SKIPPED)
574                 rval = err;
575         }
576
577         if(rval != MBError::SUCCESS)
578         {
579             Logger::Log(LOG_LEVEL_ERROR, "failed to run rc recovery: %s, clear db!!!", MBError::get_error_str(rval));
580             dict->RemoveAll();
581         }
582     }
583
584     header->rc_root_offset = 0;
585     header->rc_count = 0;
586
587     return rval;
588 }
589
590 }