#TODO unauthorized request
def setUp(self):
self.client = Pithos_Client(get_url(), get_auth(), get_user())
+
+ #keep track of initial containers
+ self.initial_containers = self.client.list_containers()
+
self._clean_account()
self.invalid_client = Pithos_Client(get_url(), get_auth(), 'invalid')
#keep track of initial account meta
self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
-
+
self.extended = {
'container':(
'name',
def _clean_account(self):
for c in self.client.list_containers():
- while True:
- #list objects returns at most 10000 objects
- #so repeat until there are no more objects
- objects = self.client.list_objects(c)
- if not objects:
- break
- for o in objects:
- self.client.delete_object(c, o)
- self.client.delete_container(c)
-
+ if c not in self.initial_containers:
+ self.client.delete_container(c, delimiter='/')
+ self.client.delete_container(c)
+
def assert_status(self, status, codes):
l = [elem for elem in self.return_codes]
if type(codes) == types.ListType:
class AccountHead(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
+ self.containers = list(set(self.initial_containers + ['apples', 'bananas', 'kiwis', 'oranges', 'pears']))
+ self.containers.sort()
+
for item in self.containers:
self.client.create_container(item)
containers = self.client.list_containers()
l = str(len(containers))
self.assertEqual(meta['x-account-container-count'], l)
- size = 0
+ size1 = 0
+ size2 = 0
for c in containers:
m = self.client.retrieve_container_metadata(c)
- size = size + int(m['x-container-bytes-used'])
- self.assertEqual(meta['x-account-bytes-used'], str(size))
+ csum = sum([o['bytes'] for o in self.client.list_objects(c, format='json')])
+ self.assertEqual(int(m['x-container-bytes-used']), csum)
+ size1 += int(m['x-container-bytes-used'])
+ size2 += csum
+ self.assertEqual(meta['x-account-bytes-used'], str(size1))
+ self.assertEqual(meta['x-account-bytes-used'], str(size2))
def test_get_account_403(self):
self.assert_raises_fault(403,
def setUp(self):
BaseTestCase.setUp(self)
#create some containers
- self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
+ self.containers = list(set(self.initial_containers + ['apples', 'bananas', 'kiwis', 'oranges', 'pears']))
+ self.containers.sort()
+
for item in self.containers:
self.client.create_container(item)
xml = self.client.list_containers(limit=l, marker=m, format='xml')
self.assert_extended(xml, 'xml', 'container', l)
nodes = xml.getElementsByTagName('name')
- self.assertEqual(len(nodes), 1)
- self.assertEqual(nodes[0].childNodes[0].data, 'pears')
+ self.assertTrue(len(nodes) <= l)
+ names = [n.childNodes[0].data for n in nodes]
+ self.assertTrue('pears' in names or 'pears' > name for name in names)
def test_if_modified_since(self):
t = datetime.datetime.utcnow()
self.assertEqual(len(c), len(self.containers) + 1)
except Fault, f:
self.failIf(f.status == 304) #fail if not modified
-
-
+
def test_if_modified_since_invalid_date(self):
c = self.client.list_containers(if_modified_since='')
self.assertEqual(len(c), len(self.containers))
class AccountPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
+ self.containers = list(set(self.initial_containers + ['apples', 'bananas', 'kiwis', 'oranges', 'pears']))
+ self.containers.sort()
+
for item in self.containers:
self.client.create_container(item)
class ContainerPut(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
-
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
def test_create(self):
self.client.create_container(self.containers[0])
containers = self.client.list_containers()
self.client.create_container(self.containers[0])
policy = {'quota':100}
- self.client.set_container_policies('c1', **policy)
+ self.client.set_container_policies(self.containers[0], **policy)
- meta = self.client.retrieve_container_metadata('c1')
+ meta = self.client.retrieve_container_metadata(self.containers[0])
self.assertTrue('x-container-policy-quota' in meta)
self.assertEqual(meta['x-container-policy-quota'], '100')
- args = ['c1', 'o1']
+ args = [self.containers[0], 'o1']
kwargs = {'length':101}
self.assert_raises_fault(413, self.upload_random_data, *args, **kwargs)
#reset quota
policy = {'quota':0}
- self.client.set_container_policies('c1', **policy)
+ self.client.set_container_policies(self.containers[0], **policy)
class ContainerPost(BaseTestCase):
def setUp(self):
class ContainerDelete(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
for c in self.containers:
self.client.create_container(c)
def test_delete_invalid(self):
self.assert_raises_fault(404, self.client.delete_container, 'c3')
+
+ def test_delete_contents(self):
+ self.client.create_folder(self.containers[0], 'folder-1')
+ self.upload_random_data(self.containers[1], 'folder-1/%s' % o_names[0])
+ self.client.create_folder(self.containers[0], 'folder-1/subfolder')
+ self.client.create_folder(self.containers[0], 'folder-2/%s' % o_names[1])
+
+ objects = self.client.list_objects(self.containers[0])
+ self.client.delete_container(self.containers[0], delimiter='/')
+ for o in objects:
+ self.assert_object_not_exists(self.containers[0], o)
+ self.assert_container_exists(self.containers[0])
class ObjectGet(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
#create some containers
for c in self.containers:
self.client.create_container(c)
class ObjectCopy(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
for c in self.containers:
self.client.create_container(c)
self.obj = self.upload_random_data(self.containers[0], o_names[0])
def test_copy_dir(self):
self.client.create_folder(self.containers[0], 'dir')
- objects = ('object1', 'subdir/object2', 'dirs')
- for name in objects[:-1]:
- self.upload_random_data(self.containers[0], 'dir/%s' % name)
- self.upload_random_data(self.containers[0], 'dirs')
+ self.client.create_folder(self.containers[0], 'dir/subdir')
+ self.upload_random_data(self.containers[0], 'dir/object1.jpg', length=1024)
+ self.upload_random_data(self.containers[0], 'dir/subdir/object2.pdf', length=2*1024)
+ self.client.create_folder(self.containers[0], 'dirs')
+ objects = self.client.list_objects(self.containers[0], prefix='dir')
self.client.copy_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/')
- self.assert_object_exists(self.containers[0], 'dir')
- self.assert_object_not_exists(self.containers[1], 'dirs')
- for name in objects[:-1]:
- meta0 = self.client.retrieve_object_metadata(self.containers[0], 'dir/%s' % name)
- meta1 = self.client.retrieve_object_metadata(self.containers[1], 'dir-backup/%s' % name)
+ for object in objects[:-1]:
+ self.assert_object_exists(self.containers[0], object)
+ self.assert_object_exists(self.containers[1], object.replace('dir', 'dir-backup', 1))
+ meta0 = self.client.retrieve_object_metadata(self.containers[0], object)
+ meta1 = self.client.retrieve_object_metadata(self.containers[1], object.replace('dir', 'dir-backup', 1))
t = ('content-length', 'x-object-hash', 'content-type')
(self.assertEqual(meta0[elem], meta1[elem]) for elem in t)
+ self.assert_object_not_exists(self.containers[1], objects[-1])
class ObjectMove(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
for c in self.containers:
self.client.create_container(c)
self.obj = self.upload_random_data(self.containers[0], o_names[0])
def test_move_dir(self):
- self.client.create_folder(self.containers[0], 'dir')
- objects = ('object1', 'subdir/object2', 'dirs')
meta = {}
- for name in objects[:-1]:
- self.upload_random_data(self.containers[0], 'dir/%s' % name)
- meta[name] = self.client.retrieve_object_metadata(self.containers[0], 'dir/%s' % name)
- self.upload_random_data(self.containers[0], 'dirs')
+ self.client.create_folder(self.containers[0], 'dir')
+ meta['dir'] = self.client.retrieve_object_metadata(self.containers[0], 'dir')
+ self.client.create_folder(self.containers[0], 'dir/subdir')
+ meta['dir/subdir'] = self.client.retrieve_object_metadata(self.containers[0], 'dir/subdir')
+ self.upload_random_data(self.containers[0], 'dir/object1.jpg', length=1024)
+ meta['dir/object1.jpg'] = self.client.retrieve_object_metadata(self.containers[0], 'dir/object1.jpg')
+ self.upload_random_data(self.containers[0], 'dir/subdir/object2.pdf', length=2*1024)
+ meta['dir/subdir/object2.pdf'] = self.client.retrieve_object_metadata(self.containers[0], 'dir/subdir/object2.pdf')
+ self.client.create_folder(self.containers[0], 'dirs')
+ meta['dirs'] = self.client.retrieve_object_metadata(self.containers[0], 'dirs')
- self.client.move_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/', content_type='application/folder')
- self.assert_object_not_exists(self.containers[0], 'dir')
- self.assert_object_not_exists(self.containers[1], 'dirs')
- for name in objects[:-1]:
- self.assert_object_not_exists(self.containers[0], 'dir/%s' % name)
- self.assert_object_exists(self.containers[1], 'dir-backup/%s' % name)
- meta1 = self.client.retrieve_object_metadata(self.containers[1], 'dir-backup/%s' % name)
+ objects = self.client.list_objects(self.containers[0], prefix='dir')
+ self.client.move_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/')
+ for object in objects[:-1]:
+ self.assert_object_not_exists(self.containers[0], object)
+ self.assert_object_exists(self.containers[1], object.replace('dir', 'dir-backup', 1))
+ meta1 = self.client.retrieve_object_metadata(self.containers[1], object.replace('dir', 'dir-backup', 1))
t = ('content-length', 'x-object-hash', 'content-type')
- (self.assertEqual(meta[name][elem], meta1[elem]) for elem in t)
+ (self.assertEqual(meta0[elem], meta1[elem]) for elem in t)
+ self.assert_object_exists(self.containers[0], objects[-1])
+ self.assert_object_not_exists(self.containers[1], objects[-1])
class ObjectPost(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
- self.containers = ['c1', 'c2']
+ self.containers = list(set(self.initial_containers + ['c1', 'c2']))
+ self.containers.sort()
+
for c in self.containers:
self.client.create_container(c)
self.obj = []
def setUp(self):
BaseTestCase.setUp(self)
self.containers = ['c1', 'c2']
+ self.containers.extend(self.initial_containers)
+
for c in self.containers:
self.client.create_container(c)
self.obj = self.upload_random_data(self.containers[0], o_names[0])
def test_delete_dir(self):
self.client.create_folder(self.containers[0], 'dir')
- objects = ('object1', 'subdir/object2', 'dirs')
- for name in objects[:-1]:
- self.upload_random_data(self.containers[0], 'dir/%s' % name)
- self.upload_random_data(self.containers[0], 'dirs')
+ self.client.create_folder(self.containers[0], 'dir/subdir')
+ self.upload_random_data(self.containers[0], 'dir/object1.jpg', length=1024)
+ self.upload_random_data(self.containers[0], 'dir/subdir/object2.pdf', length=2*1024)
+ self.client.create_folder(self.containers[0], 'dirs')
+ objects = self.client.list_objects(self.containers[0], prefix='dir')
self.client.delete_object(self.containers[0], 'dir', delimiter='/')
- self.assert_object_not_exists(self.containers[0], 'dir')
- self.assert_object_exists(self.containers[0], 'dirs')
- for name in objects[:-1]:
- self.assert_object_not_exists(self.containers[0], 'dir/%s' % name)
+ for object in objects[:-1]:
+ self.assert_object_not_exists(self.containers[0], object)
+ self.assert_object_exists(self.containers[0], objects[-1])
class ListSharing(BaseTestCase):
def setUp(self):
self.client.publish_object(c, 'o2')
def test_shared_public(self):
+ diff = lambda l: set(l) - set(self.initial_containers)
+
func, kwargs = self.client.list_containers, {'shared':True}
l = func(**kwargs)
- self.assertEqual(l, ['c1', 'c2'])
+ self.assertEqual(set(['c1', 'c2']), diff(l))
self.assertEqual(l, [e['name'] for e in func(format='json', **kwargs)])
func, kwargs = self.client.list_containers, {'public':True}
l = func(**kwargs)
- self.assertEqual(l, ['c1', 'c3'])
+ self.assertEqual(set(['c1', 'c3']), diff(l))
self.assertEqual(l, [e['name'] for e in func(format='json', **kwargs)])
func, kwargs = self.client.list_containers, {'shared':True, 'public':True}
l = func(**kwargs)
- self.assertEqual(l, ['c1', 'c2', 'c3'])
+ self.assertEqual(set(['c1', 'c2', 'c3']), diff(l))
self.assertEqual(l, [e['name'] for e in func(format='json', **kwargs)])
self.client.share_object(self.container, self.object, self.authorized)
my_shared_containers = self.client.list_containers(shared=True)
- self.assertEqual(['c'], my_shared_containers)
+ self.assertTrue('c' in my_shared_containers)
my_shared_objects = self.client.list_objects('c', shared=True)
self.assertEqual(['o'], my_shared_objects)