PyORAm
[iotcloud.git] / PyORAM / src / pyoram / storage / block_storage_file.py
diff --git a/PyORAM/src/pyoram/storage/block_storage_file.py b/PyORAM/src/pyoram/storage/block_storage_file.py
new file mode 100644 (file)
index 0000000..75932a2
--- /dev/null
@@ -0,0 +1,394 @@
+__all__ = ('BlockStorageFile',)
+
+import os
+import struct
+import logging
+import errno
+from multiprocessing.pool import ThreadPool
+
+import pyoram
+from pyoram.storage.block_storage import \
+    (BlockStorageInterface,
+     BlockStorageTypeFactory)
+
+import tqdm
+import six
+from six.moves import xrange
+
+import time 
+from AliTimer import *
+
+log = logging.getLogger("pyoram")
+
+class default_filesystem(object):
+    open = open
+    remove = os.remove
+    stat = os.stat
+
+class BlockStorageFile(BlockStorageInterface):
+    """
+    A class implementing the block storage interface
+    using a local file.
+    """
+
+    _index_struct_string = "!LLL?"
+    _index_offset = struct.calcsize(_index_struct_string)
+
+    def __init__(self,
+                 storage_name,
+                 threadpool_size=None,
+                 ignore_lock=False,
+                 _filesystem=default_filesystem):
+        self._timer = Foo.Instance();
+
+        self._bytes_sent = 0
+        self._bytes_received = 0
+        self._filesystem = _filesystem
+        self._ignore_lock = ignore_lock
+        self._f = None
+        self._pool = None
+        self._close_pool = True
+        self._async_write = None
+        self._storage_name = storage_name
+        self._f = self._filesystem.open(self.storage_name, "r+b")
+        self._f.seek(0)
+        self._block_size, self._block_count, user_header_size, locked = \
+            struct.unpack(
+                BlockStorageFile._index_struct_string,
+                self._f.read(BlockStorageFile._index_offset))
+
+        if locked and (not self._ignore_lock):
+            self._f.close()
+            self._f = None
+            raise IOError(
+                "Can not open block storage device because it is "
+                "locked by another process. To ignore this check, "
+                "initialize this class with the keyword 'ignore_lock' "
+                "set to True.")
+        self._user_header_data = bytes()
+        if user_header_size > 0:
+            self._user_header_data = \
+                self._f.read(user_header_size)
+        self._header_offset = BlockStorageFile._index_offset + \
+                              len(self._user_header_data)
+
+        # TODO: Figure out why this is required for Python3
+        #       in order to prevent issues with the
+        #       TopCachedEncryptedHeapStorage class. The
+        #       problem has something to do with bufferedio,
+        #       but it makes no sense why this fixes it (all
+        #       we've done is read above these lines). As
+        #       part of this, investigate whethor or not we
+        #       need the call to flush after write_block(s),
+        #       or if its simply connected to some Python3
+        #       bug in bufferedio.
+        self._f.flush()
+
+        if not self._ignore_lock:
+            # turn on the locked flag
+            self._f.seek(0)
+            self._f.write(
+                struct.pack(BlockStorageFile._index_struct_string,
+                            self.block_size,
+                            self.block_count,
+                            len(self._user_header_data),
+                            True))
+            self._f.flush()
+
+        if threadpool_size != 0:
+            self._pool = ThreadPool(threadpool_size)
+
+    def _check_async(self):
+        if self._async_write is not None:
+            self._async_write.get()
+            self._async_write = None
+        # TODO: Figure out why tests fail on Python3 without this
+        if six.PY3:
+            if self._f is None:
+                return
+            self._f.flush()
+
+    def _schedule_async_write(self, args, callback=None):
+        assert self._async_write is None
+        if self._pool is not None:
+            self._async_write = \
+                self._pool.apply_async(self._writev, (args, callback))
+        else:
+            self._writev(args, callback)
+
+    # This method is usually executed in another thread, so
+    # do not attempt to handle exceptions because it will
+    # not work.
+    def _writev(self, chunks, callback):
+        for i, block in chunks:
+
+            # startTime = time.time();
+            self._timer.startTimer();
+            self._f.seek(self._header_offset + i * self.block_size)
+            self._f.write(block)
+            # self._f.flush()
+            self._timer.endTimer();
+
+            # print("Write....... " + str(time.time() - startTime))
+
+            if callback is not None:
+                callback(i)
+
+    def _prep_for_close(self):
+        print("prep file close 1")
+        self._check_async()
+        print("prep file close 2")
+        
+        if self._close_pool and (self._pool is not None):
+            print("prep file close 3")
+            self._pool.close()
+            print("prep file close 4")
+            self._pool.join()
+            print("prep file close 5")
+            self._pool = None
+            print("prep file close 6")
+
+
+        if self._f is not None:
+            print("prep file close 7")
+            if not self._ignore_lock:
+                print("prep file close 8")
+                # turn off the locked flag
+                self._f.seek(0)
+                print("prep file close 9")
+
+                a = struct.pack(BlockStorageFile._index_struct_string,
+                                self.block_size,
+                                self.block_count,
+                                len(self._user_header_data),
+                                False)
+
+                print("prep file close 9.1")
+
+                self._f.write(a)
+                print("prep file close 10")
+                self._f.flush()
+                print("prep file close 11")
+
+    #
+    # Define BlockStorageInterface Methods
+    #
+
+    def clone_device(self):
+        f = BlockStorageFile(self.storage_name,
+                             threadpool_size=0,
+                             ignore_lock=True)
+        f._pool = self._pool
+        f._close_pool = False
+        return f
+
+    @classmethod
+    def compute_storage_size(cls,
+                             block_size,
+                             block_count,
+                             header_data=None,
+                             ignore_header=False):
+        assert (block_size > 0) and (block_size == int(block_size))
+        assert (block_count > 0) and (block_count == int(block_count))
+        if header_data is None:
+            header_data = bytes()
+        if ignore_header:
+            return block_size * block_count
+        else:
+            return BlockStorageFile._index_offset + \
+                   len(header_data) + \
+                   block_size * block_count
+
+    @classmethod
+    def setup(cls,
+              storage_name,
+              block_size,
+              block_count,
+              initialize=None,
+              header_data=None,
+              ignore_existing=False,
+              threadpool_size=None,
+              _filesystem=default_filesystem):
+
+        if (not ignore_existing):
+            _exists = True
+            try:
+                _filesystem.stat(storage_name)
+            except OSError as e:
+                if e.errno == errno.ENOENT:
+                    _exists = False
+            if _exists:
+                raise IOError(
+                    "Storage location already exists: %s"
+                    % (storage_name))
+        if (block_size <= 0) or (block_size != int(block_size)):
+            raise ValueError(
+                "Block size (bytes) must be a positive integer: %s"
+                % (block_size))
+        if (block_count <= 0) or (block_count != int(block_count)):
+            raise ValueError(
+                "Block count must be a positive integer: %s"
+                % (block_count))
+        if (header_data is not None) and \
+           (type(header_data) is not bytes):
+            raise TypeError(
+                "'header_data' must be of type bytes. "
+                "Invalid type: %s" % (type(header_data)))
+
+        if initialize is None:
+            zeros = bytes(bytearray(block_size))
+            initialize = lambda i: zeros
+        try:
+            with _filesystem.open(storage_name, "wb") as f:
+                # create_index
+                if header_data is None:
+                    f.write(struct.pack(BlockStorageFile._index_struct_string,
+                                        block_size,
+                                        block_count,
+                                        0,
+                                        False))
+                else:
+                    f.write(struct.pack(BlockStorageFile._index_struct_string,
+                                        block_size,
+                                        block_count,
+                                        len(header_data),
+                                        False))
+                    f.write(header_data)
+                with tqdm.tqdm(total=block_count*block_size,
+                               desc="Initializing File Block Storage Space",
+                               unit="B",
+                               unit_scale=True,
+                               disable=not pyoram.config.SHOW_PROGRESS_BAR) as progress_bar:
+                    for i in xrange(block_count):
+                        block = initialize(i)
+                        assert len(block) == block_size, \
+                            ("%s != %s" % (len(block), block_size))
+                        f.write(block)
+                        progress_bar.update(n=block_size)
+        except:                                        # pragma: no cover
+            _filesystem.remove(storage_name)           # pragma: no cover
+            raise                                      # pragma: no cover
+
+        return BlockStorageFile(storage_name,
+                                threadpool_size=threadpool_size,
+                                _filesystem=_filesystem)
+
+    @property
+    def header_data(self):
+        return self._user_header_data
+
+    @property
+    def block_count(self):
+        return self._block_count
+
+    @property
+    def block_size(self):
+        return self._block_size
+
+    @property
+    def storage_name(self):
+        return self._storage_name
+
+    def update_header_data(self, new_header_data):
+        self._check_async()
+        if len(new_header_data) != len(self.header_data):
+            raise ValueError(
+                "The size of header data can not change.\n"
+                "Original bytes: %s\n"
+                "New bytes: %s" % (len(self.header_data),
+                                   len(new_header_data)))
+        self._user_header_data = bytes(new_header_data)
+
+        self._timer.startTimer();
+        self._f.seek(BlockStorageFile._index_offset)
+        self._f.write(self._user_header_data)
+        self._timer.endTimer();
+
+    def close(self):
+        print("file close 1")
+        self._prep_for_close()
+        print("file close 2")
+        if self._f is not None:
+            try:
+                self._f.close()
+            except OSError:                            # pragma: no cover
+                pass                                   # pragma: no cover
+            self._f = None
+
+    def read_blocks(self, indices):
+        # print("Reading Blocks ......");
+        self._check_async()
+        blocks = []
+        for i in indices:
+            assert 0 <= i < self.block_count
+            self._bytes_received += self.block_size
+            
+            self._timer.startTimer();
+            self._f.seek(self._header_offset + i * self.block_size)
+            a = self._f.read(self.block_size)
+            # self._f.flush()
+            self._timer.endTimer();
+            
+            blocks.append(a)
+        return blocks
+
+    def yield_blocks(self, indices):
+        # print("Yielding Blocks ......");
+        self._check_async()
+        for i in indices:
+            assert 0 <= i < self.block_count
+            self._bytes_received += self.block_size
+
+            self._timer.startTimer();
+            self._f.seek(self._header_offset + i * self.block_size)
+            a = self._f.read(self.block_size)
+            # self._f.flush()
+            self._timer.endTimer();
+
+            yield a
+
+    def read_block(self, i):
+        # print("Reading Block ......");
+        self._check_async()
+        assert 0 <= i < self.block_count
+        self._bytes_received += self.block_size
+
+        self._timer.startTimer();
+        self._f.seek(self._header_offset + i * self.block_size)
+        a = self._f.read(self.block_size)
+        # self._f.flush()
+        self._timer.endTimer();
+
+        return a
+
+    def write_blocks(self, indices, blocks, callback=None):
+        self._check_async()
+        chunks = []
+        for i, block in zip(indices, blocks):
+            assert 0 <= i < self.block_count
+            assert len(block) == self.block_size, \
+                ("%s != %s" % (len(block), self.block_size))
+            self._bytes_sent += self.block_size
+            chunks.append((i, block))
+
+        self._schedule_async_write(chunks, callback=callback)
+        
+
+
+    def write_block(self, i, block):
+        self._check_async()
+        assert 0 <= i < self.block_count
+        assert len(block) == self.block_size
+        self._bytes_sent += self.block_size
+
+        self._schedule_async_write(((i, block),))
+
+    @property
+    def bytes_sent(self):
+        return self._bytes_sent
+
+    @property
+    def bytes_received(self):
+        return self._bytes_received
+
+BlockStorageTypeFactory.register_device("file", BlockStorageFile)