PyORAm
[iotcloud.git] / PyORAM / src / pyoram / storage / boto3_s3_wrapper.py
1 __all__ = ("Boto3S3Wrapper",
2            "MockBoto3S3Wrapper")
3 import os
4 import shutil
5
6 import pyoram
7
8 import tqdm
9 try:
10     import boto3
11     import botocore
12     boto3_available = True
13 except:                                                # pragma: no cover
14     boto3_available = False                            # pragma: no cover
15
16 import six
17 from six.moves import xrange, map
18
19 class Boto3S3Wrapper(object):
20     """
21     A wrapper class for the boto3 S3 service.
22     """
23
24     def __init__(self,
25                  bucket_name,
26                  aws_access_key_id=None,
27                  aws_secret_access_key=None,
28                  region_name=None):
29         if not boto3_available:
30             raise ImportError(                         # pragma: no cover
31                 "boto3 module is required to "         # pragma: no cover
32                 "use BlockStorageS3 device")           # pragma: no cover
33
34         self._s3 = boto3.session.Session(
35             aws_access_key_id=aws_access_key_id,
36             aws_secret_access_key=aws_secret_access_key,
37             region_name=region_name).resource('s3')
38         self._bucket = self._s3.Bucket(bucket_name)
39
40     def exists(self, key):
41         try:
42             self._bucket.Object(key).load()
43         except botocore.exceptions.ClientError as e:
44             if e.response['Error']['Code'] == "404":
45                 pass
46             else:
47                 raise e
48         else:
49             return True
50         # It's not a file. Check if it's a "directory".
51         for obj in self._bucket.objects.filter(
52                 Prefix=key+"/"):
53             return True
54         return False
55
56     def download(self, key):
57         try:
58             return self._s3.meta.client.get_object(
59                 Bucket=self._bucket.name,
60                 Key=key)['Body'].read()
61         except botocore.exceptions.ClientError:
62             raise IOError("Can not download key: %s"
63                           % (key))
64
65     def upload(self, key_block):
66         key, block = key_block
67         self._bucket.put_object(Key=key, Body=block)
68
69     # Chunk a streamed iterator of which we do not know
70     # the size
71     def _chunks(self, objs, n=100):
72         assert 1 <= n <= 1000 # required by boto3
73         objs = iter(objs)
74         try:
75             while (1):
76                 chunk = []
77                 while len(chunk) < n:
78                     chunk.append({'Key': six.next(objs).key})
79                 yield {'Objects': chunk}
80         except StopIteration:
81             pass
82         if len(chunk):
83             yield {'Objects': chunk}
84
85     def _del(self, chunk):
86         self._bucket.delete_objects(Delete=chunk)
87         return len(chunk['Objects'])
88
89     def clear(self, key, threadpool=None):
90         objs = self._bucket.objects.filter(Prefix=key+"/")
91         if threadpool is not None:
92             deliter = threadpool.imap(self._del, self._chunks(objs))
93         else:
94             deliter = map(self._del, self._chunks(objs))
95         with tqdm.tqdm(total=None, #len(objs),
96                        desc="Clearing S3 Blocks",
97                        unit=" objects",
98                        disable=not pyoram.config.SHOW_PROGRESS_BAR) as progress_bar:
99             progress_bar.update(n=0)
100             for chunksize in deliter:
101                 progress_bar.update(n=chunksize)
102
103 class MockBoto3S3Wrapper(object):
104     """
105     A mock class for Boto3S3Wrapper that uses the local filesystem and
106     treats the bucket name as a directory.
107
108     This class is mainly used for testing, but could potentially be
109     used to setup storage locally that is then uploaded to S3 through
110     the AWS web portal.
111     """
112
113     def __init__(self,
114                  bucket_name,
115                  aws_access_key_id=None,
116                  aws_secret_access_key=None,
117                  region_name=None):
118
119         self._bucket_name = os.path.abspath(
120             os.path.normpath(bucket_name))
121
122     # called within upload to create directory
123     # heirarchy on the fly
124     def _makedirs_if_needed(self, key):
125         if not os.path.exists(
126                 os.path.dirname(
127                     os.path.join(self._bucket_name, key))):
128             os.makedirs(
129                 os.path.dirname(
130                     os.path.join(self._bucket_name, key)))
131         assert not os.path.isdir(
132             os.path.join(self._bucket_name, key))
133
134     def exists(self, key):
135         return os.path.exists(
136             os.path.join(self._bucket_name, key))
137
138     def download(self, key):
139         with open(os.path.join(self._bucket_name, key), 'rb') as f:
140             return f.read()
141
142     def upload(self, key_block):
143         key, block = key_block
144         self._makedirs_if_needed(key)
145         with open(os.path.join(self._bucket_name, key), 'wb') as f:
146             f.write(block)
147
148     def clear(self, key, threadpool=None):
149         if os.path.exists(
150                 os.path.join(self._bucket_name, key)):
151             if os.path.isdir(
152                     os.path.join(self._bucket_name, key)):
153                 shutil.rmtree(
154                     os.path.join(self._bucket_name, key),
155                     ignore_errors=True)
156             else:
157                 os.remove(
158                     os.path.join(self._bucket_name, key))