Merge commit 'v0.9.0' into packaging
[pithos] / snf-pithos-tools / pithos / tools / test.py
index 35c3fdd..e4b36da 100755 (executable)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 #coding=utf8
 
-# Copyright 2011 GRNET S.A. All rights reserved.
+# Copyright 2011-2012 GRNET S.A. All rights reserved.
 # 
 # Redistribution and use in source and binary forms, with or
 # without modification, are permitted provided that the following
 # or implied, of GRNET S.A.
 
 from pithos.lib.client import Pithos_Client, Fault
-from pithos.lib.util import get_user, get_auth, get_server, get_api
+from pithos.lib.util import get_user, get_auth, get_url
 
 from xml.dom import minidom
 from StringIO import StringIO
 from hashlib import new as newhasher
 from binascii import hexlify
+from httplib import HTTPConnection
+from urlparse import urlparse
 
 import json
 import unittest
@@ -71,11 +73,9 @@ OTHER_ACCOUNTS = {
 class BaseTestCase(unittest.TestCase):
     #TODO unauthorized request
     def setUp(self):
-        self.client = Pithos_Client(get_server(), get_auth(), get_user(),
-                                    get_api())
+        self.client = Pithos_Client(get_url(), get_auth(), get_user())
         self._clean_account()
-        self.invalid_client = Pithos_Client(get_server(), get_auth(), 'invalid',
-                                            get_api())
+        self.invalid_client = Pithos_Client(get_url(), get_auth(), 'invalid')
         
         #keep track of initial account groups
         self.initial_groups = self.client.retrieve_account_groups()
@@ -1416,7 +1416,7 @@ class ObjectPost(BaseTestCase):
                                  self.containers[0],
                                  self.obj[0]['name']):
             #perform update metadata
-            more = {'foo':'foo', 'bar':'bar'}
+            more = {'foo': 'foo', 'bar': 'bar', 'f' * 114: 'b' * 256}
             status = self.client.update_object_metadata(self.containers[0],
                                                         self.obj[0]['name'],
                                                         **more)[0]
@@ -1431,6 +1431,13 @@ class ObjectPost(BaseTestCase):
             for k,v in more.items():
                 self.assertTrue(k in headers.keys())
                 self.assertTrue(headers[k], v)
+            
+            #out of limits
+            more = {'f' * 114: 'b' * 257}
+            self.assert_raises_fault(400, self.client.update_object_metadata,
+                                                        self.containers[0],
+                                                        self.obj[0]['name'],
+                                                        **more)
     
     def test_update_object(self,
                            first_byte_pos=0,
@@ -1452,9 +1459,10 @@ class ObjectPost(BaseTestCase):
             if content_length:
                 args['content_length'] = content_length
             
-            status = self.client.update_object(self.containers[0], self.obj[0]['name'],
-                                      StringIO(data), **args)[0]
-            
+            r = self.client.update_object(self.containers[0], self.obj[0]['name'],
+                                      StringIO(data), **args)
+            status = r[0]
+            etag = r[1]['etag']
             if partial < 0 or (instance_length and l <= last_byte_pos):
                 self.assertEqual(status, 202)    
             else:
@@ -1465,6 +1473,7 @@ class ObjectPost(BaseTestCase):
                 self.assertEqual(content[:first_byte_pos], self.obj[0]['data'][:first_byte_pos])
                 self.assertEqual(content[first_byte_pos:last_byte_pos+1], data)
                 self.assertEqual(content[last_byte_pos+1:], self.obj[0]['data'][last_byte_pos+1:])
+                self.assertEqual(etag, compute_md5_hash(content))
     
     def test_update_object_lt_blocksize(self):
         self.test_update_object(10, 20, content_length=None)
@@ -1681,10 +1690,9 @@ class ListSharing(BaseTestCase):
             l.append(accounts.popitem())
     
     def test_list_other_shared(self):
-        self.other = Pithos_Client(get_server(),
+        self.other = Pithos_Client(get_url(),
                               self.o1_sharing_with[0],
-                              self.o1_sharing_with[1],
-                              get_api())
+                              self.o1_sharing_with[1])
         self.assertTrue(get_user() in self.other.list_shared_by_others())
     
     def test_list_my_shared(self):
@@ -1834,10 +1842,9 @@ class TestGreek(BaseTestCase):
         self.client.create_container('φάκελος')
         o = self.upload_random_data('φάκελος', 'ο1')
         self.client.share_object('φάκελος', 'ο1', ['%s:σεφς' % get_user()])
-        chef = Pithos_Client(get_server(),
+        chef = Pithos_Client(get_url(),
                             '0009',
-                            'διογένης',
-                            get_api())
+                            'διογένης')
         self.assert_not_raises_fault(403, chef.retrieve_object_metadata,
                                      'φάκελος', 'ο1', account=get_user())
         
@@ -1892,40 +1899,61 @@ class TestPermissions(BaseTestCase):
         self.authorized = ['chazapis', 'verigak', 'gtsouk']
         groups = {'pithosdev':','.join(self.authorized)}
         self.client.set_account_groups(**groups)
-    
-    def assert_read(self, authorized=[], any=False):
+        
+        self.container = 'c'
+        self.object = 'o'
+        self.client.create_container(self.container)
+        self.upload_random_data(self.container, self.object)
+        self.upload_random_data(self.container, self.object+'/')
+        self.upload_random_data(self.container, self.object+'/a')
+        self.upload_random_data(self.container, self.object+'a')
+        self.upload_random_data(self.container, self.object+'a/')
+        self.dir_content_types = ('application/directory', 'application/folder')
+    
+    def assert_read(self, authorized=[], any=False, depth=0):
         for token, account in OTHER_ACCOUNTS.items():
-            cl = Pithos_Client(get_server(), token, account, get_api()) 
+            cl = Pithos_Client(get_url(), token, account)
             if account in authorized or any:
                 self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
-                                             'c', 'o', account=get_user())
+                                             self.container, self.object,
+                                             account=get_user())
             else:
                 self.assert_raises_fault(403, cl.retrieve_object_metadata,
-                                         'c', 'o', account=get_user())
+                                         self.container, self.object,
+                                         account=get_user())
         
         #check inheritance
