update readme
[c11concurrency-benchmarks.git] / mabain / src / async_writer.h
1 /**
2  * Copyright (C) 2017 Cisco Inc.
3  *
4  * This program is free software: you can redistribute it and/or  modify
5  * it under the terms of the GNU General Public License, version 2,
6  * as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  */
16
17 // @author Changxue Deng <chadeng@cisco.com>
18
19 #ifndef __ASYNC_WRITER_H__
20 #define __ASYNC_WRITER_H__
21
22 #include <pthread.h>
23
24 #include "db.h"
25 //#include "mb_rc.h"
26 #include "dict.h"
27 #include "mb_backup.h"
28
29 namespace mabain {
30
31 #define MABAIN_ASYNC_TYPE_NONE       0
32 #define MABAIN_ASYNC_TYPE_ADD        1
33 #define MABAIN_ASYNC_TYPE_REMOVE     2
34 #define MABAIN_ASYNC_TYPE_REMOVE_ALL 3
35 #define MABAIN_ASYNC_TYPE_RC         4
36 #define MABAIN_ASYNC_TYPE_BACKUP     5
37
38 #define MB_ASYNC_SHM_KEY_SIZE      256
39 #define MB_ASYNC_SHM_DATA_SIZE     1024
40 #define MB_ASYNC_SHM_LOCK_TMOUT    5
41
42     
43 typedef struct _AsyncNode
44 {
45     std::atomic<bool> in_use;
46     pthread_mutex_t   mutex;
47     pthread_cond_t    cond;
48
49 #ifdef __SHM_QUEUE__
50     char key[MB_ASYNC_SHM_KEY_SIZE];
51     char data[MB_ASYNC_SHM_DATA_SIZE];
52 #else
53     char *key;
54     char *data;
55 #endif
56     int key_len;
57     int data_len;
58     bool overwrite;
59     char type;
60 } AsyncNode;
61
62 class AsyncWriter
63 {
64 public:
65
66     AsyncWriter(DB *db_ptr);
67     ~AsyncWriter();
68
69 #ifndef __SHM_QUEUE__
70     void UpdateNumUsers(int delta);
71     int  Add(const char *key, int key_len, const char *data, int data_len, bool overwrite);
72     int  Remove(const char *key, int len);
73     int  RemoveAll();
74     int  Backup(const char *backup_dir);
75     int  CollectResource(int64_t m_index_rc_size, int64_t m_data_rc_size, 
76                          int64_t max_dbsz, int64_t max_dbcnt);
77     bool Busy() const;
78 #endif
79
80     int  StopAsyncThread();
81     int  ProcessTask(int ntasks, bool rc_mode);
82
83 private:
84     static void *async_thread_wrapper(void *context);
85     AsyncNode* AcquireSlot();
86     int PrepareSlot(AsyncNode *node_ptr) const;
87     void* async_writer_thread();
88 #ifdef __SHM_QUEUE__
89     uint32_t NextShmSlot(uint32_t windex, uint32_t qindex);
90 #endif
91
92     // db pointer
93     DB *db;
94     Dict *dict;
95
96     // thread id
97     pthread_t tid;
98     bool stop_processing;
99
100     AsyncNode *queue;
101 #ifdef __SHM_QUEUE__
102     IndexHeader *header;
103 #else
104     std::atomic<int> num_users;
105     std::atomic<uint32_t> queue_index;
106     uint32_t writer_index;
107 #endif
108
109     bool is_rc_running;
110     char *rc_backup_dir;
111 };
112
113 }
114
115 #endif