+#coding=utf8
+
# Copyright 2011 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
DEFAULT_USER = 'test'
DEFAULT_AUTH = '0000'
+OTHER_ACCOUNTS = {
+ '0001': 'verigak',
+ '0002': 'chazapis',
+ '0003': 'gtsouk',
+ '0004': 'papagian',
+ '0005': 'louridas',
+ '0006': 'chstath',
+ '0007': 'pkanavos',
+ '0008': 'mvasilak'}
+
class BaseTestCase(unittest.TestCase):
#TODO unauthorized request
def setUp(self):
'name',
'count',
'bytes',
- 'last_modified'),
+ 'last_modified',
+ 'x_container_policy_quota',
+ 'x_container_policy_versioning',),
'object':(
'name',
'hash',
def tearDown(self):
for c in self.client.list_containers():
- for o in self.client.list_objects(c):
- self.client.delete_object(c, o)
+ while True:
+ #list objects returns at most 10000 objects
+ #so repeat until there are no more objects
+ objects = self.client.list_objects(c)
+ if not objects:
+ break
+ for o in objects:
+ self.client.delete_object(c, o)
self.client.delete_container(c)
def assert_status(self, status, codes):
l.append(codes)
self.assertTrue(status in l)
- #def assert_list(self, path, entity, limit=10000, format='text', params=None, **headers):
- # status, headers, data = self.client.get(path, format=format,
- # headers=headers, params=params)
- #
- # self.assert_status(status, [200, 204, 304, 412])
- # if format == 'text':
- # data = data.strip().split('\n') if data else []
- # self.assertTrue(len(data) <= limit)
- # else:
- # exp_content_type = self.contentTypes[format]
- # self.assertEqual(headers['content_type'].find(exp_content_type), 0)
- # #self.assert_extended(data, format, entity, limit)
- # if format == 'json':
- # data = json.loads(data) if data else []
- # elif format == 'xml':
- # data = minidom.parseString(data)
- # return status, headers, data
-
#def assert_headers(self, headers, type, **exp_meta):
# prefix = 'x-%s-meta-' %type
# system_headers = [h for h in headers if not h.startswith(prefix)]
# k = k.split(prefix)[-1]
# self.assertEqual(v, exp_meta[k])
- def assert_extended(self, data, format, type, size):
+ def assert_extended(self, data, format, type, size=10000):
if format == 'xml':
self._assert_xml(data, type, size)
elif format == 'json':
self.assertTrue(len(entities) <= size)
for e in entities:
for item in info:
- self.assertTrue(e.hasAttribute(item))
+ self.assertTrue(e.getElementsByTagName(item))
def assert_raises_fault(self, status, callableObj, *args, **kwargs):
"""
when callableObj is called with the specific arguments
"""
try:
- callableObj(*args, **kwargs)
+ r = callableObj(*args, **kwargs)
self.fail('Should never reach here')
except Fault, f:
self.failUnless(f.status == status)
args = {}
args['etag'] = etag if etag else obj['hash']
- guess = mimetypes.guess_type(name)
- type = type if type else guess[0]
- enc = enc if enc else guess[1]
+ try:
+ guess = mimetypes.guess_type(name)
+ type = type if type else guess[0]
+ enc = enc if enc else guess[1]
+ except:
+ pass
args['content_type'] = type if type else 'plain/text'
args['content_encoding'] = enc if enc else None
return obj
except IOError:
return
+class TopLevel(BaseTestCase):
+ def test_list_shared_by_other(self):
+ pass
class AccountHead(BaseTestCase):
def setUp(self):
meta = self.client.retrieve_account_metadata(restricted=True,
until='kshfksfh')
self.assertTrue('premium' in meta)
-
+
class AccountGet(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
l = 2
m = 'oranges'
xml = self.client.list_containers(limit=l, marker=m, format='xml')
- #self.assert_extended(xml, 'xml', 'container', l)
+ self.assert_extended(xml, 'xml', 'container', l)
nodes = xml.getElementsByTagName('name')
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0].childNodes[0].data, 'pears')
l.sort()
self.assertEqual(objects, l)
+ def test_list_objects_containing_slash(self):
+ self.client.create_container('test')
+ self.upload_random_data('test', '/objectname')
+
+ objects = self.client.list_objects('test')
+ self.assertEqual(objects, ['/objectname'])
+
+ objects = self.client.list_objects('test', format='json')
+ self.assertEqual(objects[0]['name'], '/objectname')
+
+ objects = self.client.list_objects('test', format='xml')
+ self.assert_extended(objects, 'xml', 'object')
+ node_name = objects.getElementsByTagName('name')[0]
+ self.assertEqual(node_name.firstChild.data, '/objectname')
+
+ #objects = self.client.list_objects('test', prefix='/')
+ #self.assertEqual(objects, ['/objectname'])
+ #
+ #objects = self.client.list_objects('test', path='/')
+ #self.assertEqual(objects, ['/objectname'])
+ #
+ #objects = self.client.list_objects('test', prefix='/', delimiter='n')
+ #self.assertEqual(objects, ['/object'])
+
def test_list_objects_with_limit_marker(self):
objects = self.client.list_objects(self.container[0], limit=2)
l = [elem['name'] for elem in self.obj[:8]]
end = len(l) >= end and end or len(l)
self.assertEqual(objects, l[start:end])
+ #takes too long
+ #def test_list_limit_exceeds(self):
+ # self.client.create_container('pithos')
+ #
+ # for i in range(10001):
+ # self.client.create_zero_length_object('pithos', i)
+ #
+ # self.assertEqual(10000, len(self.client.list_objects('pithos')))
+
+ def test_list_empty_params(self):
+ objects = self.client.get('/%s' % self.container[0])[2]
+ if objects:
+ objects = objects.strip().split('\n')
+ self.assertEqual(objects,
+ self.client.list_objects(self.container[0]))
+
def test_list_pseudo_hierarchical_folders(self):
objects = self.client.list_objects(self.container[1], prefix='photos',
delimiter='/')
def test_extended_list_xml(self):
xml = self.client.list_objects(self.container[1], format='xml', limit=4,
prefix='photos', delimiter='/')
+ self.assert_extended(xml, 'xml', 'object', size=4)
dirs = xml.getElementsByTagName('subdir')
self.assertEqual(len(dirs), 2)
self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/')
#assert content-type
self.assertEqual(h['content-type'], o['meta']['content_type'])
+ def test_maximum_upload_size_exceeds(self):
+ name = o_names[0]
+ meta = {'test':'test1'}
+ #upload 100MB
+ length=1024*1024*100
+ self.assert_raises_fault(400, self.upload_random_data, self.container,
+ name, length, **meta)
+
+ ##d = get_random_data(length=1024*1024*100)
+ ##self.client.create_object_using_chunks(self.container, name, StringIO(d))
+
+
def test_upload_with_name_containing_slash(self):
name = '/%s' % o_names[0]
meta = {'test':'test1'}
uploaded_data = self.client.retrieve_object(self.container, objname)
self.assertEqual(data, uploaded_data)
-
+
+ def test_manifestation(self):
+ prefix = 'myobject/'
+ data = ''
+ for i in range(5):
+ part = '%s%d' %(prefix, i)
+ o = self.upload_random_data(self.container, part)
+ data += o['data']
+
+ manifest = '%s/%s' %(self.container, prefix)
+ self.client.create_manifestation(self.container, 'large-object',
+ manifest)
+
+ self.assert_object_exists(self.container, 'large-object')
+ self.assertEqual(data, self.client.retrieve_object(self.container,
+ 'large-object'))
+
+ #wrong manifestation
+ self.client.create_manifestation(self.container, 'large-object',
+ 'invalid')
+
class ObjectCopy(BaseTestCase):
def setUp(self):
BaseTestCase.setUp(self)
self.client.create_container(c)
self.obj = self.upload_random_data(self.containers[0], o_names[0])
+ def tearDown(self):
+ pass
+
def test_copy(self):
with AssertMappingInvariant(self.client.retrieve_object_metadata,
self.containers[0], self.obj['name']):
self.assert_raises_fault(404, self.client.copy_object, self.containers[1],
self.obj['name'], self.containers[1],
'testcopy', meta)
-
class ObjectMove(BaseTestCase):
def setUp(self):
self.assert_raises_fault(404, self.client.delete_object, self.containers[1],
self.obj['name'])
+class ListSharing(BaseTestCase):
+ def setUp(self):
+ BaseTestCase.setUp(self)
+ self.client.create_container('c')
+ for i in range(2):
+ self.upload_random_data('c', 'o%s' %i)
+ accounts = OTHER_ACCOUNTS.copy()
+ self.o1_sharing_with = accounts.popitem()
+ self.o1_sharing = [self.o1_sharing_with[1]]
+ self.client.share_object('c', 'o1', self.o1_sharing, read=True)
+
+ l = []
+ for i in range(2):
+ l.append(accounts.popitem())
+ #self.client.set_account_groups({'pithos-dev':'chazapis,verigak,papagian'})
+ #self.o2_sharing = 'write=%s' %
+ #self.client.share_object('c', 'o2', self.o2_sharing)
+
+ def test_listing(self):
+ self.other = Pithos_Client(DEFAULT_HOST,
+ self.o1_sharing_with[0],
+ self.o1_sharing_with[1],
+ DEFAULT_API)
+ self.assertTrue('test' in self.other.list_shared_by_others())
+
+class TestGreek(BaseTestCase):
+ def tearDown(self):
+ pass
+
+ def test_create_container(self):
+ self.client.create_container('φάκελος')
+ self.assert_container_exists('φάκελος')
+
+ def test_create_object(self):
+ self.client.create_container('φάκελος')
+ self.upload_random_data('φάκελος', 'αντικείμενο')
+
+ self.assert_object_exists('φάκελος', 'αντικείμενο')
+
+ def test_copy_object(self):
+ self.client.create_container('φάκελος')
+ self.upload_random_data('φάκελος', 'αντικείμενο')
+
+ self.client.create_container('αντίγραφα')
+ self.client.copy_object('φάκελος', 'αντικείμενο', 'αντίγραφα',
+ 'αντικείμενο')
+
+ self.assert_object_exists('αντίγραφα', 'αντικείμενο')
+ self.assert_object_exists('φάκελος', 'αντικείμενο')
+
+ def test_move_object(self):
+ self.client.create_container('φάκελος')
+ self.upload_random_data('φάκελος', 'αντικείμενο')
+
+ self.client.create_container('αντίγραφα')
+ self.client.copy_object('φάκελος', 'αντικείμενο', 'αντίγραφα',
+ 'αντικείμενο')
+
+ self.assert_object_exists('αντίγραφα', 'αντικείμενο')
+ self.assert_object_not_exists('φάκελος', 'αντικείμενο')
+
+ def test_delete_object(self):
+ pass
+
+ def test_delete_container(self):
+ pass
+
+ def test_account_meta(self):
+ pass
+
+ def test_container_meta(self):
+ pass
+
+ def test_obejct_meta(self):
+ pass
+
+ def test_list_meta_filtering(self):
+ pass
+
+ def test_groups(self):
+ pass
+
+ def test_permissions(self):
+ pass
+
class AssertMappingInvariant(object):
def __init__(self, callable, *args, **kwargs):
self.callable = callable
import json
import types
import socket
+import urllib
import pithos.api.faults
ERROR_CODES = {304:'Not Modified',
self.token = token
def _req(self, method, path, body=None, headers={}, format='text',
- params={}):
- full_path = '/%s/%s%s?format=%s' % (self.api, self.account, path,
- format)
+ params={}, top_level_req=False):
+ path = urllib.quote(path)
+ account = '' if top_level_req else self.account
+ full_path = '/%s/%s%s?format=%s' % (self.api, account, path, format)
+
for k,v in params.items():
if v:
full_path = '%s&%s=%s' %(full_path, k, v)
kwargs['headers'].setdefault('content-type',
'application/octet-stream')
kwargs['headers'].setdefault('content-length', len(body) if body else 0)
- try:
- #print '*', method, full_path, kwargs
- conn.request(method, full_path, **kwargs)
- except socket.error, e:
- raise Fault(status=503)
+ kwargs['headers'] = _encode_headers(kwargs['headers'])
+ #print '#', method, full_path, kwargs
+ conn.request(method, full_path, **kwargs)
+
+ #try:
+ # conn.request(method, full_path, **kwargs)
+ #except socket.error, e:
+ # print '###', e[0], conn.auto_open
+ # raise Fault(status=503)
resp = conn.getresponse()
headers = dict(resp.getheaders())
def delete(self, path, format='text', params={}):
return self._req('DELETE', path, format=format, params=params)
- def get(self, path, format='text', headers=None, params={}):
+ def get(self, path, format='text', headers={}, params={},
+ top_level_req=False):
return self._req('GET', path, headers=headers, format=format,
- params=params)
+ params=params, top_level_req=top_level_req)
def head(self, path, format='text', params={}):
- return self._req('HEAD', path, format=format, params=params)
+ return self._req('HEAD', path, format=format, params=params)
def post(self, path, body=None, format='text', headers=None, params={}):
return self._req('POST', path, body, headers=headers, format=format,
def put(self, path, body=None, format='text', headers=None):
return self._req('PUT', path, body, headers=headers, format=format)
- def _list(self, path, format='text', params={}, **headers):
+ def _list(self, path, format='text', params={}, top_level_req=False,
+ **headers):
status, headers, data = self.get(path, format=format, headers=headers,
- params=params)
+ params=params,
+ top_level_req=top_level_req)
if format == 'json':
data = json.loads(data) if data else ''
elif format == 'xml':
# Storage Account Services
- def list_containers(self, format='text', limit=10000, marker=None, params={},
+ def list_containers(self, format='text', limit=None, marker=None, params={},
**headers):
"""lists containers"""
- if not params:
- params = {}
params.update({'limit':limit, 'marker':marker})
return self._list('', format, params, **headers)
def _filter_trashed(self, l):
return self._filter(l, {'trash':'true'})
- def list_objects(self, container, format='text', limit=10000, marker=None,
+ def list_objects(self, container, format='text', limit=None, marker=None,
prefix=None, delimiter=None, path=None,
include_trashed=False, params={}, **headers):
"""returns a list with the container objects"""
path = '/%s/%s' % (dst_container, dst_object)
headers = {} if not headers else headers
for k, v in meta.items():
- headers['x-object-meta-%s' % k] = v
+ headers['x-object-meta-%s' % k] = v
if remove:
headers['x-move-from'] = '/%s/%s' % (src_container, src_object)
else:
block = f.read(blocksize)
if block == '':
break
- data = '%s\r\n%s\r\n' % (hex(len(block)), block)
+ data = '%x\r\n%s\r\n' % (len(block), block)
try:
http.send(data)
except:
#retry
http.send(data)
- data = '0x0\r\n'
+ data = '0\r\n\r\n'
try:
http.send(data)
except:
if_unmodified_since=None, limit=1000, marker=None,
until=None):
"""returns a list with the account containers"""
- params = {'until':until} if until else None
+ params = {'until':until} if until else {}
headers = {'if-modified-since':if_modified_since,
'if-unmodified-since':if_unmodified_since}
return OOS_Client.list_containers(self, format=format, limit=limit,
def set_account_groups(self, **groups):
"""create account groups"""
headers = {}
- for key, val in groups.items():
- headers['x-account-group-%s' % key] = val
+ for k, v in groups.items():
+ headers['x-account-group-%s' % k] = v
params = {'update':None}
return self.post('', headers=headers, params=params)
def reset_account_groups(self, **groups):
"""overrides account groups"""
headers = {}
- for key, val in groups.items():
- headers['x-account-group-%s' % key] = val
+ for k, v in groups.items():
+ v = v.strip()
+ headers['x-account-group-%s' % k] = v
meta = self.retrieve_account_metadata()
headers.update(meta)
return self.post('', headers=headers)
# Storage Container Services
- def list_objects(self, container, format='text', limit=10000, marker=None,
+ def list_objects(self, container, format='text', limit=None, marker=None,
prefix=None, delimiter=None, path=None,
include_trashed=False, params={}, if_modified_since=None,
if_unmodified_since=None, meta={}, until=None):
headers.update({elem:eval(elem)})
for k,v in meta.items():
- headers['x-object-meta-%s' %k.strip()] = v.strip()
+ v = v.strip()
+ headers['x-object-meta-%s' %k.strip()] = v
return self._chunked_transfer(path, 'PUT', f, headers=headers,
blocksize=blocksize)
headers['content_range'] = 'bytes */*'
for k,v in meta.items():
- headers['x-object-meta-%s' %k.strip()] = v.strip()
+ v = v.strip()
+ headers['x-object-meta-%s' %k.strip()] = v
return self._chunked_transfer(path, 'POST', f, headers=headers,
blocksize=blocksize)
path = '/%s/%s' % (dst_container, dst_object)
headers = {} if not headers else headers
for k, v in meta.items():
- headers['x-object-meta-%s' % k] = v
+ headers['x-object-meta-%s' % k] = v
if remove:
headers['x-move-from'] = '/%s/%s' % (src_container, src_object)
else:
headers['x_object_version'] = version
return OOS_Client.move_object(self, src_container, src_object,
dst_container, dst_object, meta=meta,
- **headers)
\ No newline at end of file
+ **headers)
+
+ def list_shared_by_others(self, limit=None, marker=None, format='text'):
+ l = ['limit', 'marker']
+ params = {}
+ for elem in [elem for elem in l if eval(elem)]:
+ params[elem] = eval(elem)
+ return self._list('', format, params, top_level_req=True)
+
+ def share_object(self, container, object, l, read=True):
+ action = 'read' if read else 'write'
+ sharing = '%s=%s' % (action, ','.join(l))
+ self.update_object(container, object, f=None, x_object_sharing=sharing)
+
+def _encode_headers(headers):
+ h = {}
+ for k, v in headers.items():
+ k = urllib.quote(k)
+ if v and type(v) == types.StringType:
+ v = urllib.quote(v, '/=,-* :"')
+ h[k] = v
+ return h
\ No newline at end of file