update readme
[c11concurrency-benchmarks.git] / mabain / src / shmq_update.cpp
1 /**
2  * Copyright (C) 2018 Cisco Inc.
3  *
4  * This program is free software: you can redistribute it and/or  modify
5  * it under the terms of the GNU General Public License, version 2,
6  * as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  */
16
17 // @author Changxue Deng <chadeng@cisco.com>
18
19 #include <pthread.h>
20
21 #include "dict.h"
22 #include "error.h"
23 #include "async_writer.h"
24 #include "./util/shm_mutex.h"
25
26 #ifdef __SHM_QUEUE__
27
28 namespace mabain {
29
30 int Dict::SHMQ_Add(const char *key, int key_len, const char *data, int data_len,
31                    bool overwrite)
32 {
33     if(key_len > MB_ASYNC_SHM_KEY_SIZE || data_len > MB_ASYNC_SHM_DATA_SIZE)
34     {
35         return MBError::OUT_OF_BOUND;
36     }
37
38     AsyncNode *node_ptr = SHMQ_AcquireSlot();
39     if(node_ptr == NULL)
40         return MBError::MUTEX_ERROR;
41
42     memcpy(node_ptr->key, key, key_len);
43     memcpy(node_ptr->data, data, data_len);
44     node_ptr->key_len = key_len;
45     node_ptr->data_len = data_len;
46     node_ptr->overwrite = overwrite;
47
48     node_ptr->type = MABAIN_ASYNC_TYPE_ADD;
49     return SHMQ_PrepareSlot(node_ptr);
50 }
51
52 int Dict::SHMQ_Remove(const char *key, int len)
53 {
54     if(len > MB_ASYNC_SHM_KEY_SIZE)
55         return MBError::OUT_OF_BOUND;
56
57     AsyncNode *node_ptr = SHMQ_AcquireSlot();
58     if(node_ptr == NULL)
59         return MBError::MUTEX_ERROR;
60
61     memcpy(node_ptr->key, key, len);
62     node_ptr->key_len = len;
63     node_ptr->type = MABAIN_ASYNC_TYPE_REMOVE;
64     return SHMQ_PrepareSlot(node_ptr);
65 }
66
67 int Dict::SHMQ_RemoveAll()
68 {
69     AsyncNode *node_ptr = SHMQ_AcquireSlot();
70     if(node_ptr == NULL)
71         return MBError::MUTEX_ERROR;
72
73     node_ptr->type = MABAIN_ASYNC_TYPE_REMOVE_ALL;
74     return SHMQ_PrepareSlot(node_ptr);
75 }
76
77 int Dict::SHMQ_Backup(const char *backup_dir)
78 {
79     if(backup_dir == NULL)
80         return MBError::INVALID_ARG;
81     if(strlen(backup_dir) >= MB_ASYNC_SHM_DATA_SIZE)
82         return MBError::OUT_OF_BOUND;
83
84     AsyncNode *node_ptr = SHMQ_AcquireSlot();
85     if(node_ptr == NULL)
86         return MBError::MUTEX_ERROR;
87     snprintf(node_ptr->data, MB_ASYNC_SHM_DATA_SIZE, "%s", backup_dir);
88     node_ptr->type = MABAIN_ASYNC_TYPE_BACKUP;
89     return SHMQ_PrepareSlot(node_ptr);
90 }
91
92 int Dict::SHMQ_CollectResource(int64_t m_index_rc_size,
93                                int64_t m_data_rc_size,
94                                int64_t max_dbsz,
95                                int64_t max_dbcnt)
96 {
97     AsyncNode *node_ptr = SHMQ_AcquireSlot();
98     if(node_ptr == NULL)
99         return MBError::MUTEX_ERROR;
100
101     int64_t *data_ptr = (int64_t *) node_ptr->data;
102     node_ptr->data_len = sizeof(int64_t)*4;
103     data_ptr[0] = m_index_rc_size;
104     data_ptr[1] = m_data_rc_size;
105     data_ptr[2] = max_dbsz;
106     data_ptr[3] = max_dbcnt;
107     node_ptr->type = MABAIN_ASYNC_TYPE_RC;
108
109     return SHMQ_PrepareSlot(node_ptr);
110 }
111
112 AsyncNode* Dict::SHMQ_AcquireSlot() const
113 {
114     uint32_t index = header->queue_index.fetch_add(1, std::memory_order_release);
115     AsyncNode *node_ptr = queue + (index % header->async_queue_size);
116
117     struct timespec tm_exp;
118     tm_exp.tv_sec = time(NULL) + MB_ASYNC_SHM_LOCK_TMOUT;
119     tm_exp.tv_nsec = 0;
120     int rval = pthread_mutex_timedlock(&node_ptr->mutex, &tm_exp);
121     if(rval != 0)
122     {
123         Logger::Log(LOG_LEVEL_ERROR, "shared memory mutex lock failed: %d", rval);
124         return NULL;
125     }
126
127     while(node_ptr->in_use.load(std::memory_order_consume))
128     {
129         tm_exp.tv_sec += MB_ASYNC_SHM_LOCK_TMOUT;
130         rval =  pthread_cond_timedwait(&node_ptr->cond, &node_ptr->mutex, &tm_exp);
131         if(rval != 0)
132         {
133             Logger::Log(LOG_LEVEL_ERROR, "shared memory conditional wait failed: %d", rval);
134             if(rval == ETIMEDOUT)
135             {
136                  Logger::Log(LOG_LEVEL_INFO, "pthread_cond_timedwait timedout, "
137                              "check if async writer is running");
138             }
139             if((rval = pthread_mutex_unlock(&node_ptr->mutex)) != 0)
140                 Logger::Log(LOG_LEVEL_ERROR, "shm mutex unlock failed: %d", rval);
141             return NULL;
142         }
143     }
144
145     return node_ptr;
146 }
147
148 int Dict::SHMQ_PrepareSlot(AsyncNode *node_ptr) const
149 {
150     node_ptr->in_use.store(true, std::memory_order_release);
151     pthread_cond_signal(&node_ptr->cond);
152     if(pthread_mutex_unlock(&node_ptr->mutex) != 0)
153         return MBError::MUTEX_ERROR;
154
155     return MBError::SUCCESS;
156 }
157
158 bool Dict::SHMQ_Busy() const
159 {
160     if((header->queue_index.load(std::memory_order_consume) != header->writer_index) || header->rc_flag == 1)
161         return true;
162
163     size_t rc_off = header->rc_root_offset.load(std::memory_order_consume);
164     return rc_off != 0;
165 }
166
167 }
168
169 #endif