PyORAm
[iotcloud.git] / PyORAM / src / pyoram / storage / block_storage_file.py
1 __all__ = ('BlockStorageFile',)
2
3 import os
4 import struct
5 import logging
6 import errno
7 from multiprocessing.pool import ThreadPool
8
9 import pyoram
10 from pyoram.storage.block_storage import \
11     (BlockStorageInterface,
12      BlockStorageTypeFactory)
13
14 import tqdm
15 import six
16 from six.moves import xrange
17
18 import time 
19 from AliTimer import *
20
21 log = logging.getLogger("pyoram")
22
23 class default_filesystem(object):
24     open = open
25     remove = os.remove
26     stat = os.stat
27
28 class BlockStorageFile(BlockStorageInterface):
29     """
30     A class implementing the block storage interface
31     using a local file.
32     """
33
34     _index_struct_string = "!LLL?"
35     _index_offset = struct.calcsize(_index_struct_string)
36
37     def __init__(self,
38                  storage_name,
39                  threadpool_size=None,
40                  ignore_lock=False,
41                  _filesystem=default_filesystem):
42         self._timer = Foo.Instance();
43
44         self._bytes_sent = 0
45         self._bytes_received = 0
46         self._filesystem = _filesystem
47         self._ignore_lock = ignore_lock
48         self._f = None
49         self._pool = None
50         self._close_pool = True
51         self._async_write = None
52         self._storage_name = storage_name
53         self._f = self._filesystem.open(self.storage_name, "r+b")
54         self._f.seek(0)
55         self._block_size, self._block_count, user_header_size, locked = \
56             struct.unpack(
57                 BlockStorageFile._index_struct_string,
58                 self._f.read(BlockStorageFile._index_offset))
59
60         if locked and (not self._ignore_lock):
61             self._f.close()
62             self._f = None
63             raise IOError(
64                 "Can not open block storage device because it is "
65                 "locked by another process. To ignore this check, "
66                 "initialize this class with the keyword 'ignore_lock' "
67                 "set to True.")
68         self._user_header_data = bytes()
69         if user_header_size > 0:
70             self._user_header_data = \
71                 self._f.read(user_header_size)
72         self._header_offset = BlockStorageFile._index_offset + \
73                               len(self._user_header_data)
74
75         # TODO: Figure out why this is required for Python3
76         #       in order to prevent issues with the
77         #       TopCachedEncryptedHeapStorage class. The
78         #       problem has something to do with bufferedio,
79         #       but it makes no sense why this fixes it (all
80         #       we've done is read above these lines). As
81         #       part of this, investigate whethor or not we
82         #       need the call to flush after write_block(s),
83         #       or if its simply connected to some Python3
84         #       bug in bufferedio.
85         self._f.flush()
86
87         if not self._ignore_lock:
88             # turn on the locked flag
89             self._f.seek(0)
90             self._f.write(
91                 struct.pack(BlockStorageFile._index_struct_string,
92                             self.block_size,
93                             self.block_count,
94                             len(self._user_header_data),
95                             True))
96             self._f.flush()
97
98         if threadpool_size != 0:
99             self._pool = ThreadPool(threadpool_size)
100
101     def _check_async(self):
102         if self._async_write is not None:
103             self._async_write.get()
104             self._async_write = None
105         # TODO: Figure out why tests fail on Python3 without this
106         if six.PY3:
107             if self._f is None:
108                 return
109             self._f.flush()
110
111     def _schedule_async_write(self, args, callback=None):
112         assert self._async_write is None
113         if self._pool is not None:
114             self._async_write = \
115                 self._pool.apply_async(self._writev, (args, callback))
116         else:
117             self._writev(args, callback)
118
119     # This method is usually executed in another thread, so
120     # do not attempt to handle exceptions because it will
121     # not work.
122     def _writev(self, chunks, callback):
123         for i, block in chunks:
124
125             # startTime = time.time();
126             self._timer.startTimer();
127             self._f.seek(self._header_offset + i * self.block_size)
128             self._f.write(block)
129             # self._f.flush()
130             self._timer.endTimer();
131
132             # print("Write....... " + str(time.time() - startTime))
133
134             if callback is not None:
135                 callback(i)
136
137     def _prep_for_close(self):
138         print("prep file close 1")
139         self._check_async()
140         print("prep file close 2")
141         
142         if self._close_pool and (self._pool is not None):
143             print("prep file close 3")
144             self._pool.close()
145             print("prep file close 4")
146             self._pool.join()
147             print("prep file close 5")
148             self._pool = None
149             print("prep file close 6")
150
151
152         if self._f is not None:
153             print("prep file close 7")
154             if not self._ignore_lock:
155                 print("prep file close 8")
156                 # turn off the locked flag
157                 self._f.seek(0)
158                 print("prep file close 9")
159
160                 a = struct.pack(BlockStorageFile._index_struct_string,
161                                 self.block_size,
162                                 self.block_count,
163                                 len(self._user_header_data),
164                                 False)
165
166                 print("prep file close 9.1")
167
168                 self._f.write(a)
169                 print("prep file close 10")
170                 self._f.flush()
171                 print("prep file close 11")
172
173     #
174     # Define BlockStorageInterface Methods
175     #
176
177     def clone_device(self):
178         f = BlockStorageFile(self.storage_name,
179                              threadpool_size=0,
180                              ignore_lock=True)
181         f._pool = self._pool
182         f._close_pool = False
183         return f
184
185     @classmethod
186     def compute_storage_size(cls,
187                              block_size,
188                              block_count,
189                              header_data=None,
190                              ignore_header=False):
191         assert (block_size > 0) and (block_size == int(block_size))
192         assert (block_count > 0) and (block_count == int(block_count))
193         if header_data is None:
194             header_data = bytes()
195         if ignore_header:
196             return block_size * block_count
197         else:
198             return BlockStorageFile._index_offset + \
199                    len(header_data) + \
200                    block_size * block_count
201
202     @classmethod
203     def setup(cls,
204               storage_name,
205               block_size,
206               block_count,
207               initialize=None,
208               header_data=None,
209               ignore_existing=False,
210               threadpool_size=None,
211               _filesystem=default_filesystem):
212
213         if (not ignore_existing):
214             _exists = True
215             try:
216                 _filesystem.stat(storage_name)
217             except OSError as e:
218                 if e.errno == errno.ENOENT:
219                     _exists = False
220             if _exists:
221                 raise IOError(
222                     "Storage location already exists: %s"
223                     % (storage_name))
224         if (block_size <= 0) or (block_size != int(block_size)):
225             raise ValueError(
226                 "Block size (bytes) must be a positive integer: %s"
227                 % (block_size))
228         if (block_count <= 0) or (block_count != int(block_count)):
229             raise ValueError(
230                 "Block count must be a positive integer: %s"
231                 % (block_count))
232         if (header_data is not None) and \
233            (type(header_data) is not bytes):
234             raise TypeError(
235                 "'header_data' must be of type bytes. "
236                 "Invalid type: %s" % (type(header_data)))
237
238         if initialize is None:
239             zeros = bytes(bytearray(block_size))
240             initialize = lambda i: zeros
241         try:
242             with _filesystem.open(storage_name, "wb") as f:
243                 # create_index
244                 if header_data is None:
245                     f.write(struct.pack(BlockStorageFile._index_struct_string,
246                                         block_size,
247                                         block_count,
248                                         0,
249                                         False))
250                 else:
251                     f.write(struct.pack(BlockStorageFile._index_struct_string,
252                                         block_size,
253                                         block_count,
254                                         len(header_data),
255                                         False))
256                     f.write(header_data)
257                 with tqdm.tqdm(total=block_count*block_size,
258                                desc="Initializing File Block Storage Space",
259                                unit="B",
260                                unit_scale=True,
261                                disable=not pyoram.config.SHOW_PROGRESS_BAR) as progress_bar:
262                     for i in xrange(block_count):
263                         block = initialize(i)
264                         assert len(block) == block_size, \
265                             ("%s != %s" % (len(block), block_size))
266                         f.write(block)
267                         progress_bar.update(n=block_size)
268         except:                                        # pragma: no cover
269             _filesystem.remove(storage_name)           # pragma: no cover
270             raise                                      # pragma: no cover
271
272         return BlockStorageFile(storage_name,
273                                 threadpool_size=threadpool_size,
274                                 _filesystem=_filesystem)
275
276     @property
277     def header_data(self):
278         return self._user_header_data
279
280     @property
281     def block_count(self):
282         return self._block_count
283
284     @property
285     def block_size(self):
286         return self._block_size
287
288     @property
289     def storage_name(self):
290         return self._storage_name
291
292     def update_header_data(self, new_header_data):
293         self._check_async()
294         if len(new_header_data) != len(self.header_data):
295             raise ValueError(
296                 "The size of header data can not change.\n"
297                 "Original bytes: %s\n"
298                 "New bytes: %s" % (len(self.header_data),
299                                    len(new_header_data)))
300         self._user_header_data = bytes(new_header_data)
301
302         self._timer.startTimer();
303         self._f.seek(BlockStorageFile._index_offset)
304         self._f.write(self._user_header_data)
305         self._timer.endTimer();
306
307     def close(self):
308         print("file close 1")
309         self._prep_for_close()
310         print("file close 2")
311         if self._f is not None:
312             try:
313                 self._f.close()
314             except OSError:                            # pragma: no cover
315                 pass                                   # pragma: no cover
316             self._f = None
317
318     def read_blocks(self, indices):
319         # print("Reading Blocks ......");
320         self._check_async()
321         blocks = []
322         for i in indices:
323             assert 0 <= i < self.block_count
324             self._bytes_received += self.block_size
325             
326             self._timer.startTimer();
327             self._f.seek(self._header_offset + i * self.block_size)
328             a = self._f.read(self.block_size)
329             # self._f.flush()
330             self._timer.endTimer();
331             
332             blocks.append(a)
333         return blocks
334
335     def yield_blocks(self, indices):
336         # print("Yielding Blocks ......");
337         self._check_async()
338         for i in indices:
339             assert 0 <= i < self.block_count
340             self._bytes_received += self.block_size
341
342             self._timer.startTimer();
343             self._f.seek(self._header_offset + i * self.block_size)
344             a = self._f.read(self.block_size)
345             # self._f.flush()
346             self._timer.endTimer();
347
348             yield a
349
350     def read_block(self, i):
351         # print("Reading Block ......");
352         self._check_async()
353         assert 0 <= i < self.block_count
354         self._bytes_received += self.block_size
355
356         self._timer.startTimer();
357         self._f.seek(self._header_offset + i * self.block_size)
358         a = self._f.read(self.block_size)
359         # self._f.flush()
360         self._timer.endTimer();
361
362         return a
363
364     def write_blocks(self, indices, blocks, callback=None):
365         self._check_async()
366         chunks = []
367         for i, block in zip(indices, blocks):
368             assert 0 <= i < self.block_count
369             assert len(block) == self.block_size, \
370                 ("%s != %s" % (len(block), self.block_size))
371             self._bytes_sent += self.block_size
372             chunks.append((i, block))
373
374         self._schedule_async_write(chunks, callback=callback)
375         
376
377
378     def write_block(self, i, block):
379         self._check_async()
380         assert 0 <= i < self.block_count
381         assert len(block) == self.block_size
382         self._bytes_sent += self.block_size
383
384         self._schedule_async_write(((i, block),))
385
386     @property
387     def bytes_sent(self):
388         return self._bytes_sent
389
390     @property
391     def bytes_received(self):
392         return self._bytes_received
393
394 BlockStorageTypeFactory.register_device("file", BlockStorageFile)