2 * Copyright (C) 2018 Cisco Inc.
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2,
6 * as published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // @author Changxue Deng <chadeng@cisco.com>
23 #include "async_writer.h"
24 #include "./util/shm_mutex.h"
30 int Dict::SHMQ_Add(const char *key, int key_len, const char *data, int data_len,
33 if(key_len > MB_ASYNC_SHM_KEY_SIZE || data_len > MB_ASYNC_SHM_DATA_SIZE)
35 return MBError::OUT_OF_BOUND;
38 AsyncNode *node_ptr = SHMQ_AcquireSlot();
40 return MBError::MUTEX_ERROR;
42 memcpy(node_ptr->key, key, key_len);
43 memcpy(node_ptr->data, data, data_len);
44 node_ptr->key_len = key_len;
45 node_ptr->data_len = data_len;
46 node_ptr->overwrite = overwrite;
48 node_ptr->type = MABAIN_ASYNC_TYPE_ADD;
49 return SHMQ_PrepareSlot(node_ptr);
52 int Dict::SHMQ_Remove(const char *key, int len)
54 if(len > MB_ASYNC_SHM_KEY_SIZE)
55 return MBError::OUT_OF_BOUND;
57 AsyncNode *node_ptr = SHMQ_AcquireSlot();
59 return MBError::MUTEX_ERROR;
61 memcpy(node_ptr->key, key, len);
62 node_ptr->key_len = len;
63 node_ptr->type = MABAIN_ASYNC_TYPE_REMOVE;
64 return SHMQ_PrepareSlot(node_ptr);
67 int Dict::SHMQ_RemoveAll()
69 AsyncNode *node_ptr = SHMQ_AcquireSlot();
71 return MBError::MUTEX_ERROR;
73 node_ptr->type = MABAIN_ASYNC_TYPE_REMOVE_ALL;
74 return SHMQ_PrepareSlot(node_ptr);
77 int Dict::SHMQ_Backup(const char *backup_dir)
79 if(backup_dir == NULL)
80 return MBError::INVALID_ARG;
81 if(strlen(backup_dir) >= MB_ASYNC_SHM_DATA_SIZE)
82 return MBError::OUT_OF_BOUND;
84 AsyncNode *node_ptr = SHMQ_AcquireSlot();
86 return MBError::MUTEX_ERROR;
87 snprintf(node_ptr->data, MB_ASYNC_SHM_DATA_SIZE, "%s", backup_dir);
88 node_ptr->type = MABAIN_ASYNC_TYPE_BACKUP;
89 return SHMQ_PrepareSlot(node_ptr);
92 int Dict::SHMQ_CollectResource(int64_t m_index_rc_size,
93 int64_t m_data_rc_size,
97 AsyncNode *node_ptr = SHMQ_AcquireSlot();
99 return MBError::MUTEX_ERROR;
101 int64_t *data_ptr = (int64_t *) node_ptr->data;
102 node_ptr->data_len = sizeof(int64_t)*4;
103 data_ptr[0] = m_index_rc_size;
104 data_ptr[1] = m_data_rc_size;
105 data_ptr[2] = max_dbsz;
106 data_ptr[3] = max_dbcnt;
107 node_ptr->type = MABAIN_ASYNC_TYPE_RC;
109 return SHMQ_PrepareSlot(node_ptr);
112 AsyncNode* Dict::SHMQ_AcquireSlot() const
114 uint32_t index = header->queue_index.fetch_add(1, std::memory_order_release);
115 AsyncNode *node_ptr = queue + (index % header->async_queue_size);
117 struct timespec tm_exp;
118 tm_exp.tv_sec = time(NULL) + MB_ASYNC_SHM_LOCK_TMOUT;
120 int rval = pthread_mutex_timedlock(&node_ptr->mutex, &tm_exp);
123 Logger::Log(LOG_LEVEL_ERROR, "shared memory mutex lock failed: %d", rval);
127 while(node_ptr->in_use.load(std::memory_order_consume))
129 tm_exp.tv_sec += MB_ASYNC_SHM_LOCK_TMOUT;
130 rval = pthread_cond_timedwait(&node_ptr->cond, &node_ptr->mutex, &tm_exp);
133 Logger::Log(LOG_LEVEL_ERROR, "shared memory conditional wait failed: %d", rval);
134 if(rval == ETIMEDOUT)
136 Logger::Log(LOG_LEVEL_INFO, "pthread_cond_timedwait timedout, "
137 "check if async writer is running");
139 if((rval = pthread_mutex_unlock(&node_ptr->mutex)) != 0)
140 Logger::Log(LOG_LEVEL_ERROR, "shm mutex unlock failed: %d", rval);
148 int Dict::SHMQ_PrepareSlot(AsyncNode *node_ptr) const
150 node_ptr->in_use.store(true, std::memory_order_release);
151 pthread_cond_signal(&node_ptr->cond);
152 if(pthread_mutex_unlock(&node_ptr->mutex) != 0)
153 return MBError::MUTEX_ERROR;
155 return MBError::SUCCESS;
158 bool Dict::SHMQ_Busy() const
160 if((header->queue_index.load(std::memory_order_consume) != header->writer_index) || header->rc_flag == 1)
163 size_t rc_off = header->rc_root_offset.load(std::memory_order_consume);