1 __all__ = ('BlockStorageS3',)
5 from multiprocessing.pool import ThreadPool
8 from pyoram.storage.block_storage import \
9 (BlockStorageInterface,
10 BlockStorageTypeFactory)
11 from pyoram.storage.boto3_s3_wrapper import Boto3S3Wrapper
15 from six.moves import xrange, map
17 log = logging.getLogger("pyoram")
19 class BlockStorageS3(BlockStorageInterface):
21 A block storage device for Amazon Simple
25 _index_name = "PyORAMBlockStorageS3_index.bin"
26 _index_struct_string = "!LLL?"
27 _index_offset = struct.calcsize(_index_struct_string)
32 aws_access_key_id=None,
33 aws_secret_access_key=None,
37 s3_wrapper=Boto3S3Wrapper):
40 self._bytes_received = 0
41 self._storage_name = storage_name
42 self._bucket_name = bucket_name
43 self._aws_access_key_id = aws_access_key_id
44 self._aws_secret_access_key = aws_secret_access_key
45 self._region_name = region_name
47 self._close_pool = True
49 self._ignore_lock = ignore_lock
50 self._async_write = None
51 self._async_write_callback = None
53 if bucket_name is None:
54 raise ValueError("'bucket_name' keyword is required")
56 if threadpool_size != 0:
57 self._pool = ThreadPool(threadpool_size)
59 self._s3 = s3_wrapper(bucket_name,
60 aws_access_key_id=aws_access_key_id,
61 aws_secret_access_key=aws_secret_access_key,
62 region_name=region_name)
63 self._basename = self.storage_name+"/b%d"
65 index_data = self._s3.download(
66 self._storage_name+"/"+BlockStorageS3._index_name)
67 self._block_size, self._block_count, user_header_size, locked = \
69 BlockStorageS3._index_struct_string,
70 index_data[:BlockStorageS3._index_offset])
71 if locked and (not self._ignore_lock):
73 "Can not open block storage device because it is "
74 "locked by another process. To ignore this check, "
75 "initialize this class with the keyword 'ignore_lock' "
77 self._user_header_data = bytes()
78 if user_header_size > 0:
79 self._user_header_data = \
80 index_data[BlockStorageS3._index_offset:
81 (BlockStorageS3._index_offset+user_header_size)]
83 if not self._ignore_lock:
84 # turn on the locked flag
85 self._s3.upload((self._storage_name+"/"+BlockStorageS3._index_name,
86 struct.pack(BlockStorageS3._index_struct_string,
89 len(self.header_data),
93 def _check_async(self):
94 if self._async_write is not None:
95 for i in self._async_write:
96 if self._async_write_callback is not None:
97 self._async_write_callback(i)
98 self._async_write = None
99 self._async_write_callback = None
101 def _schedule_async_write(self, arglist, callback=None):
102 assert self._async_write is None
103 if self._pool is not None:
104 self._async_write = \
105 self._pool.imap_unordered(self._s3.upload, arglist)
106 self._async_write_callback = callback
108 # Note: we are using six.map which always
110 for i in map(self._s3.upload, arglist):
111 if callback is not None:
114 def _download(self, i):
115 return self._s3.download(self._basename % i)
118 # Define BlockStorageInterface Methods
121 def clone_device(self):
122 f = BlockStorageS3(self.storage_name,
123 bucket_name=self._bucket_name,
124 aws_access_key_id=self._aws_access_key_id,
125 aws_secret_access_key=self._aws_secret_access_key,
126 region_name=self._region_name,
128 s3_wrapper=type(self._s3),
131 f._close_pool = False
135 def compute_storage_size(cls,
139 ignore_header=False):
140 assert (block_size > 0) and (block_size == int(block_size))
141 assert (block_count > 0) and (block_count == int(block_count))
142 if header_data is None:
143 header_data = bytes()
145 return block_size * block_count
147 return BlockStorageS3._index_offset + \
149 block_size * block_count
157 aws_access_key_id=None,
158 aws_secret_access_key=None,
162 threadpool_size=None,
163 ignore_existing=False,
164 s3_wrapper=Boto3S3Wrapper):
166 if bucket_name is None:
167 raise ValueError("'bucket_name' is required")
168 if (block_size <= 0) or (block_size != int(block_size)):
170 "Block size (bytes) must be a positive integer: %s"
172 if (block_count <= 0) or (block_count != int(block_count)):
174 "Block count must be a positive integer: %s"
176 if (header_data is not None) and \
177 (type(header_data) is not bytes):
179 "'header_data' must be of type bytes. "
180 "Invalid type: %s" % (type(header_data)))
183 if threadpool_size != 0:
184 pool = ThreadPool(threadpool_size)
186 s3 = s3_wrapper(bucket_name,
187 aws_access_key_id=aws_access_key_id,
188 aws_secret_access_key=aws_secret_access_key,
189 region_name=region_name)
190 exists = s3.exists(storage_name)
191 if (not ignore_existing) and exists:
193 "Storage location already exists in bucket %s: %s"
194 % (bucket_name, storage_name))
196 log.info("Deleting objects in existing S3 entry: %s/%s"
197 % (bucket_name, storage_name))
198 print("Clearing Existing S3 Objects With Prefix %s/%s/"
199 % (bucket_name, storage_name))
200 s3.clear(storage_name, threadpool=pool)
202 if header_data is None:
203 s3.upload((storage_name+"/"+BlockStorageS3._index_name,
204 struct.pack(BlockStorageS3._index_struct_string,
210 s3.upload((storage_name+"/"+BlockStorageS3._index_name,
211 struct.pack(BlockStorageS3._index_struct_string,
218 if initialize is None:
219 zeros = bytes(bytearray(block_size))
220 initialize = lambda i: zeros
221 basename = storage_name+"/b%d"
222 # NOTE: We will not be informed when a thread
223 # encounters an exception (e.g., when
224 # calling initialize(i). We must ensure
225 # that all iterations were processed
226 # by counting the results.
228 for i in xrange(block_count):
229 yield (basename % i, initialize(i))
233 except Exception as e: # pragma: no cover
234 log.error( # pragma: no cover
235 "An exception occured during S3 " # pragma: no cover
236 "setup when calling the block " # pragma: no cover
237 "initialization function: %s" # pragma: no cover
238 % (str(e))) # pragma: no cover
239 raise # pragma: no cover
241 progress_bar = tqdm.tqdm(total=block_count*block_size,
242 desc="Initializing S3 Block Storage Space",
245 disable=not pyoram.config.SHOW_PROGRESS_BAR)
248 for i,_ in enumerate(
249 pool.imap_unordered(_do_upload, init_blocks())):
251 progress_bar.update(n=block_size)
252 except Exception as e: # pragma: no cover
253 s3.clear(storage_name) # pragma: no cover
254 raise # pragma: no cover
261 for i,_ in enumerate(
262 map(s3.upload, init_blocks())):
264 progress_bar.update(n=block_size)
265 except Exception as e: # pragma: no cover
266 s3.clear(storage_name) # pragma: no cover
267 raise # pragma: no cover
271 if total != block_count - 1:
272 s3.clear(storage_name) # pragma: no cover
273 if pool is not None: # pragma: no cover
274 pool.close() # pragma: no cover
275 pool.join() # pragma: no cover
276 raise ValueError( # pragma: no cover
277 "Something went wrong during S3 block" # pragma: no cover
278 " initialization. Check the logger " # pragma: no cover
279 "output for more information.") # pragma: no cover
281 return BlockStorageS3(storage_name,
282 bucket_name=bucket_name,
283 aws_access_key_id=aws_access_key_id,
284 aws_secret_access_key=aws_secret_access_key,
285 region_name=region_name,
286 threadpool_size=threadpool_size,
287 s3_wrapper=s3_wrapper)
290 def header_data(self):
291 return self._user_header_data
294 def block_count(self):
295 return self._block_count
298 def block_size(self):
299 return self._block_size
302 def storage_name(self):
303 return self._storage_name
305 def update_header_data(self, new_header_data):
307 if len(new_header_data) != len(self.header_data):
309 "The size of header data can not change.\n"
310 "Original bytes: %s\n"
311 "New bytes: %s" % (len(self.header_data),
312 len(new_header_data)))
313 self._user_header_data = new_header_data
315 index_data = bytearray(self._s3.download(
316 self._storage_name+"/"+BlockStorageS3._index_name))
317 lenbefore = len(index_data)
318 index_data[BlockStorageS3._index_offset:] = new_header_data
319 assert lenbefore == len(index_data)
320 self._s3.upload((self._storage_name+"/"+BlockStorageS3._index_name,
325 if self._s3 is not None:
326 if not self._ignore_lock:
327 # turn off the locked flag
329 (self._storage_name+"/"+BlockStorageS3._index_name,
330 struct.pack(BlockStorageS3._index_struct_string,
333 len(self.header_data),
336 if self._close_pool and (self._pool is not None):
341 def read_blocks(self, indices):
343 # be sure not to exhaust this if it is an iterator
345 indices = list(indices)
346 assert all(0 <= i <= self.block_count for i in indices)
347 self._bytes_received += self.block_size * len(indices)
348 if self._pool is not None:
349 return self._pool.map(self._download, indices)
351 return list(map(self._download, indices))
353 def yield_blocks(self, indices):
355 # be sure not to exhaust this if it is an iterator
357 indices = list(indices)
358 assert all(0 <= i <= self.block_count for i in indices)
359 self._bytes_received += self.block_size * len(indices)
360 if self._pool is not None:
361 return self._pool.imap(self._download, indices)
363 return map(self._download, indices)
365 def read_block(self, i):
367 assert 0 <= i < self.block_count
368 self._bytes_received += self.block_size
369 return self._download(i)
371 def write_blocks(self, indices, blocks, callback=None):
373 # be sure not to exhaust this if it is an iterator
375 indices = list(indices)
376 assert all(0 <= i <= self.block_count for i in indices)
377 self._bytes_sent += self.block_size * len(indices)
378 indices = (self._basename % i for i in indices)
379 self._schedule_async_write(zip(indices, blocks),
382 def write_block(self, i, block):
384 assert 0 <= i < self.block_count
385 self._bytes_sent += self.block_size
386 self._schedule_async_write((((self._basename % i), block),))
389 def bytes_sent(self):
390 return self._bytes_sent
393 def bytes_received(self):
394 return self._bytes_received
396 BlockStorageTypeFactory.register_device("s3", BlockStorageS3)