update readme
[c11concurrency-benchmarks.git] / mabain / src / async_writer.cpp
1 /**
2  * Copyright (C) 2018 Cisco Inc.
3  *
4  * This program is free software: you can redistribute it and/or  modify
5  * it under the terms of the GNU General Public License, version 2,
6  * as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  */
16
17 // @author Changxue Deng <chadeng@cisco.com>
18
19 #include <unistd.h>
20 #include <string.h>
21
22 #include "async_writer.h"
23 #include "error.h"
24 #include "logger.h"
25 #include "mb_data.h"
26 #include "mb_rc.h"
27 #include "integer_4b_5b.h"
28 #include "dict.h"
29 #include "./util/shm_mutex.h"
30
31 namespace mabain {
32
33 static void free_async_node(AsyncNode *node_ptr)
34 {
35 #ifndef __SHM_QUEUE__
36     if(node_ptr->key != NULL)
37     {
38         free(node_ptr->key);
39         node_ptr->key = NULL;
40         node_ptr->key_len = 0;
41     }
42
43     if(node_ptr->data != NULL)
44     {
45         free(node_ptr->data);
46         node_ptr->data = NULL;
47         node_ptr->data_len = 0;
48     }
49 #endif
50
51     node_ptr->type = MABAIN_ASYNC_TYPE_NONE;
52 }
53
54 AsyncWriter::AsyncWriter(DB *db_ptr)
55                        : db(db_ptr),
56                          tid(0),
57                          stop_processing(false),
58                          queue(NULL),
59 #ifdef __SHM_QUEUE__
60                          header(NULL)
61 #else
62                          num_users(0),
63                          queue_index(0),
64                          writer_index(0)
65 #endif
66 {
67     dict = NULL;
68     if(!(db_ptr->GetDBOptions() & CONSTS::ACCESS_MODE_WRITER))
69         throw (int) MBError::NOT_ALLOWED;
70     if(db == NULL)
71         throw (int) MBError::INVALID_ARG;
72     dict = db->GetDictPtr();
73     if(dict == NULL)
74         throw (int) MBError::NOT_INITIALIZED;
75
76 #ifdef __SHM_QUEUE__
77     // initialize shared memory queue pointer
78     header = dict->GetHeaderPtr();
79     if(header == NULL)
80         throw (int) MBError::NOT_INITIALIZED;
81     char *hdr_ptr = (char *) header;
82     queue = reinterpret_cast<AsyncNode *>(hdr_ptr + RollableFile::page_size);
83     header->rc_flag.store(0, std::memory_order_release);
84 #else
85     queue = new AsyncNode[MB_MAX_NUM_SHM_QUEUE_NODE];
86     memset(queue, 0, MB_MAX_NUM_SHM_QUEUE_NODE * sizeof(AsyncNode));
87     for(int i = 0; i < MB_MAX_NUM_SHM_QUEUE_NODE; i++)
88     {
89         queue[i].in_use.store(false, std::memory_order_release);
90         if(pthread_mutex_init(&queue[i].mutex, NULL) != 0)
91         {
92             Logger::Log(LOG_LEVEL_ERROR, "failed to init mutex");
93             throw (int) MBError::MUTEX_ERROR;
94         }
95         if(pthread_cond_init(&queue[i].cond, NULL) != 0)
96         {
97             Logger::Log(LOG_LEVEL_ERROR, "failed to init conditional variable");
98             throw (int) MBError::MUTEX_ERROR;
99         }
100     }
101     is_rc_running = false;
102 #endif
103
104     rc_backup_dir = NULL;
105     // start the thread
106     if(pthread_create(&tid, NULL, async_thread_wrapper, this) != 0)
107     {
108         Logger::Log(LOG_LEVEL_ERROR, "failed to create async thread");
109         tid = 0;
110         throw (int) MBError::THREAD_FAILED;
111     }
112
113 }
114
115 AsyncWriter::~AsyncWriter()
116 {
117 }
118
119 #ifndef __SHM_QUEUE__
120 void AsyncWriter::UpdateNumUsers(int delta)
121 {
122     if(delta > 0)
123         num_users.fetch_add(1, std::memory_order_release);
124     else if(delta < 0)
125         num_users.fetch_sub(1, std::memory_order_release);
126 }
127 #endif
128
129 int AsyncWriter::StopAsyncThread()
130 {
131 #ifndef __SHM_QUEUE__
132     if(num_users.load(std::memory_order_consume) > 0)
133     {
134         Logger::Log(LOG_LEVEL_ERROR, "still being used, cannot shutdown async thread");
135     }
136 #endif
137
138     stop_processing = true;
139
140 #ifndef __SHM_QUEUE__
141     for(int i = 0; i < MB_MAX_NUM_SHM_QUEUE_NODE; i++)
142     {
143         pthread_cond_signal(&queue[i].cond);
144     }
145 #endif
146
147     if(tid != 0)
148     {
149         Logger::Log(LOG_LEVEL_INFO, "joining async writer thread");
150         pthread_join(tid, NULL);
151     }
152
153 #ifndef __SHM_QUEUE__
154     for(int i = 0; i < MB_MAX_NUM_SHM_QUEUE_NODE; i++)
155     {
156         pthread_mutex_destroy(&queue[i].mutex);
157         pthread_cond_destroy(&queue[i].cond);
158     }
159
160     if(queue != NULL)
161         delete [] queue;
162 #endif
163
164     return MBError::SUCCESS;
165 }
166
167 #ifndef __SHM_QUEUE__
168 // Check if async tasks are completed.
169 bool AsyncWriter::Busy() const
170 {
171     uint32_t index = queue_index.load(std::memory_order_consume);
172     return index != writer_index || is_rc_running;
173 }
174 #endif
175
176 #ifndef __SHM_QUEUE__
177 AsyncNode* AsyncWriter::AcquireSlot()
178 {
179     uint32_t index = queue_index.fetch_add(1, std::memory_order_release);
180     AsyncNode *node_ptr = queue + (index % MB_MAX_NUM_SHM_QUEUE_NODE);
181
182     if(pthread_mutex_lock(&node_ptr->mutex) != 0)
183     {
184         Logger::Log(LOG_LEVEL_ERROR, "failed to lock mutex");
185         return NULL;
186     }
187
188     while(node_ptr->in_use.load(std::memory_order_consume))
189     {
190         pthread_cond_wait(&node_ptr->cond, &node_ptr->mutex);
191     }
192
193     return node_ptr;
194 }
195 #endif
196
197 #ifndef __SHM_QUEUE__
198 int AsyncWriter::PrepareSlot(AsyncNode *node_ptr) const
199 {
200     node_ptr->in_use.store(true, std::memory_order_release);
201     pthread_cond_signal(&node_ptr->cond);
202     if(pthread_mutex_unlock(&node_ptr->mutex) != 0)
203         return MBError::MUTEX_ERROR;
204     return MBError::SUCCESS;
205 }
206 #endif
207
208 #ifndef __SHM_QUEUE__
209 int AsyncWriter::Add(const char *key, int key_len, const char *data,
210                      int data_len, bool overwrite)
211 {
212     if(stop_processing)
213         return MBError::DB_CLOSED;
214
215     AsyncNode *node_ptr = AcquireSlot();
216     if(node_ptr == NULL)
217         return MBError::MUTEX_ERROR;
218
219     node_ptr->key = (char *) malloc(key_len);
220     node_ptr->data = (char *) malloc(data_len);
221     if(node_ptr->key == NULL || node_ptr->data == NULL)
222     {
223         pthread_mutex_unlock(&node_ptr->mutex);
224         free_async_node(node_ptr);
225         return MBError::NO_MEMORY;
226     }
227     memcpy(node_ptr->key, key, key_len);
228     memcpy(node_ptr->data, data, data_len);
229     node_ptr->key_len = key_len;
230     node_ptr->data_len = data_len;
231     node_ptr->overwrite = overwrite;
232
233     node_ptr->type = MABAIN_ASYNC_TYPE_ADD;
234
235
236     return PrepareSlot(node_ptr);
237 }
238 #endif
239
240 #ifndef __SHM_QUEUE__
241 int AsyncWriter::Remove(const char *key, int len)
242 {
243     if(stop_processing)
244         return MBError::DB_CLOSED;
245
246     AsyncNode *node_ptr = AcquireSlot();
247     if(node_ptr == NULL)
248         return MBError::MUTEX_ERROR;
249
250     node_ptr->key = (char *) malloc(len);
251     if(node_ptr->key == NULL)
252     {
253         pthread_mutex_unlock(&node_ptr->mutex);
254         free_async_node(node_ptr);
255         return MBError::NO_MEMORY;
256     }
257     memcpy(node_ptr->key, key, len);
258     node_ptr->key_len = len;
259     node_ptr->type = MABAIN_ASYNC_TYPE_REMOVE;
260
261     return PrepareSlot(node_ptr);
262 }
263 #endif
264
265 #ifndef __SHM_QUEUE__
266 int AsyncWriter::Backup(const char *backup_dir)
267 {
268     if(backup_dir == NULL)
269         return MBError::INVALID_ARG;
270
271     if(stop_processing)
272         return MBError::DB_CLOSED;
273
274     AsyncNode *node_ptr = AcquireSlot();
275     if(node_ptr == NULL)
276         return MBError::MUTEX_ERROR;
277
278     node_ptr->data = (char *) strdup(backup_dir);
279     if(node_ptr->data == NULL)
280     {
281         pthread_mutex_unlock(&node_ptr->mutex);
282         free_async_node(node_ptr);
283         return MBError::NO_MEMORY;
284     }
285     node_ptr->type = MABAIN_ASYNC_TYPE_BACKUP;
286     return PrepareSlot(node_ptr);
287 }
288 #endif
289
290 #ifndef __SHM_QUEUE__
291 int AsyncWriter::RemoveAll()
292 {
293     if(stop_processing)
294         return MBError::DB_CLOSED;
295
296     AsyncNode *node_ptr = AcquireSlot();
297     if(node_ptr == NULL)
298         return MBError::MUTEX_ERROR;
299
300     node_ptr->type = MABAIN_ASYNC_TYPE_REMOVE_ALL;
301
302     return PrepareSlot(node_ptr);
303 }
304 #endif
305
306 #ifndef __SHM_QUEUE__
307 int  AsyncWriter::CollectResource(int64_t m_index_rc_size, int64_t m_data_rc_size,
308                                   int64_t max_dbsz, int64_t max_dbcnt)
309 {
310     if(stop_processing)
311         return MBError::DB_CLOSED;
312
313     AsyncNode *node_ptr = AcquireSlot();
314     if(node_ptr == NULL)
315         return MBError::MUTEX_ERROR;
316
317     int64_t *data_ptr = (int64_t *) calloc(4, sizeof(int64_t));
318     if (data_ptr == NULL)
319         return MBError::NO_MEMORY;
320
321     node_ptr->data = (char *) data_ptr;
322     node_ptr->data_len = sizeof(int64_t)*4;
323     data_ptr[0] = m_index_rc_size;
324     data_ptr[1] = m_data_rc_size;
325     data_ptr[2] = max_dbsz;
326     data_ptr[3] = max_dbcnt;
327     node_ptr->type = MABAIN_ASYNC_TYPE_RC;
328
329     return PrepareSlot(node_ptr);
330 }
331 #endif
332
333 // Run a given number of tasks if they are available.
334 // This function should only be called by rc or pruner.
335 int AsyncWriter::ProcessTask(int ntasks, bool rc_mode)
336 {
337     AsyncNode *node_ptr;
338     MBData mbd;
339     int rval = MBError::SUCCESS;
340     int count = 0;
341 #ifdef __SHM_QUEUE__
342     struct timespec tm_exp;
343 #endif
344
345     while(count < ntasks)
346     {
347 #ifdef __SHM_QUEUE__
348         node_ptr = &queue[header->writer_index % header->async_queue_size];
349         tm_exp.tv_sec = time(NULL) + MB_ASYNC_SHM_LOCK_TMOUT;
350         tm_exp.tv_nsec = 0;
351         rval = pthread_mutex_timedlock(&node_ptr->mutex, &tm_exp);
352         if(rval == ETIMEDOUT)
353         {
354             Logger::Log(LOG_LEVEL_WARN, "mutex lock timeout, need to re-intialize");
355             InitShmMutex(&node_ptr->mutex);
356             count = ntasks;
357             continue;
358         }
359         else if(rval != 0)
360 #else
361         node_ptr = &queue[writer_index % MB_MAX_NUM_SHM_QUEUE_NODE];
362         if(pthread_mutex_lock(&node_ptr->mutex) != 0)
363 #endif
364         {
365             Logger::Log(LOG_LEVEL_ERROR, "failed to lock mutex");
366             throw (int) MBError::MUTEX_ERROR;
367         }
368
369         if(node_ptr->in_use.load(std::memory_order_consume))
370         {
371             switch(node_ptr->type)
372             {
373                 case MABAIN_ASYNC_TYPE_ADD:
374                     if(rc_mode)
375                         mbd.options = CONSTS::OPTION_RC_MODE;
376                     mbd.buff = (uint8_t *) node_ptr->data;
377                     mbd.data_len = node_ptr->data_len;
378                     try {
379                         rval = dict->Add((uint8_t *)node_ptr->key, node_ptr->key_len, mbd, node_ptr->overwrite);
380                     } catch (int err) {
381                         rval = err;
382                         Logger::Log(LOG_LEVEL_ERROR, "dict->Add throws error %s",
383                                 MBError::get_error_str(err));
384                     }
385                     break;
386                 case MABAIN_ASYNC_TYPE_REMOVE:
387                     // FIXME
388                     // Removing entries during rc is currently not supported.
389                     // The index or data index could have been reset in fucntion ResourceColletion::Finish.
390                     // However, the deletion may be run for some entry which still exist in the rc root tree.
391                     // This requires modifying buffer in high end, where the offset for writing is greather than
392                     // header->m_index_offset. This causes exception thrown from DictMem::WriteData.
393                     // Note this is not a problem for Dict::Add since Add does not modify buffers in high end.
394                     // This problem will be fixed when resolving issue: https://github.com/chxdeng/mabain/issues/21
395                     rval = MBError::SUCCESS;
396                     break;
397                 case MABAIN_ASYNC_TYPE_REMOVE_ALL:
398                     if(!rc_mode)
399                     {
400                         try {
401                             rval = dict->RemoveAll();
402                         } catch (int err) {
403                             Logger::Log(LOG_LEVEL_ERROR, "dict->Add throws error %s",
404                                         MBError::get_error_str(err));
405                             rval = err;
406                         }
407                     }
408                     else
409                     {
410                         rval = MBError::SUCCESS;
411                     }
412                     break;
413                 case MABAIN_ASYNC_TYPE_RC:
414                     // ignore rc task since it is running already.
415                     rval = MBError::RC_SKIPPED;
416                     break;
417                 case MABAIN_ASYNC_TYPE_NONE:
418                     rval = MBError::SUCCESS;
419                     break;
420                 case MABAIN_ASYNC_TYPE_BACKUP:
421                     // clean up existing backup dir varibale buffer.
422                     if (rc_backup_dir != NULL)
423                         free(rc_backup_dir);
424 #ifdef __SHM_QUEUE__
425                     rc_backup_dir = (char *) malloc(node_ptr->data_len+1);
426                     memcpy(rc_backup_dir, node_ptr->data, node_ptr->data_len);
427                     rc_backup_dir[node_ptr->data_len] = '\0';
428 #else
429                     rc_backup_dir = (char *) node_ptr->data;
430                     node_ptr->data = NULL;
431 #endif
432                     rval = MBError::SUCCESS;
433                     break;
434                 default:
435                     rval = MBError::INVALID_ARG;
436                     break;
437             }
438
439 #ifdef __SHM_QUEUE__
440             header->writer_index++;
441 #else
442             writer_index++;
443 #endif
444             free_async_node(node_ptr);
445             node_ptr->in_use.store(false, std::memory_order_release);
446             pthread_cond_signal(&node_ptr->cond);
447             mbd.Clear();
448             count++;
449         }
450         else
451         {
452             // done processing
453             count = ntasks;
454         }
455
456         if(pthread_mutex_unlock(&node_ptr->mutex) != 0)
457         {
458             Logger::Log(LOG_LEVEL_ERROR, "failed to unlock mutex");
459             throw (int) MBError::MUTEX_ERROR;
460         }
461
462         if(rval != MBError::SUCCESS)
463         {
464             Logger::Log(LOG_LEVEL_DEBUG, "failed to run update %d: %s",
465                         (int)node_ptr->type, MBError::get_error_str(rval));
466         }
467     }
468
469     if(stop_processing)
470         return MBError::RC_SKIPPED;
471     return MBError::SUCCESS;
472 }
473
474 #ifdef __SHM_QUEUE__
475 uint32_t AsyncWriter::NextShmSlot(uint32_t windex, uint32_t qindex)
476 {
477     int cnt = 0;
478     while(windex != qindex)
479     {
480         if(queue[windex % header->async_queue_size].in_use.load(std::memory_order_consume))
481             break;
482         if(++cnt > header->async_queue_size)
483         {
484             windex = qindex;
485             break;
486         }
487
488         windex++;
489     }
490
491     return windex;
492 }
493 #endif
494
495 void* AsyncWriter::async_writer_thread()
496 {
497     AsyncNode *node_ptr;
498     MBData mbd;
499     int rval;
500     int64_t min_index_size = 0;
501     int64_t min_data_size = 0;
502     int64_t max_dbsize = MAX_6B_OFFSET;
503     int64_t max_dbcount = MAX_6B_OFFSET;
504 #ifdef __SHM_QUEUE__
505     struct timespec tm_exp;
506     bool skip;
507 #endif
508
509     Logger::Log(LOG_LEVEL_INFO, "async writer started");
510     while(true)
511     {
512 #ifdef __SHM_QUEUE__
513         node_ptr = &queue[header->writer_index % header->async_queue_size];
514         tm_exp.tv_sec = time(NULL) + MB_ASYNC_SHM_LOCK_TMOUT;
515         tm_exp.tv_nsec = 0;
516         rval = pthread_mutex_timedlock(&node_ptr->mutex, &tm_exp);
517         if(rval == ETIMEDOUT)
518         {
519             Logger::Log(LOG_LEVEL_WARN, "async writer shared memory mutex lock timeout, need to re-intialize");
520             InitShmMutex(&node_ptr->mutex);
521             header->writer_index++;
522             continue;
523         }
524         else if(rval != 0)
525 #else
526         node_ptr = &queue[writer_index % MB_MAX_NUM_SHM_QUEUE_NODE];
527         if(pthread_mutex_lock(&node_ptr->mutex) != 0)
528 #endif
529         {
530             Logger::Log(LOG_LEVEL_ERROR, "async writer failed to lock shared memory mutex");
531             throw (int) MBError::MUTEX_ERROR;
532         }
533
534 #ifdef __SHM_QUEUE__
535         skip = false;
536 #endif
537         while(!node_ptr->in_use.load(std::memory_order_consume))
538         {
539             if(stop_processing)
540                 break;
541 #ifdef __SHM_QUEUE__
542             tm_exp.tv_sec = time(NULL) + MB_ASYNC_SHM_LOCK_TMOUT;
543             tm_exp.tv_nsec = 0;
544             pthread_cond_timedwait(&node_ptr->cond, &node_ptr->mutex, &tm_exp);
545
546             uint32_t windex = header->writer_index;
547             uint32_t qindex = header->queue_index.load(std::memory_order_consume);
548             if(windex != qindex)
549             {
550                 // Reader process may have exited unexpectedly. Recover index.
551                 skip = true;
552                 header->writer_index = NextShmSlot(windex, qindex);
553                 break;
554             }
555             continue;
556 #else
557             pthread_cond_wait(&node_ptr->cond, &node_ptr->mutex);
558 #endif
559         }
560
561 #ifdef __SHM_QUEUE__
562         if(skip || stop_processing)
563         {
564             if(pthread_mutex_unlock(&node_ptr->mutex) != 0)
565             {
566                 Logger::Log(LOG_LEVEL_ERROR, "async writer failed to unlock shared memory mutex");
567                 throw (int) MBError::MUTEX_ERROR;
568             }
569             if(stop_processing)
570                 break;
571             continue;
572         }
573 #else
574         if(stop_processing && !node_ptr->in_use.load(std::memory_order_consume))
575         {
576             pthread_mutex_unlock(&node_ptr->mutex);
577             break;
578         }
579 #endif
580         // process the node
581         switch(node_ptr->type)
582         {
583             case MABAIN_ASYNC_TYPE_ADD:
584                 mbd.buff = (uint8_t *) node_ptr->data;
585                 mbd.data_len = node_ptr->data_len;
586                 try {
587                     rval = dict->Add((uint8_t *)node_ptr->key, node_ptr->key_len, mbd,
588                                      node_ptr->overwrite);
589                 } catch (int err) {
590                     Logger::Log(LOG_LEVEL_ERROR, "dict->Add throws error %s",
591                                 MBError::get_error_str(err));
592                     rval = err;
593                 }
594                 break;
595             case MABAIN_ASYNC_TYPE_REMOVE:
596                 mbd.options |= CONSTS::OPTION_FIND_AND_STORE_PARENT;
597                 try {
598                     rval = dict->Remove((uint8_t *)node_ptr->key, node_ptr->key_len, mbd);
599                 } catch (int err) {
600                     Logger::Log(LOG_LEVEL_ERROR, "dict->Remmove throws error %s",
601                                 MBError::get_error_str(err));
602                     rval = err;
603                 }
604                 mbd.options &= ~CONSTS::OPTION_FIND_AND_STORE_PARENT;
605                 break;
606             case MABAIN_ASYNC_TYPE_REMOVE_ALL:
607                 try {
608                     rval = dict->RemoveAll();
609                 } catch (int err) {
610                     Logger::Log(LOG_LEVEL_ERROR, "dict->RemoveAll throws error %s",
611                                 MBError::get_error_str(err));
612                     rval = err;
613                 }
614                 break;
615             case MABAIN_ASYNC_TYPE_RC:
616                 rval = MBError::SUCCESS;
617 #ifdef __SHM_QUEUE__
618                 header->rc_flag.store(1, std::memory_order_release);
619 #else
620                 is_rc_running = true;
621 #endif
622                 {
623                     int64_t *data_ptr = reinterpret_cast<int64_t *>(node_ptr->data);
624                     min_index_size = data_ptr[0];
625                     min_data_size  = data_ptr[1];
626                     max_dbsize = data_ptr[2];
627                     max_dbcount = data_ptr[3];
628                 }
629                 break;
630             case MABAIN_ASYNC_TYPE_NONE:
631                 rval = MBError::SUCCESS;
632                 break;
633             case MABAIN_ASYNC_TYPE_BACKUP:
634                 try {
635                     DBBackup mbbk(*db);
636                     rval = mbbk.Backup((const char*) node_ptr->data);
637                 } catch (int error) {
638                     rval = error;
639                 }
640                     break;
641             default:
642                 rval = MBError::INVALID_ARG;
643                 break;
644         }
645
646 #ifdef __SHM_QUEUE__
647         header->writer_index++;
648 #else
649         writer_index++;
650 #endif
651         free_async_node(node_ptr);
652         node_ptr->in_use.store(false, std::memory_order_release);
653         pthread_cond_signal(&node_ptr->cond);
654         if(pthread_mutex_unlock(&node_ptr->mutex) != 0)
655         {
656             Logger::Log(LOG_LEVEL_ERROR, "failed to unlock mutex");
657             throw (int) MBError::MUTEX_ERROR;
658         }
659
660         if(rval != MBError::SUCCESS)
661         {
662             Logger::Log(LOG_LEVEL_DEBUG, "failed to run update %d: %s",
663                         (int)node_ptr->type, MBError::get_error_str(rval));
664         }
665
666         mbd.Clear();
667
668 #ifdef __SHM_QUEUE__
669         if (header->rc_flag.load(std::memory_order_consume) == 1)
670 #else
671         if(is_rc_running)
672 #endif
673         {
674             rval = MBError::SUCCESS;
675             try {
676                 ResourceCollection rc = ResourceCollection(*db);
677                 rc.ReclaimResource(min_index_size, min_data_size, max_dbsize, max_dbcount, this);
678             } catch (int error) {
679                 if(error != MBError::RC_SKIPPED)
680                     Logger::Log(LOG_LEVEL_WARN, "rc failed :%s", MBError::get_error_str(error));
681                 else
682                     rval = error;
683             }
684
685 #ifdef __SHM_QUEUE__
686             header->rc_flag.store(0, std::memory_order_release);
687 #else
688             is_rc_running = false;
689 #endif
690             if(rc_backup_dir != NULL)
691             {
692                 if(rval == MBError::SUCCESS)
693                 {
694 #ifdef __SHM_QUEUE__
695                     dict->SHMQ_Backup(rc_backup_dir);
696 #else
697                     Backup(rc_backup_dir);
698 #endif
699                 }
700                 free(rc_backup_dir);
701                 rc_backup_dir = NULL;
702             }
703         }
704     }
705
706     mbd.buff = NULL;
707     Logger::Log(LOG_LEVEL_INFO, "async writer exiting");
708     return NULL;
709 }
710
711 void* AsyncWriter::async_thread_wrapper(void *context)
712 {
713     AsyncWriter *instance_ptr = static_cast<AsyncWriter *>(context);
714     return instance_ptr->async_writer_thread();
715 }
716
717 }