2 * Copyright (C) 2017 Cisco Inc.
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2,
6 * as published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // @author Changxue Deng <chadeng@cisco.com>
23 #include <sys/types.h>
32 #include "rollable_file.h"
35 #include "resource_pool.h"
39 #define SLIDING_MEM_SIZE 16LLU*1024*1024 // 16M
40 #define MAX_NUM_BLOCK 2*1024 // 2K
41 #define RC_OFFSET_PERCENTAGE 75 // default rc offset is placed at 75% of maximum size
43 const long RollableFile::page_size = sysconf(_SC_PAGESIZE);
44 int RollableFile::ShmSync(uint8_t *addr, int size)
46 off_t page_offset = ((off_t) addr) % RollableFile::page_size;
47 return msync(addr-page_offset, size+page_offset, MS_SYNC);
50 RollableFile::RollableFile(const std::string &fpath, size_t blocksize, size_t memcap, int access_mode,
51 long max_block, int in_rc_offset_percentage)
53 block_size(blocksize),
55 sliding_mmap(access_mode & CONSTS::USE_SLIDING_WINDOW),
57 max_num_block(max_block),
58 rc_offset_percentage(in_rc_offset_percentage),
62 sliding_mem_size = SLIDING_MEM_SIZE;
66 shm_sliding_start_ptr = NULL;
68 if(mode & CONSTS::ACCESS_MODE_WRITER)
70 if(max_num_block == 0 || max_num_block > MAX_NUM_BLOCK)
71 max_num_block = MAX_NUM_BLOCK;
73 Logger::Log(LOG_LEVEL_INFO, "maximal block number for %s is %d",
74 fpath.c_str(), max_num_block);
76 if(rc_offset_percentage == 0 || rc_offset_percentage > 100 || rc_offset_percentage < 50)
77 rc_offset_percentage = RC_OFFSET_PERCENTAGE;
79 Logger::Log(LOG_LEVEL_INFO, "rc_offset_percentage is set to %d", rc_offset_percentage);
82 Logger::Log(LOG_LEVEL_INFO, "opening rollable file %s for %s, mmap size: %d",
83 path.c_str(), (mode & CONSTS::ACCESS_MODE_WRITER)?"writing":"reading", mmap_mem);
86 Logger::Log(LOG_LEVEL_INFO, "sliding mmap is turned off for " + fpath);
90 // page_size is used to check page alignment when mapping file to memory.
91 if(RollableFile::page_size < 0)
93 Logger::Log(LOG_LEVEL_WARN, "failed to get page size, turning off sliding memory");
98 Logger::Log(LOG_LEVEL_INFO, "sliding mmap is turned on for " + fpath);
102 files.assign(3, NULL);
103 if(mode & CONSTS::SYNC_ON_WRITE)
104 Logger::Log(LOG_LEVEL_INFO, "Sync is turned on for " + fpath);
107 void RollableFile::InitShmSlidingAddr(std::atomic<size_t> *shm_sliding_addr)
109 shm_sliding_start_ptr = shm_sliding_addr;
111 assert(shm_sliding_start_ptr != NULL);
115 void RollableFile::Close()
117 if(sliding_addr != NULL)
119 munmap(sliding_addr, sliding_size);
124 RollableFile::~RollableFile()
129 int RollableFile::OpenAndMapBlockFile(size_t block_order, bool create_file)
131 if(block_order >= max_num_block)
133 int level = LOG_LEVEL_DEBUG;
134 if(mode & CONSTS::ACCESS_MODE_WRITER)
135 level = LOG_LEVEL_WARN;
136 Logger::Log(level, "block number %d ovferflow", block_order);
137 return MBError::NO_RESOURCE;
140 int rval = MBError::SUCCESS;
141 std::stringstream ss;
144 assert(files[block_order] == NULL);
148 if(mmap_mem > mem_used)
152 if(!map_file && (mode & CONSTS::MEMORY_ONLY_MODE))
153 return MBError::NO_MEMORY;
155 files[block_order] = ResourcePool::getInstance().OpenFile(path+ss.str(),
161 mem_used += block_size;
162 else if(mode & CONSTS::MEMORY_ONLY_MODE)
163 rval = MBError::MMAP_FAILED;
167 // Need to make sure the required size at offset is aligned with
168 // block_size and mmap_size. We should not write the size in two
169 // different blocks or one in mmaped region and the other one on disk.
170 size_t RollableFile::CheckAlignment(size_t offset, int size)
172 size_t block_offset = offset % block_size;
174 if(block_offset + size > block_size)
176 // Start at the begining of the next block
177 offset = offset + block_size - block_offset;
183 int RollableFile::CheckAndOpenFile(size_t order, bool create_file)
185 int rval = MBError::SUCCESS;
187 if(order >= static_cast<size_t>(files.size()))
188 files.resize(order+3, NULL);
190 if(files[order] == NULL)
191 rval = OpenAndMapBlockFile(order, create_file);
196 // Get shared memory address for existing buffer
197 // No need to check alignment
198 uint8_t* RollableFile::GetShmPtr(size_t offset, int size)
200 size_t order = offset / block_size;
201 int rval = CheckAndOpenFile(order, false);
203 if(rval != MBError::SUCCESS)
206 if(files[order]->IsMapped())
208 size_t index = offset % block_size;
209 return files[order]->GetMapAddr() + index;
214 if(static_cast<off_t>(offset) >= sliding_start &&
215 offset + size <= sliding_start + sliding_size)
217 if(sliding_addr != NULL)
218 return sliding_addr + (offset % block_size) - sliding_map_off;
225 int RollableFile::Reserve(size_t &offset, int size, uint8_t* &ptr, bool map_new_sliding)
229 offset = CheckAlignment(offset, size);
231 size_t order = offset / block_size;
232 rval = CheckAndOpenFile(order, true);
233 if(rval != MBError::SUCCESS)
236 if(files[order]->IsMapped())
238 size_t index = offset % block_size;
239 ptr = files[order]->GetMapAddr() + index;
245 if(static_cast<off_t>(offset) >= sliding_start &&
246 offset + size <= sliding_start + sliding_size)
248 if(sliding_addr != NULL)
249 ptr = sliding_addr + (offset % block_size) - sliding_map_off;
251 else if(map_new_sliding && offset >= sliding_start + sliding_size)
253 ptr = NewSlidingMapAddr(order, offset, size);
256 // Load the mmap starting offset to shared memory so that readers
257 // can map the same region when reading it.
258 shm_sliding_start_ptr->store(sliding_start, std::memory_order_relaxed);
266 uint8_t* RollableFile::NewSlidingMapAddr(size_t order, size_t offset, int size)
268 if(sliding_addr != NULL)
270 // No need to call msync since munmap will write all memory
272 //msync(sliding_addr, sliding_size, MS_SYNC);
273 munmap(sliding_addr, sliding_size);
276 if(sliding_start == 0)
278 sliding_start = offset;
282 sliding_start += sliding_size;
285 // Check page alignment
286 int page_alignment = sliding_start % RollableFile::page_size;
287 if(page_alignment != 0)
289 sliding_start -= page_alignment;
290 if(sliding_start < 0)
293 sliding_map_off = sliding_start % block_size;
294 if(sliding_map_off + sliding_mem_size > block_size)
296 sliding_size = block_size - sliding_map_off;
300 sliding_size = sliding_mem_size;
303 order = sliding_start / block_size;
304 sliding_addr = files[order]->MapFile(sliding_size, sliding_map_off, true);
305 if(sliding_addr != NULL)
307 if(static_cast<off_t>(offset) >= sliding_start &&
308 offset+size <= sliding_start+sliding_size)
309 return sliding_addr + (offset % block_size) - sliding_map_off;
313 Logger::Log(LOG_LEVEL_WARN, "last mmap failed, disable sliding mmap");
314 sliding_mmap = false;
320 size_t RollableFile::RandomWrite(const void *data, size_t size, off_t offset)
322 size_t order = offset / block_size;
323 int rval = CheckAndOpenFile(order, false);
324 if(rval != MBError::SUCCESS)
328 if(sliding_mmap && sliding_addr != NULL)
330 if(offset >= sliding_start && offset+size <= sliding_start+sliding_size)
332 uint8_t *start_addr = sliding_addr + (offset % block_size) - sliding_map_off;
333 memcpy(start_addr, data, size);
334 if(mode & CONSTS::SYNC_ON_WRITE)
336 off_t page_off = ((off_t) start_addr) % RollableFile::page_size;
337 if(msync(start_addr-page_off, size+page_off, MS_SYNC) == -1)
338 std::cout<<"msync error\n";
344 int index = offset % block_size;
345 return files[order]->RandomWrite(data, size, index);
348 void* RollableFile::NewReaderSlidingMap(size_t order)
350 off_t start_off = shm_sliding_start_ptr->load(std::memory_order_relaxed);
351 if(start_off == 0 || start_off == sliding_start || start_off/block_size != (unsigned)order)
354 if(sliding_addr != NULL)
355 munmap(sliding_addr, sliding_size);
356 sliding_start = start_off;
357 sliding_map_off = sliding_start % block_size;
358 if(sliding_map_off + SLIDING_MEM_SIZE > block_size)
360 sliding_size = block_size - sliding_map_off;
364 sliding_size = SLIDING_MEM_SIZE;
367 sliding_addr = files[order]->MapFile(sliding_size, sliding_map_off, true);
371 size_t RollableFile::RandomRead(void *buff, size_t size, off_t offset)
373 size_t order = offset / block_size;
375 int rval = CheckAndOpenFile(order, false);
376 if(rval != MBError::SUCCESS && rval != MBError::MMAP_FAILED)
381 if(!(mode & CONSTS::ACCESS_MODE_WRITER))
382 NewReaderSlidingMap(order);
385 if(sliding_addr != NULL && offset >= sliding_start &&
386 offset+size <= sliding_start+sliding_size)
388 memcpy(buff, sliding_addr+(offset%block_size)-sliding_map_off, size);
393 int index = offset % block_size;
394 return files[order]->RandomRead(buff, size, index);
397 void RollableFile::PrintStats(std::ostream &out_stream) const
399 out_stream << "Rollable file: " << path << " stats:" << std::endl;
400 out_stream << "\tshared memory size: " << mmap_mem << std::endl;
403 out_stream << "\tsliding mmap start: " << sliding_start << std::endl;
404 out_stream << "\tsliding mmap size: " << sliding_mem_size << std::endl;
408 void RollableFile::ResetSlidingWindow()
410 if(sliding_addr != NULL)
412 munmap(sliding_addr, sliding_size);
421 void RollableFile::Flush()
423 for (std::vector<std::shared_ptr<MmapFileIO>>::iterator it = files.begin();
424 it != files.end(); ++it)
433 size_t RollableFile::GetResourceCollectionOffset() const
435 return int((rc_offset_percentage / 100.0f) * max_num_block) * block_size;
438 void RollableFile::RemoveUnused(size_t max_size, bool writer_mode)
440 unsigned ibeg = max_size/(block_size + 1) + 1;
441 for(auto i = ibeg; i < files.size(); i++)
445 if(files[i]->IsMapped() && mem_used > block_size)
446 mem_used -= block_size;
449 ResourcePool::getInstance().RemoveResourceByPath(files[i]->GetFilePath());
450 unlink(files[i]->GetFilePath().c_str());