Disable long running test.
[pithos] / tools / test
index a737a4c..0162626 100755 (executable)
@@ -69,46 +69,16 @@ class BaseTestCase(unittest.TestCase):
     def setUp(self):
         self.client = Pithos_Client(get_server(), get_auth(), get_user(),
                                     get_api())
+        self._clean_account()
         self.invalid_client = Pithos_Client(get_server(), get_auth(), 'invalid',
                                             get_api())
-        #self.headers = {
-        #    'account': ('x-account-container-count',
-        #                'x-account-bytes-used',
-        #                'last-modified',
-        #                'content-length',
-        #                'date',
-        #                'content_type',
-        #                'server',),
-        #    'object': ('etag',
-        #               'content-length',
-        #               'content_type',
-        #               'content-encoding',
-        #               'last-modified',
-        #               'date',
-        #               'x-object-manifest',
-        #               'content-range',
-        #               'x-object-modified-by',
-        #               'x-object-version',
-        #               'x-object-version-timestamp',
-        #               'server',),
-        #    'container': ('x-container-object-count',
-        #                  'x-container-bytes-used',
-        #                  'content_type',
-        #                  'last-modified',
-        #                  'content-length',
-        #                  'date',
-        #                  'x-container-block-size',
-        #                  'x-container-block-hash',
-        #                  'x-container-policy-quota',
-        #                  'x-container-policy-versioning',
-        #                  'server',
-        #                  'x-container-object-meta',
-        #                  'x-container-policy-versioning',
-        #                  'server',)}
-        #
-        #self.contentTypes = {'xml':'application/xml',
-        #                     'json':'application/json',
-        #                     '':'text/plain'}
+        
+        #keep track of initial account groups
+        self.initial_groups = self.client.retrieve_account_groups()
+        
+        #keep track of initial account meta
+        self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
+        
         self.extended = {
             'container':(
                 'name',
@@ -124,9 +94,25 @@ class BaseTestCase(unittest.TestCase):
                 'content_type',
                 'content_encoding',
                 'last_modified',)}
-        self.return_codes = (400, 401, 404, 503,)
+        self.return_codes = (400, 401, 403, 404, 503,)
     
     def tearDown(self):
+        #delete additionally created meta
+        l = []
+        for m in self.client.retrieve_account_metadata(restricted=True):
+            if m not in self.initial_meta:
+                l.append(m)
+        self.client.delete_account_metadata(l)
+        
+        #delete additionally created groups
+        l = []
+        for g in self.client.retrieve_account_groups():
+            if g not in self.initial_groups:
+                l.append(g)
+        self.client.unset_account_groups(l)
+        self._clean_account()
+    
+    def _clean_account(self):
         for c in self.client.list_containers():
             while True:
                 #list objects returns at most 10000 objects
@@ -146,16 +132,6 @@ class BaseTestCase(unittest.TestCase):
             l.append(codes)
         self.assertTrue(status in l)
     
-    #def assert_headers(self, headers, type, **exp_meta):
-    #    prefix = 'x-%s-meta-' %type
-    #    system_headers = [h for h in headers if not h.startswith(prefix)]
-    #    for k,v in headers.items():
-    #        if k in system_headers:
-    #            self.assertTrue(k in headers[type])
-    #        elif exp_meta:
-    #            k = k.split(prefix)[-1]
-    #            self.assertEqual(v, exp_meta[k])
-    
     def assert_extended(self, data, format, type, size=10000):
         if format == 'xml':
             self._assert_xml(data, type, size)
@@ -276,8 +252,8 @@ class BaseTestCase(unittest.TestCase):
             obj['meta'] = args
             
             path = '/%s/%s' % (container, name)
-            self.client.create_object(container, name, StringIO(obj['data']),
-                                      meta, **args)
+            self.client.create_object(container, name, f=StringIO(obj['data']),
+                                      meta=meta, **args)
             
             return obj
         except IOError:
@@ -290,32 +266,9 @@ class AccountHead(BaseTestCase):
         for item in self.containers:
             self.client.create_container(item)
         
