2 * Copyright (C) 2018 Cisco Inc.
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2,
6 * as published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // @author Changxue Deng <chadeng@cisco.com>
22 #include "async_writer.h"
27 #include "integer_4b_5b.h"
29 #include "./util/shm_mutex.h"
33 static void free_async_node(AsyncNode *node_ptr)
36 if(node_ptr->key != NULL)
40 node_ptr->key_len = 0;
43 if(node_ptr->data != NULL)
46 node_ptr->data = NULL;
47 node_ptr->data_len = 0;
51 node_ptr->type = MABAIN_ASYNC_TYPE_NONE;
54 AsyncWriter::AsyncWriter(DB *db_ptr)
57 stop_processing(false),
68 if(!(db_ptr->GetDBOptions() & CONSTS::ACCESS_MODE_WRITER))
69 throw (int) MBError::NOT_ALLOWED;
71 throw (int) MBError::INVALID_ARG;
72 dict = db->GetDictPtr();
74 throw (int) MBError::NOT_INITIALIZED;
77 // initialize shared memory queue pointer
78 header = dict->GetHeaderPtr();
80 throw (int) MBError::NOT_INITIALIZED;
81 char *hdr_ptr = (char *) header;
82 queue = reinterpret_cast<AsyncNode *>(hdr_ptr + RollableFile::page_size);
83 header->rc_flag.store(0, std::memory_order_release);
85 queue = new AsyncNode[MB_MAX_NUM_SHM_QUEUE_NODE];
86 memset(queue, 0, MB_MAX_NUM_SHM_QUEUE_NODE * sizeof(AsyncNode));
87 for(int i = 0; i < MB_MAX_NUM_SHM_QUEUE_NODE; i++)
89 queue[i].in_use.store(false, std::memory_order_release);
90 if(pthread_mutex_init(&queue[i].mutex, NULL) != 0)
92 Logger::Log(LOG_LEVEL_ERROR, "failed to init mutex");
93 throw (int) MBError::MUTEX_ERROR;
95 if(pthread_cond_init(&queue[i].cond, NULL) != 0)
97 Logger::Log(LOG_LEVEL_ERROR, "failed to init conditional variable");
98 throw (int) MBError::MUTEX_ERROR;
101 is_rc_running = false;
104 rc_backup_dir = NULL;
106 if(pthread_create(&tid, NULL, async_thread_wrapper, this) != 0)
108 Logger::Log(LOG_LEVEL_ERROR, "failed to create async thread");
110 throw (int) MBError::THREAD_FAILED;
115 AsyncWriter::~AsyncWriter()
119 #ifndef __SHM_QUEUE__
120 void AsyncWriter::UpdateNumUsers(int delta)
123 num_users.fetch_add(1, std::memory_order_release);
125 num_users.fetch_sub(1, std::memory_order_release);
129 int AsyncWriter::StopAsyncThread()
131 #ifndef __SHM_QUEUE__
132 if(num_users.load(std::memory_order_consume) > 0)
134 Logger::Log(LOG_LEVEL_ERROR, "still being used, cannot shutdown async thread");
138 stop_processing = true;
140 #ifndef __SHM_QUEUE__
141 for(int i = 0; i < MB_MAX_NUM_SHM_QUEUE_NODE; i++)
143 pthread_cond_signal(&queue[i].cond);
149 Logger::Log(LOG_LEVEL_INFO, "joining async writer thread");
150 pthread_join(tid, NULL);
153 #ifndef __SHM_QUEUE__
154 for(int i = 0; i < MB_MAX_NUM_SHM_QUEUE_NODE; i++)
156 pthread_mutex_destroy(&queue[i].mutex);
157 pthread_cond_destroy(&queue[i].cond);
164 return MBError::SUCCESS;
167 #ifndef __SHM_QUEUE__
168 // Check if async tasks are completed.
169 bool AsyncWriter::Busy() const
171 uint32_t index = queue_index.load(std::memory_order_consume);
172 return index != writer_index || is_rc_running;
176 #ifndef __SHM_QUEUE__
177 AsyncNode* AsyncWriter::AcquireSlot()
179 uint32_t index = queue_index.fetch_add(1, std::memory_order_release);
180 AsyncNode *node_ptr = queue + (index % MB_MAX_NUM_SHM_QUEUE_NODE);
182 if(pthread_mutex_lock(&node_ptr->mutex) != 0)
184 Logger::Log(LOG_LEVEL_ERROR, "failed to lock mutex");
188 while(node_ptr->in_use.load(std::memory_order_consume))
190 pthread_cond_wait(&node_ptr->cond, &node_ptr->mutex);
197 #ifndef __SHM_QUEUE__
198 int AsyncWriter::PrepareSlot(AsyncNode *node_ptr) const
200 node_ptr->in_use.store(true, std::memory_order_release);
201 pthread_cond_signal(&node_ptr->cond);
202 if(pthread_mutex_unlock(&node_ptr->mutex) != 0)
203 return MBError::MUTEX_ERROR;
204 return MBError::SUCCESS;
208 #ifndef __SHM_QUEUE__
209 int AsyncWriter::Add(const char *key, int key_len, const char *data,
210 int data_len, bool overwrite)
213 return MBError::DB_CLOSED;
215 AsyncNode *node_ptr = AcquireSlot();
217 return MBError::MUTEX_ERROR;
219 node_ptr->key = (char *) malloc(key_len);
220 node_ptr->data = (char *) malloc(data_len);
221 if(node_ptr->key == NULL || node_ptr->data == NULL)
223 pthread_mutex_unlock(&node_ptr->mutex);
224 free_async_node(node_ptr);
225 return MBError::NO_MEMORY;
227 memcpy(node_ptr->key, key, key_len);
228 memcpy(node_ptr->data, data, data_len);
229 node_ptr->key_len = key_len;
230 node_ptr->data_len = data_len;
231 node_ptr->overwrite = overwrite;
233 node_ptr->type = MABAIN_ASYNC_TYPE_ADD;
236 return PrepareSlot(node_ptr);
240 #ifndef __SHM_QUEUE__
241 int AsyncWriter::Remove(const char *key, int len)
244 return MBError::DB_CLOSED;
246 AsyncNode *node_ptr = AcquireSlot();
248 return MBError::MUTEX_ERROR;
250 node_ptr->key = (char *) malloc(len);
251 if(node_ptr->key == NULL)
253 pthread_mutex_unlock(&node_ptr->mutex);
254 free_async_node(node_ptr);
255 return MBError::NO_MEMORY;
257 memcpy(node_ptr->key, key, len);
258 node_ptr->key_len = len;
259 node_ptr->type = MABAIN_ASYNC_TYPE_REMOVE;
261 return PrepareSlot(node_ptr);
265 #ifndef __SHM_QUEUE__
266 int AsyncWriter::Backup(const char *backup_dir)
268 if(backup_dir == NULL)
269 return MBError::INVALID_ARG;
272 return MBError::DB_CLOSED;
274 AsyncNode *node_ptr = AcquireSlot();
276 return MBError::MUTEX_ERROR;
278 node_ptr->data = (char *) strdup(backup_dir);
279 if(node_ptr->data == NULL)
281 pthread_mutex_unlock(&node_ptr->mutex);
282 free_async_node(node_ptr);
283 return MBError::NO_MEMORY;
285 node_ptr->type = MABAIN_ASYNC_TYPE_BACKUP;
286 return PrepareSlot(node_ptr);
290 #ifndef __SHM_QUEUE__
291 int AsyncWriter::RemoveAll()
294 return MBError::DB_CLOSED;
296 AsyncNode *node_ptr = AcquireSlot();
298 return MBError::MUTEX_ERROR;
300 node_ptr->type = MABAIN_ASYNC_TYPE_REMOVE_ALL;
302 return PrepareSlot(node_ptr);
306 #ifndef __SHM_QUEUE__
307 int AsyncWriter::CollectResource(int64_t m_index_rc_size, int64_t m_data_rc_size,
308 int64_t max_dbsz, int64_t max_dbcnt)
311 return MBError::DB_CLOSED;
313 AsyncNode *node_ptr = AcquireSlot();
315 return MBError::MUTEX_ERROR;
317 int64_t *data_ptr = (int64_t *) calloc(4, sizeof(int64_t));
318 if (data_ptr == NULL)
319 return MBError::NO_MEMORY;
321 node_ptr->data = (char *) data_ptr;
322 node_ptr->data_len = sizeof(int64_t)*4;
323 data_ptr[0] = m_index_rc_size;
324 data_ptr[1] = m_data_rc_size;
325 data_ptr[2] = max_dbsz;
326 data_ptr[3] = max_dbcnt;
327 node_ptr->type = MABAIN_ASYNC_TYPE_RC;
329 return PrepareSlot(node_ptr);
333 // Run a given number of tasks if they are available.
334 // This function should only be called by rc or pruner.
335 int AsyncWriter::ProcessTask(int ntasks, bool rc_mode)
339 int rval = MBError::SUCCESS;
342 struct timespec tm_exp;
345 while(count < ntasks)
348 node_ptr = &queue[header->writer_index % header->async_queue_size];
349 tm_exp.tv_sec = time(NULL) + MB_ASYNC_SHM_LOCK_TMOUT;
351 rval = pthread_mutex_timedlock(&node_ptr->mutex, &tm_exp);
352 if(rval == ETIMEDOUT)
354 Logger::Log(LOG_LEVEL_WARN, "mutex lock timeout, need to re-intialize");
355 InitShmMutex(&node_ptr->mutex);
361 node_ptr = &queue[writer_index % MB_MAX_NUM_SHM_QUEUE_NODE];
362 if(pthread_mutex_lock(&node_ptr->mutex) != 0)
365 Logger::Log(LOG_LEVEL_ERROR, "failed to lock mutex");
366 throw (int) MBError::MUTEX_ERROR;
369 if(node_ptr->in_use.load(std::memory_order_consume))
371 switch(node_ptr->type)
373 case MABAIN_ASYNC_TYPE_ADD:
375 mbd.options = CONSTS::OPTION_RC_MODE;
376 mbd.buff = (uint8_t *) node_ptr->data;
377 mbd.data_len = node_ptr->data_len;
379 rval = dict->Add((uint8_t *)node_ptr->key, node_ptr->key_len, mbd, node_ptr->overwrite);
382 Logger::Log(LOG_LEVEL_ERROR, "dict->Add throws error %s",
383 MBError::get_error_str(err));
386 case MABAIN_ASYNC_TYPE_REMOVE:
388 // Removing entries during rc is currently not supported.
389 // The index or data index could have been reset in fucntion ResourceColletion::Finish.
390 // However, the deletion may be run for some entry which still exist in the rc root tree.
391 // This requires modifying buffer in high end, where the offset for writing is greather than
392 // header->m_index_offset. This causes exception thrown from DictMem::WriteData.
393 // Note this is not a problem for Dict::Add since Add does not modify buffers in high end.
394 // This problem will be fixed when resolving issue: https://github.com/chxdeng/mabain/issues/21
395 rval = MBError::SUCCESS;
397 case MABAIN_ASYNC_TYPE_REMOVE_ALL:
401 rval = dict->RemoveAll();
403 Logger::Log(LOG_LEVEL_ERROR, "dict->Add throws error %s",
404 MBError::get_error_str(err));
410 rval = MBError::SUCCESS;
413 case MABAIN_ASYNC_TYPE_RC:
414 // ignore rc task since it is running already.
415 rval = MBError::RC_SKIPPED;
417 case MABAIN_ASYNC_TYPE_NONE:
418 rval = MBError::SUCCESS;
420 case MABAIN_ASYNC_TYPE_BACKUP:
421 // clean up existing backup dir varibale buffer.
422 if (rc_backup_dir != NULL)
425 rc_backup_dir = (char *) malloc(node_ptr->data_len+1);
426 memcpy(rc_backup_dir, node_ptr->data, node_ptr->data_len);
427 rc_backup_dir[node_ptr->data_len] = '\0';
429 rc_backup_dir = (char *) node_ptr->data;
430 node_ptr->data = NULL;
432 rval = MBError::SUCCESS;
435 rval = MBError::INVALID_ARG;
440 header->writer_index++;
444 free_async_node(node_ptr);
445 node_ptr->in_use.store(false, std::memory_order_release);
446 pthread_cond_signal(&node_ptr->cond);
456 if(pthread_mutex_unlock(&node_ptr->mutex) != 0)
458 Logger::Log(LOG_LEVEL_ERROR, "failed to unlock mutex");
459 throw (int) MBError::MUTEX_ERROR;
462 if(rval != MBError::SUCCESS)
464 Logger::Log(LOG_LEVEL_DEBUG, "failed to run update %d: %s",
465 (int)node_ptr->type, MBError::get_error_str(rval));
470 return MBError::RC_SKIPPED;
471 return MBError::SUCCESS;
475 uint32_t AsyncWriter::NextShmSlot(uint32_t windex, uint32_t qindex)
478 while(windex != qindex)
480 if(queue[windex % header->async_queue_size].in_use.load(std::memory_order_consume))
482 if(++cnt > header->async_queue_size)
495 void* AsyncWriter::async_writer_thread()
500 int64_t min_index_size = 0;
501 int64_t min_data_size = 0;
502 int64_t max_dbsize = MAX_6B_OFFSET;
503 int64_t max_dbcount = MAX_6B_OFFSET;
505 struct timespec tm_exp;
509 Logger::Log(LOG_LEVEL_INFO, "async writer started");
513 node_ptr = &queue[header->writer_index % header->async_queue_size];
514 tm_exp.tv_sec = time(NULL) + MB_ASYNC_SHM_LOCK_TMOUT;
516 rval = pthread_mutex_timedlock(&node_ptr->mutex, &tm_exp);
517 if(rval == ETIMEDOUT)
519 Logger::Log(LOG_LEVEL_WARN, "async writer shared memory mutex lock timeout, need to re-intialize");
520 InitShmMutex(&node_ptr->mutex);
521 header->writer_index++;
526 node_ptr = &queue[writer_index % MB_MAX_NUM_SHM_QUEUE_NODE];
527 if(pthread_mutex_lock(&node_ptr->mutex) != 0)
530 Logger::Log(LOG_LEVEL_ERROR, "async writer failed to lock shared memory mutex");
531 throw (int) MBError::MUTEX_ERROR;
537 while(!node_ptr->in_use.load(std::memory_order_consume))
542 tm_exp.tv_sec = time(NULL) + MB_ASYNC_SHM_LOCK_TMOUT;
544 pthread_cond_timedwait(&node_ptr->cond, &node_ptr->mutex, &tm_exp);
546 uint32_t windex = header->writer_index;
547 uint32_t qindex = header->queue_index.load(std::memory_order_consume);
550 // Reader process may have exited unexpectedly. Recover index.
552 header->writer_index = NextShmSlot(windex, qindex);
557 pthread_cond_wait(&node_ptr->cond, &node_ptr->mutex);
562 if(skip || stop_processing)
564 if(pthread_mutex_unlock(&node_ptr->mutex) != 0)
566 Logger::Log(LOG_LEVEL_ERROR, "async writer failed to unlock shared memory mutex");
567 throw (int) MBError::MUTEX_ERROR;
574 if(stop_processing && !node_ptr->in_use.load(std::memory_order_consume))
576 pthread_mutex_unlock(&node_ptr->mutex);
581 switch(node_ptr->type)
583 case MABAIN_ASYNC_TYPE_ADD:
584 mbd.buff = (uint8_t *) node_ptr->data;
585 mbd.data_len = node_ptr->data_len;
587 rval = dict->Add((uint8_t *)node_ptr->key, node_ptr->key_len, mbd,
588 node_ptr->overwrite);
590 Logger::Log(LOG_LEVEL_ERROR, "dict->Add throws error %s",
591 MBError::get_error_str(err));
595 case MABAIN_ASYNC_TYPE_REMOVE:
596 mbd.options |= CONSTS::OPTION_FIND_AND_STORE_PARENT;
598 rval = dict->Remove((uint8_t *)node_ptr->key, node_ptr->key_len, mbd);
600 Logger::Log(LOG_LEVEL_ERROR, "dict->Remmove throws error %s",
601 MBError::get_error_str(err));
604 mbd.options &= ~CONSTS::OPTION_FIND_AND_STORE_PARENT;
606 case MABAIN_ASYNC_TYPE_REMOVE_ALL:
608 rval = dict->RemoveAll();
610 Logger::Log(LOG_LEVEL_ERROR, "dict->RemoveAll throws error %s",
611 MBError::get_error_str(err));
615 case MABAIN_ASYNC_TYPE_RC:
616 rval = MBError::SUCCESS;
618 header->rc_flag.store(1, std::memory_order_release);
620 is_rc_running = true;
623 int64_t *data_ptr = reinterpret_cast<int64_t *>(node_ptr->data);
624 min_index_size = data_ptr[0];
625 min_data_size = data_ptr[1];
626 max_dbsize = data_ptr[2];
627 max_dbcount = data_ptr[3];
630 case MABAIN_ASYNC_TYPE_NONE:
631 rval = MBError::SUCCESS;
633 case MABAIN_ASYNC_TYPE_BACKUP:
636 rval = mbbk.Backup((const char*) node_ptr->data);
637 } catch (int error) {
642 rval = MBError::INVALID_ARG;
647 header->writer_index++;
651 free_async_node(node_ptr);
652 node_ptr->in_use.store(false, std::memory_order_release);
653 pthread_cond_signal(&node_ptr->cond);
654 if(pthread_mutex_unlock(&node_ptr->mutex) != 0)
656 Logger::Log(LOG_LEVEL_ERROR, "failed to unlock mutex");
657 throw (int) MBError::MUTEX_ERROR;
660 if(rval != MBError::SUCCESS)
662 Logger::Log(LOG_LEVEL_DEBUG, "failed to run update %d: %s",
663 (int)node_ptr->type, MBError::get_error_str(rval));
669 if (header->rc_flag.load(std::memory_order_consume) == 1)
674 rval = MBError::SUCCESS;
676 ResourceCollection rc = ResourceCollection(*db);
677 rc.ReclaimResource(min_index_size, min_data_size, max_dbsize, max_dbcount, this);
678 } catch (int error) {
679 if(error != MBError::RC_SKIPPED)
680 Logger::Log(LOG_LEVEL_WARN, "rc failed :%s", MBError::get_error_str(error));
686 header->rc_flag.store(0, std::memory_order_release);
688 is_rc_running = false;
690 if(rc_backup_dir != NULL)
692 if(rval == MBError::SUCCESS)
695 dict->SHMQ_Backup(rc_backup_dir);
697 Backup(rc_backup_dir);
701 rc_backup_dir = NULL;
707 Logger::Log(LOG_LEVEL_INFO, "async writer exiting");
711 void* AsyncWriter::async_thread_wrapper(void *context)
713 AsyncWriter *instance_ptr = static_cast<AsyncWriter *>(context);
714 return instance_ptr->async_writer_thread();