-        o = self.upload_random_data('c', 'o/also-shared')
+        meta = self.client.retrieve_object_metadata(self.container, self.object)
+        type = meta['content-type']
+        derivatives = self.client.list_objects(self.container, prefix=self.object)
+        #exclude the self.object
+        del derivatives[derivatives.index(self.object)]
+        for o in derivatives:
+            for token, account in OTHER_ACCOUNTS.items():
+                cl = Pithos_Client(get_url(), token, account)
+                prefix = self.object if self.object.endswith('/') else self.object+'/'
+                if (account in authorized or any) and \
+                (type in self.dir_content_types) and \
+                o.startswith(prefix):
+                    self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
+                                             self.container, o, account=get_user())
+                else:
+                    self.assert_raises_fault(403, cl.retrieve_object_metadata,
+                                         self.container, o, account=get_user())
+    
+    def assert_write(self, authorized=[], any=False):
+        o_data = self.client.retrieve_object(self.container, self.object)
         for token, account in OTHER_ACCOUNTS.items():
-            cl = Pithos_Client(get_server(), token, account, get_api()) 
-            if account in authorized or any:
-                self.assert_not_raises_fault(403, cl.retrieve_object_metadata,
-                                             'c', 'o/also-shared', account=get_user())
-            else:
-                self.assert_raises_fault(403, cl.retrieve_object_metadata,
-                                         'c', 'o/also-shared', account=get_user())
-    
-    def assert_write(self, o_data, authorized=[], any=False):
-        for token, account in OTHER_ACCOUNTS.items():
-            cl = Pithos_Client(get_server(), token, account, get_api()) 
+            cl = Pithos_Client(get_url(), token, account)
             new_data = get_random_data()
             if account in authorized or any:
                 # test write access
                 self.assert_not_raises_fault(403, cl.update_object,
-                                             'c', 'o', StringIO(new_data),
+                                             self.container, self.object, StringIO(new_data),
                                              account=get_user())
                 try:
                     # test read access
-                    server_data = cl.retrieve_object('c', 'o', account=get_user())
+                    server_data = cl.retrieve_object(self.container, self.object, account=get_user())
                     self.assertEqual(o_data, server_data[:len(o_data)])
                     self.assertEqual(new_data, server_data[len(o_data):])
                     o_data = server_data
@@ -1933,72 +1961,105 @@ class TestPermissions(BaseTestCase):
                     self.failIf(f.status == 403)
             else:
                 self.assert_raises_fault(403, cl.update_object,
-                                             'c', 'o', StringIO(new_data),
+                                             self.container, self.object, StringIO(new_data),
                                              account=get_user())
