Fix storers that was out of date
[pithos] / tools / test
index ff9e89c..fa26fc6 100755 (executable)
@@ -69,6 +69,7 @@ class BaseTestCase(unittest.TestCase):
     def setUp(self):
         self.client = Pithos_Client(get_server(), get_auth(), get_user(),
                                     get_api())
+        self._clean_account()
         self.invalid_client = Pithos_Client(get_server(), get_auth(), 'invalid',
                                             get_api())
         #self.headers = {
@@ -124,9 +125,12 @@ class BaseTestCase(unittest.TestCase):
                 'content_type',
                 'content_encoding',
                 'last_modified',)}
-        self.return_codes = (400, 401, 404, 503,)
+        self.return_codes = (400, 401, 403, 404, 503,)
     
     def tearDown(self):
+        self._clean_account()
+    
+    def _clean_account(self):
         for c in self.client.list_containers():
             while True:
                 #list objects returns at most 10000 objects
@@ -195,7 +199,10 @@ class BaseTestCase(unittest.TestCase):
             r = callableObj(*args, **kwargs)
             self.fail('Should never reach here')
         except Fault, f:
-            self.failUnless(f.status == status)
+            if type(status) == types.ListType:
+                self.failUnless(f.status in status)
+            else:
+                self.failUnless(f.status == status)
     
     def assert_not_raises_fault(self, status, callableObj, *args, **kwargs):
         """
@@ -273,8 +280,8 @@ class BaseTestCase(unittest.TestCase):
             obj['meta'] = args
             
             path = '/%s/%s' % (container, name)
-            self.client.create_object(container, name, StringIO(obj['data']),
-                                      meta, **args)
+            self.client.create_object(container, name, f=StringIO(obj['data']),
+                                      meta=meta, **args)
             
             return obj
         except IOError:
@@ -326,8 +333,8 @@ class AccountHead(BaseTestCase):
             size = size + int(m['x-container-bytes-used'])
         self.assertEqual(meta['x-account-bytes-used'], str(size))
     
-    def test_get_account_401(self):
-        self.assert_raises_fault(401,
+    def test_get_account_403(self):
+        self.assert_raises_fault(403,
                                  self.invalid_client.retrieve_account_metadata)
     
     def test_get_account_meta_until(self):
@@ -364,8 +371,8 @@ class AccountGet(BaseTestCase):
         containers = self.client.list_containers()
         self.assertEquals(self.containers, containers)
     
-    def test_list_401(self):
-        self.assert_raises_fault(401, self.invalid_client.list_containers)
+    def test_list_403(self):
+        self.assert_raises_fault(403, self.invalid_client.list_containers)
     
     def test_list_with_limit(self):
         limit = 2
@@ -502,7 +509,7 @@ class AccountPost(BaseTestCase):
         
     def test_invalid_account_update_meta(self):
         meta = {'test':'test', 'tost':'tost'}
-        self.assert_raises_fault(401,
+        self.assert_raises_fault(403,
                                  self.invalid_client.update_account_metadata,
                                  **meta)
     
@@ -814,6 +821,24 @@ class ContainerPut(BaseTestCase):
         self.client.create_container(self.containers[0])
         self.assertTrue(not self.client.create_container(self.containers[0]))
     
+    def test_quota(self):
+        self.client.create_container(self.containers[0])
+        
+        policy = {'quota':100}
+        self.client.set_container_policies('c1', **policy)
+        
+        meta = self.client.retrieve_container_metadata('c1')
+        self.assertTrue('x-container-policy-quota' in meta)
+        self.assertEqual(meta['x-container-policy-quota'], '100')
+        
+        args = ['c1', 'o1']
+        kwargs = {'length':101}
+        self.assert_raises_fault(413, self.upload_random_data, *args, **kwargs)
+        
+        #reset quota
+        policy = {'quota':0}
+        self.client.set_container_policies('c1', **policy)
+    
 class ContainerPost(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
@@ -836,13 +861,13 @@ class ContainerDelete(BaseTestCase):
         self.containers = ['c1', 'c2']
         for c in self.containers:
             self.client.create_container(c)
-        self.upload_random_data(self.containers[1], o_names[0])
     
     def test_delete(self):
         status = self.client.delete_container(self.containers[0])[0]
         self.assertEqual(status, 204)
     
     def test_delete_non_empty(self):
+        self.upload_random_data(self.containers[1], o_names[0])
         self.assert_raises_fault(409, self.client.delete_container,
                                  self.containers[1])
     
@@ -1323,6 +1348,16 @@ class ObjectPut(BaseTestCase):
         self.assertEqual(int(zero_meta['content-length']), 0)
         self.assertEqual(zero_hash, [])
         self.assertEqual(zero_data, '')
+    
+    def test_create_object_by_hashmap(self):
+        c = self.container
+        o = 'object'
+        self.upload_random_data(c, o)
+        hashmap = self.client.retrieve_object(c, o, format='json')
+        o2 = 'object-copy'
+        self.client.create_object_by_hashmap(c, o2, hashmap)
+        self.assertEqual(self.client.retrieve_object(c, o),
+                         self.client.retrieve_object(c, o))
 
 class ObjectCopy(BaseTestCase):
     def setUp(self):
@@ -1331,10 +1366,7 @@ class ObjectCopy(BaseTestCase):
         for c in self.containers:
             self.client.create_container(c)
         self.obj = self.upload_random_data(self.containers[0], o_names[0])
-    
-    def tearDown(self):
-        pass
-    
+        
     def test_copy(self):
         with AssertMappingInvariant(self.client.retrieve_object_metadata,
                              self.containers[0], self.obj['name']):
@@ -1536,7 +1568,7 @@ class ObjectPost(BaseTestCase):
     def test_update_object_invalid_range_and_length(self):
         with AssertContentInvariant(self.client.retrieve_object,
                                     self.containers[0], self.obj[0]['name']):
-            self.assert_raises_fault(416, self.test_update_object, 499, 0, True,
+            self.assert_raises_fault([400, 416], self.test_update_object, 499, 0, True,
                                      -1)
     
     def test_update_object_invalid_range_with_no_content_length(self):
@@ -1857,13 +1889,13 @@ class TestGreek(BaseTestCase):
                             '0009',
                             'διογένης',
                             get_api())
-        self.assert_not_raises_fault(401, chef.retrieve_object_metadata,
+        self.assert_not_raises_fault(403, chef.retrieve_object_metadata,
                                      'φάκελος', 'ο1', account=get_user())
         
         #check write access
         self.client.share_object('φάκελος', 'ο1', ['διογένης'], read=False)
         new_data = get_random_data()
-        self.assert_not_raises_fault(401, chef.update_object,
+        self.assert_not_raises_fault(403, chef.update_object,
                                      'φάκελος', 'ο1', StringIO(new_data),
                                      account=get_user())
         
@@ -1912,7 +1944,7 @@ class TestPermissions(BaseTestCase):
         self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
         
         #create a group
-        self.authorized = ['chazapis', 'verigak', 'gtsouk', 'papagian']
+        self.authorized = ['chazapis', 'verigak', 'gtsouk']
         groups = {'pithosdev':','.join(self.authorized)}
         self.client.set_account_groups(**groups)
     
@@ -1933,48 +1965,46 @@ class TestPermissions(BaseTestCase):
         
         BaseTestCase.tearDown(self)
     
-    def test_read_access(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+    def assert_read(self, authorized=[], any=False):
         for token, account in OTHER_ACCOUNTS.items():
             cl = Pithos_Client(get_server(), token, account, get_api()) 
-            if account in self.authorized:
-                self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
+            if account in authorized or any:
+                self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
                                              'c', 'o', account=get_user())
             else:
-                self.assert_raises_fault(401, cl.retrieve_object_metadata,
+                self.assert_raises_fault(403, cl.retrieve_object_metadata,
                                          'c', 'o', account=get_user())
         
         #check inheritance
         o = self.upload_random_data('c', 'o/also-shared')
         for token, account in OTHER_ACCOUNTS.items():
             cl = Pithos_Client(get_server(), token, account, get_api()) 
-            if account in self.authorized:
-                self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
+            if account in authorized or any:
+                self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
                                              'c', 'o/also-shared', account=get_user())
             else:
-                self.assert_raises_fault(401, cl.retrieve_object_metadata,
+                self.assert_raises_fault(403, cl.retrieve_object_metadata,
                                          'c', 'o/also-shared', account=get_user())
     
-    def test_write_access(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', ['chazapis'], read=False)
-        o_data = o['data']
+    def assert_write(self, o_data, authorized=[], any=False):
         for token, account in OTHER_ACCOUNTS.items():
             cl = Pithos_Client(get_server(), token, account, get_api()) 
             new_data = get_random_data()
-            if account in [get_user(), 'chazapis']:
-                self.assert_not_raises_fault(401, cl.update_object,
+            if account in authorized or any:
+                # test write access
+                self.assert_not_raises_fault(403, cl.update_object,
                                              'c', 'o', StringIO(new_data),
                                              account=get_user())
-                server_data = self.client.retrieve_object('c', 'o')
-                self.assertEqual(o_data, server_data[:len(o_data)])
-                self.assertEqual(new_data, server_data[len(o_data):])
-                o_data = server_data
+                try:
+                    # test read access
+                    server_data = cl.retrieve_object('c', 'o', account=get_user())
+                    self.assertEqual(o_data, server_data[:len(o_data)])
+                    self.assertEqual(new_data, server_data[len(o_data):])
+                    o_data = server_data
+                except Fault, f:
+                    self.failIf(f.status == 403)
             else:
-                self.assert_raises_fault(401, cl.update_object,
+                self.assert_raises_fault(403, cl.update_object,
                                              'c', 'o', StringIO(new_data),
                                              account=get_user())
         
@@ -1984,20 +2014,75 @@ class TestPermissions(BaseTestCase):
         for token, account in OTHER_ACCOUNTS.items():
             cl = Pithos_Client(get_server(), token, account, get_api()) 
             new_data = get_random_data()
-            if account in [get_user(), 'chazapis']:
-                self.assert_not_raises_fault(401, cl.update_object,
+            if account in authorized or any:
+                # test write access
+                self.assert_not_raises_fault(403, cl.update_object,
                                              'c', o['name'],
                                              StringIO(new_data),
                                              account=get_user())
-                server_data = self.client.retrieve_object('c', o['name'])
-                self.assertEqual(o_data, server_data[:len(o_data)])
-                self.assertEqual(new_data, server_data[len(o_data):])
-                o_data = server_data
+                try:
+                    server_data = cl.retrieve_object('c', o['name'], account=get_user())
+                    self.assertEqual(o_data, server_data[:len(o_data)])
+                    self.assertEqual(new_data, server_data[len(o_data):])
+                    o_data = server_data
+                except Fault, f:
+                    self.failIf(f.status == 403)
             else:
-                self.assert_raises_fault(401, cl.update_object,
+                self.assert_raises_fault(403, cl.update_object,
                                              'c', o['name'],
                                              StringIO(new_data),
                                              account=get_user())
+    
+    def test_group_read(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+        self.assert_read(authorized=self.authorized)
+    
+    def test_read_many(self):
+        #test read access
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', self.authorized)
+        self.assert_read(authorized=self.authorized)
+    
+    def test_read_by_everyone(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['*'])
+        self.assert_read(any=True)
+    
+    def test_group_write(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()], read=False)
+        self.assert_write(o['data'], authorized=self.authorized)
+    
+    def test_write_many(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', self.authorized, read=False)
+        self.assert_write(o['data'], authorized=self.authorized)
+    
+    def test_write_by_everyone(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['*'], read=False)
+        o_data = o['data']
+        self.assert_write(o['data'], any=True)
+
+class TestPublish(BaseTestCase):
+    def test_publish(self):
+        self.client.create_container('c')
+        o_data = self.upload_random_data('c', 'o')['data']
+        self.client.publish_object('c', 'o')
+        meta = self.client.retrieve_object_metadata('c', 'o')
+        self.assertTrue('x-object-public' in meta)
+        url = '/public/%s/c/o' % get_user()
+        self.assertEqual(meta['x-object-public'], url)
+        public_client = Pithos_Client(get_server(), get_auth(), get_user(), api='')
+        data = public_client.get(url)[2]
+        self.assertEqual(o_data, data)
 
 class AssertMappingInvariant(object):
     def __init__(self, callable, *args, **kwargs):
@@ -2085,3 +2170,5 @@ o_names = ['kate.jpg',
 if __name__ == "__main__":
     if get_user() == 'test':
         unittest.main()
+    else:
+        print 'Will not run tests as any other user except \'test\' (current user: %s).' % get_user()