1 __all__ = ("Boto3S3Wrapper",
12 boto3_available = True
13 except: # pragma: no cover
14 boto3_available = False # pragma: no cover
17 from six.moves import xrange, map
19 class Boto3S3Wrapper(object):
21 A wrapper class for the boto3 S3 service.
26 aws_access_key_id=None,
27 aws_secret_access_key=None,
29 if not boto3_available:
30 raise ImportError( # pragma: no cover
31 "boto3 module is required to " # pragma: no cover
32 "use BlockStorageS3 device") # pragma: no cover
34 self._s3 = boto3.session.Session(
35 aws_access_key_id=aws_access_key_id,
36 aws_secret_access_key=aws_secret_access_key,
37 region_name=region_name).resource('s3')
38 self._bucket = self._s3.Bucket(bucket_name)
40 def exists(self, key):
42 self._bucket.Object(key).load()
43 except botocore.exceptions.ClientError as e:
44 if e.response['Error']['Code'] == "404":
50 # It's not a file. Check if it's a "directory".
51 for obj in self._bucket.objects.filter(
56 def download(self, key):
58 return self._s3.meta.client.get_object(
59 Bucket=self._bucket.name,
60 Key=key)['Body'].read()
61 except botocore.exceptions.ClientError:
62 raise IOError("Can not download key: %s"
65 def upload(self, key_block):
66 key, block = key_block
67 self._bucket.put_object(Key=key, Body=block)
69 # Chunk a streamed iterator of which we do not know
71 def _chunks(self, objs, n=100):
72 assert 1 <= n <= 1000 # required by boto3
78 chunk.append({'Key': six.next(objs).key})
79 yield {'Objects': chunk}
83 yield {'Objects': chunk}
85 def _del(self, chunk):
86 self._bucket.delete_objects(Delete=chunk)
87 return len(chunk['Objects'])
89 def clear(self, key, threadpool=None):
90 objs = self._bucket.objects.filter(Prefix=key+"/")
91 if threadpool is not None:
92 deliter = threadpool.imap(self._del, self._chunks(objs))
94 deliter = map(self._del, self._chunks(objs))
95 with tqdm.tqdm(total=None, #len(objs),
96 desc="Clearing S3 Blocks",
98 disable=not pyoram.config.SHOW_PROGRESS_BAR) as progress_bar:
99 progress_bar.update(n=0)
100 for chunksize in deliter:
101 progress_bar.update(n=chunksize)
103 class MockBoto3S3Wrapper(object):
105 A mock class for Boto3S3Wrapper that uses the local filesystem and
106 treats the bucket name as a directory.
108 This class is mainly used for testing, but could potentially be
109 used to setup storage locally that is then uploaded to S3 through
115 aws_access_key_id=None,
116 aws_secret_access_key=None,
119 self._bucket_name = os.path.abspath(
120 os.path.normpath(bucket_name))
122 # called within upload to create directory
123 # heirarchy on the fly
124 def _makedirs_if_needed(self, key):
125 if not os.path.exists(
127 os.path.join(self._bucket_name, key))):
130 os.path.join(self._bucket_name, key)))
131 assert not os.path.isdir(
132 os.path.join(self._bucket_name, key))
134 def exists(self, key):
135 return os.path.exists(
136 os.path.join(self._bucket_name, key))
138 def download(self, key):
139 with open(os.path.join(self._bucket_name, key), 'rb') as f:
142 def upload(self, key_block):
143 key, block = key_block
144 self._makedirs_if_needed(key)
145 with open(os.path.join(self._bucket_name, key), 'wb') as f:
148 def clear(self, key, threadpool=None):
150 os.path.join(self._bucket_name, key)):
152 os.path.join(self._bucket_name, key)):
154 os.path.join(self._bucket_name, key),
158 os.path.join(self._bucket_name, key))