Revision e73f61d6 snf-pithos-tools/pithos/tools/test.py
b/snf-pithos-tools/pithos/tools/test.py | ||
---|---|---|
538 | 538 |
self.obj.append(self.upload_random_data(self.container[0], o)) |
539 | 539 |
for o in o_names[8:]: |
540 | 540 |
self.obj.append(self.upload_random_data(self.container[1], o)) |
541 |
|
|
541 |
|
|
542 |
def test_list_shared(self): |
|
543 |
self.client.share_object(self.container[0], self.obj[0]['name'], ('*',)) |
|
544 |
objs = self.client.list_objects(self.container[0], shared=True) |
|
545 |
self.assertEqual(objs, [self.obj[0]['name']]) |
|
546 |
|
|
547 |
# create child object |
|
548 |
self.upload_random_data(self.container[0], strnextling(self.obj[0]['name'])) |
|
549 |
objs = self.client.list_objects(self.container[0], shared=True) |
|
550 |
self.assertEqual(objs, [self.obj[0]['name']]) |
|
551 |
|
|
552 |
# test inheritance |
|
553 |
self.client.create_folder(self.container[1], 'folder') |
|
554 |
self.client.share_object(self.container[1], 'folder', ('*',)) |
|
555 |
self.upload_random_data(self.container[1], 'folder/object') |
|
556 |
objs = self.client.list_objects(self.container[1], shared=True) |
|
557 |
self.assertEqual(objs, ['folder', 'folder/object']) |
|
558 |
|
|
559 |
def test_list_public(self): |
|
560 |
self.client.publish_object(self.container[0], self.obj[0]['name']) |
|
561 |
objs = self.client.list_objects(self.container[0], public=True) |
|
562 |
self.assertEqual(objs, [self.obj[0]['name']]) |
|
563 |
|
|
564 |
# create child object |
|
565 |
self.upload_random_data(self.container[0], strnextling(self.obj[0]['name'])) |
|
566 |
objs = self.client.list_objects(self.container[0], public=True) |
|
567 |
self.assertEqual(objs, [self.obj[0]['name']]) |
|
568 |
|
|
569 |
# test inheritance |
|
570 |
self.client.create_folder(self.container[1], 'folder') |
|
571 |
self.client.publish_object(self.container[1], 'folder') |
|
572 |
self.upload_random_data(self.container[1], 'folder/object') |
|
573 |
objs = self.client.list_objects(self.container[1], public=True) |
|
574 |
self.assertEqual(objs, ['folder']) |
|
575 |
|
|
576 |
def test_list_shared_public(self): |
|
577 |
self.client.share_object(self.container[0], self.obj[0]['name'], ('*',)) |
|
578 |
self.client.publish_object(self.container[0], self.obj[1]['name']) |
|
579 |
objs = self.client.list_objects(self.container[0], shared=True, public=True) |
|
580 |
self.assertEqual(objs, [self.obj[0]['name'], self.obj[1]['name']]) |
|
581 |
|
|
582 |
# create child object |
|
583 |
self.upload_random_data(self.container[0], strnextling(self.obj[0]['name'])) |
|
584 |
self.upload_random_data(self.container[0], strnextling(self.obj[1]['name'])) |
|
585 |
objs = self.client.list_objects(self.container[0], shared=True, public=True) |
|
586 |
self.assertEqual(objs, [self.obj[0]['name'], self.obj[1]['name']]) |
|
587 |
|
|
588 |
# test inheritance |
|
589 |
self.client.create_folder(self.container[1], 'folder1') |
|
590 |
self.client.share_object(self.container[1], 'folder1', ('*',)) |
|
591 |
self.upload_random_data(self.container[1], 'folder1/object') |
|
592 |
self.client.create_folder(self.container[1], 'folder2') |
|
593 |
self.client.publish_object(self.container[1], 'folder2') |
|
594 |
o = self.upload_random_data(self.container[1], 'folder2/object') |
|
595 |
objs = self.client.list_objects(self.container[1], shared=True, public=True) |
|
596 |
self.assertEqual(objs, ['folder1', 'folder1/object', 'folder2']) |
|
597 |
|
|
542 | 598 |
def test_list_objects(self): |
543 | 599 |
objects = self.client.list_objects(self.container[0]) |
544 | 600 |
l = [elem['name'] for elem in self.obj[:8]] |
... | ... | |
1373 | 1429 |
self.assert_raises_fault(404, self.client.copy_object, self.containers[1], |
1374 | 1430 |
self.obj['name'], self.containers[1], |
1375 | 1431 |
'testcopy', meta) |
1376 |
|
|
1432 |
|
|
1433 |
def test_copy_dir(self): |
|
1434 |
self.client.create_folder(self.containers[0], 'dir') |
|
1435 |
objects = ('object1', 'subdir/object2', 'dirs') |
|
1436 |
for name in objects[:-1]: |
|
1437 |
self.upload_random_data(self.containers[0], 'dir/%s' % name) |
|
1438 |
self.upload_random_data(self.containers[0], 'dirs') |
|
1439 |
|
|
1440 |
self.client.copy_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/') |
|
1441 |
self.assert_object_exists(self.containers[0], 'dir') |
|
1442 |
self.assert_object_not_exists(self.containers[1], 'dirs') |
|
1443 |
for name in objects[:-1]: |
|
1444 |
meta0 = self.client.retrieve_object_metadata(self.containers[0], 'dir/%s' % name) |
|
1445 |
meta1 = self.client.retrieve_object_metadata(self.containers[1], 'dir-backup/%s' % name) |
|
1446 |
t = ('content-length', 'x-object-hash', 'content-type') |
|
1447 |
(self.assertEqual(meta0[elem], meta1[elem]) for elem in t) |
|
1448 |
|
|
1377 | 1449 |
class ObjectMove(BaseTestCase): |
1378 | 1450 |
def setUp(self): |
1379 | 1451 |
BaseTestCase.setUp(self) |
... | ... | |
1409 | 1481 |
|
1410 | 1482 |
#assert src object no more exists |
1411 | 1483 |
self.assert_object_not_exists(self.containers[0], self.obj['name']) |
1484 |
|
|
1485 |
|
|
1486 |
def test_move_dir(self): |
|
1487 |
self.client.create_folder(self.containers[0], 'dir') |
|
1488 |
objects = ('object1', 'subdir/object2', 'dirs') |
|
1489 |
meta = {} |
|
1490 |
for name in objects[:-1]: |
|
1491 |
self.upload_random_data(self.containers[0], 'dir/%s' % name) |
|
1492 |
meta[name] = self.client.retrieve_object_metadata(self.containers[0], 'dir/%s' % name) |
|
1493 |
self.upload_random_data(self.containers[0], 'dirs') |
|
1494 |
|
|
1495 |
self.client.move_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/', content_type='application/folder') |
|
1496 |
self.assert_object_not_exists(self.containers[0], 'dir') |
|
1497 |
self.assert_object_not_exists(self.containers[1], 'dirs') |
|
1498 |
for name in objects[:-1]: |
|
1499 |
self.assert_object_not_exists(self.containers[0], 'dir/%s' % name) |
|
1500 |
self.assert_object_exists(self.containers[1], 'dir-backup/%s' % name) |
|
1501 |
meta1 = self.client.retrieve_object_metadata(self.containers[1], 'dir-backup/%s' % name) |
|
1502 |
t = ('content-length', 'x-object-hash', 'content-type') |
|
1503 |
(self.assertEqual(meta[name][elem], meta1[elem]) for elem in t) |
|
1412 | 1504 |
|
1413 | 1505 |
class ObjectPost(BaseTestCase): |
1414 | 1506 |
def setUp(self): |
... | ... | |
1554 | 1646 |
self.assert_raises_fault(400, self.test_update_object, |
1555 | 1647 |
content_length = 1000) |
1556 | 1648 |
|
1557 |
def _test_update_object_invalid_range(self):
|
|
1649 |
def test_update_object_invalid_range(self): |
|
1558 | 1650 |
with AssertContentInvariant(self.client.retrieve_object, |
1559 | 1651 |
self.containers[0], self.obj[0]['name']): |
1560 | 1652 |
self.assert_raises_fault(416, self.test_update_object, 499, 0, True) |
1561 | 1653 |
|
1562 |
def _test_update_object_invalid_range_and_length(self):
|
|
1654 |
def test_update_object_invalid_range_and_length(self): |
|
1563 | 1655 |
with AssertContentInvariant(self.client.retrieve_object, |
1564 | 1656 |
self.containers[0], self.obj[0]['name']): |
1565 | 1657 |
self.assert_raises_fault([400, 416], self.test_update_object, 499, 0, True, |
... | ... | |
1704 | 1796 |
#assert item not found |
1705 | 1797 |
self.assert_raises_fault(404, self.client.delete_object, self.containers[1], |
1706 | 1798 |
self.obj['name']) |
1799 |
|
|
1800 |
def test_delete_dir(self): |
|
1801 |
self.client.create_folder(self.containers[0], 'dir') |
|
1802 |
objects = ('object1', 'subdir/object2', 'dirs') |
|
1803 |
for name in objects[:-1]: |
|
1804 |
self.upload_random_data(self.containers[0], 'dir/%s' % name) |
|
1805 |
self.upload_random_data(self.containers[0], 'dirs') |
|
1806 |
|
|
1807 |
self.client.delete_object(self.containers[0], 'dir', delimiter='/') |
|
1808 |
self.assert_object_not_exists(self.containers[0], 'dir') |
|
1809 |
self.assert_object_exists(self.containers[0], 'dirs') |
|
1810 |
for name in objects[:-1]: |
|
1811 |
self.assert_object_not_exists(self.containers[0], 'dir/%s' % name) |
|
1707 | 1812 |
|
1708 | 1813 |
class ListSharing(BaseTestCase): |
1709 | 1814 |
def setUp(self): |
... | ... | |
2273 | 2378 |
for k, v in self.map.items(): |
2274 | 2379 |
if is_date(v): |
2275 | 2380 |
continue |
2276 |
#print '#', k, v, map[k] |
|
2277 | 2381 |
assert(k in map) |
2278 | 2382 |
assert v == map[k] |
2279 | 2383 |
|
... | ... | |
2327 | 2431 |
return True |
2328 | 2432 |
return False |
2329 | 2433 |
|
2434 |
def strnextling(prefix): |
|
2435 |
"""Return the first unicode string |
|
2436 |
greater than but not starting with given prefix. |
|
2437 |
strnextling('hello') -> 'hellp' |
|
2438 |
""" |
|
2439 |
if not prefix: |
|
2440 |
## all strings start with the null string, |
|
2441 |
## therefore we have to approximate strnextling('') |
|
2442 |
## with the last unicode character supported by python |
|
2443 |
## 0x10ffff for wide (32-bit unicode) python builds |
|
2444 |
## 0x00ffff for narrow (16-bit unicode) python builds |
|
2445 |
## We will not autodetect. 0xffff is safe enough. |
|
2446 |
return unichr(0xffff) |
|
2447 |
s = prefix[:-1] |
|
2448 |
c = ord(prefix[-1]) |
|
2449 |
if c >= 0xffff: |
|
2450 |
raise RuntimeError |
|
2451 |
s += unichr(c+1) |
|
2452 |
return s |
|
2453 |
|
|
2330 | 2454 |
o_names = ['kate.jpg', |
2331 | 2455 |
'kate_beckinsale.jpg', |
2332 | 2456 |
'How To Win Friends And Influence People.pdf', |
Also available in: Unified diff