-        
         #check inheritance
-        o = self.upload_random_data('c', 'o/also-shared')
-        o_data = o['data']
-        for token, account in OTHER_ACCOUNTS.items():
-            cl = Pithos_Client(get_server(), token, account, get_api()) 
-            new_data = get_random_data()
-            if account in authorized or any:
-                # test write access
-                self.assert_not_raises_fault(403, cl.update_object,
-                                             'c', o['name'],
-                                             StringIO(new_data),
-                                             account=get_user())
-                try:
-                    server_data = cl.retrieve_object('c', o['name'], account=get_user())
-                    self.assertEqual(o_data, server_data[:len(o_data)])
-                    self.assertEqual(new_data, server_data[len(o_data):])
-                    o_data = server_data
-                except Fault, f:
-                    self.failIf(f.status == 403)
-            else:
-                self.assert_raises_fault(403, cl.update_object,
-                                             'c', o['name'],
-                                             StringIO(new_data),
-                                             account=get_user())
+        meta = self.client.retrieve_object_metadata(self.container, self.object)
+        type = meta['content-type']
+        derivatives = self.client.list_objects(self.container, prefix=self.object)
+        #exclude the object
+        del derivatives[derivatives.index(self.object)]
+        for o in derivatives:
+            for token, account in OTHER_ACCOUNTS.items():
+                prefix = self.object if self.object.endswith('/') else self.object+'/'
+                cl = Pithos_Client(get_url(), token, account)
+                new_data = get_random_data()
+                if (account in authorized or any) and \
+                (type in self.dir_content_types) and \
+                o.startswith(prefix):
+                    # test write access
+                    self.assert_not_raises_fault(403, cl.update_object,
+                                                 self.container, o,
+                                                 StringIO(new_data),
+                                                 account=get_user())
+                    try:
+                        server_data = cl.retrieve_object(self.container, o, account=get_user())
+                        self.assertEqual(new_data, server_data[-len(new_data):])
+                    except Fault, f:
+                        self.failIf(f.status == 403)
+                else:
+                    self.assert_raises_fault(403, cl.update_object,
+                                                 self.container, o,
+                                                 StringIO(new_data),
+                                                 account=get_user())
     
     def test_group_read(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()])
+        self.client.share_object(self.container, self.object, ['%s:pithosdev' % get_user()])
         self.assert_read(authorized=self.authorized)
     
     def test_read_many(self):