-                #keep track of initial account groups
-        self.initial_groups = self.client.retrieve_account_groups()
-        
-        #keep track of initial account meta
-        self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
-        
         meta = {'foo':'bar'}
         self.client.update_account_metadata(**meta)
-        self.updated_meta = self.initial_meta.update(meta)
-    
-    def tearDown(self):
-        #delete additionally created meta
-        l = []
-        for m in self.client.retrieve_account_metadata(restricted=True):
-            if m not in self.initial_meta:
-                l.append(m)
-        self.client.delete_account_metadata(l)
-        
-        #delete additionally created groups
-        l = []
-        for g in self.client.retrieve_account_groups():
-            if g not in self.initial_groups:
-                l.append(g)
-        self.client.unset_account_groups(l)
-        
-        BaseTestCase.tearDown(self)
+        #self.updated_meta = self.initial_meta.update(meta)
     
     def test_get_account_meta(self):
         meta = self.client.retrieve_account_metadata()
@@ -329,8 +282,8 @@ class AccountHead(BaseTestCase):
             size = size + int(m['x-container-bytes-used'])
         self.assertEqual(meta['x-account-bytes-used'], str(size))
     
-    def test_get_account_401(self):
-        self.assert_raises_fault(401,
+    def test_get_account_403(self):
+        self.assert_raises_fault(403,
                                  self.invalid_client.retrieve_account_metadata)
     
     def test_get_account_meta_until(self):
@@ -367,8 +320,8 @@ class AccountGet(BaseTestCase):
         containers = self.client.list_containers()
         self.assertEquals(self.containers, containers)
     
-    def test_list_401(self):
-        self.assert_raises_fault(401, self.invalid_client.list_containers)
+    def test_list_403(self):
+        self.assert_raises_fault(403, self.invalid_client.list_containers)
     
     def test_list_with_limit(self):
         limit = 2
@@ -466,32 +419,9 @@ class AccountPost(BaseTestCase):
         for item in self.containers:
             self.client.create_container(item)
         
-        #keep track of initial account groups
-        self.initial_groups = self.client.retrieve_account_groups()
-        
-        #keep track of initial account meta
-        self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
-        
         meta = {'foo':'bar'}
         self.client.update_account_metadata(**meta)
-        self.updated_meta = self.initial_meta.update(meta)
-    
-    def tearDown(self):
-        #delete additionally created meta
-        l = []
-        for m in self.client.retrieve_account_metadata(restricted=True):
-            if m not in self.initial_meta:
-                l.append(m)
-        self.client.delete_account_metadata(l)
-        
-        #delete additionally created groups
-        l = []
-        for g in self.client.retrieve_account_groups():
-            if g not in self.initial_groups:
-                l.append(g)
-        self.client.unset_account_groups(l)
-        
-        BaseTestCase.tearDown(self)
+        #self.updated_meta = self.initial_meta.update(meta)
     
     def test_update_meta(self):
         with AssertMappingInvariant(self.client.retrieve_account_groups):
@@ -505,7 +435,7 @@ class AccountPost(BaseTestCase):
         
     def test_invalid_account_update_meta(self):
         meta = {'test':'test', 'tost':'tost'}
-        self.assert_raises_fault(401,
+        self.assert_raises_fault(403,
                                  self.invalid_client.update_account_metadata,
                                  **meta)
     
@@ -625,15 +555,6 @@ class ContainerGet(BaseTestCase):
         self.assert_extended(objects, 'xml', 'object')
         node_name = objects.getElementsByTagName('name')[0]
         self.assertEqual(node_name.firstChild.data, '/objectname')
-        
-        #objects = self.client.list_objects('test', prefix='/')
-        #self.assertEqual(objects, ['/objectname'])
-        #
-        #objects = self.client.list_objects('test', path='/')
-        #self.assertEqual(objects, ['/objectname'])
-        #
-        #objects = self.client.list_objects('test', prefix='/', delimiter='n')
-        #self.assertEqual(objects, ['/object'])
 
     def test_list_objects_with_limit_marker(self):
         objects = self.client.list_objects(self.container[0], limit=2)
@@ -817,6 +738,24 @@ class ContainerPut(BaseTestCase):
         self.client.create_container(self.containers[0])
         self.assertTrue(not self.client.create_container(self.containers[0]))
     
+    def test_quota(self):
+        self.client.create_container(self.containers[0])
+        
+        policy = {'quota':100}
+        self.client.set_container_policies('c1', **policy)
+        
+        meta = self.client.retrieve_container_metadata('c1')
+        self.assertTrue('x-container-policy-quota' in meta)
+        self.assertEqual(meta['x-container-policy-quota'], '100')
+        
+        args = ['c1', 'o1']
+        kwargs = {'length':101}
+        self.assert_raises_fault(413, self.upload_random_data, *args, **kwargs)
+        
+        #reset quota
+        policy = {'quota':0}
+        self.client.set_container_policies('c1', **policy)
+    
 class ContainerPost(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
@@ -839,22 +778,19 @@ class ContainerDelete(BaseTestCase):
         self.containers = ['c1', 'c2']
         for c in self.containers:
             self.client.create_container(c)
-        self.upload_random_data(self.containers[1], o_names[0])
     
     def test_delete(self):
         status = self.client.delete_container(self.containers[0])[0]
         self.assertEqual(status, 204)
     
     def test_delete_non_empty(self):
+        self.upload_random_data(self.containers[1], o_names[0])
         self.assert_raises_fault(409, self.client.delete_container,
                                  self.containers[1])
     
     def test_delete_invalid(self):
         self.assert_raises_fault(404, self.client.delete_container, 'c3')
 
-class ObjectHead(BaseTestCase):
-    pass
-
 class ObjectGet(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
@@ -915,6 +851,25 @@ class ObjectGet(BaseTestCase):
                                         self.objects[0]['meta'])
         self.assertEqual(o, self.objects[0]['data'])
     
+    def test_objects_with_trailing_spaces(self):
+        self.client.create_container('test')
+        #create 'a' object
+        self.upload_random_data('test', 'a')
+        #look for 'a ' object
+        self.assert_raises_fault(404, self.client.retrieve_object,
+                                 'test', 'a ')
+        
+        #delete 'a' object
+        self.client.delete_object('test', 'a')
+        self.assert_raises_fault(404, self.client.retrieve_object,
+                                 'test', 'a')
+        
+        #create 'a ' object
+        self.upload_random_data('test', 'a ')
+        #look for 'a' object
+        self.assert_raises_fault(404, self.client.retrieve_object,
+                                 'test', 'a')
+    
     def test_get_invalid(self):
         self.assert_raises_fault(404, self.client.retrieve_object,
                                  self.containers[0], self.objects[0]['name'])
@@ -1326,6 +1281,16 @@ class ObjectPut(BaseTestCase):
         self.assertEqual(int(zero_meta['content-length']), 0)
         self.assertEqual(zero_hash, [])
         self.assertEqual(zero_data, '')
+    
+    def test_create_object_by_hashmap(self):
+        c = self.container
+        o = 'object'
+        self.upload_random_data(c, o)
+        hashmap = self.client.retrieve_object(c, o, format='json')
+        o2 = 'object-copy'
+        self.client.create_object_by_hashmap(c, o2, hashmap)
+        self.assertEqual(self.client.retrieve_object(c, o),
+                         self.client.retrieve_object(c, o))
 
 class ObjectCopy(BaseTestCase):
     def setUp(self):
@@ -1528,12 +1493,12 @@ class ObjectPost(BaseTestCase):
             self.assert_raises_fault(400, self.test_update_object,
                                      content_length = 1000)
     
-    def test_update_object_invalid_range(self):
+    def _test_update_object_invalid_range(self):
         with AssertContentInvariant(self.client.retrieve_object,
                                     self.containers[0], self.obj[0]['name']):
             self.assert_raises_fault(416, self.test_update_object, 499, 0, True)
     
-    def test_update_object_invalid_range_and_length(self):
+    def _test_update_object_invalid_range_and_length(self):
         with AssertContentInvariant(self.client.retrieve_object,
                                     self.containers[0], self.obj[0]['name']):
             self.assert_raises_fault([400, 416], self.test_update_object, 499, 0, True,
@@ -1713,31 +1678,6 @@ class ListSharing(BaseTestCase):
         self.assertTrue('o2' not in my_shared_objects)
     
 class TestGreek(BaseTestCase):
-    def setUp(self):
-        BaseTestCase.setUp(self)
-        #keep track of initial account groups
-        self.initial_groups = self.client.retrieve_account_groups()
-        
-        #keep track of initial account meta
-        self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
-    
-    def tearDown(self):
-        #delete additionally created meta
-        l = []
-        for m in self.client.retrieve_account_metadata(restricted=True):
-            if m not in self.initial_meta:
-                l.append(m)
-        self.client.delete_account_metadata(l)
-        
-        #delete additionally created groups
-        l = []
-        for g in self.client.retrieve_account_groups():
-            if g not in self.initial_groups:
-                l.append(g)
-        self.client.unset_account_groups(l)
-        
-        BaseTestCase.tearDown(self)
-    
     def test_create_container(self):
         self.client.create_container('φάκελος')
         self.assert_container_exists('φάκελος')
@@ -1857,13 +1797,13 @@ class TestGreek(BaseTestCase):
                             '0009',
                             'διογένης',
                             get_api())
-        self.assert_not_raises_fault(401, chef.retrieve_object_metadata,
+        self.assert_not_raises_fault(403, chef.retrieve_object_metadata,
                                      'φάκελος', 'ο1', account=get_user())
         
         #check write access
         self.client.share_object('φάκελος', 'ο1', ['διογένης'], read=False)
         new_data = get_random_data()
-        self.assert_not_raises_fault(401, chef.update_object,
+        self.assert_not_raises_fault(403, chef.update_object,
                                      'φάκελος', 'ο1', StringIO(new_data),
                                      account=get_user())
         
@@ -1906,75 +1846,52 @@ class TestGreek(BaseTestCase):
 class TestPermissions(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        #keep track of initial account groups
-        self.initial_groups = self.client.retrieve_account_groups()
-        #keep track of initial account meta
-        self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
         
         #create a group
-        self.authorized = ['chazapis', 'verigak', 'gtsouk', 'papagian']
+        self.authorized = ['chazapis', 'verigak', 'gtsouk']
         groups = {'pithosdev':','.join(self.authorized)}
         self.client.set_account_groups(**groups)
     
-    def tearDown(self):
-        #delete additionally created meta
-        l = []
-        for m in self.client.retrieve_account_metadata(restricted=True):
-            if m not in self.initial_meta:
-                l.append(m)
-        self.client.delete_account_metadata(l)
-        
-        #delete additionally created groups
-        l = []
-        for g in self.client.retrieve_account_groups():
-            if g not in self.initial_groups:
-                l.append(g)
-        self.client.unset_account_groups(l)
-        
-        BaseTestCase.tearDown(self)
-    
-    def test_read_access(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+    def assert_read(self, authorized=[], any=False):
         for token, account in OTHER_ACCOUNTS.items():
             cl = Pithos_Client(get_server(), token, account, get_api()) 
-            if account in self.authorized:
-                self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
+            if account in authorized or any:
+                self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
                                              'c', 'o', account=get_user())
             else:
-                self.assert_raises_fault(401, cl.retrieve_object_metadata,
+                self.assert_raises_fault(403, cl.retrieve_object_metadata,
                                          'c', 'o', account=get_user())
         
         #check inheritance
         o = self.upload_random_data('c', 'o/also-shared')
         for token, account in OTHER_ACCOUNTS.items():
             cl = Pithos_Client(get_server(), token, account, get_api()) 
-            if account in self.authorized:
-                self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
+            if account in authorized or any:
+                self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
                                              'c', 'o/also-shared', account=get_user())
             else:
-                self.assert_raises_fault(401, cl.retrieve_object_metadata,
+                self.assert_raises_fault(403, cl.retrieve_object_metadata,
                                          'c', 'o/also-shared', account=get_user())
     
-    def test_write_access(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', ['chazapis'], read=False)
-        o_data = o['data']
+    def assert_write(self, o_data, authorized=[], any=False):
         for token, account in OTHER_ACCOUNTS.items():
             cl = Pithos_Client(get_server(), token, account, get_api()) 
             new_data = get_random_data()
-            if account in [get_user(), 'chazapis']:
-                self.assert_not_raises_fault(401, cl.update_object,
+            if account in authorized or any:
+                # test write access
+                self.assert_not_raises_fault(403, cl.update_object,
                                              'c', 'o', StringIO(new_data),
                                              account=get_user())
-                server_data = self.client.retrieve_object('c', 'o')
-                self.assertEqual(o_data, server_data[:len(o_data)])
-                self.assertEqual(new_data, server_data[len(o_data):])
-                o_data = server_data
+                try:
+                    # test read access
+                    server_data = cl.retrieve_object('c', 'o', account=get_user())
+                    self.assertEqual(o_data, server_data[:len(o_data)])
+                    self.assertEqual(new_data, server_data[len(o_data):])
+                    o_data = server_data
+                except Fault, f:
+                    self.failIf(f.status == 403)
             else:
-                self.assert_raises_fault(401, cl.update_object,
+                self.assert_raises_fault(403, cl.update_object,
                                              'c', 'o', StringIO(new_data),
                                              account=get_user())
         
@@ -1984,20 +1901,75 @@ class TestPermissions(BaseTestCase):
         for token, account in OTHER_ACCOUNTS.items():
             cl = Pithos_Client(get_server(), token, account, get_api()) 
             new_data = get_random_data()
-            if account in [get_user(), 'chazapis']:
-                self.assert_not_raises_fault(401, cl.update_object,
+            if account in authorized or any:
+                # test write access
+                self.assert_not_raises_fault(403, cl.update_object,
                                              'c', o['name'],
                                              StringIO(new_data),
                                              account=get_user())
-                server_data = self.client.retrieve_object('c', o['name'])
-                self.assertEqual(o_data, server_data[:len(o_data)])
-                self.assertEqual(new_data, server_data[len(o_data):])
-                o_data = server_data
+                try:
+                    server_data = cl.retrieve_object('c', o['name'], account=get_user())
+                    self.assertEqual(o_data, server_data[:len(o_data)])
+                    self.assertEqual(new_data, server_data[len(o_data):])
+                    o_data = server_data
+                except Fault, f:
+                    self.failIf(f.status == 403)
             else:
-                self.assert_raises_fault(401, cl.update_object,
+                self.assert_raises_fault(403, cl.update_object,
                                              'c', o['name'],
                                              StringIO(new_data),
                                              account=get_user())
+    
+    def test_group_read(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+        self.assert_read(authorized=self.authorized)
+    
+    def test_read_many(self):
+        #test read access
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', self.authorized)
+        self.assert_read(authorized=self.authorized)
+    
+    def test_read_by_everyone(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['*'])
+        self.assert_read(any=True)
+    
+    def test_group_write(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()], read=False)
+        self.assert_write(o['data'], authorized=self.authorized)
+    
+    def test_write_many(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', self.authorized, read=False)
+        self.assert_write(o['data'], authorized=self.authorized)
+    
+    def test_write_by_everyone(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['*'], read=False)
+        o_data = o['data']
+        self.assert_write(o['data'], any=True)
+
+class TestPublish(BaseTestCase):
+    def test_publish(self):
+        self.client.create_container('c')
+        o_data = self.upload_random_data('c', 'o')['data']
+        self.client.publish_object('c', 'o')
+        meta = self.client.retrieve_object_metadata('c', 'o')
+        self.assertTrue('x-object-public' in meta)
+        url = '/public/%s/c/o' % get_user()
+        self.assertEqual(meta['x-object-public'], url)
+        public_client = Pithos_Client(get_server(), get_auth(), get_user(), api='')
+        data = public_client.get(url)[2]
+        self.assertEqual(o_data, data)
 
 class AssertMappingInvariant(object):
     def __init__(self, callable, *args, **kwargs):