Use environmental variable for the sync container in psync.
[pithos] / tools / test
index 427cdfb..ca34fae 100755 (executable)
@@ -195,7 +195,10 @@ class BaseTestCase(unittest.TestCase):
             r = callableObj(*args, **kwargs)
             self.fail('Should never reach here')
         except Fault, f:
-            self.failUnless(f.status == status)
+            if type(status) == types.ListType:
+                self.failUnless(f.status in status)
+            else:
+                self.failUnless(f.status == status)
     
     def assert_not_raises_fault(self, status, callableObj, *args, **kwargs):
         """
@@ -239,6 +242,12 @@ class BaseTestCase(unittest.TestCase):
         self.assert_raises_fault(404, self.client.retrieve_object_metadata,
                                  container, object)
     
+    def assert_versionlist_structure(self, versionlist):
+        self.assertTrue(type(versionlist) == types.ListType)
+        for elem in versionlist:
+            self.assertTrue(type(elem) == types.ListType)
+            self.assertEqual(len(elem), 2)
+    
     def upload_random_data(self, container, name, length=1024, type=None,
                            enc=None, **meta):
         data = get_random_data(length)
@@ -267,8 +276,8 @@ class BaseTestCase(unittest.TestCase):
             obj['meta'] = args
             
             path = '/%s/%s' % (container, name)
-            self.client.create_object(container, name, StringIO(obj['data']),
-                                      meta, **args)
+            self.client.create_object(container, name, f=StringIO(obj['data']),
+                                      meta=meta, **args)
             
             return obj
         except IOError:
@@ -277,7 +286,6 @@ class BaseTestCase(unittest.TestCase):
 class AccountHead(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
         for item in self.containers:
             self.client.create_container(item)
@@ -349,7 +357,6 @@ class AccountHead(BaseTestCase):
 class AccountGet(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         #create some containers
         self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
         for item in self.containers:
@@ -455,7 +462,6 @@ class AccountGet(BaseTestCase):
 class AccountPost(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
         for item in self.containers:
             self.client.create_container(item)
@@ -569,7 +575,6 @@ class AccountPost(BaseTestCase):
 class ContainerHead(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.container = 'apples'
         self.client.create_container(self.container)
     
@@ -591,7 +596,6 @@ class ContainerHead(BaseTestCase):
 class ContainerGet(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.container = ['pears', 'apples']
         for c in self.container:
             self.client.create_container(c)
@@ -801,7 +805,6 @@ class ContainerGet(BaseTestCase):
 class ContainerPut(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.containers = ['c1', 'c2']
     
     def test_create(self):
@@ -817,7 +820,6 @@ class ContainerPut(BaseTestCase):
 class ContainerPost(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.container = 'apples'
         self.client.create_container(self.container)
     
@@ -834,7 +836,6 @@ class ContainerPost(BaseTestCase):
 class ContainerDelete(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.containers = ['c1', 'c2']
         for c in self.containers:
             self.client.create_container(c)
@@ -857,7 +858,6 @@ class ObjectHead(BaseTestCase):
 class ObjectGet(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.containers = ['c1', 'c2']
         #create some containers
         for c in self.containers:
@@ -869,6 +869,45 @@ class ObjectGet(BaseTestCase):
         for n in names:
             self.objects.append(self.upload_random_data(self.containers[1], n))
     
+    def test_versions(self):
+        c = self.containers[1]
+        o = self.objects[0]
+        b = self.client.retrieve_object_versionlist(c, o['name'])['versions']
+        self.assert_versionlist_structure(b)
+        
+        #update meta
+        meta = {'quality':'AAA', 'stock':True}
+        self.client.update_object_metadata(c, o['name'], **meta)
+        
+        a = self.client.retrieve_object_versionlist(c, o['name'])['versions']
+        self.assert_versionlist_structure(a)
+        self.assertEqual(len(b)+1, len(a))
+        self.assertEqual(b, a[:-1])
+        
+        #get exact previous version metadata
+        v = a[-2][0]
+        v_meta = self.client.retrieve_object_metadata(c, o['name'],
+                                                      restricted=True,
+                                                      version=v)
+        for k in meta.keys():
+            self.assertTrue(k not in v_meta)
+        
+        #update obejct
+        data = get_random_data()
+        self.client.update_object(c, o['name'], StringIO(data))
+        
+        aa = self.client.retrieve_object_versionlist(c, o['name'])['versions']
+        self.assert_versionlist_structure(aa)
+        self.assertEqual(len(a)+1, len(aa))
+        self.assertEqual(a, aa[:-1])
+        
+        #get exact previous version
+        v = aa[-3][0]
+        v_data = self.client.retrieve_object_version(c, o['name'], version=v)
+        self.assertEqual(o['data'], v_data)
+        self.assertEqual(self.client.retrieve_object(c, o['name']),
+                         '%s%s' %(v_data, data))
+    
     def test_get(self):
         #perform get
         o = self.client.retrieve_object(self.containers[1],
@@ -1193,7 +1232,6 @@ class ObjectGet(BaseTestCase):
 class ObjectPut(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.container = 'c1'
         self.client.create_container(self.container)
     
@@ -1288,19 +1326,25 @@ class ObjectPut(BaseTestCase):
         self.assertEqual(int(zero_meta['content-length']), 0)
         self.assertEqual(zero_hash, [])
         self.assertEqual(zero_data, '')
+    
+    def test_create_object_by_hashmap(self):
+        c = self.container
+        o = 'object'
+        self.upload_random_data(c, o)
+        hashmap = self.client.retrieve_object(c, o, format='json')
+        o2 = 'object-copy'
+        self.client.create_object_by_hashmap(c, o2, hashmap)
+        self.assertEqual(self.client.retrieve_object(c, o),
+                         self.client.retrieve_object(c, o))
 
 class ObjectCopy(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.containers = ['c1', 'c2']
         for c in self.containers:
             self.client.create_container(c)
         self.obj = self.upload_random_data(self.containers[0], o_names[0])
-    
-    def tearDown(self):
-        pass
-    
+        
     def test_copy(self):
         with AssertMappingInvariant(self.client.retrieve_object_metadata,
                              self.containers[0], self.obj['name']):
@@ -1363,7 +1407,6 @@ class ObjectCopy(BaseTestCase):
 class ObjectMove(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.containers = ['c1', 'c2']
         for c in self.containers:
             self.client.create_container(c)
@@ -1393,7 +1436,6 @@ class ObjectMove(BaseTestCase):
 class ObjectPost(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.containers = ['c1', 'c2']
         for c in self.containers:
             self.client.create_container(c)
@@ -1504,7 +1546,7 @@ class ObjectPost(BaseTestCase):
     def test_update_object_invalid_range_and_length(self):
         with AssertContentInvariant(self.client.retrieve_object,
                                     self.containers[0], self.obj[0]['name']):
-            self.assert_raises_fault(416, self.test_update_object, 499, 0, True,
+            self.assert_raises_fault([400, 416], self.test_update_object, 499, 0, True,
                                      -1)
     
     def test_update_object_invalid_range_with_no_content_length(self):
@@ -1633,7 +1675,6 @@ class ObjectPost(BaseTestCase):
 class ObjectDelete(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
-        self.account = 'test'
         self.containers = ['c1', 'c2']
         for c in self.containers:
             self.client.create_container(c)
@@ -1651,28 +1692,36 @@ class ObjectDelete(BaseTestCase):
 class ListSharing(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
+        for i in range(2):
+            self.client.create_container('c%s' %i)
         self.client.create_container('c')
         for i in range(2):
-            self.upload_random_data('c', 'o%s' %i)
+            self.upload_random_data('c1', 'o%s' %i)
         accounts = OTHER_ACCOUNTS.copy()
         self.o1_sharing_with = accounts.popitem()
         self.o1_sharing = [self.o1_sharing_with[1]]
-        self.client.share_object('c', 'o1', self.o1_sharing, read=True)
+        self.client.share_object('c1', 'o1', self.o1_sharing, read=True)
         
         l = []
         for i in range(2):
             l.append(accounts.popitem())
-        #self.client.set_account_groups({'pithos-dev':'chazapis,verigak,papagian'})
-        #self.o2_sharing = 'write=%s' % 
-        #self.client.share_object('c', 'o2', self.o2_sharing)
     
-    def test_listing(self):
+    def test_list_other_shared(self):
         self.other = Pithos_Client(get_server(),
                               self.o1_sharing_with[0],
                               self.o1_sharing_with[1],
                               get_api())
-        self.assertTrue('test' in self.other.list_shared_by_others())
-
+        self.assertTrue(get_user() in self.other.list_shared_by_others())
+    
+    def test_list_my_shared(self):
+        my_shared_containers = self.client.list_containers(shared=True)
+        self.assertTrue('c1' in my_shared_containers)
+        self.assertTrue('c2' not in my_shared_containers)
+        
+        my_shared_objects = self.client.list_objects('c1', shared=True)
+        self.assertTrue('o1' in my_shared_objects)
+        self.assertTrue('o2' not in my_shared_objects)
+    
 class TestGreek(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
@@ -1813,7 +1862,7 @@ class TestGreek(BaseTestCase):
         #check read access
         self.client.create_container('φάκελος')
         o = self.upload_random_data('φάκελος', 'ο1')
-        self.client.share_object('φάκελος', 'ο1', ['test:σεφς'])
+        self.client.share_object('φάκελος', 'ο1', ['%s:σεφς' % get_user()])
         chef = Pithos_Client(get_server(),
                             '0009',
                             'διογένης',
@@ -1873,7 +1922,7 @@ class TestPermissions(BaseTestCase):
         self.initial_meta = self.client.retrieve_account_metadata(restricted=True)
         
         #create a group
-        self.authorized = ['chazapis', 'verigak', 'gtsouk', 'papagian']
+        self.authorized = ['chazapis', 'verigak', 'gtsouk']
         groups = {'pithosdev':','.join(self.authorized)}
         self.client.set_account_groups(**groups)
     
@@ -1894,38 +1943,124 @@ class TestPermissions(BaseTestCase):
         
         BaseTestCase.tearDown(self)
     
-    def test_read_access(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', ['test:pithosdev'])
+    def assert_read(self, authorized=[], any=False):
         for token, account in OTHER_ACCOUNTS.items():
             cl = Pithos_Client(get_server(), token, account, get_api()) 
-            if account in self.authorized:
+            if account in authorized or any:
                 self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
                                              'c', 'o', account=get_user())
             else:
                 self.assert_raises_fault(401, cl.retrieve_object_metadata,
                                          'c', 'o', account=get_user())
         
+        #check inheritance
+        o = self.upload_random_data('c', 'o/also-shared')
+        for token, account in OTHER_ACCOUNTS.items():
+            cl = Pithos_Client(get_server(), token, account, get_api()) 
+            if account in authorized or any:
+                self.assert_not_raises_fault(401, cl.retrieve_object_metadata,
+                                             'c', 'o/also-shared', account=get_user())
+            else:
+                self.assert_raises_fault(401, cl.retrieve_object_metadata,
+                                         'c', 'o/also-shared', account=get_user())
     
-    def test_write_access(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', ['chazapis'], read=False)
+    def assert_write(self, o_data, authorized=[], any=False):
         for token, account in OTHER_ACCOUNTS.items():
             cl = Pithos_Client(get_server(), token, account, get_api()) 
             new_data = get_random_data()
-            if account == 'chazapis':
+            if account in authorized or any:
+                # test write access
                 self.assert_not_raises_fault(401, cl.update_object,
                                              'c', 'o', StringIO(new_data),
                                              account=get_user())
-                server_data = self.client.retrieve_object('c', 'o')
-                self.assertEqual(o['data'], server_data[:len(o['data'])])
-                self.assertEqual(new_data, server_data[len(o['data']):])
+                try:
+                    # test read access
+                    server_data = cl.retrieve_object('c', 'o', account=get_user())
+                    self.assertEqual(o_data, server_data[:len(o_data)])
+                    self.assertEqual(new_data, server_data[len(o_data):])
+                    o_data = server_data
+                except Fault, f:
+                    self.failIf(f.status == 401)
             else:
                 self.assert_raises_fault(401, cl.update_object,
                                              'c', 'o', StringIO(new_data),
                                              account=get_user())
+        
+        #check inheritance
+        o = self.upload_random_data('c', 'o/also-shared')
+        o_data = o['data']
+        for token, account in OTHER_ACCOUNTS.items():
+            cl = Pithos_Client(get_server(), token, account, get_api()) 
+            new_data = get_random_data()
+            if account in authorized or any:
+                # test write access
+                self.assert_not_raises_fault(401, cl.update_object,
+                                             'c', o['name'],
+                                             StringIO(new_data),
+                                             account=get_user())
+                try:
+                    server_data = cl.retrieve_object('c', o['name'], account=get_user())
+                    self.assertEqual(o_data, server_data[:len(o_data)])
+                    self.assertEqual(new_data, server_data[len(o_data):])
+                    o_data = server_data
+                except Fault, f:
+                    self.failIf(f.status == 401)
+            else:
+                self.assert_raises_fault(401, cl.update_object,
+                                             'c', o['name'],
+                                             StringIO(new_data),
+                                             account=get_user())
+    
+    def test_group_read(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+        self.assert_read(authorized=self.authorized)
+    
+    def test_read_many(self):
+        #test read access
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', self.authorized)
+        self.assert_read(authorized=self.authorized)
+    
+    def test_read_by_everyone(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['*'])
+        self.assert_read(any=True)
+    
+    def test_group_write(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()], read=False)
+        self.assert_write(o['data'], authorized=self.authorized)
+    
+    def test_write_many(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', self.authorized, read=False)
+        self.assert_write(o['data'], authorized=self.authorized)
+    
+    def test_write_by_everyone(self):
+        self.client.create_container('c')
+        o = self.upload_random_data('c', 'o')
+        self.client.share_object('c', 'o', ['*'], read=False)
+        o_data = o['data']
+        self.assert_write(o['data'], any=True)
+
+class TestPublish(BaseTestCase):
+    def test_publish(self):
+        self.client.create_container('c')
+        o_data = self.upload_random_data('c', 'o')['data']
+        self.client.publish_object('c', 'o')
+        meta = self.client.retrieve_object_metadata('c', 'o')
+        self.assertTrue('x-object-public' in meta)
+        url = '/public/%s/c/o' % get_user()
+        self.assertEqual(meta['x-object-public'], url)
+        public_client = Pithos_Client(get_server(), get_auth(), get_user(), api='')
+        data = public_client.get(url)[2]
+        self.assertEqual(o_data, data)
 
 class AssertMappingInvariant(object):
     def __init__(self, callable, *args, **kwargs):
@@ -2011,4 +2146,7 @@ o_names = ['kate.jpg',
            'photos/me.jpg']
 
 if __name__ == "__main__":
-    unittest.main()
+    if get_user() == 'test':
+        unittest.main()
+    else:
+        print 'Will not run tests as any other user except \'test\' (current user: %s).' % get_user()