#TODO unauthorized request
def setUp(self):
self.client = Pithos_Client(get_url(), get_auth(), get_user())
+
+ #keep track of initial containers
+ self.initial_containers = self.client.list_containers()
+
self._clean_account()
self.invalid_client = Pithos_Client(get_url(), get_auth(), 'invalid')
#keep track of initial account meta
self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
-
+
self.extended = {
'container':(
'name',
def _clean_account(self):
for c in self.client.list_containers():
- while True:
- #list objects returns at most 10000 objects
- #so repeat until there are no more objects
- objects = self.client.list_objects(c)
- if not objects:
- break
- for o in objects:
- self.client.delete_object(c, o)
- self.client.delete_container(c)
-
+ if c not in self.initial_containers:
+ self.client.delete_container(c, delimiter='/')
+ self.client.delete_container(c)
+
def assert_status(self, status, codes):
l = [elem for elem in self.return_codes]
if type(codes) == types.ListType:
class AccountHead(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
+ self.containers = list(set(self.initial_containers + ['apples', 'bananas', 'kiwis', 'oranges', 'pears']))
+ self.containers.sort()
+
for item in self.containers:
self.client.create_container(item)
containers = self.client.list_containers()
l = str(len(containers))
self.assertEqual(meta['x-account-container-count'], l)
- size = 0
+ size1 = 0
+ size2 = 0
for c in containers:
m = self.client.retrieve_container_metadata(c)
- size = size + int(m['x-container-bytes-used'])
- self.assertEqual(meta['x-account-bytes-used'], str(size))
+ csum = sum([o['bytes'] for o in self.client.list_objects(c, format='json')])
+ self.assertEqual(int(m['x-container-bytes-used']), csum)
+ size1 += int(m['x-container-bytes-used'])
+ size2 += csum
+ self.assertEqual(meta['x-account-bytes-used'], str(size1))
+ self.assertEqual(meta['x-account-bytes-used'], str(size2))
def test_get_account_403(self):
self.assert_raises_fault(403,
def setUp(self):
BaseTestCase.setUp(self)
#create some containers
- self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
+ self.containers = list(set(self.initial_containers + ['apples', 'bananas', 'kiwis', 'oranges', 'pears']))
+ self.containers.sort()
+
for item in self.containers:
self.client.create_container(item)
xml = self.client.list_containers(limit=l, marker=m, format='xml')
self.assert_extended(xml, 'xml', 'container', l)
nodes = xml.getElementsByTagName('name')
- self.assertEqual(len(nodes), 1)
- self.assertEqual(nodes[0].childNodes[0].data, 'pears')
+ self.assertTrue(len(nodes) <= l)
+ names = [n.childNodes[0].data for n in nodes]
+ self.assertTrue('pears' in names or 'pears' > name for name in names)
def test_if_modified_since(self):
t = datetime.datetime.utcnow()
self.assertEqual(len(c), len(self.containers) + 1)
except Fault, f:
self.failIf(f.status == 304) #fail if not modified
-
-
+
def test_if_modified_since_invalid_date(self):
c = self.client.list_containers(if_modified_since='')
self.assertEqual(len(c), len(self.containers))
class AccountPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
+ self.containers = list(set(self.initial_containers + ['apples', 'bananas', 'kiwis', 'oranges', 'pears']))
+ self.containers.sort()
+
for item in self.containers:
self.client.create_container(item)
self.obj.append(self.upload_random_data(self.container[0], o))
for o in o_names[8:]:
self.obj.append(self.upload_random_data(self.container[1], o))
-
+
+ def test_list_shared(self):
+ self.client.share_object(self.container[0], self.obj[0]['name'], ('*',))
+ objs = self.client.list_objects(self.container[0], shared=True)
+ self.assertEqual(objs, [self.obj[0]['name']])
+
+ # create child object
+ self.upload_random_data(self.container[0], strnextling(self.obj[0]['name']))
+ objs = self.client.list_objects(self.container[0], shared=True)
+ self.assertEqual(objs, [self.obj[0]['name']])
+
+ # test inheritance
+ self.client.create_folder(self.container[1], 'folder')
+ self.client.share_object(self.container[1], 'folder', ('*',))
+ self.upload_random_data(self.container[1], 'folder/object')
+ objs = self.client.list_objects(self.container[1], shared=True)
+ self.assertEqual(objs, ['folder', 'folder/object'])
+
+ def test_list_public(self):
+ self.client.publish_object(self.container[0], self.obj[0]['name'])
+ objs = self.client.list_objects(self.container[0], public=True)
+ self.assertEqual(objs, [self.obj[0]['name']])
+
+ # create child object
+ self.upload_random_data(self.container[0], strnextling(self.obj[0]['name']))
+ objs = self.client.list_objects(self.container[0], public=True)
+ self.assertEqual(objs, [self.obj[0]['name']])
+
+ # test inheritance
+ self.client.create_folder(self.container[1], 'folder')
+ self.client.publish_object(self.container[1], 'folder')
+ self.upload_random_data(self.container[1], 'folder/object')
+ objs = self.client.list_objects(self.container[1], public=True)
+ self.assertEqual(objs, ['folder'])
+
+ def test_list_shared_public(self):
+ self.client.share_object(self.container[0], self.obj[0]['name'], ('*',))
+ self.client.publish_object(self.container[0], self.obj[1]['name'])
+ objs = self.client.list_objects(self.container[0], shared=True, public=True)
+ self.assertEqual(objs, [self.obj[0]['name'], self.obj[1]['name']])
+
+ # create child object
+ self.upload_random_data(self.container[0], strnextling(self.obj[0]['name']))
+ self.upload_random_data(self.container[0], strnextling(self.obj[1]['name']))
+ objs = self.client.list_objects(self.container[0], shared=True, public=True)
+ self.assertEqual(objs, [self.obj[0]['name'], self.obj[1]['name']])
+
+ # test inheritance
+ self.client.create_folder(self.container[1], 'folder1')
+ self.client.share_object(self.container[1], 'folder1', ('*',))
+ self.upload_random_data(self.container[1], 'folder1/object')
+ self.client.create_folder(self.container[1], 'folder2')
+ self.client.publish_object(self.container[1], 'folder2')
+ o = self.upload_random_data(self.container[1], 'folder2/object')
+ objs = self.client.list_objects(self.container[1], shared=True, public=True)
+ self.assertEqual(objs, ['folder1', 'folder1/object', 'folder2'])
+
def test_list_objects(self):
objects = self.client.list_objects(self.container[0])
l = [elem['name'] for elem in self.obj[:8]]
class ContainerPut(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
-
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
def test_create(self):
self.client.create_container(self.containers[0])
containers = self.client.list_containers()
self.client.create_container(self.containers[0])
policy = {'quota':100}
- self.client.set_container_policies('c1', **policy)
+ self.client.set_container_policies(self.containers[0], **policy)
- meta = self.client.retrieve_container_metadata('c1')
+ meta = self.client.retrieve_container_metadata(self.containers[0])
self.assertTrue('x-container-policy-quota' in meta)
self.assertEqual(meta['x-container-policy-quota'], '100')
- args = ['c1', 'o1']
+ args = [self.containers[0], 'o1']
kwargs = {'length':101}
self.assert_raises_fault(413, self.upload_random_data, *args, **kwargs)
#reset quota
policy = {'quota':0}
- self.client.set_container_policies('c1', **policy)
+ self.client.set_container_policies(self.containers[0], **policy)
class ContainerPost(BaseTestCase):
def setUp(self):
class ContainerDelete(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
for c in self.containers:
self.client.create_container(c)
def test_delete_invalid(self):
self.assert_raises_fault(404, self.client.delete_container, 'c3')
+
+ def test_delete_contents(self):
+ self.client.create_folder(self.containers[0], 'folder-1')
+ self.upload_random_data(self.containers[1], 'folder-1/%s' % o_names[0])
+ self.client.create_folder(self.containers[0], 'folder-1/subfolder')
+ self.client.create_folder(self.containers[0], 'folder-2/%s' % o_names[1])
+
+ objects = self.client.list_objects(self.containers[0])
+ self.client.delete_container(self.containers[0], delimiter='/')
+ for o in objects:
+ self.assert_object_not_exists(self.containers[0], o)
+ self.assert_container_exists(self.containers[0])
class ObjectGet(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
#create some containers
for c in self.containers:
self.client.create_container(c)
def _test_maximum_upload_size_exceeds(self):
name = o_names[0]
meta = {'test':'test1'}
- #upload 100MB
- length=1024*1024*100
+ #upload 5GB
+ length= 5 * (1024 * 1024 * 1024) + 1
self.assert_raises_fault(400, self.upload_random_data, self.container,
name, length, **meta)
class ObjectCopy(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
for c in self.containers:
self.client.create_container(c)
self.obj = self.upload_random_data(self.containers[0], o_names[0])
self.assert_raises_fault(404, self.client.copy_object, self.containers[1],
self.obj['name'], self.containers[1],
'testcopy', meta)
-
+
+ def test_copy_dir(self):
+ self.client.create_folder(self.containers[0], 'dir')
+ self.client.create_folder(self.containers[0], 'dir/subdir')
+ self.upload_random_data(self.containers[0], 'dir/object1.jpg', length=1024)
+ self.upload_random_data(self.containers[0], 'dir/subdir/object2.pdf', length=2*1024)
+ self.client.create_folder(self.containers[0], 'dirs')
+
+ objects = self.client.list_objects(self.containers[0], prefix='dir')
+ self.client.copy_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/')
+ for object in objects[:-1]:
+ self.assert_object_exists(self.containers[0], object)
+ self.assert_object_exists(self.containers[1], object.replace('dir', 'dir-backup', 1))
+ meta0 = self.client.retrieve_object_metadata(self.containers[0], object)
+ meta1 = self.client.retrieve_object_metadata(self.containers[1], object.replace('dir', 'dir-backup', 1))
+ t = ('content-length', 'x-object-hash', 'content-type')
+ (self.assertEqual(meta0[elem], meta1[elem]) for elem in t)
+ self.assert_object_not_exists(self.containers[1], objects[-1])
+
class ObjectMove(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
for c in self.containers:
self.client.create_container(c)
self.obj = self.upload_random_data(self.containers[0], o_names[0])
#assert src object no more exists
self.assert_object_not_exists(self.containers[0], self.obj['name'])
+
+
+ def test_move_dir(self):
+ meta = {}
+ self.client.create_folder(self.containers[0], 'dir')
+ meta['dir'] = self.client.retrieve_object_metadata(self.containers[0], 'dir')
+ self.client.create_folder(self.containers[0], 'dir/subdir')
+ meta['dir/subdir'] = self.client.retrieve_object_metadata(self.containers[0], 'dir/subdir')
+ self.upload_random_data(self.containers[0], 'dir/object1.jpg', length=1024)
+ meta['dir/object1.jpg'] = self.client.retrieve_object_metadata(self.containers[0], 'dir/object1.jpg')
+ self.upload_random_data(self.containers[0], 'dir/subdir/object2.pdf', length=2*1024)
+ meta['dir/subdir/object2.pdf'] = self.client.retrieve_object_metadata(self.containers[0], 'dir/subdir/object2.pdf')
+ self.client.create_folder(self.containers[0], 'dirs')
+ meta['dirs'] = self.client.retrieve_object_metadata(self.containers[0], 'dirs')
+
+ objects = self.client.list_objects(self.containers[0], prefix='dir')
+ self.client.move_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/')
+ for object in objects[:-1]:
+ self.assert_object_not_exists(self.containers[0], object)
+ self.assert_object_exists(self.containers[1], object.replace('dir', 'dir-backup', 1))
+ meta1 = self.client.retrieve_object_metadata(self.containers[1], object.replace('dir', 'dir-backup', 1))
+ t = ('content-length', 'x-object-hash', 'content-type')
+ (self.assertEqual(meta0[elem], meta1[elem]) for elem in t)
+ self.assert_object_exists(self.containers[0], objects[-1])
+ self.assert_object_not_exists(self.containers[1], objects[-1])
class ObjectPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
for c in self.containers:
self.client.create_container(c)
self.obj = []
self.assert_raises_fault(400, self.test_update_object,
content_length = 1000)
- def _test_update_object_invalid_range(self):
+ def test_update_object_invalid_range(self):
with AssertContentInvariant(self.client.retrieve_object,
self.containers[0], self.obj[0]['name']):
self.assert_raises_fault(416, self.test_update_object, 499, 0, True)
- def _test_update_object_invalid_range_and_length(self):
+ def test_update_object_invalid_range_and_length(self):
with AssertContentInvariant(self.client.retrieve_object,
self.containers[0], self.obj[0]['name']):
self.assert_raises_fault([400, 416], self.test_update_object, 499, 0, True,
def setUp(self):
BaseTestCase.setUp(self)
self.containers = ['c1', 'c2']
+ self.containers.extend(self.initial_containers)
+
for c in self.containers:
self.client.create_container(c)
self.obj = self.upload_random_data(self.containers[0], o_names[0])
#assert item not found
self.assert_raises_fault(404, self.client.delete_object, self.containers[1],
self.obj['name'])
+
+ def test_delete_dir(self):
+ self.client.create_folder(self.containers[0], 'dir')
+ self.client.create_folder(self.containers[0], 'dir/subdir')
+ self.upload_random_data(self.containers[0], 'dir/object1.jpg', length=1024)
+ self.upload_random_data(self.containers[0], 'dir/subdir/object2.pdf', length=2*1024)
+ self.client.create_folder(self.containers[0], 'dirs')
+
+ objects = self.client.list_objects(self.containers[0], prefix='dir')
+ self.client.delete_object(self.containers[0], 'dir', delimiter='/')
+ for object in objects[:-1]:
+ self.assert_object_not_exists(self.containers[0], object)
+ self.assert_object_exists(self.containers[0], objects[-1])
class ListSharing(BaseTestCase):
def setUp(self):
self.client.publish_object(c, 'o2')
def test_shared_public(self):
+ diff = lambda l: set(l) - set(self.initial_containers)
+
func, kwargs = self.client.list_containers, {'shared':True}
l = func(**kwargs)
- self.assertEqual(l, ['c1', 'c2'])
+ self.assertEqual(set(['c1', 'c2']), diff(l))
self.assertEqual(l, [e['name'] for e in func(format='json', **kwargs)])
func, kwargs = self.client.list_containers, {'public':True}
l = func(**kwargs)
- self.assertEqual(l, ['c1', 'c3'])
+ self.assertEqual(set(['c1', 'c3']), diff(l))
self.assertEqual(l, [e['name'] for e in func(format='json', **kwargs)])
func, kwargs = self.client.list_containers, {'shared':True, 'public':True}
l = func(**kwargs)
- self.assertEqual(l, ['c1', 'c2', 'c3'])
+ self.assertEqual(set(['c1', 'c2', 'c3']), diff(l))
self.assertEqual(l, [e['name'] for e in func(format='json', **kwargs)])
self.client.share_object(self.container, self.object, self.authorized)
my_shared_containers = self.client.list_containers(shared=True)
- self.assertEqual(['c'], my_shared_containers)
+ self.assertTrue('c' in my_shared_containers)
my_shared_objects = self.client.list_objects('c', shared=True)
self.assertEqual(['o'], my_shared_objects)
for k, v in self.map.items():
if is_date(v):
continue
- #print '#', k, v, map[k]
assert(k in map)
assert v == map[k]
return True
return False
+def strnextling(prefix):
+ """Return the first unicode string
+ greater than but not starting with given prefix.
+ strnextling('hello') -> 'hellp'
+ """
+ if not prefix:
+ ## all strings start with the null string,
+ ## therefore we have to approximate strnextling('')
+ ## with the last unicode character supported by python
+ ## 0x10ffff for wide (32-bit unicode) python builds
+ ## 0x00ffff for narrow (16-bit unicode) python builds
+ ## We will not autodetect. 0xffff is safe enough.
+ return unichr(0xffff)
+ s = prefix[:-1]
+ c = ord(prefix[-1])
+ if c >= 0xffff:
+ raise RuntimeError
+ s += unichr(c+1)
+ return s
+
o_names = ['kate.jpg',
'kate_beckinsale.jpg',
'How To Win Friends And Influence People.pdf',