More tests for listing shared & public objects & mass copy/move/delete
authorSofia Papagiannaki <papagian@gmail.com>
Wed, 27 Jun 2012 15:33:06 +0000 (18:33 +0300)
committerSofia Papagiannaki <papagian@gmail.com>
Wed, 27 Jun 2012 15:33:06 +0000 (18:33 +0300)
Refs: #2611
Refs: #2394

snf-pithos-tools/pithos/tools/lib/client.py
snf-pithos-tools/pithos/tools/test.py

index cbe88ac..d45f33e 100644 (file)
@@ -762,6 +762,18 @@ class Pithos_Client(OOS_Client):
         return OOS_Client.create_zero_length_object(self, container, object,
                                                     **args)
     
+    def create_folder(self, container, name,
+                          meta={}, etag=None, 
+                          content_encoding=None,
+                          content_disposition=None,
+                          x_object_manifest=None, x_object_sharing=None,
+                          x_object_public=None, account=None):
+       args = locals().copy()
+        for elem in ['self', 'container', 'name']:
+            args.pop(elem)
+        args['content_type'] = 'application/directory'
+        return self.create_zero_length_object(container, name, **args)
+    
     def create_object(self, container, object, f=stdin, format='text',
                       meta={}, params={}, etag=None, content_type=None,
                       content_encoding=None, content_disposition=None,
index 9301b6d..803d9ce 100755 (executable)
@@ -538,7 +538,63 @@ class ContainerGet(BaseTestCase):
             self.obj.append(self.upload_random_data(self.container[0], o))
         for o in o_names[8:]:
             self.obj.append(self.upload_random_data(self.container[1], o))
-
+    
+    def test_list_shared(self):
+        self.client.share_object(self.container[0], self.obj[0]['name'], ('*',))
+        objs = self.client.list_objects(self.container[0], shared=True)
+        self.assertEqual(objs, [self.obj[0]['name']])
+        
+        # create child object
+        self.upload_random_data(self.container[0], strnextling(self.obj[0]['name']))
+        objs = self.client.list_objects(self.container[0], shared=True)
+        self.assertEqual(objs, [self.obj[0]['name']])
+        
+        # test inheritance
+        self.client.create_folder(self.container[1], 'folder')
+        self.client.share_object(self.container[1], 'folder', ('*',))
+        self.upload_random_data(self.container[1], 'folder/object')
+        objs = self.client.list_objects(self.container[1], shared=True)
+        self.assertEqual(objs, ['folder', 'folder/object'])
+    
+    def test_list_public(self):
+        self.client.publish_object(self.container[0], self.obj[0]['name'])
+        objs = self.client.list_objects(self.container[0], public=True)
+        self.assertEqual(objs, [self.obj[0]['name']])
+        
+        # create child object
+        self.upload_random_data(self.container[0], strnextling(self.obj[0]['name']))
+        objs = self.client.list_objects(self.container[0], public=True)
+        self.assertEqual(objs, [self.obj[0]['name']])
+        
+        # test inheritance
+        self.client.create_folder(self.container[1], 'folder')
+        self.client.publish_object(self.container[1], 'folder')
+        self.upload_random_data(self.container[1], 'folder/object')
+        objs = self.client.list_objects(self.container[1], public=True)
+        self.assertEqual(objs, ['folder'])
+    
+    def test_list_shared_public(self):
+        self.client.share_object(self.container[0], self.obj[0]['name'], ('*',))
+        self.client.publish_object(self.container[0], self.obj[1]['name'])
+        objs = self.client.list_objects(self.container[0], shared=True, public=True)
+        self.assertEqual(objs, [self.obj[0]['name'], self.obj[1]['name']])
+        
+        # create child object
+        self.upload_random_data(self.container[0], strnextling(self.obj[0]['name']))
+        self.upload_random_data(self.container[0], strnextling(self.obj[1]['name']))
+        objs = self.client.list_objects(self.container[0], shared=True, public=True)
+        self.assertEqual(objs, [self.obj[0]['name'], self.obj[1]['name']])
+        
+        # test inheritance
+        self.client.create_folder(self.container[1], 'folder1')
+        self.client.share_object(self.container[1], 'folder1', ('*',))
+        self.upload_random_data(self.container[1], 'folder1/object')
+        self.client.create_folder(self.container[1], 'folder2')
+        self.client.publish_object(self.container[1], 'folder2')
+        o = self.upload_random_data(self.container[1], 'folder2/object')
+        objs = self.client.list_objects(self.container[1], shared=True, public=True)
+        self.assertEqual(objs, ['folder1', 'folder1/object', 'folder2'])
+    
     def test_list_objects(self):
         objects = self.client.list_objects(self.container[0])
         l = [elem['name'] for elem in self.obj[:8]]
@@ -1373,7 +1429,23 @@ class ObjectCopy(BaseTestCase):
         self.assert_raises_fault(404, self.client.copy_object, self.containers[1],
                                  self.obj['name'], self.containers[1],
                                  'testcopy', meta)
-
+    
+    def test_copy_dir(self):
+        self.client.create_folder(self.containers[0], 'dir')
+        objects = ('object1', 'subdir/object2', 'dirs')
+        for name in objects[:-1]:
+            self.upload_random_data(self.containers[0], 'dir/%s' % name)
+        self.upload_random_data(self.containers[0], 'dirs')
+        
+        self.client.copy_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/')
+        self.assert_object_exists(self.containers[0], 'dir')
+        self.assert_object_not_exists(self.containers[1], 'dirs')
+        for name in objects[:-1]:
+            meta0 = self.client.retrieve_object_metadata(self.containers[0], 'dir/%s' % name)
+            meta1 = self.client.retrieve_object_metadata(self.containers[1], 'dir-backup/%s' % name)
+            t = ('content-length', 'x-object-hash', 'content-type')
+            (self.assertEqual(meta0[elem], meta1[elem]) for elem in t)
+        
 class ObjectMove(BaseTestCase):
     def setUp(self):
         BaseTestCase.setUp(self)
@@ -1409,6 +1481,26 @@ class ObjectMove(BaseTestCase):
 
         #assert src object no more exists
         self.assert_object_not_exists(self.containers[0], self.obj['name'])
+    
+    
+    def test_move_dir(self):
+        self.client.create_folder(self.containers[0], 'dir')
+        objects = ('object1', 'subdir/object2', 'dirs')
+        meta = {}
+        for name in objects[:-1]:
+            self.upload_random_data(self.containers[0], 'dir/%s' % name)
+            meta[name] = self.client.retrieve_object_metadata(self.containers[0], 'dir/%s' % name)
+        self.upload_random_data(self.containers[0], 'dirs')
+        
+        self.client.move_object(self.containers[0], 'dir', self.containers[1], 'dir-backup', delimiter='/', content_type='application/folder')
+        self.assert_object_not_exists(self.containers[0], 'dir')
+        self.assert_object_not_exists(self.containers[1], 'dirs')
+        for name in objects[:-1]:
+            self.assert_object_not_exists(self.containers[0], 'dir/%s' % name)
+            self.assert_object_exists(self.containers[1], 'dir-backup/%s' % name)
+            meta1 = self.client.retrieve_object_metadata(self.containers[1], 'dir-backup/%s' % name)
+            t = ('content-length', 'x-object-hash', 'content-type')
+            (self.assertEqual(meta[name][elem], meta1[elem]) for elem in t)
 
 class ObjectPost(BaseTestCase):
     def setUp(self):
@@ -1554,12 +1646,12 @@ class ObjectPost(BaseTestCase):
             self.assert_raises_fault(400, self.test_update_object,
                                      content_length = 1000)
 
-    def _test_update_object_invalid_range(self):
+    def test_update_object_invalid_range(self):
         with AssertContentInvariant(self.client.retrieve_object,
                                     self.containers[0], self.obj[0]['name']):
             self.assert_raises_fault(416, self.test_update_object, 499, 0, True)
 
-    def _test_update_object_invalid_range_and_length(self):
+    def test_update_object_invalid_range_and_length(self):
         with AssertContentInvariant(self.client.retrieve_object,
                                     self.containers[0], self.obj[0]['name']):
             self.assert_raises_fault([400, 416], self.test_update_object, 499, 0, True,
@@ -1704,6 +1796,19 @@ class ObjectDelete(BaseTestCase):
         #assert item not found
         self.assert_raises_fault(404, self.client.delete_object, self.containers[1],
                                  self.obj['name'])
+    
+    def test_delete_dir(self):
+        self.client.create_folder(self.containers[0], 'dir')
+        objects = ('object1', 'subdir/object2', 'dirs')
+        for name in objects[:-1]:
+            self.upload_random_data(self.containers[0], 'dir/%s' % name)
+        self.upload_random_data(self.containers[0], 'dirs')
+        
+        self.client.delete_object(self.containers[0], 'dir', delimiter='/')
+        self.assert_object_not_exists(self.containers[0], 'dir')
+        self.assert_object_exists(self.containers[0], 'dirs')
+        for name in objects[:-1]:
+            self.assert_object_not_exists(self.containers[0], 'dir/%s' % name)
 
 class ListSharing(BaseTestCase):
     def setUp(self):
@@ -2273,7 +2378,6 @@ class AssertMappingInvariant(object):
         for k, v in self.map.items():
             if is_date(v):
                 continue
-            #print '#', k, v, map[k]
             assert(k in map)
             assert v == map[k]
 
@@ -2327,6 +2431,26 @@ def is_date(date):
             return True
     return False
 
+def strnextling(prefix):
+    """Return the first unicode string
+       greater than but not starting with given prefix.
+       strnextling('hello') -> 'hellp'
+    """
+    if not prefix:
+        ## all strings start with the null string,
+        ## therefore we have to approximate strnextling('')
+        ## with the last unicode character supported by python
+        ## 0x10ffff for wide (32-bit unicode) python builds
+        ## 0x00ffff for narrow (16-bit unicode) python builds
+        ## We will not autodetect. 0xffff is safe enough.
+        return unichr(0xffff)
+    s = prefix[:-1]
+    c = ord(prefix[-1])
+    if c >= 0xffff:
+        raise RuntimeError
+    s += unichr(c+1)
+    return s
+
 o_names = ['kate.jpg',
            'kate_beckinsale.jpg',
            'How To Win Friends And Influence People.pdf',