self.obj.append(self.upload_random_data(self.container[0], o))
for o in o_names[8:]:
self.obj.append(self.upload_random_data(self.container[1], o))
-
+
+ def test_list_shared(self):
+ self.client.share_object(self.container[0], self.obj[0]['name'], ('*',))
+ objs = self.client.list_objects(self.container[0], shared=True)
+ self.assertEqual(objs, [self.obj[0]['name']])
+
+ # create child object
+ self.upload_random_data(self.container[0], strnextling(self.obj[0]['name']))
+ objs = self.client.list_objects(self.container[0], shared=True)
+ self.assertEqual(objs, [self.obj[0]['name']])
+
+ # test inheritance
+ self.client.create_folder(self.container[1], 'folder')
+ self.client.share_object(self.container[1], 'folder', ('*',))
+ self.upload_random_data(self.container[1], 'folder/object')
+ objs = self.client.list_objects(self.container[1], shared=True)
+ self.assertEqual(objs, ['folder', 'folder/object'])
+
+ def test_list_public(self):
+ self.client.publish_object(self.container[0], self.obj[0]['name'])
+ objs = self.client.list_objects(self.container[0], public=True)
+ self.assertEqual(objs, [self.obj[0]['name']])
+
+ # create child object
+ self.upload_random_data(self.container[0], strnextling(self.obj[0]['name']))
+ objs = self.client.list_objects(self.container[0], public=True)
+ self.assertEqual(objs, [self.obj[0]['name']])
+
+ # test inheritance
+ self.client.create_folder(self.container[1], 'folder')
+ self.client.publish_object(self.container[1], 'folder')
+ self.upload_random_data(self.container[1], 'folder/object')
+ objs = self.client.list_objects(self.container[1], public=True)
+ self.assertEqual(objs, ['folder'])
+
+ def test_list_shared_public(self):
+ self.client.share_object(self.container[0], self.obj[0]['name'], ('*',))
+ self.client.publish_object(self.container[0], self.obj[1]['name'])
+ objs = self.client.list_objects(self.container[0], shared=True, public=True)
+ self.assertEqual(objs, [self.obj[0]['name'], self.obj[1]['name']])
+
+ # create child object
+ self.upload_random_data(self.container[0], strnextling(self.obj[0]['name']))
+ self.upload_random_data(self.container[0], strnextling(self.obj[1]['name']))
+ objs = self.client.list_objects(self.container[0], shared=True, public=True)
+ self.assertEqual(objs, [self.obj[0]['name'], self.obj[1]['name']])
+
+ # test inheritance
+ self.client.create_folder(self.container[1], 'folder1')
+ self.client.share_object(self.container[1], 'folder1', ('*',))
+ self.upload_random_data(self.container[1], 'folder1/object')
+ self.client.create_folder(self.container[1], 'folder2')
+ self.client.publish_object(self.container[1], 'folder2')
+ o = self.upload_random_data(self.container[1], 'folder2/object')
+ objs = self.client.list_objects(self.container[1], shared=True, public=True)
+ self.assertEqual(objs, ['folder1', 'folder1/object', 'folder2'])
+
def test_list_objects(self):
objects = self.client.list_objects(self.container[0])
l = [elem['name'] for elem in self.obj[:8]]
def _test_maximum_upload_size_exceeds(self):
name = o_names[0]
meta = {'test':'test1'}
- #upload 100MB
- length=1024*1024*100
+ #upload 5GB
+ length= 5 * (1024 * 1024 * 1024) + 1
self.assert_raises_fault(400, self.upload_random_data, self.container,
name, length, **meta)
self.assert_raises_fault(404, self.client.copy_object, self.containers[1],
self.obj['name'], self.containers[1],
'testcopy', meta)
-
+
+ def test_copy_dir(self):
+ self.client.create_folder(self.containers[0], 'dir')
+ objects = ('object1', 'subdir/object2', 'dirs')
+ for name in objects[:-1]:
+ self.upload_random_data(self.containers[0], 'dir/%s' % name)
+ self.upload_random_data(self.containers[0], 'dirs')
+
+ self.client.copy_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/')
+ self.assert_object_exists(self.containers[0], 'dir')
+ self.assert_object_not_exists(self.containers[1], 'dirs')
+ for name in objects[:-1]:
+ meta0 = self.client.retrieve_object_metadata(self.containers[0], 'dir/%s' % name)
+ meta1 = self.client.retrieve_object_metadata(self.containers[1], 'dir-backup/%s' % name)
+ t = ('content-length', 'x-object-hash', 'content-type')
+ (self.assertEqual(meta0[elem], meta1[elem]) for elem in t)
+
class ObjectMove(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
#assert src object no more exists
self.assert_object_not_exists(self.containers[0], self.obj['name'])
+
+
+ def test_move_dir(self):
+ self.client.create_folder(self.containers[0], 'dir')
+ objects = ('object1', 'subdir/object2', 'dirs')
+ meta = {}
+ for name in objects[:-1]:
+ self.upload_random_data(self.containers[0], 'dir/%s' % name)
+ meta[name] = self.client.retrieve_object_metadata(self.containers[0], 'dir/%s' % name)
+ self.upload_random_data(self.containers[0], 'dirs')
+
+ self.client.move_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/', content_type='application/folder')
+ self.assert_object_not_exists(self.containers[0], 'dir')
+ self.assert_object_not_exists(self.containers[1], 'dirs')
+ for name in objects[:-1]:
+ self.assert_object_not_exists(self.containers[0], 'dir/%s' % name)
+ self.assert_object_exists(self.containers[1], 'dir-backup/%s' % name)
+ meta1 = self.client.retrieve_object_metadata(self.containers[1], 'dir-backup/%s' % name)
+ t = ('content-length', 'x-object-hash', 'content-type')
+ (self.assertEqual(meta[name][elem], meta1[elem]) for elem in t)
class ObjectPost(BaseTestCase):
def setUp(self):
self.assert_raises_fault(400, self.test_update_object,
content_length = 1000)
- def _test_update_object_invalid_range(self):
+ def test_update_object_invalid_range(self):
with AssertContentInvariant(self.client.retrieve_object,
self.containers[0], self.obj[0]['name']):
self.assert_raises_fault(416, self.test_update_object, 499, 0, True)
- def _test_update_object_invalid_range_and_length(self):
+ def test_update_object_invalid_range_and_length(self):
with AssertContentInvariant(self.client.retrieve_object,
self.containers[0], self.obj[0]['name']):
self.assert_raises_fault([400, 416], self.test_update_object, 499, 0, True,
#assert item not found
self.assert_raises_fault(404, self.client.delete_object, self.containers[1],
self.obj['name'])
+
+ def test_delete_dir(self):
+ self.client.create_folder(self.containers[0], 'dir')
+ objects = ('object1', 'subdir/object2', 'dirs')
+ for name in objects[:-1]:
+ self.upload_random_data(self.containers[0], 'dir/%s' % name)
+ self.upload_random_data(self.containers[0], 'dirs')
+
+ self.client.delete_object(self.containers[0], 'dir', delimiter='/')
+ self.assert_object_not_exists(self.containers[0], 'dir')
+ self.assert_object_exists(self.containers[0], 'dirs')
+ for name in objects[:-1]:
+ self.assert_object_not_exists(self.containers[0], 'dir/%s' % name)
class ListSharing(BaseTestCase):
def setUp(self):
self.client.publish_object(c, 'o2')
def test_shared_public(self):
- self.assertEqual(self.client.list_containers(shared=True),
- ['c1', 'c2'])
- self.assertEqual(self.client.list_containers(public=True), ['c1', 'c3'])
- self.assertEqual(self.client.list_containers(shared=True, public=True), ['c1', 'c2', 'c3'])
+ func, kwargs = self.client.list_containers, {'shared':True}
+ l = func(**kwargs)
+ self.assertEqual(l, ['c1', 'c2'])
+ self.assertEqual(l, [e['name'] for e in func(format='json', **kwargs)])
+
+ func, kwargs = self.client.list_containers, {'public':True}
+ l = func(**kwargs)
+ self.assertEqual(l, ['c1', 'c3'])
+ self.assertEqual(l, [e['name'] for e in func(format='json', **kwargs)])
+
+ func, kwargs = self.client.list_containers, {'shared':True, 'public':True}
+ l = func(**kwargs)
+ self.assertEqual(l, ['c1', 'c2', 'c3'])
+ self.assertEqual(l, [e['name'] for e in func(format='json', **kwargs)])
+
+
+ func, args, kwargs = self.client.list_objects, ['c1'], {'shared':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, ['o1'])
+ self.assertEqual(l, [e['name'] for e in func(*args, format='json', **kwargs)])
+
+ func, args, kwargs = self.client.list_objects, ['c1'], {'public':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, ['o2'])
+ self.assertEqual(l, [e['name'] for e in func(*args, format='json', **kwargs)])
+
+ func, args, kwargs = self.client.list_objects, ['c1'], {'shared':True, 'public':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, ['o1', 'o2'])
+ self.assertEqual(l, [e['name'] for e in func(*args, format='json', **kwargs)])
+
- self.assertEqual(self.client.list_objects('c1', shared=True), ['o1'])
- self.assertEqual(self.client.list_objects('c1', public=True), ['o2'])
- self.assertEqual(self.client.list_objects('c1', shared=True, public=True), ['o1', 'o2'])
+ func, args, kwargs = self.client.list_objects, ['c2'], {'shared':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, ['o1'])
+ self.assertEqual(l, [e['name'] for e in func(*args, format='json', **kwargs)])
- self.assertEqual(self.client.list_objects('c2', shared=True), ['o1'])
- self.assertEqual(self.client.list_objects('c2', public=True), '')
- self.assertEqual(self.client.list_objects('c2', shared=True, public=True), ['o1'])
+ func, args, kwargs = self.client.list_objects, ['c2'], {'public':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, '')
+ self.assertEqual([], func(*args, format='json', **kwargs))
- self.assertEqual(self.client.list_objects('c3', shared=True), '')
- self.assertEqual(self.client.list_objects('c3', public=True), ['o2'])
- self.assertEqual(self.client.list_objects('c3', shared=True, public=True), ['o2'])
+ func, args, kwargs = self.client.list_objects, ['c2'], {'shared':True, 'public':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, ['o1'])
+ self.assertEqual(l, [e['name'] for e in func(*args, format='json', **kwargs)])
- self.assertEqual(self.client.list_objects('c4', shared=True), '')
- self.assertEqual(self.client.list_objects('c4', public=True), '')
- self.assertEqual(self.client.list_objects('c4', shared=True, public=True), '')
+
+ func, args, kwargs = self.client.list_objects, ['c3'], {'shared':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, '')
+ self.assertEqual([], func(*args, format='json', **kwargs))
+
+ func, args, kwargs = self.client.list_objects, ['c3'], {'public':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, ['o2'])
+ self.assertEqual(l, [e['name'] for e in func(*args, format='json', **kwargs)])
+
+ func, args, kwargs = self.client.list_objects, ['c3'], {'shared':True, 'public':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, ['o2'])
+ self.assertEqual(l, [e['name'] for e in func(*args, format='json', **kwargs)])
+
+
+ func, args, kwargs = self.client.list_objects, ['c4'], {'shared':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, '')
+ self.assertEqual([], func(*args, format='json', **kwargs))
+
+ func, args, kwargs = self.client.list_objects, ['c4'], {'public':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, '')
+ self.assertEqual([], func(*args, format='json', **kwargs))
+
+ func, args, kwargs = self.client.list_objects, ['c4'], {'shared':True, 'public':True}
+ l = func(*args, **kwargs)
+ self.assertEqual(l, '')
+ self.assertEqual([], func(*args, format='json', **kwargs))
class TestGreek(BaseTestCase):
def test_create_container(self):
for k, v in self.map.items():
if is_date(v):
continue
- #print '#', k, v, map[k]
assert(k in map)
assert v == map[k]
return True
return False
+def strnextling(prefix):
+ """Return the first unicode string
+ greater than but not starting with given prefix.
+ strnextling('hello') -> 'hellp'
+ """
+ if not prefix:
+ ## all strings start with the null string,
+ ## therefore we have to approximate strnextling('')
+ ## with the last unicode character supported by python
+ ## 0x10ffff for wide (32-bit unicode) python builds
+ ## 0x00ffff for narrow (16-bit unicode) python builds
+ ## We will not autodetect. 0xffff is safe enough.
+ return unichr(0xffff)
+ s = prefix[:-1]
+ c = ord(prefix[-1])
+ if c >= 0xffff:
+ raise RuntimeError
+ s += unichr(c+1)
+ return s
+
o_names = ['kate.jpg',
'kate_beckinsale.jpg',
'How To Win Friends And Influence People.pdf',