5 from pyoram.util.virtual_heap import \
7 from pyoram.storage.block_storage import \
8 BlockStorageTypeFactory
9 from pyoram.encrypted_storage.encrypted_block_storage import \
11 from pyoram.encrypted_storage.encrypted_heap_storage import \
13 from pyoram.crypto.aes import AES
15 from six.moves import xrange
17 thisdir = os.path.dirname(os.path.abspath(__file__))
19 class TestEncryptedHeapStorage(unittest2.TestCase):
23 fd, cls._dummy_name = tempfile.mkstemp()
26 os.remove(cls._dummy_name)
27 except OSError: # pragma: no cover
28 pass # pragma: no cover
30 cls._blocks_per_bucket = 3
34 ((cls._heap_base**(cls._heap_height+1)) - 1)//(cls._heap_base-1)
35 cls._block_count = cls._bucket_count * \
36 cls._blocks_per_bucket
37 cls._testfname = cls.__name__ + "_testfile.bin"
39 cls._type_name = "file"
40 f = EncryptedHeapStorage.setup(
42 block_size=cls._block_size,
43 heap_height=cls._heap_height,
44 key_size=AES.key_sizes[-1],
45 heap_base=cls._heap_base,
46 blocks_per_bucket=cls._blocks_per_bucket,
47 storage_type=cls._type_name,
48 initialize=lambda i: bytes(bytearray([i]) * \
50 cls._blocks_per_bucket),
54 for i in range(cls._bucket_count):
55 data = bytearray([i]) * \
57 cls._blocks_per_bucket
58 cls._buckets.append(data)
61 def tearDownClass(cls):
63 os.remove(cls._testfname)
64 except OSError: # pragma: no cover
65 pass # pragma: no cover
67 os.remove(cls._dummy_name)
68 except OSError: # pragma: no cover
69 pass # pragma: no cover
71 def test_setup_fails(self):
72 self.assertEqual(os.path.exists(self._dummy_name), False)
73 with self.assertRaises(IOError):
74 EncryptedHeapStorage.setup(
80 key_size=AES.key_sizes[-1],
82 storage_type=self._type_name)
83 self.assertEqual(os.path.exists(self._dummy_name), False)
84 with self.assertRaises(IOError):
85 EncryptedHeapStorage.setup(
91 key_size=AES.key_sizes[-1],
93 storage_type=self._type_name,
94 ignore_existing=False)
95 self.assertEqual(os.path.exists(self._dummy_name), False)
97 with self.assertRaises(ValueError):
98 EncryptedHeapStorage.setup(
102 key_size=AES.key_sizes[-1],
104 storage_type=self._type_name)
105 self.assertEqual(os.path.exists(self._dummy_name), False)
107 with self.assertRaises(ValueError):
108 EncryptedHeapStorage.setup(
113 storage_type=self._type_name)
114 self.assertEqual(os.path.exists(self._dummy_name), False)
115 # bad blocks_per_bucket
116 with self.assertRaises(ValueError):
117 EncryptedHeapStorage.setup(
121 key_size=AES.key_sizes[-1],
123 storage_type=self._type_name)
124 self.assertEqual(os.path.exists(self._dummy_name), False)
126 with self.assertRaises(ValueError):
127 EncryptedHeapStorage.setup(
131 key_size=AES.key_sizes[-1],
134 storage_type=self._type_name)
135 self.assertEqual(os.path.exists(self._dummy_name), False)
137 with self.assertRaises(TypeError):
138 EncryptedHeapStorage.setup(
142 key_size=AES.key_sizes[-1],
144 storage_type=self._type_name,
146 self.assertEqual(os.path.exists(self._dummy_name), False)
148 with self.assertRaises(ValueError):
149 EncryptedHeapStorage.setup(
153 key_size=AES.key_sizes[-1],
156 storage_type=self._type_name)
157 self.assertEqual(os.path.exists(self._dummy_name), False)
159 def test_setup(self):
160 fname = ".".join(self.id().split(".")[1:])
162 fname = os.path.join(thisdir, fname)
163 if os.path.exists(fname):
164 os.remove(fname) # pragma: no cover
167 blocks_per_bucket = 3
168 fsetup = EncryptedHeapStorage.setup(
172 key_size=AES.key_sizes[-1],
173 blocks_per_bucket=blocks_per_bucket)
175 self.assertEqual(type(fsetup.raw_storage),
176 BlockStorageTypeFactory(self._type_name))
177 with open(fname, 'rb') as f:
181 EncryptedHeapStorage.compute_storage_size(
184 blocks_per_bucket=blocks_per_bucket))
187 EncryptedHeapStorage.compute_storage_size(
190 blocks_per_bucket=blocks_per_bucket,
193 with EncryptedHeapStorage(
196 storage_type=self._type_name) as f:
197 self.assertEqual(f.header_data, bytes())
198 self.assertEqual(fsetup.header_data, bytes())
199 self.assertEqual(f.key, fsetup.key)
200 self.assertEqual(f.blocks_per_bucket,
202 self.assertEqual(fsetup.blocks_per_bucket,
204 self.assertEqual(f.bucket_count,
205 2**(heap_height+1) - 1)
206 self.assertEqual(fsetup.bucket_count,
207 2**(heap_height+1) - 1)
208 self.assertEqual(f.bucket_size,
209 bsize * blocks_per_bucket)
210 self.assertEqual(fsetup.bucket_size,
211 bsize * blocks_per_bucket)
212 self.assertEqual(f.storage_name, fname)
213 self.assertEqual(fsetup.storage_name, fname)
216 def test_setup_withdata(self):
217 fname = ".".join(self.id().split(".")[1:])
219 fname = os.path.join(thisdir, fname)
220 if os.path.exists(fname):
221 os.remove(fname) # pragma: no cover
224 blocks_per_bucket = 1
225 header_data = bytes(bytearray([0,1,2]))
226 fsetup = EncryptedHeapStorage.setup(
230 key_size=AES.key_sizes[-1],
231 blocks_per_bucket=blocks_per_bucket,
232 header_data=header_data)
234 self.assertEqual(type(fsetup.raw_storage),
235 BlockStorageTypeFactory(self._type_name))
236 with open(fname, 'rb') as f:
240 EncryptedHeapStorage.compute_storage_size(
243 header_data=header_data))
244 self.assertTrue(len(header_data) > 0)
246 EncryptedHeapStorage.compute_storage_size(
249 storage_type=self._type_name) <
250 EncryptedHeapStorage.compute_storage_size(
253 storage_type=self._type_name,
254 header_data=header_data),
258 EncryptedHeapStorage.compute_storage_size(
261 storage_type=self._type_name,
262 header_data=header_data,
265 with EncryptedHeapStorage(
268 storage_type=self._type_name) as f:
269 self.assertEqual(f.header_data, header_data)
270 self.assertEqual(fsetup.header_data, header_data)
271 self.assertEqual(f.key, fsetup.key)
272 self.assertEqual(f.blocks_per_bucket,
274 self.assertEqual(fsetup.blocks_per_bucket,
276 self.assertEqual(f.bucket_count,
277 2**(heap_height+1) - 1)
278 self.assertEqual(fsetup.bucket_count,
279 2**(heap_height+1) - 1)
280 self.assertEqual(f.bucket_size,
281 bsize * blocks_per_bucket)
282 self.assertEqual(fsetup.bucket_size,
283 bsize * blocks_per_bucket)
284 self.assertEqual(f.storage_name, fname)
285 self.assertEqual(fsetup.storage_name, fname)
288 def test_init_noexists(self):
289 self.assertEqual(os.path.exists(self._dummy_name), False)
290 with self.assertRaises(IOError):
291 with EncryptedHeapStorage(
294 storage_type=self._type_name) as f:
295 pass # pragma: no cover
297 def test_init_exists(self):
298 self.assertEqual(os.path.exists(self._testfname), True)
299 with open(self._testfname, 'rb') as f:
300 databefore = f.read()
301 with self.assertRaises(ValueError):
302 with EncryptedBlockStorage(self._testfname,
304 storage_type=self._type_name) as fb:
305 with EncryptedHeapStorage(fb, key=self._key) as f:
306 pass # pragma: no cover
307 with EncryptedHeapStorage(
310 storage_type=self._type_name) as f:
311 self.assertEqual(f.key, self._key)
312 self.assertEqual(f.bucket_size,
314 self._blocks_per_bucket)
315 self.assertEqual(f.bucket_count,
317 self.assertEqual(f.storage_name, self._testfname)
318 self.assertEqual(f.header_data, bytes())
319 self.assertEqual(os.path.exists(self._testfname), True)
320 with open(self._testfname, 'rb') as f:
322 self.assertEqual(databefore, dataafter)
324 def test_read_path(self):
326 with EncryptedHeapStorage(
329 storage_type=self._type_name) as f:
330 self.assertEqual(f.bytes_sent, 0)
331 self.assertEqual(f.bytes_received, 0)
334 f.virtual_heap.first_bucket_at_level(0), 0)
336 f.virtual_heap.last_leaf_bucket(), 0)
338 for b in range(f.virtual_heap.first_bucket_at_level(0),
339 f.virtual_heap.last_leaf_bucket()+1):
340 data = f.read_path(b)
341 bucket_path = f.virtual_heap.Node(b).\
342 bucket_path_from_root()
343 total_buckets += len(bucket_path)
344 self.assertEqual(f.virtual_heap.Node(b).level+1,
346 for i, bucket in zip(bucket_path, data):
347 self.assertEqual(list(bytearray(bucket)),
348 list(self._buckets[i]))
350 self.assertEqual(f.bytes_sent, 0)
351 self.assertEqual(f.bytes_received,
352 total_buckets*f.bucket_storage._storage.block_size)
354 def test_write_path(self):
355 data = [bytearray([self._bucket_count]) * \
357 self._blocks_per_bucket
358 for i in xrange(self._block_count)]
359 with EncryptedHeapStorage(
362 storage_type=self._type_name) as f:
363 self.assertEqual(f.bytes_sent, 0)
364 self.assertEqual(f.bytes_received, 0)
367 f.virtual_heap.first_bucket_at_level(0), 0)
369 f.virtual_heap.last_leaf_bucket(), 0)
371 for b in range(f.virtual_heap.first_bucket_at_level(0),
372 f.virtual_heap.last_leaf_bucket()+1):
373 orig = f.read_path(b)
374 bucket_path = f.virtual_heap.Node(b).\
375 bucket_path_from_root()
376 total_buckets += len(bucket_path)
377 self.assertNotEqual(len(bucket_path), 0)
378 self.assertEqual(f.virtual_heap.Node(b).level+1,
380 self.assertEqual(len(orig), len(bucket_path))
381 for i, bucket in zip(bucket_path, orig):
382 self.assertEqual(list(bytearray(bucket)),
383 list(self._buckets[i]))
384 f.write_path(b, [bytes(data[i])
385 for i in bucket_path])
388 self.assertEqual(len(new), len(bucket_path))
389 for i, bucket in zip(bucket_path, new):
390 self.assertEqual(list(bytearray(bucket)),
393 f.write_path(b, [bytes(self._buckets[i])
394 for i in bucket_path])
396 orig = f.read_path(b)
397 self.assertEqual(len(orig), len(bucket_path))
398 for i, bucket in zip(bucket_path, orig):
399 self.assertEqual(list(bytearray(bucket)),
400 list(self._buckets[i]))
402 self.assertEqual(f.bytes_sent,
403 total_buckets*f.bucket_storage._storage.block_size*2)
404 self.assertEqual(f.bytes_received,
405 total_buckets*f.bucket_storage._storage.block_size*3)
407 def test_update_header_data(self):
408 fname = ".".join(self.id().split(".")[1:])
410 fname = os.path.join(thisdir, fname)
411 if os.path.exists(fname):
412 os.remove(fname) # pragma: no cover
415 blocks_per_bucket = 1
416 header_data = bytes(bytearray([0,1,2]))
417 fsetup = EncryptedHeapStorage.setup(
420 heap_height=heap_height,
421 key_size=AES.key_sizes[-1],
422 blocks_per_bucket=blocks_per_bucket,
423 header_data=header_data)
425 new_header_data = bytes(bytearray([1,1,1]))
426 with EncryptedHeapStorage(
429 storage_type=self._type_name) as f:
430 self.assertEqual(f.header_data, header_data)
431 f.update_header_data(new_header_data)
432 self.assertEqual(f.header_data, new_header_data)
433 with EncryptedHeapStorage(
436 storage_type=self._type_name) as f:
437 self.assertEqual(f.header_data, new_header_data)
438 with self.assertRaises(ValueError):
439 with EncryptedHeapStorage(
442 storage_type=self._type_name) as f:
443 f.update_header_data(bytes(bytearray([1,1])))
444 with self.assertRaises(ValueError):
445 with EncryptedHeapStorage(
448 storage_type=self._type_name) as f:
449 f.update_header_data(bytes(bytearray([1,1,1,1])))
450 with EncryptedHeapStorage(
453 storage_type=self._type_name) as f:
454 self.assertEqual(f.header_data, new_header_data)
457 def test_locked_flag(self):
458 with EncryptedHeapStorage(self._testfname,
460 storage_type=self._type_name) as f:
461 with self.assertRaises(IOError):
462 with EncryptedHeapStorage(self._testfname,
464 storage_type=self._type_name) as f1:
465 pass # pragma: no cover
466 with self.assertRaises(IOError):
467 with EncryptedHeapStorage(self._testfname,
469 storage_type=self._type_name) as f1:
470 pass # pragma: no cover
471 with EncryptedHeapStorage(self._testfname,
473 storage_type=self._type_name,
474 ignore_lock=True) as f1:
476 with self.assertRaises(IOError):
477 with EncryptedHeapStorage(self._testfname,
479 storage_type=self._type_name) as f1:
480 pass # pragma: no cover
481 with EncryptedHeapStorage(self._testfname,
483 storage_type=self._type_name,
484 ignore_lock=True) as f1:
486 with EncryptedHeapStorage(self._testfname,
488 storage_type=self._type_name,
489 ignore_lock=True) as f1:
491 with EncryptedHeapStorage(self._testfname,
493 storage_type=self._type_name) as f:
496 def test_read_path_cloned(self):
498 with EncryptedHeapStorage(
501 storage_type=self._type_name) as forig:
502 self.assertEqual(forig.bytes_sent, 0)
503 self.assertEqual(forig.bytes_received, 0)
504 with forig.clone_device() as f:
505 self.assertEqual(forig.bytes_sent, 0)
506 self.assertEqual(forig.bytes_received, 0)
507 self.assertEqual(f.bytes_sent, 0)
508 self.assertEqual(f.bytes_received, 0)
511 f.virtual_heap.first_bucket_at_level(0), 0)
513 f.virtual_heap.last_leaf_bucket(), 0)
515 for b in range(f.virtual_heap.first_bucket_at_level(0),
516 f.virtual_heap.last_leaf_bucket()+1):
517 data = f.read_path(b)
518 bucket_path = f.virtual_heap.Node(b).\
519 bucket_path_from_root()
520 total_buckets += len(bucket_path)
521 self.assertEqual(f.virtual_heap.Node(b).level+1,
523 for i, bucket in zip(bucket_path, data):
524 self.assertEqual(list(bytearray(bucket)),
525 list(self._buckets[i]))
527 self.assertEqual(f.bytes_sent, 0)
528 self.assertEqual(f.bytes_received,
529 total_buckets*f.bucket_storage._storage.block_size)
530 self.assertEqual(forig.bytes_sent, 0)
531 self.assertEqual(forig.bytes_received, 0)
533 def test_write_path_cloned(self):
534 data = [bytearray([self._bucket_count]) * \
536 self._blocks_per_bucket
537 for i in xrange(self._block_count)]
538 with EncryptedHeapStorage(
541 storage_type=self._type_name) as forig:
542 self.assertEqual(forig.bytes_sent, 0)
543 self.assertEqual(forig.bytes_received, 0)
544 with forig.clone_device() as f:
545 self.assertEqual(forig.bytes_sent, 0)
546 self.assertEqual(forig.bytes_received, 0)
547 self.assertEqual(f.bytes_sent, 0)
548 self.assertEqual(f.bytes_received, 0)
551 f.virtual_heap.first_bucket_at_level(0), 0)
553 f.virtual_heap.last_leaf_bucket(), 0)
555 for b in range(f.virtual_heap.first_bucket_at_level(0),
556 f.virtual_heap.last_leaf_bucket()+1):
557 orig = f.read_path(b)
558 bucket_path = f.virtual_heap.Node(b).\
559 bucket_path_from_root()
560 total_buckets += len(bucket_path)
561 self.assertNotEqual(len(bucket_path), 0)
562 self.assertEqual(f.virtual_heap.Node(b).level+1,
564 self.assertEqual(len(orig), len(bucket_path))
565 for i, bucket in zip(bucket_path, orig):
566 self.assertEqual(list(bytearray(bucket)),
567 list(self._buckets[i]))
568 f.write_path(b, [bytes(data[i])
569 for i in bucket_path])
572 self.assertEqual(len(new), len(bucket_path))
573 for i, bucket in zip(bucket_path, new):
574 self.assertEqual(list(bytearray(bucket)),
577 f.write_path(b, [bytes(self._buckets[i])
578 for i in bucket_path])
580 orig = f.read_path(b)
581 self.assertEqual(len(orig), len(bucket_path))
582 for i, bucket in zip(bucket_path, orig):
583 self.assertEqual(list(bytearray(bucket)),
584 list(self._buckets[i]))
586 self.assertEqual(f.bytes_sent,
587 total_buckets*f.bucket_storage._storage.block_size*2)
588 self.assertEqual(f.bytes_received,
589 total_buckets*f.bucket_storage._storage.block_size*3)
590 self.assertEqual(forig.bytes_sent, 0)
591 self.assertEqual(forig.bytes_received, 0)
593 if __name__ == "__main__":
594 unittest2.main() # pragma: no cover