benchmarks compiles with clang
[c11concurrency-benchmarks.git] / mabain / src / rollable_file.cpp
1 /**
2  * Copyright (C) 2017 Cisco Inc.
3  *
4  * This program is free software: you can redistribute it and/or  modify
5  * it under the terms of the GNU General Public License, version 2,
6  * as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15  */
16
17 // @author Changxue Deng <chadeng@cisco.com>
18
19 #include <sstream>
20 #include <cstdlib>
21 #include <stdint.h>
22 #include <string.h>
23 #include <sys/types.h>
24 #include <sys/mman.h>
25 #include <sys/stat.h>
26 #include <fcntl.h>
27 #include <assert.h>
28 #include <errno.h>
29 #include <climits>
30
31 #include "db.h"
32 #include "rollable_file.h"
33 #include "logger.h"
34 #include "error.h"
35 #include "resource_pool.h"
36
37 namespace mabain {
38
39 #define SLIDING_MEM_SIZE     16LLU*1024*1024    // 16M
40 #define MAX_NUM_BLOCK        2*1024             // 2K
41 #define RC_OFFSET_PERCENTAGE 75                 // default rc offset is placed at 75% of maximum size
42
43 const long RollableFile::page_size = sysconf(_SC_PAGESIZE);
44 int RollableFile::ShmSync(uint8_t *addr, int size)
45 {
46     off_t page_offset = ((off_t) addr) % RollableFile::page_size;
47     return msync(addr-page_offset, size+page_offset, MS_SYNC);
48 }
49
50 RollableFile::RollableFile(const std::string &fpath, size_t blocksize, size_t memcap, int access_mode,
51                             long max_block, int in_rc_offset_percentage)
52           : path(fpath),
53             block_size(blocksize),
54             mmap_mem(memcap),
55             sliding_mmap(access_mode & CONSTS::USE_SLIDING_WINDOW),
56             mode(access_mode),
57             max_num_block(max_block),
58             rc_offset_percentage(in_rc_offset_percentage),
59             mem_used(0)
60 {
61     sliding_addr = NULL;
62     sliding_mem_size = SLIDING_MEM_SIZE;
63     sliding_size = 0;
64     sliding_start = 0;
65     sliding_map_off = 0;
66     shm_sliding_start_ptr = NULL;
67
68     if(mode & CONSTS::ACCESS_MODE_WRITER)
69     {
70         if(max_num_block == 0 || max_num_block > MAX_NUM_BLOCK)
71             max_num_block = MAX_NUM_BLOCK;
72
73         Logger::Log(LOG_LEVEL_INFO, "maximal block number for %s is %d",
74                     fpath.c_str(), max_num_block);
75
76         if(rc_offset_percentage == 0 || rc_offset_percentage > 100 || rc_offset_percentage < 50)
77             rc_offset_percentage = RC_OFFSET_PERCENTAGE;
78
79         Logger::Log(LOG_LEVEL_INFO, "rc_offset_percentage is set to %d", rc_offset_percentage);
80     }
81
82     Logger::Log(LOG_LEVEL_INFO, "opening rollable file %s for %s, mmap size: %d",
83             path.c_str(), (mode & CONSTS::ACCESS_MODE_WRITER)?"writing":"reading", mmap_mem);
84     if(!sliding_mmap)
85     {
86         Logger::Log(LOG_LEVEL_INFO, "sliding mmap is turned off for " + fpath);
87     }
88     else
89     {
90         // page_size is used to check page alignment when mapping file to memory.
91         if(RollableFile::page_size < 0)
92         {
93             Logger::Log(LOG_LEVEL_WARN, "failed to get page size, turning off sliding memory");
94             sliding_mmap = false;
95         }
96         else
97         {
98             Logger::Log(LOG_LEVEL_INFO, "sliding mmap is turned on for " + fpath);
99         }
100     }
101
102     files.assign(3, NULL);
103     if(mode & CONSTS::SYNC_ON_WRITE)
104         Logger::Log(LOG_LEVEL_INFO, "Sync is turned on for " + fpath);
105 }
106
107 void RollableFile::InitShmSlidingAddr(std::atomic<size_t> *shm_sliding_addr)
108 {
109     shm_sliding_start_ptr = shm_sliding_addr;
110 #ifdef __DEBUG__
111     assert(shm_sliding_start_ptr != NULL);
112 #endif
113 }
114
115 void RollableFile::Close()
116 {
117     if(sliding_addr != NULL)
118     {
119         munmap(sliding_addr, sliding_size);
120         sliding_addr = NULL;
121     }
122 }
123
124 RollableFile::~RollableFile()
125 {
126     Close();
127 }
128
129 int RollableFile::OpenAndMapBlockFile(size_t block_order, bool create_file)
130 {
131     if(block_order >= max_num_block)
132     {
133         int level = LOG_LEVEL_DEBUG;
134         if(mode & CONSTS::ACCESS_MODE_WRITER)
135             level = LOG_LEVEL_WARN;
136         Logger::Log(level, "block number %d ovferflow", block_order);
137         return MBError::NO_RESOURCE;
138     }
139
140     int rval = MBError::SUCCESS;
141     std::stringstream ss;
142     ss << block_order;
143 #ifdef __DEBUG__
144     assert(files[block_order] == NULL);
145 #endif
146
147     bool map_file;
148     if(mmap_mem > mem_used)
149         map_file = true; 
150     else
151         map_file = false;
152     if(!map_file && (mode & CONSTS::MEMORY_ONLY_MODE))
153         return MBError::NO_MEMORY;
154
155     files[block_order] = ResourcePool::getInstance().OpenFile(path+ss.str(),
156                                                               mode,
157                                                               block_size,
158                                                               map_file,
159                                                               create_file);
160     if(map_file)
161         mem_used += block_size;
162     else if(mode & CONSTS::MEMORY_ONLY_MODE)
163         rval = MBError::MMAP_FAILED;
164     return rval;
165 }
166
167 // Need to make sure the required size at offset is aligned with
168 // block_size and mmap_size. We should not write the size in two
169 // different blocks or one in mmaped region and the other one on disk.
170 size_t RollableFile::CheckAlignment(size_t offset, int size)
171 {
172     size_t block_offset = offset % block_size;
173
174     if(block_offset + size > block_size)
175     {
176         // Start at the begining of the next block
177         offset = offset + block_size - block_offset;
178     }
179
180     return offset;
181 }
182
183 int RollableFile::CheckAndOpenFile(size_t order, bool create_file)
184 {
185     int rval = MBError::SUCCESS;
186
187     if(order >= static_cast<size_t>(files.size()))
188         files.resize(order+3, NULL);
189
190     if(files[order] == NULL)
191         rval = OpenAndMapBlockFile(order, create_file);
192
193     return rval;
194 }
195
196 // Get shared memory address for existing buffer
197 // No need to check alignment
198 uint8_t* RollableFile::GetShmPtr(size_t offset, int size)
199 {
200     size_t order = offset / block_size;
201     int rval = CheckAndOpenFile(order, false);
202
203     if(rval != MBError::SUCCESS)
204         return NULL;
205
206     if(files[order]->IsMapped())
207     {
208         size_t index = offset % block_size;
209         return files[order]->GetMapAddr() + index;
210     }
211
212     if(sliding_mmap)
213     {
214         if(static_cast<off_t>(offset) >= sliding_start &&
215                offset + size <= sliding_start + sliding_size)
216         {
217             if(sliding_addr != NULL)
218                 return sliding_addr + (offset % block_size) - sliding_map_off;
219         }
220     }
221
222     return NULL;
223 }
224
225 int RollableFile::Reserve(size_t &offset, int size, uint8_t* &ptr, bool map_new_sliding)
226 {
227     int rval;
228     ptr = NULL;
229     offset = CheckAlignment(offset, size);
230
231     size_t order = offset / block_size;
232     rval = CheckAndOpenFile(order, true);
233     if(rval != MBError::SUCCESS)
234         return rval;
235
236     if(files[order]->IsMapped())
237     {
238         size_t index = offset % block_size;
239         ptr = files[order]->GetMapAddr() + index;
240         return rval;
241     }
242
243     if(sliding_mmap)
244     {
245         if(static_cast<off_t>(offset) >= sliding_start &&
246                offset + size <= sliding_start + sliding_size)
247         {
248             if(sliding_addr != NULL)
249                 ptr = sliding_addr + (offset % block_size) - sliding_map_off;
250         }
251         else if(map_new_sliding && offset >= sliding_start + sliding_size)
252         {
253             ptr = NewSlidingMapAddr(order, offset, size);
254             if(ptr != NULL)
255             {
256                 // Load the mmap starting offset to shared memory so that readers
257                 // can map the same region when reading it.
258                 shm_sliding_start_ptr->store(sliding_start, std::memory_order_relaxed);
259             }
260         }
261     }
262
263     return rval;
264 }
265
266 uint8_t* RollableFile::NewSlidingMapAddr(size_t order, size_t offset, int size)
267 {
268     if(sliding_addr != NULL)
269     {
270         // No need to call msync since munmap will write all memory
271         // update to disk
272         //msync(sliding_addr, sliding_size, MS_SYNC);
273         munmap(sliding_addr, sliding_size);
274     }
275
276     if(sliding_start == 0)
277     {
278         sliding_start = offset;
279     }
280     else
281     {
282         sliding_start += sliding_size;
283     }
284
285     // Check page alignment
286     int page_alignment = sliding_start % RollableFile::page_size;
287     if(page_alignment != 0)
288     {
289         sliding_start -= page_alignment;
290         if(sliding_start < 0)
291             sliding_start = 0;
292     }
293     sliding_map_off = sliding_start % block_size;
294     if(sliding_map_off + sliding_mem_size > block_size)
295     {
296          sliding_size = block_size - sliding_map_off;
297     }
298     else
299     {
300          sliding_size = sliding_mem_size;
301     }
302
303     order = sliding_start / block_size;
304     sliding_addr = files[order]->MapFile(sliding_size, sliding_map_off, true);
305     if(sliding_addr != NULL)
306     {
307         if(static_cast<off_t>(offset) >= sliding_start &&
308                           offset+size <= sliding_start+sliding_size)
309             return sliding_addr + (offset % block_size) - sliding_map_off;
310     }
311     else
312     {
313         Logger::Log(LOG_LEVEL_WARN, "last mmap failed, disable sliding mmap");
314         sliding_mmap = false;
315     }
316
317     return NULL;
318 }
319
320 size_t RollableFile::RandomWrite(const void *data, size_t size, off_t offset)
321 {
322     size_t order = offset / block_size;
323     int rval = CheckAndOpenFile(order, false);
324     if(rval != MBError::SUCCESS)
325         return 0;
326
327     // Check sliding map
328     if(sliding_mmap && sliding_addr != NULL)
329     {
330         if(offset >= sliding_start && offset+size <= sliding_start+sliding_size)
331         {
332             uint8_t *start_addr = sliding_addr + (offset % block_size) - sliding_map_off;
333             memcpy(start_addr, data, size);
334             if(mode & CONSTS::SYNC_ON_WRITE)
335             {
336                 off_t page_off = ((off_t) start_addr) % RollableFile::page_size;
337                 if(msync(start_addr-page_off, size+page_off, MS_SYNC) == -1)
338                     std::cout<<"msync error\n";
339             }
340             return size;
341         }
342     }
343
344     int index = offset % block_size;
345     return files[order]->RandomWrite(data, size, index);
346 }
347
348 void* RollableFile::NewReaderSlidingMap(size_t order)
349 {
350     off_t start_off = shm_sliding_start_ptr->load(std::memory_order_relaxed);
351     if(start_off == 0 || start_off == sliding_start || start_off/block_size != (unsigned)order)
352         return NULL;
353
354     if(sliding_addr != NULL)
355         munmap(sliding_addr, sliding_size);
356     sliding_start = start_off;
357     sliding_map_off = sliding_start % block_size;
358     if(sliding_map_off + SLIDING_MEM_SIZE > block_size)
359     {
360         sliding_size = block_size - sliding_map_off;
361     }
362     else
363     {
364         sliding_size = SLIDING_MEM_SIZE;
365     }
366
367     sliding_addr = files[order]->MapFile(sliding_size, sliding_map_off, true);
368     return sliding_addr;
369 }
370
371 size_t RollableFile::RandomRead(void *buff, size_t size, off_t offset)
372 {
373     size_t order = offset / block_size;
374
375     int rval = CheckAndOpenFile(order, false);
376     if(rval != MBError::SUCCESS && rval != MBError::MMAP_FAILED)
377         return 0;
378
379     if(sliding_mmap)
380     {
381         if(!(mode & CONSTS::ACCESS_MODE_WRITER))
382             NewReaderSlidingMap(order);
383
384         // Check sliding map
385         if(sliding_addr != NULL && offset >= sliding_start &&
386            offset+size <= sliding_start+sliding_size)
387         {
388             memcpy(buff, sliding_addr+(offset%block_size)-sliding_map_off, size);
389             return size;
390         }
391     }
392
393     int index = offset % block_size;
394     return files[order]->RandomRead(buff, size, index);
395 }
396
397 void RollableFile::PrintStats(std::ostream &out_stream) const
398 {
399     out_stream << "Rollable file: " << path << " stats:" << std::endl;
400     out_stream << "\tshared memory size: " << mmap_mem << std::endl;
401     if(sliding_mmap)
402     {
403         out_stream << "\tsliding mmap start: " << sliding_start << std::endl;
404         out_stream << "\tsliding mmap size: " << sliding_mem_size << std::endl;
405     }
406 }
407
408 void RollableFile::ResetSlidingWindow()
409 {
410     if(sliding_addr != NULL)
411     {
412         munmap(sliding_addr, sliding_size);
413         sliding_addr = NULL;
414     }
415
416     sliding_size = 0;
417     sliding_start = 0;
418     sliding_map_off = 0;
419 }
420
421 void RollableFile::Flush()
422 {
423     for (std::vector<std::shared_ptr<MmapFileIO>>::iterator it = files.begin();
424          it != files.end(); ++it)
425     {
426         if(*it != NULL)
427         {
428             (*it)->Flush();
429         }
430     }
431 }
432
433 size_t RollableFile::GetResourceCollectionOffset() const
434 {
435     return int((rc_offset_percentage / 100.0f) * max_num_block) * block_size;
436 }
437
438 void RollableFile::RemoveUnused(size_t max_size, bool writer_mode)
439 {
440     unsigned ibeg = max_size/(block_size + 1) + 1;
441     for(auto i = ibeg; i < files.size(); i++)
442     {
443         if(files[i] != NULL)
444         {
445             if(files[i]->IsMapped() && mem_used > block_size)
446                 mem_used -= block_size;
447             if(writer_mode)
448             {
449                 ResourcePool::getInstance().RemoveResourceByPath(files[i]->GetFilePath());
450                 unlink(files[i]->GetFilePath().c_str());
451             }
452             files[i] = NULL;
453         }
454     }
455 }
456
457 }