-        #test read access
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', self.authorized)
+        self.client.share_object(self.container, self.object, self.authorized)
         self.assert_read(authorized=self.authorized)
     
     def test_read_by_everyone(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', ['*'])
+        self.client.share_object(self.container, self.object, ['*'])
         self.assert_read(any=True)
     
+    def test_read_directory(self):
+        for type in self.dir_content_types:
+            #change content type
+            self.client.move_object(self.container, self.object, self.container, self.object, content_type=type)
+            self.client.share_object(self.container, self.object, ['*'])
+            self.assert_read(any=True)
+            self.client.share_object(self.container, self.object, self.authorized)
+            self.assert_read(authorized=self.authorized)
+            self.client.share_object(self.container, self.object, ['%s:pithosdev' % get_user()])
+            self.assert_read(authorized=self.authorized)
+    
     def test_group_write(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', ['%s:pithosdev' % get_user()], read=False)
-        self.assert_write(o['data'], authorized=self.authorized)
+        self.client.share_object(self.container, self.object, ['%s:pithosdev' % get_user()], read=False)
+        self.assert_write(authorized=self.authorized)
     
     def test_write_many(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', self.authorized, read=False)
-        self.assert_write(o['data'], authorized=self.authorized)
+        self.client.share_object(self.container, self.object, self.authorized, read=False)
+        self.assert_write(authorized=self.authorized)
     
     def test_write_by_everyone(self):
-        self.client.create_container('c')
-        o = self.upload_random_data('c', 'o')
-        self.client.share_object('c', 'o', ['*'], read=False)
-        o_data = o['data']
-        self.assert_write(o['data'], any=True)
-
+        self.client.share_object(self.container, self.object, ['*'], read=False)
+        self.assert_write(any=True)
+    
+    def test_write_directory(self):
+        dir_content_types = ('application/directory', 'application/foler')
+        for type in dir_content_types:
+            #change content type
+            self.client.move_object(self.container, self.object, self.container, self.object, content_type='application/folder')
+            self.client.share_object(self.container, self.object, ['*'], read=False)
+            self.assert_write(any=True)
+            self.client.share_object(self.container, self.object, self.authorized, read=False)
+            self.assert_write(authorized=self.authorized)
+            self.client.share_object(self.container, self.object, ['%s:pithosdev' % get_user()], read=False)
+            self.assert_write(authorized=self.authorized)
+    
+    def test_shared_listing(self):
+        self.client.share_object(self.container, self.object, self.authorized)
+        
+        my_shared_containers = self.client.list_containers(shared=True)
+        self.assertEqual(['c'], my_shared_containers)
+        my_shared_objects = self.client.list_objects('c', shared=True)
+        self.assertEqual(['o'], my_shared_objects)
+        
+        dir_content_types = ('application/directory', 'application/foler')
+        for type in dir_content_types:
+            #change content type
+            self.client.move_object(self.container, self.object, self.container, self.object, content_type='application/folder')
+            my_shared_objects = self.client.list_objects('c', shared=True)
+            self.assertEqual(['o', 'o/', 'o/a'], my_shared_objects)
+        
+        for token, account in OTHER_ACCOUNTS.items():
+            if account in self.authorized:
+                self.other = Pithos_Client(get_url(), token, account)
+                self.assertTrue(get_user() in self.other.list_shared_by_others())
+    
 class TestPublish(BaseTestCase):
     def test_publish(self):
         self.client.create_container('c')
@@ -2007,10 +2068,51 @@ class TestPublish(BaseTestCase):
         meta = self.client.retrieve_object_metadata('c', 'o')
         self.assertTrue('x-object-public' in meta)
         url = meta['x-object-public']
-        public_client = Pithos_Client(get_server(), get_auth(), get_user(), api='')
-        data = public_client.get(url)[2]
+        
+        p = urlparse(get_url())
+        if p.scheme == 'http':
+            conn = HTTPConnection(p.netloc)
+        elif p.scheme == 'https':
+            conn = HTTPSConnection(p.netloc)
+        else:
+            raise Exception('Unknown URL scheme')
+        
+        conn.request('GET', url)
+        resp = conn.getresponse()
+        length = resp.getheader('content-length', None)
+        data = resp.read(length)
         self.assertEqual(o_data, data)
 
+class TestPolicies(BaseTestCase):
+    def test_none_versioning(self):
+        self.client.create_container('c', policies={'versioning':'none'})
+        o = self.upload_random_data('c', 'o')
+        meta = self.client.retrieve_object_metadata('c', 'o')
+        v = meta['x-object-version']
+        more_data = get_random_data()
+        self.client.update_object('c', 'o', StringIO(more_data))
+        vlist = self.client.retrieve_object_versionlist('c', 'o')
+        self.assert_raises_fault(404, self.client.retrieve_object_version,
+                                 'c', 'o', v)
+        data = self.client.retrieve_object('c', 'o')
+        end = len(o['data'])
+        self.assertEqual(data[:end], o['data'])
+        self.assertEqual(data[end:], more_data)
+    
+    def test_quota(self):
+        self.client.create_container('c', policies={'quota':'1'})
+        meta = self.client.retrieve_container_metadata('c')
+        self.assertEqual(meta['x-container-policy-quota'], '1')
+        self.assert_raises_fault(413, self.upload_random_data, 'c', 'o',
+                                 length=1024*1024+1)
+    
+    def test_quota_none(self):
+        self.client.create_container('c', policies={'quota':'0'})
+        meta = self.client.retrieve_container_metadata('c')
+        self.assertEqual(meta['x-container-policy-quota'], '0')
+        self.assert_not_raises_fault(413, self.upload_random_data, 'c', 'o',
+                                 length=1024*1024+1)
+
 class AssertUUidInvariant(object):
     def __init__(self, callable, *args, **kwargs):
         self.callable = callable
@@ -2079,7 +2181,7 @@ def compute_block_hash(data, algorithm):
 
 def get_random_data(length=500):
     char_set = string.ascii_uppercase + string.digits
-    return ''.join(random.choice(char_set) for x in range(length))
+    return ''.join(random.choice(char_set) for x in xrange(length))
 
 def is_date(date):
     MONTHS = 'jan feb mar apr may jun jul aug sep oct nov dec'.split()