1 __all__ = ('BlockStorageFile',)
7 from multiprocessing.pool import ThreadPool
10 from pyoram.storage.block_storage import \
11 (BlockStorageInterface,
12 BlockStorageTypeFactory)
16 from six.moves import xrange
19 from AliTimer import *
21 log = logging.getLogger("pyoram")
23 class default_filesystem(object):
28 class BlockStorageFile(BlockStorageInterface):
30 A class implementing the block storage interface
34 _index_struct_string = "!LLL?"
35 _index_offset = struct.calcsize(_index_struct_string)
41 _filesystem=default_filesystem):
42 self._timer = Foo.Instance();
45 self._bytes_received = 0
46 self._filesystem = _filesystem
47 self._ignore_lock = ignore_lock
50 self._close_pool = True
51 self._async_write = None
52 self._storage_name = storage_name
53 self._f = self._filesystem.open(self.storage_name, "r+b")
55 self._block_size, self._block_count, user_header_size, locked = \
57 BlockStorageFile._index_struct_string,
58 self._f.read(BlockStorageFile._index_offset))
60 if locked and (not self._ignore_lock):
64 "Can not open block storage device because it is "
65 "locked by another process. To ignore this check, "
66 "initialize this class with the keyword 'ignore_lock' "
68 self._user_header_data = bytes()
69 if user_header_size > 0:
70 self._user_header_data = \
71 self._f.read(user_header_size)
72 self._header_offset = BlockStorageFile._index_offset + \
73 len(self._user_header_data)
75 # TODO: Figure out why this is required for Python3
76 # in order to prevent issues with the
77 # TopCachedEncryptedHeapStorage class. The
78 # problem has something to do with bufferedio,
79 # but it makes no sense why this fixes it (all
80 # we've done is read above these lines). As
81 # part of this, investigate whethor or not we
82 # need the call to flush after write_block(s),
83 # or if its simply connected to some Python3
87 if not self._ignore_lock:
88 # turn on the locked flag
91 struct.pack(BlockStorageFile._index_struct_string,
94 len(self._user_header_data),
98 if threadpool_size != 0:
99 self._pool = ThreadPool(threadpool_size)
101 def _check_async(self):
102 if self._async_write is not None:
103 self._async_write.get()
104 self._async_write = None
105 # TODO: Figure out why tests fail on Python3 without this
111 def _schedule_async_write(self, args, callback=None):
112 assert self._async_write is None
113 if self._pool is not None:
114 self._async_write = \
115 self._pool.apply_async(self._writev, (args, callback))
117 self._writev(args, callback)
119 # This method is usually executed in another thread, so
120 # do not attempt to handle exceptions because it will
122 def _writev(self, chunks, callback):
123 for i, block in chunks:
125 # startTime = time.time();
126 self._timer.startTimer();
127 self._f.seek(self._header_offset + i * self.block_size)
130 self._timer.endTimer();
132 # print("Write....... " + str(time.time() - startTime))
134 if callback is not None:
137 def _prep_for_close(self):
138 print("prep file close 1")
140 print("prep file close 2")
142 if self._close_pool and (self._pool is not None):
143 print("prep file close 3")
145 print("prep file close 4")
147 print("prep file close 5")
149 print("prep file close 6")
152 if self._f is not None:
153 print("prep file close 7")
154 if not self._ignore_lock:
155 print("prep file close 8")
156 # turn off the locked flag
158 print("prep file close 9")
160 a = struct.pack(BlockStorageFile._index_struct_string,
163 len(self._user_header_data),
166 print("prep file close 9.1")
169 print("prep file close 10")
171 print("prep file close 11")
174 # Define BlockStorageInterface Methods
177 def clone_device(self):
178 f = BlockStorageFile(self.storage_name,
182 f._close_pool = False
186 def compute_storage_size(cls,
190 ignore_header=False):
191 assert (block_size > 0) and (block_size == int(block_size))
192 assert (block_count > 0) and (block_count == int(block_count))
193 if header_data is None:
194 header_data = bytes()
196 return block_size * block_count
198 return BlockStorageFile._index_offset + \
200 block_size * block_count
209 ignore_existing=False,
210 threadpool_size=None,
211 _filesystem=default_filesystem):
213 if (not ignore_existing):
216 _filesystem.stat(storage_name)
218 if e.errno == errno.ENOENT:
222 "Storage location already exists: %s"
224 if (block_size <= 0) or (block_size != int(block_size)):
226 "Block size (bytes) must be a positive integer: %s"
228 if (block_count <= 0) or (block_count != int(block_count)):
230 "Block count must be a positive integer: %s"
232 if (header_data is not None) and \
233 (type(header_data) is not bytes):
235 "'header_data' must be of type bytes. "
236 "Invalid type: %s" % (type(header_data)))
238 if initialize is None:
239 zeros = bytes(bytearray(block_size))
240 initialize = lambda i: zeros
242 with _filesystem.open(storage_name, "wb") as f:
244 if header_data is None:
245 f.write(struct.pack(BlockStorageFile._index_struct_string,
251 f.write(struct.pack(BlockStorageFile._index_struct_string,
257 with tqdm.tqdm(total=block_count*block_size,
258 desc="Initializing File Block Storage Space",
261 disable=not pyoram.config.SHOW_PROGRESS_BAR) as progress_bar:
262 for i in xrange(block_count):
263 block = initialize(i)
264 assert len(block) == block_size, \
265 ("%s != %s" % (len(block), block_size))
267 progress_bar.update(n=block_size)
268 except: # pragma: no cover
269 _filesystem.remove(storage_name) # pragma: no cover
270 raise # pragma: no cover
272 return BlockStorageFile(storage_name,
273 threadpool_size=threadpool_size,
274 _filesystem=_filesystem)
277 def header_data(self):
278 return self._user_header_data
281 def block_count(self):
282 return self._block_count
285 def block_size(self):
286 return self._block_size
289 def storage_name(self):
290 return self._storage_name
292 def update_header_data(self, new_header_data):
294 if len(new_header_data) != len(self.header_data):
296 "The size of header data can not change.\n"
297 "Original bytes: %s\n"
298 "New bytes: %s" % (len(self.header_data),
299 len(new_header_data)))
300 self._user_header_data = bytes(new_header_data)
302 self._timer.startTimer();
303 self._f.seek(BlockStorageFile._index_offset)
304 self._f.write(self._user_header_data)
305 self._timer.endTimer();
308 print("file close 1")
309 self._prep_for_close()
310 print("file close 2")
311 if self._f is not None:
314 except OSError: # pragma: no cover
315 pass # pragma: no cover
318 def read_blocks(self, indices):
319 # print("Reading Blocks ......");
323 assert 0 <= i < self.block_count
324 self._bytes_received += self.block_size
326 self._timer.startTimer();
327 self._f.seek(self._header_offset + i * self.block_size)
328 a = self._f.read(self.block_size)
330 self._timer.endTimer();
335 def yield_blocks(self, indices):
336 # print("Yielding Blocks ......");
339 assert 0 <= i < self.block_count
340 self._bytes_received += self.block_size
342 self._timer.startTimer();
343 self._f.seek(self._header_offset + i * self.block_size)
344 a = self._f.read(self.block_size)
346 self._timer.endTimer();
350 def read_block(self, i):
351 # print("Reading Block ......");
353 assert 0 <= i < self.block_count
354 self._bytes_received += self.block_size
356 self._timer.startTimer();
357 self._f.seek(self._header_offset + i * self.block_size)
358 a = self._f.read(self.block_size)
360 self._timer.endTimer();
364 def write_blocks(self, indices, blocks, callback=None):
367 for i, block in zip(indices, blocks):
368 assert 0 <= i < self.block_count
369 assert len(block) == self.block_size, \
370 ("%s != %s" % (len(block), self.block_size))
371 self._bytes_sent += self.block_size
372 chunks.append((i, block))
374 self._schedule_async_write(chunks, callback=callback)
378 def write_block(self, i, block):
380 assert 0 <= i < self.block_count
381 assert len(block) == self.block_size
382 self._bytes_sent += self.block_size
384 self._schedule_async_write(((i, block),))
387 def bytes_sent(self):
388 return self._bytes_sent
391 def bytes_received(self):
392 return self._bytes_received
394 BlockStorageTypeFactory.register_device("file", BlockStorageFile)