_best_match = []
+def _arg2syntax(arg):
+ return arg.replace(
+ '____', '[:').replace(
+ '___', ':').replace(
+ '__', ']').replace(
+ '_', ' ')
+
+
def _construct_command_syntax(cls):
spec = getargspec(cls.main.im_func)
args = spec.args[1:]
n = len(args) - len(spec.defaults or ())
- required = ' '.join(
- '<%s>' % x.replace(
- '____', '[:').replace(
- '___', ':').replace(
- '__', ']').replace(
- '_', ' ') for x in args[:n])
- optional = ' '.join(
- '[%s]' % x.replace(
- '____', '[:').replace(
- '___', ':').replace(
- '__', ']').replace(
- '_', ' ') for x in args[n:])
+ required = ' '.join(['<%s>' % _arg2syntax(x) for x in args[:n]])
+ optional = ' '.join(['[%s]' % _arg2syntax(x) for x in args[n:]])
cls.syntax = ' '.join(x for x in [required, optional] if x)
if spec.varargs:
cls.syntax += ' <%s ...>' % spec.varargs
if pkg:
cmds = None
try:
- cmds = [
- cmd for cmd in getattr(pkg, '_commands')
- if arguments['config'].get(cmd.name, 'cli')
- ]
+ _cnf = arguments['config']
+ cmds = [cmd for cmd in getattr(pkg, '_commands') if _cnf.get(
+ cmd.name, 'cli')]
except AttributeError:
if _debug:
kloger.warning('No description for %s' % spec)
def _load_all_commands(cmd_tree, arguments):
- _config = arguments['config']
- for spec in [spec for spec in _config.get_groups()
- if _config.get(spec, 'cli')]:
+ _cnf = arguments['config']
+ specs = [spec for spec in _cnf.get_groups() if _cnf.get(spec, 'cli')]
+ for spec in specs:
try:
spec_module = _load_spec_module(spec, arguments, '_commands')
spec_commands = getattr(spec_module, '_commands')
if not sep:
raiseCLIError(
CLISyntaxError('Argument Syntax Error '),
- details=['%s is missing a "="',
- ' (usage: -o section.key=val)' % option]
- )
+ details=[
+ '%s is missing a "="',
+ ' (usage: -o section.key=val)' % option])
section, sep, key = keypath.partition('.')
if not sep:
key = section
try:
self._value = int(newvalue)
except ValueError:
- raiseCLIError(
- CLISyntaxError('IntArgument Error',
+ raiseCLIError(CLISyntaxError(
+ 'IntArgument Error',
details=['Value %s not an int' % newvalue]))
raiseCLIError(
None,
'Date Argument Error',
- details='%s not a valid date. correct formats:\n\t%s'\
- % (datestr, self.INPUT_FORMATS))
+ details='%s not a valid date. correct formats:\n\t%s' % (
+ datestr, self.INPUT_FORMATS))
class VersionArgument(FlagArgument):
try:
foo(self, *args, **kwargs)
except ClientError as ce:
+ ce_msg = ('%s' % ce).lower()
if ce.status == 401:
raiseCLIError(ce, 'Authorization failed', details=[
'Make sure a valid token is provided:',
'Check if service is up or set to url %s' % base_url,
' to get url: /config get %s' % base_url,
' to set url: /config set %s <URL>' % base_url])
- elif ce.status == 404\
- and 'kamakihttpresponse' in ('%s' % ce).lower():
+ elif ce.status == 404 and 'kamakihttpresponse' in ce_msg:
client = getattr(self, 'client', None)
if not client:
raise
except ClientError as ce:
if ce.status == 413:
msg = 'Cannot create another network',
- details = ['Maximum number of networks reached',
+ details = [
+ 'Maximum number of networks reached',
'* to get a list of networks: /network list',
'* to delete a network: /network delete <net id>']
raiseCLIError(ce, msg, details=details)
try:
foo(self, *args, **kwargs)
except ClientError as ce:
+ ce_msg = ('%s' % ce).lower()
if ce.status == 404 or (
- (ce.status == 400 and 'metadata' in ('%s' % ce).lower())):
+ ce.status == 400 and 'metadata' in ce_msg):
msg = 'No properties with key %s in this image' % key
raiseCLIError(ce, msg)
raise
class pithos(object):
container_howto = [
- "" 'To specify a container:',
+ 'To specify a container:',
' 1. Set store.container variable (permanent)',
' /config set store.container <container>',
' 2. --container=<container> (temporary, overrides 1)',
return foo(self, *args, **kwargs)
except ClientError as ce:
err_msg = ('%s' % ce).lower()
- if size and (ce.status == 416 or
- (ce.status == 400 and
- 'object length is smaller than range length' in err_msg)):
+ expected = 'object length is smaller than range length'
+ if size and (
+ ce.status == 416 or (
+ ce.status == 400 and expected in err_msg)):
raiseCLIError(ce, 'Remote object %s:%s <= %s %s' % (
- self.container,
- self.path,
- format_size(size),
- ('(%sB)' % size) if size >= 1024 else ''))
+ self.container, self.path, format_size(size),
+ ('(%sB)' % size) if size >= 1024 else ''))
raise
return _raise
try:
instance = cmd.get_class()(self.arguments)
instance.config = self.config
- prs = ArgumentParseManager(cmd.path.split(),
+ prs = ArgumentParseManager(
+ cmd.path.split(),
dict(instance.arguments))
prs.syntax = '%s %s' % (
cmd.path.replace('_', ' '),
@command(image_cmds)
class image_delete(_init_cyclades):
- """Delete an image (image file remains intact)"""
+ """Delete an image (WARNING: image file is also removed)"""
@errors.generic.all
@errors.cyclades.connection
try:
(key, val) = p.split('=')
except ValueError as err:
- raiseCLIError(err, 'Error in --sharing',
+ raiseCLIError(
+ err,
+ 'Error in --sharing',
details='Incorrect format',
importance=1)
if key.lower() not in ('read', 'write'):
return (dst[0], dst[1]) if len(dst) > 1 else (None, dst[0])
def extract_container_and_path(
- self,
- container_with_path,
- path_is_optional=True):
+ self,
+ container_with_path,
+ path_is_optional=True):
"""Contains all heuristics for deciding what should be used as
container or path. Options are:
* user string of the form container:path
def print_objects(self, object_list):
limit = int(self['limit']) if self['limit'] > 0 else len(object_list)
for index, obj in enumerate(object_list):
- if (self['exact_match'] and self.path and (
- obj['name'] != self.path) or 'content_type' not in obj):
- continue
+ if self['exact_match'] and self.path and not (
+ obj['name'] == self.path or 'content_type' in obj):
+ continue
pretty_obj = obj.copy()
index += 1
empty_space = ' ' * (len(str(len(object_list))) - len(str(index)))
if len(r.json) == 1:
obj = r.json[0]
return [(obj['name'], dst_path or obj['name'])]
- return [(
- obj['name'],
- '%s%s' % (
- dst_path,
- obj['name'][len(self.path) if self['replace'] else 0:])
- ) for obj in r.json]
+ start = len(self.path) if self['replace'] else 0
+ return [(obj['name'], '%s%s' % (
+ dst_path,
+ obj['name'][start:])) for obj in r.json]
@errors.generic.all
@errors.pithos.connection
self.container))
def main(
- self,
- source_container___path,
- destination_container___path=None):
+ self,
+ source_container___path,
+ destination_container___path=None):
super(self.__class__, self)._run(
source_container___path,
path_is_optional=False)
self.container))
def main(
- self,
- source_container___path,
- destination_container___path=None):
+ self,
+ source_container___path,
+ destination_container___path=None):
super(self.__class__, self)._run(
source_container___path,
path_is_optional=False)
default=False),
recursive=FlagArgument(
'Download a remote directory and all its contents',
- '-r, --resursive')
+ '-r, --recursive')
)
def _is_dir(self, remote_dict):
@errors.pithos.object_path
def _run(self):
if self.path:
- if self['yes'] or ask_user('Delete %s:%s ?' % (
- self.container,
- self.path)):
+ if self['yes'] or ask_user(
+ 'Delete %s:%s ?' % (self.container, self.path)):
self.client.del_object(
self.path,
until=self['until'],
else:
print('Aborted')
else:
- if self['resursive']:
+ if self['recursive']:
ask_msg = 'Delete container contents'
else:
ask_msg = 'Delete container'
for perms in permissions:
splstr = perms.split('=')
if 'read' == splstr[0]:
- read = [user_or_group.strip() \
- for user_or_group in splstr[1].split(',')]
+ read = [ug.strip() for ug in splstr[1].split(',')]
elif 'write' == splstr[0]:
- write = [user_or_group.strip() \
- for user_or_group in splstr[1].split(',')]
+ write = [ug.strip() for ug in splstr[1].split(',')]
else:
- read = False
- write = False
- if not (read or write):
- msg = 'Usage:\tread=<groups,users> write=<groups,users>'
- raiseCLIError(None, msg)
+ msg = 'Usage:\tread=<groups,users> write=<groups,users>'
+ raiseCLIError(None, msg)
return (read, write)
@errors.generic.all
@errors.pithos.connection
def _run(self):
accounts = self.client.get_sharing_accounts(marker=self['marker'])
- print_items(accounts if self['detail']
- else [acc['name'] for acc in accounts])
+ if self['detail']:
+ print_items(accounts)
+ else:
+ print_items([acc['name'] for acc in accounts])
def main(self):
super(self.__class__, self)._run()
@errors.pithos.object_path
def _run(self):
versions = self.client.get_object_versionlist(self.path)
- print_items([dict(
- id=vitem[0],
- created=strftime('%d-%m-%Y %H:%M:%S', localtime(float(vitem[1])))
- ) for vitem in versions])
+ print_items([dict(id=vitem[0], created=strftime(
+ '%d-%m-%Y %H:%M:%S',
+ localtime(float(vitem[1])))) for vitem in versions])
def main(self, container___path):
super(store_versions, self)._run(
{'id': '142', 'title': 'lalakis 2', 'content': self.d2}])
print('- - -')
print('\nTest print_items with id and enumeration:\n- - -')
- print_items([
+ print_items(
+ [
{'id': '42', 'title': 'lalakis 1', 'content': self.d1},
{'id': '142', 'title': 'lalakis 2', 'content': self.d2}],
with_enumeration=True)
print('- - -')
print('\nTest print_items with id, title and redundancy:\n- - -')
- print_items([
+ print_items(
+ [
{'id': '42', 'title': 'lalakis 1', 'content': self.d1},
{'id': '142', 'title': 'lalakis 2', 'content': self.d2}],
title=('id', 'title'),
class CLICmdSpecError(CLIError):
def __init__(
- self,
- message='Command Specification Error',
- details=[],
- importance=0):
+ self, message='Command Specification Error',
+ details=[], importance=0):
super(CLICmdSpecError, self).__init__(message, details, importance)
class CLICmdIncompleteError(CLICmdSpecError):
def __init__(
- self,
- message='Incomplete Command Error',
- details=[],
- importance=1):
+ self, message='Incomplete Command Error',
+ details=[], importance=1):
super(CLICmdSpecError, self).__init__(message, details, importance)
def get(self, match_terms=None, limit=0):
f = open(self.filepath, 'r')
- result = ['%s. \t%s' % (index + 1, line)
- for index, line in enumerate(f.readlines())
+ result = ['%s. \t%s' % (
+ i + 1, line) for i, line in enumerate(f.readlines())
if self._match(line, match_terms)]
offset = len(result) - limit if limit and len(result) > limit else 0
return result[offset:]
def print_dict(
- d,
- exclude=(),
- ident=0,
- with_enumeration=False,
- recursive_enumeration=False):
+ d, exclude=(), ident=0,
+ with_enumeration=False, recursive_enumeration=False):
"""
Pretty-print a dictionary object
raiseCLIError(TypeError('Cannot dict_print a non-dict object'))
if d:
- margin = max(len(unicode(key).strip())
- for key in d.keys() if key not in exclude)
+ margin = max(len(('%s' % key).strip()) for key in d.keys() if (
+ key not in exclude))
counter = 1
for key, val in sorted(d.items()):
def print_list(
- l,
- exclude=(),
- ident=0,
- with_enumeration=False,
- recursive_enumeration=False):
+ l, exclude=(), ident=0,
+ with_enumeration=False, recursive_enumeration=False):
"""
Pretty-print a list object
if l:
try:
- margin = max(len(unicode(item).strip()) for item in l
- if not (isinstance(item, dict)
- or isinstance(item, list)
- or item in exclude))
+ margin = max(len(('%s' % item).strip()) for item in l if not (
+ isinstance(item, dict) or
+ isinstance(item, list) or
+ item in exclude))
except ValueError:
margin = (2 + len(unicode(len(l)))) if enumerate else 1
def print_items(
- items,
- title=('id', 'name'),
- with_enumeration=False,
- with_redundancy=False,
- page_size=0):
+ items, title=('id', 'name'),
+ with_enumeration=False, with_redundancy=False,
+ page_size=0):
"""print dict or list items in a list, using some values as title
Objects of next level don't inherit enumeration (default: off) or titles
def _watch_thread_limit(self, threadlist):
recvlog.debug('# running threads: %s' % len(threadlist))
- if (
- self._elapsed_old > self._elapsed_new) and (
+ if (self._elapsed_old > self._elapsed_new) and (
self._thread_limit < self.POOL_SIZE):
- self._thread_limit += 1
+ self._thread_limit += 1
elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
self._thread_limit -= 1
self.http_client.headers.setdefault(name, value)
def request(
- self,
- method,
- path,
- async_headers={},
- async_params={},
- **kwargs):
+ self,
+ method,
+ path,
+ async_headers={},
+ async_params={},
+ **kwargs):
"""In threaded/asynchronous requests, headers and params are not safe
Therefore, the standard self.set_header/param system can be used only
for headers and params that are common for all requests. All other
E.g. in most queries the 'X-Auth-Token' header might be the same for
all, but the 'Range' header might be different from request to request.
"""
-
try:
success = kwargs.pop('success', 200)
for call_name, call_doc in canonifier.call_docs():
if hasattr(self, call_name):
- # don't crash: wrap the function instead
- #m = ( "Method '%s' defined both in natively "
- # "in callpoint '%s' and in api spec '%s'" %
- # (call_name,
- # type(self).__name__,
- # type(canonifier).__name__) )
#raise ValueError(m)
call_func = getattr(self, call_name)
if not callable(call_func):
- m = ("api spec '%s', method '%s' is not a "\
- "callable attribute in callpoint '%s'" %\
- (type(canonifier).__name__,
- call_name,
- type(self).__name))
- raise ValueError(m)
+ raise ValueError(' '.join([
+ "api spec '%s'," % type(canonifier).__name__,
+ "method '%s' is not a callable" % call_name,
+ "attribute in callpoint '%s'" % type(self).__name]))
original_calls[call_name] = call_func
padchar = '\n'
args = [a.tostring(
- depth=depth,
- showopts=showopts,
- multiline=multiline) for a in self.args]
+ depth=depth,
+ showopts=showopts,
+ multiline=multiline) for a in self.args]
args += [("%s=%s" % (k, v.tostring(
- depth=depth,
- showopts=showopts,
- multiline=multiline)))
- for k, v in self.kw.items()]
+ depth=depth,
+ showopts=showopts,
+ multiline=multiline))) for k, v in self.kw.items()]
if showopts:
args += [("%s=%s" % (k, str(v))) for k, v in self.opts.items()]
self.pat = pat
if 'choices' in opts:
- opts['choices'] = dict((unicode(x), unicode(x))
- for x in opts['choices'])
+ opts['choices'] = dict((
+ unicode(x),
+ unicode(x)) for x in opts['choices'])
def _check(self, item):
if not isinstance(item, unicode):
if matcher is not None:
match = matcher.match(item)
if ((not match) or (match.start(), match.end()) != (0, itemlen)):
-
- m = ("%s: '%s' does not match '%s'"
- % (self, shorts(item), self.pat))
+ m = ("%s: '%s' does not match '%s'" % (
+ self,
+ shorts(item),
+ self.pat))
raise CanonifyException(m)
return item
self.pat = pat
if 'choices' in opts:
- opts['choices'] = dict((str(x), str(x))
- for x in opts['choices'])
+ opts['choices'] = dict((str(x), str(x)) for x in opts['choices'])
def _check(self, item):
if isinstance(item, unicode):
if matcher is not None:
match = matcher.match(item)
if ((not match) or (match.start(), match.end()) != (0, itemlen)):
- m = ("%s: '%s' does not match '%s'"
- % (self, shorts(item), self.pat))
+ m = ("%s: '%s' does not match '%s'" % (
+ self,
+ shorts(item),
+ self.pat))
raise CanonifyException(m)
return item
arglen = len(arglist)
if arglen != len(keys):
m = "inconsistent number of parameters: %s != %s" % (
- arglen, len(keys))
+ arglen,
+ len(keys))
raise CanonifyException(m)
position = 0
args = zip(args, defaults)
for a, c in args:
if not isinstance(c, Canonical):
- m = ("argument '%s=%s' is not an instance of 'Canonical'"
- % (a, repr(c)))
+ m = ("argument '%s=%s' not an instance of 'Canonical'" % (
+ a,
+ repr(c)))
raise SpecifyException(m)
canonical = Null() if len(args) == 0 else Args(*args)
self = object.__new__(cls)
canonical = f(self)
if not isinstance(canonical, Canonical):
- m = ("method '%s' does not return a Canonical, but a(n) %s "
- % (name, type(canonical)))
- raise SpecifyException(m)
+ raise SpecifyException(', '.join([
+ "method %s does not return a Canonical" % name,
+ "but a (n) %s" % type(canonical)]))
canonical_outputs[name] = canonical
return Canonifier(cls.__name__, canonical_inputs, canonical_outputs,
return repr(self.args) + '+' + repr(self.kw)
def __getitem__(self, key):
- if (isinstance(key, int)
- or isinstance(key, long)
- or isinstance(key, slice)):
+ if (isinstance(key, int)) or (
+ isinstance(key, long)) or (
+ isinstance(key, slice)):
return self.args[key]
else:
return self.kw[key]
def __setitem__(self, key, value):
- if (isinstance(key, int)
- or isinstance(key, long)
- or isinstance(key, slice)):
+ if (isinstance(key, int)) or (
+ isinstance(key, long)) or (
+ isinstance(key, slice)):
self.args[key] = value
else:
self.kw[key] = value
def __delitem__(self, key):
- if (isinstance(key, int)
- or isinstance(key, long)
- or isinstance(key, slice)):
+ if (isinstance(key, int)) or (
+ isinstance(key, long)) or (
+ isinstance(key, slice)):
del self.args[key]
else:
del self.kw[key]
return self.delete(path, success=success, **kwargs)
def servers_post(
- self,
- server_id='',
- command='',
- json_data=None,
- success=202,
- **kwargs):
+ self,
+ server_id='',
+ command='',
+ json_data=None,
+ success=202,
+ **kwargs):
"""POST base_url/servers[/server_id]/[command] request
:param server_id: integer (as int or str)
return self.post(path, data=data, success=success, **kwargs)
def servers_put(
- self,
- server_id='',
- command='',
- json_data=None,
- success=204,
- **kwargs):
+ self,
+ server_id='',
+ command='',
+ json_data=None,
+ success=204,
+ **kwargs):
"""PUT base_url/servers[/server_id]/[command] request
:param server_id: integer (as int or str)
return self.delete(path, success=success, **kwargs)
def images_post(
- self,
- image_id='',
- command='',
- json_data=None,
- success=201,
- **kwargs):
+ self,
+ image_id='',
+ command='',
+ json_data=None,
+ success=201,
+ **kwargs):
"""POST base_url/images[/image_id]/[command] request
:param image_id: string
return self.post(path, data=data, success=success, **kwargs)
def images_put(
- self,
- image_id='',
- command='',
- json_data=None,
- success=201,
- **kwargs):
+ self,
+ image_id='',
+ command='',
+ json_data=None,
+ success=201,
+ **kwargs):
"""PUT base_url/images[/image_id]/[command] request
:param image_id: string
def set_method(self, method):
self.method = method
- def perform_request(self,
- method=None,
- url=None,
- async_headers={},
- async_params={},
- data=None):
+ def perform_request(
+ self,
+ method=None, url=None, async_headers={}, async_params={}, data=None):
raise NotImplementedError
self.path += '?%s' % parsed.query if parsed.query else ''
return (parsed.scheme, parsed.netloc)
- def perform_request(self,
- method=None,
- data=None,
- async_headers={},
- async_params={}):
+ def perform_request(
+ self,
+ method=None, data=None, async_headers={}, async_params={}):
"""
:param method: (str) http method ('get', 'post', etc.)
errno=-1)
try:
#Be carefull, all non-body variables should not be unicode
- conn.request(method=str(method.upper()),
+ conn.request(
+ method=str(method.upper()),
url=str(self.path),
headers=http_headers,
body=data)
try:
return r['attachments']['values'][0]['firewallProfile']
except KeyError:
- raise ClientError('No Firewall Profile', 520,
+ raise ClientError(
+ 'No Firewall Profile', 520,
details='Server %s is missing a firewall profile' % server_id)
def set_firewall_profile(self, server_id, profile):
r = self.networks_get(network_id=network_id)
return r.json['network']['attachments']['values']
- def create_network(self,
- name, cidr=None, gateway=None, type=None, dhcp=None):
+ def create_network(
+ self, name,
+ cidr=None, gateway=None, type=None, dhcp=None):
"""
:param name: (str)
except ClientError as err:
if err.status == 421:
err.details = [
- 'Network may be still connected to at least one server']
+ 'Network may be still connected to at least one server']
raise err
r.release()
:param nic_id: (str)
"""
server_nets = self.list_server_nics(server_id)
- nets = [(net['id'], net['network_id']) for net in server_nets\
- if nic_id == net['id']]
num_of_disconnections = 0
- for (nic_id, network_id) in nets:
+ for (nic_id, network_id) in [(
+ net['id'],
+ net['network_id']) for net in server_nets if nic_id == net['id']]:
req = {'remove': {'attachment': unicode(nic_id)}}
r = self.networks_post(network_id, 'action', json_data=req)
r.release()
r.release()
def wait_server(
- self,
- server_id,
- current_status='BUILD',
- delay=0.5,
- max_wait=128,
- wait_cb=None):
+ self,
+ server_id,
+ current_status='BUILD',
+ delay=0.5,
+ max_wait=128,
+ wait_cb=None):
"""Wait for server while its status is current_status
:param server_id: integer (str or int)
"""GRNet Cyclades REST API Client"""
def servers_get(
- self,
- server_id='',
- command='',
- success=200,
- changes_since=None,
- **kwargs):
+ self,
+ server_id='',
+ command='',
+ success=200,
+ changes_since=None,
+ **kwargs):
"""GET base_url/servers[/server_id][/command] request
:param server_id: integer (as int or str)
return self.get(path, success=success, **kwargs)
def networks_get(
- self,
- network_id='',
- command='',
- success=(200, 203),
- **kwargs):
+ self,
+ network_id='',
+ command='',
+ success=(200, 203),
+ **kwargs):
"""GET base_url/networks[/network_id][/command] request
:param network_id: integer (str or int)
return self.get(path, success=success, **kwargs)
def networks_delete(
- self,
- network_id='',
- command='',
- success=204,
- **kwargs):
+ self,
+ network_id='',
+ command='',
+ success=204,
+ **kwargs):
"""DEL ETE base_url/networks[/network_id][/command] request
:param network_id: integer (str or int)
return self.delete(path, success=success, **kwargs)
def networks_post(
- self,
- network_id='',
- command='',
- json_data=None,
- success=202,
- **kwargs):
+ self,
+ network_id='',
+ command='',
+ json_data=None,
+ success=202,
+ **kwargs):
"""POST base_url/servers[/server_id]/[command] request
:param network_id: integer (str or int)
return self.post(path, data=data, success=success, **kwargs)
def networks_put(
- self,
- network_id='',
- command='',
- json_data=None,
- success=204,
- **kwargs):
+ self,
+ network_id='',
+ command='',
+ json_data=None,
+ success=204,
+ **kwargs):
"""PUT base_url/servers[/server_id]/[command] request
:param network_id: integer (str or int)
r = self.container_delete(until=unicode(time()))
r.release()
- def upload_object_unchunked(self, obj, f,
- withHashFile=False,
- size=None,
- etag=None,
- content_encoding=None,
- content_disposition=None,
- content_type=None,
- sharing=None,
- public=None):
+ def upload_object_unchunked(
+ self, obj, f,
+ withHashFile=False,
+ size=None,
+ etag=None,
+ content_encoding=None,
+ content_disposition=None,
+ content_type=None,
+ sharing=None,
+ public=None):
"""
:param obj: (str) remote object path
import json
data = json.dumps(json.loads(data))
except ValueError:
- raise ClientError(message='"%s" is not json-formated' % f.name,
- status=1)
+ raise ClientError('"%s" is not json-formated' % f.name, 1)
except SyntaxError:
- raise ClientError(message='"%s" is not a valid hashmap file'\
- % f.name, status=1)
+ msg = '"%s" is not a valid hashmap file' % f.name
+ raise ClientError(msg, 1)
f = StringIO(data)
data = f.read(size) if size is not None else f.read()
- r = self.object_put(obj,
+ r = self.object_put(
+ obj,
data=data,
etag=etag,
content_encoding=content_encoding,
success=201)
r.release()
- def create_object_by_manifestation(self, obj,
- etag=None,
- content_encoding=None,
- content_disposition=None,
- content_type=None,
- sharing=None,
- public=None):
+ def create_object_by_manifestation(
+ self, obj,
+ etag=None,
+ content_encoding=None,
+ content_disposition=None,
+ content_type=None,
+ sharing=None,
+ public=None):
"""
:param obj: (str) remote object path
:param public: (bool)
"""
self._assert_container()
- r = self.object_put(obj,
+ r = self.object_put(
+ obj,
content_length=0,
etag=etag,
content_encoding=content_encoding,
from random import randint
if not randint(0, 7):
raise ClientError('BAD GATEWAY STUFF', 503)
- r = self.container_post(update=True,
+ r = self.container_post(
+ update=True,
content_type='application/octet-stream',
content_length=len(data),
data=data,
nblocks = 1 + (size - 1) // blocksize
return (blocksize, blockhash, size, nblocks)
- def _get_missing_hashes(self, obj, json,
- size=None,
- format='json',
- hashmap=True,
- content_type=None,
- etag=None,
- content_encoding=None,
- content_disposition=None,
- permissions=None,
- public=None,
- success=(201, 409)):
- r = self.object_put(obj,
+ def _get_missing_hashes(
+ self, obj, json,
+ size=None,
+ format='json',
+ hashmap=True,
+ content_type=None,
+ etag=None,
+ content_encoding=None,
+ content_disposition=None,
+ permissions=None,
+ public=None,
+ success=(201, 409)):
+ r = self.object_put(
+ obj,
format='json',
hashmap=True,
content_type=content_type,
return None
return r.json
- def _caclulate_uploaded_blocks(self,
- blocksize,
- blockhash,
- size,
- nblocks,
- hashes,
- hmap,
- fileobj,
+ def _caclulate_uploaded_blocks(
+ self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
hash_cb=None):
offset = 0
if hash_cb:
if hash_cb:
hash_gen.next()
if offset != size:
- assert offset == size, \
- "Failed to calculate uploaded blocks: " \
- "Offset and object size do not match"
+ msg = 'Failed to calculate uploaded blocks:'
+ msg += ' Offset and object size do not match'
+ assert offset == size, msg
def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
"""upload missing blocks asynchronously"""
for thread in set(flying).difference(unfinished):
if thread.exception:
failures.append(thread)
- if isinstance(thread.exception, ClientError)\
- and thread.exception.status == 502:
- self.POOLSIZE = self._thread_limit
+ if isinstance(
+ thread.exception,
+ ClientError) and thread.exception.status == 502:
+ self.POOLSIZE = self._thread_limit
elif thread.isAlive():
flying.append(thread)
elif upload_gen:
return [failure.kwargs['hash'] for failure in failures]
- def upload_object(self, obj, f,
- size=None,
- hash_cb=None,
- upload_cb=None,
- etag=None,
- content_encoding=None,
- content_disposition=None,
- content_type=None,
- sharing=None,
- public=None):
+ def upload_object(
+ self, obj, f,
+ size=None,
+ hash_cb=None,
+ upload_cb=None,
+ etag=None,
+ content_encoding=None,
+ content_disposition=None,
+ content_type=None,
+ sharing=None,
+ public=None):
"""Upload an object using multiple connections (threads)
:param obj: (str) remote object path
if content_type is None:
content_type = 'application/octet-stream'
- self._caclulate_uploaded_blocks(*block_info,
+ self._caclulate_uploaded_blocks(
+ *block_info,
hashes=hashes,
hmap=hmap,
fileobj=f,
hash_cb=hash_cb)
hashmap = dict(bytes=size, hashes=hashes)
- missing = self._get_missing_hashes(obj, hashmap,
+ missing = self._get_missing_hashes(
+ obj, hashmap,
content_type=content_type,
size=size,
etag=etag,
map_dict[h] = i
return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
- def _dump_blocks_sync(self,
- obj,
- remote_hashes,
- blocksize,
- total_size,
- dst,
- range,
- **restargs):
+ def _dump_blocks_sync(
+ self, obj, remote_hashes, blocksize, total_size, dst, range,
+ **args):
for blockid, blockhash in enumerate(remote_hashes):
- if blockhash == None:
- continue
- start = blocksize * blockid
- end = total_size - 1 if start + blocksize > total_size\
- else start + blocksize - 1
- (start, end) = _range_up(start, end, range)
- restargs['data_range'] = 'bytes=%s-%s' % (start, end)
- r = self.object_get(obj, success=(200, 206), **restargs)
- self._cb_next()
- dst.write(r.content)
- dst.flush()
+ if blockhash:
+ start = blocksize * blockid
+ is_last = start + blocksize > total_size
+ end = (total_size - 1) if is_last else (start + blocksize - 1)
+ (start, end) = _range_up(start, end, range)
+ args['data_range'] = 'bytes=%s-%s' % (start, end)
+ r = self.object_get(obj, success=(200, 206), **args)
+ self._cb_next()
+ dst.write(r.content)
+ dst.flush()
- def _get_block_async(self, obj, **restargs):
- event = SilentEvent(self.object_get,
- obj,
- success=(200, 206),
- **restargs)
+ def _get_block_async(self, obj, **args):
+ event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
event.start()
return event
h.update(block.strip('\x00'))
return hexlify(h.digest())
- def _thread2file(self,
- flying,
- local_file,
- offset=0,
- **restargs):
+ def _thread2file(self, flying, local_file, offset=0, **restargs):
"""write the results of a greenleted rest call to a file
@offset: the offset of the file up to blocksize
- e.g. if the range is 10-100, all
local_file.flush()
return finished
- def _dump_blocks_async(self,
- obj,
- remote_hashes,
- blocksize,
- total_size,
- local_file,
- blockhash=None,
- resume=False,
- filerange=None,
- **restargs):
-
+ def _dump_blocks_async(
+ self, obj, remote_hashes, blocksize, total_size, local_file,
+ blockhash=None, resume=False, filerange=None, **restargs):
file_size = fstat(local_file.fileno()).st_size if resume else 0
flying = {}
finished = []
self._init_thread_limit()
for block_hash, blockid in remote_hashes.items():
start = blocksize * blockid
- if start < file_size\
- and block_hash == self._hash_from_file(
- local_file,
- start,
- blocksize,
- blockhash):
+ if start < file_size and block_hash == self._hash_from_file(
+ local_file, start, blocksize, blockhash):
self._cb_next()
continue
self._watch_thread_limit(flying.values())
thread.join()
finished += self._thread2file(flying, local_file, offset, **restargs)
- def download_object(self,
- obj,
- dst,
- download_cb=None,
- version=None,
- resume=False,
- range=None,
- if_match=None,
- if_none_match=None,
- if_modified_since=None,
- if_unmodified_since=None):
+ def download_object(
+ self, obj, dst,
+ download_cb=None,
+ version=None,
+ resume=False,
+ range_str=None,
+ if_match=None,
+ if_none_match=None,
+ if_modified_since=None,
+ if_unmodified_since=None):
"""Download an object using multiple connections (threads) and
writing to random parts of the file
:param resume: (bool) if set, preserve already downloaded file parts
- :param range: (str) from-to where from and to are integers denoting
- file positions in bytes
+ :param range_str: (str) from-to where from and to are integers
+ denoting file positions in bytes
:param if_match: (str)
:param if_unmodified_since: (str) formated date
"""
- restargs = dict(version=version,
- data_range=None if range is None else 'bytes=%s' % range,
+ restargs = dict(
+ version=version,
+ data_range=None if range_str is None else 'bytes=%s' % range_str,
if_match=if_match,
if_none_match=if_none_match,
if_modified_since=if_modified_since,
if_unmodified_since=if_unmodified_since)
- (blocksize,
+ (
+ blocksize,
blockhash,
total_size,
hash_list,
self._cb_next()
if dst.isatty():
- self._dump_blocks_sync(obj,
+ self._dump_blocks_sync(
+ obj,
hash_list,
blocksize,
total_size,
dst,
- range,
+ range_str,
**restargs)
else:
- self._dump_blocks_async(obj,
+ self._dump_blocks_async(
+ obj,
remote_hashes,
blocksize,
total_size,
dst,
blockhash,
resume,
- range,
+ range_str,
**restargs)
- if range is None:
+ if not range_str:
dst.truncate(total_size)
self._complete_cb()
except:
break
- def get_object_hashmap(self, obj,
- version=None,
- if_match=None,
- if_none_match=None,
- if_modified_since=None,
- if_unmodified_since=None,
- data_range=None):
+ def get_object_hashmap(
+ self, obj,
+ version=None,
+ if_match=None,
+ if_none_match=None,
+ if_modified_since=None,
+ if_unmodified_since=None,
+ data_range=None):
"""
:param obj: (str) remote object path
:returns: (list)
"""
try:
- r = self.object_get(obj,
+ r = self.object_get(
+ obj,
hashmap=True,
version=version,
if_etag_match=if_match,
"""
:returns: (dict)
"""
- return filter_in(self.get_account_info(),
+ return filter_in(
+ self.get_account_info(),
'X-Account-Policy-Quota',
exactMatch=True)
"""
:returns: (dict)
"""
- return filter_in(self.get_account_info(),
+ return filter_in(
+ self.get_account_info(),
'X-Account-Policy-Versioning',
exactMatch=True)
success=(204, 404, 409))
r.release()
if r.status_code == 404:
- raise ClientError('Container "%s" does not exist' % self.container,
+ raise ClientError(
+ 'Container "%s" does not exist' % self.container,
r.status_code)
elif r.status_code == 409:
- raise ClientError('Container "%s" is not empty' % self.container,
+ raise ClientError(
+ 'Container "%s" is not empty' % self.container,
r.status_code)
def get_container_versioning(self, container):
:returns: (dict)
"""
self.container = container
- return filter_in(self.get_container_info(),
+ return filter_in(
+ self.get_container_info(),
'X-Container-Policy-Versioning')
def get_container_quota(self, container):
:returns: (dict)
"""
- return filter_in(self.get_container_info(until=until),
+ return filter_in(
+ self.get_container_info(until=until),
'X-Container-Meta')
def get_container_object_meta(self, until=None):
:returns: (dict)
"""
- return filter_in(self.get_container_info(until=until),
+ return filter_in(
+ self.get_container_info(until=until),
'X-Container-Object-Meta')
def set_container_meta(self, metapairs):
info = self.get_object_info(obj)
pref, sep, rest = self.base_url.partition('//')
base = rest.split('/')[0]
- newurl = path4url('%s%s%s' % (pref, sep, base),
+ newurl = path4url(
+ '%s%s%s' % (pref, sep, base),
info['x-object-public'])
return newurl[1:]
:returns: (dict)
"""
- return filter_in(self.get_object_info(obj, version=version),
+ return filter_in(
+ self.get_object_info(obj, version=version),
'X-Object-Meta')
def get_object_sharing(self, obj):
:returns: (dict)
"""
- r = filter_in(self.get_object_info(obj),
+ r = filter_in(
+ self.get_object_info(obj),
'X-Object-Sharing',
exactMatch=True)
reply = {}
reply[key] = val
return reply
- def set_object_sharing(self, obj,
- read_permition=False,
- write_permition=False):
+ def set_object_sharing(
+ self, obj,
+ read_permition=False, write_permition=False):
"""Give read/write permisions to an object.
:param obj: (str) remote object path
permissions will be removed
"""
- perms = dict(read='' if not read_permition else read_permition,
+ perms = dict(
+ read='' if not read_permition else read_permition,
write='' if not write_permition else write_permition)
r = self.object_post(obj, update=True, permissions=perms)
r.release()
for i in range(nblocks):
block = source_file.read(min(blocksize, filesize - offset))
offset += len(block)
- r = self.object_post(obj,
+ r = self.object_post(
+ obj,
update=True,
content_range='bytes */*',
content_type='application/octet-stream',
:param upto_bytes: max number of bytes to leave on file
"""
- r = self.object_post(obj,
+ r = self.object_post(
+ obj,
update=True,
content_range='bytes 0-%s/*' % upto_bytes,
content_type='application/octet-stream',
source_object=path4url(self.container, obj))
r.release()
- def overwrite_object(self,
- obj,
- start,
- end,
- source_file,
+ def overwrite_object(
+ self, obj, start, end, source_file,
upload_cb=None):
"""Overwrite a part of an object from local source file
upload_gen = upload_cb(nblocks)
upload_gen.next()
for i in range(nblocks):
- block = source_file.read(min(blocksize,
- filesize - offset,
- datasize - offset))
- r = self.object_post(obj,
+ read_size = min(blocksize, filesize - offset, datasize - offset)
+ block = source_file.read(read_size)
+ r = self.object_post(
+ obj,
update=True,
content_type='application/octet-stream',
content_length=len(block),
if upload_cb:
upload_gen.next()
- def copy_object(self, src_container, src_object, dst_container,
+ def copy_object(
+ self, src_container, src_object, dst_container,
dst_object=False,
source_version=None,
public=False,
self.container = dst_container
dst_object = dst_object or src_object
src_path = path4url(src_container, src_object)
- r = self.object_put(dst_object,
+ r = self.object_put(
+ dst_object,
success=201,
copy_from=src_path,
content_length=0,
delimiter=delimiter)
r.release()
- def move_object(self, src_container, src_object, dst_container,
- dst_object=False,
- source_version=None,
- public=False,
- content_type=None,
- delimiter=None):
+ def move_object(
+ self, src_container, src_object, dst_container,
+ dst_object=False,
+ source_version=None,
+ public=False,
+ content_type=None,
+ delimiter=None):
"""
:param src_container: (str) source container
self.container = dst_container
dst_object = dst_object or src_object
src_path = path4url(src_container, src_object)
- r = self.object_put(dst_object,
+ r = self.object_put(
+ dst_object,
success=201,
move_from=src_path,
content_length=0,
class PithosRestAPI(StorageClient):
- def account_head(self,
- until=None,
- if_modified_since=None,
- if_unmodified_since=None,
- *args,
- **kwargs):
+ def account_head(
+ self,
+ until=None,
+ if_modified_since=None,
+ if_unmodified_since=None,
+ *args, **kwargs):
""" Full Pithos+ HEAD at account level
--- request parameters ---
self._assert_account()
path = path4url(self.account)
- self.set_param('until', until, iff=until is not None)
+ self.set_param('until', until, iff=until)
self.set_header('If-Modified-Since', if_modified_since)
self.set_header('If-Unmodified-Since', if_unmodified_since)
success = kwargs.pop('success', 204)
return self.head(path, *args, success=success, **kwargs)
- def account_get(self,
- limit=None,
- marker=None,
- format='json',
- show_only_shared=False,
- until=None,
- if_modified_since=None,
- if_unmodified_since=None,
- *args,
- **kwargs):
+ def account_get(
+ self,
+ limit=None,
+ marker=None,
+ format='json',
+ show_only_shared=False,
+ until=None,
+ if_modified_since=None,
+ if_unmodified_since=None,
+ *args, **kwargs):
""" Full Pithos+ GET at account level
--- request parameters ---
self._assert_account()
- self.set_param('format', format, iff=format is not None)
- self.set_param('limit', limit, iff=limit is not None)
- self.set_param('marker', marker, iff=marker is not None)
+ self.set_param('format', format, iff=format)
+ self.set_param('limit', limit, iff=limit)
+ self.set_param('marker', marker, iff=marker)
self.set_param('shared', iff=show_only_shared)
- self.set_param('until', until, iff=until is not None)
+ self.set_param('until', until, iff=until)
self.set_header('If-Modified-Since', if_modified_since)
self.set_header('If-Unmodified-Since', if_unmodified_since)
success = kwargs.pop('success', (200, 204))
return self.get(path, *args, success=success, **kwargs)
- def account_post(self,
- update=True,
- groups={},
- metadata=None,
- quota=None,
- versioning=None,
- *args,
- **kwargs):
+ def account_post(
+ self,
+ update=True,
+ groups={},
+ metadata=None,
+ quota=None,
+ versioning=None,
+ *args, **kwargs):
""" Full Pithos+ POST at account level
--- request parameters ---
userstr = userstr + dlm + user
dlm = ','
self.set_header('X-Account-Group-' + group, userstr)
- if metadata is not None:
+ if metadata:
for metaname, metaval in metadata.items():
self.set_header('X-Account-Meta-' + metaname, metaval)
self.set_header('X-Account-Policy-Quota', quota)
success = kwargs.pop('success', 202)
return self.post(path, *args, success=success, **kwargs)
- def container_head(self, until=None,
- if_modified_since=None, if_unmodified_since=None, *args, **kwargs):
+ def container_head(
+ self,
+ until=None,
+ if_modified_since=None,
+ if_unmodified_since=None,
+ *args, **kwargs):
""" Full Pithos+ HEAD at container level
--- request params ---
self._assert_container()
- self.set_param('until', until, iff=until is not None)
+ self.set_param('until', until, iff=until)
self.set_header('If-Modified-Since', if_modified_since)
self.set_header('If-Unmodified-Since', if_unmodified_since)
success = kwargs.pop('success', 204)
return self.head(path, *args, success=success, **kwargs)
- def container_get(self,
- limit=None,
- marker=None,
- prefix=None,
- delimiter=None,
- path=None,
- format='json',
- meta=[],
- show_only_shared=False,
- until=None,
- if_modified_since=None,
- if_unmodified_since=None,
- *args,
- **kwargs):
+ def container_get(
+ self,
+ limit=None,
+ marker=None,
+ prefix=None,
+ delimiter=None,
+ path=None,
+ format='json',
+ meta=[],
+ show_only_shared=False,
+ until=None,
+ if_modified_since=None,
+ if_unmodified_since=None,
+ *args, **kwargs):
""" Full Pithos+ GET at container level
--- request parameters ---
self._assert_container()
- self.set_param('format', format, iff=format is not None)
- self.set_param('limit', limit, iff=limit is not None)
- self.set_param('marker', marker, iff=marker is not None)
- if path is None:
- self.set_param('prefix', prefix, iff=prefix is not None)
- self.set_param('delimiter', delimiter, iff=delimiter is not None)
+ self.set_param('format', format, iff=format)
+ self.set_param('limit', limit, iff=limit)
+ self.set_param('marker', marker, iff=marker)
+ if not path:
+ self.set_param('prefix', prefix, iff=prefix)
+ self.set_param('delimiter', delimiter, iff=delimiter)
else:
self.set_param('path', path)
self.set_param('shared', iff=show_only_shared)
- self.set_param('meta',
- list2str(meta),
- iff=meta is not None and len(meta) > 0)
- self.set_param('until', until, iff=until is not None)
+ self.set_param('meta', list2str(meta), iff=meta)
+ self.set_param('until', until, iff=until)
self.set_header('If-Modified-Since', if_modified_since)
self.set_header('If-Unmodified-Since', if_unmodified_since)
success = kwargs.pop('success', 200)
return self.get(path, *args, success=success, **kwargs)
- def container_put(self,
- quota=None,
- versioning=None,
- metadata=None,
- *args,
- **kwargs):
+ def container_put(
+ self,
+ quota=None, versioning=None, metadata=None,
+ *args, **kwargs):
""" Full Pithos+ PUT at container level
--- request headers ---
"""
self._assert_container()
- if metadata is not None:
+ if metadata:
for metaname, metaval in metadata.items():
self.set_header('X-Container-Meta-' + metaname, metaval)
self.set_header('X-Container-Policy-Quota', quota)
success = kwargs.pop('success', (201, 202))
return self.put(path, *args, success=success, **kwargs)
- def container_post(self,
- update=True,
- format='json',
- quota=None,
- versioning=None,
- metadata=None,
- content_type=None,
- content_length=None,
- transfer_encoding=None,
- *args,
- **kwargs):
+ def container_post(
+ self,
+ update=True,
+ format='json',
+ quota=None,
+ versioning=None,
+ metadata=None,
+ content_type=None,
+ content_length=None,
+ transfer_encoding=None,
+ *args, **kwargs):
""" Full Pithos+ POST at container level
--- request params ---
"""
self._assert_container()
- self.set_param('format', format, iff=format is not None)
+ self.set_param('format', format, iff=format)
self.set_param('update', iff=update)
- if metadata is not None:
+ if metadata:
for metaname, metaval in metadata.items():
self.set_header('X-Container-Meta-' + metaname, metaval)
self.set_header('X-Container-Policy-Quota', quota)
self._assert_container()
- self.set_param('until', until, iff=until is not None)
- self.set_param('delimiter', delimiter, iff=delimiter is not None)
+ self.set_param('until', until, iff=until)
+ self.set_param('delimiter', delimiter, iff=delimiter)
path = path4url(self.account, self.container)
success = kwargs.pop('success', 204)
return self.delete(path, success=success)
- def object_head(self, object,
- version=None,
- if_etag_match=None,
- if_etag_not_match=None,
- if_modified_since=None,
- if_unmodified_since=None,
- *args,
- **kwargs):
+ def object_head(
+ self, object,
+ version=None,
+ if_etag_match=None,
+ if_etag_not_match=None,
+ if_modified_since=None,
+ if_unmodified_since=None,
+ *args, **kwargs):
""" Full Pithos+ HEAD at object level
--- request parameters ---
self._assert_container()
- self.set_param('version', version, iff=version is not None)
+ self.set_param('version', version, iff=version)
self.set_header('If-Match', if_etag_match)
self.set_header('If-None-Match', if_etag_not_match)
success = kwargs.pop('success', 200)
return self.head(path, *args, success=success, **kwargs)
- def object_get(self, object,
- format='json',
- hashmap=False,
- version=None,
- data_range=None,
- if_range=False,
- if_etag_match=None,
- if_etag_not_match=None,
- if_modified_since=None,
- if_unmodified_since=None,
- *args,
- **kwargs):
+ def object_get(
+ self, object,
+ format='json',
+ hashmap=False,
+ version=None,
+ data_range=None,
+ if_range=False,
+ if_etag_match=None,
+ if_etag_not_match=None,
+ if_modified_since=None,
+ if_unmodified_since=None,
+ *args, **kwargs):
""" Full Pithos+ GET at object level
--- request parameters ---
self._assert_container()
- self.set_param('format', format, iff=format is not None)
- self.set_param('version', version, iff=version is not None)
+ self.set_param('format', format, iff=format)
+ self.set_param('version', version, iff=version)
self.set_param('hashmap', hashmap, iff=hashmap)
self.set_header('Range', data_range)
self.set_header('If-Range', '',
- if_range is True and data_range is not None)
+ if_range is True and data_range)
self.set_header('If-Match', if_etag_match, )
self.set_header('If-None-Match', if_etag_not_match)
self.set_header('If-Modified-Since', if_modified_since)
success = kwargs.pop('success', 200)
return self.get(path, *args, success=success, **kwargs)
- def object_put(self, object,
- format='json',
- hashmap=False,
- delimiter=None,
- if_etag_match=None,
- if_etag_not_match=None,
- etag=None,
- content_length=None,
- content_type=None,
- transfer_encoding=None,
- copy_from=None,
- move_from=None,
- source_account=None,
- source_version=None,
- content_encoding=None,
- content_disposition=None,
- manifest=None,
- permissions=None,
- public=None,
- metadata=None,
- *args,
- **kwargs):
+ def object_put(
+ self, object,
+ format='json',
+ hashmap=False,
+ delimiter=None,
+ if_etag_match=None,
+ if_etag_not_match=None,
+ etag=None,
+ content_length=None,
+ content_type=None,
+ transfer_encoding=None,
+ copy_from=None,
+ move_from=None,
+ source_account=None,
+ source_version=None,
+ content_encoding=None,
+ content_disposition=None,
+ manifest=None,
+ permissions=None,
+ public=None,
+ metadata=None,
+ *args, **kwargs):
""" Full Pithos+ PUT at object level
--- request parameters ---
self._assert_container()
- self.set_param('format', format, iff=format is not None)
+ self.set_param('format', format, iff=format)
self.set_param('hashmap', hashmap, iff=hashmap)
- self.set_param('delimiter', delimiter, iff=delimiter is not None)
+ self.set_param('delimiter', delimiter, iff=delimiter)
self.set_header('If-Match', if_etag_match)
self.set_header('If-None-Match', if_etag_not_match)
perms = None
if permissions:
for permission_type, permission_list in permissions.items():
- if perms is None:
+ if not perms:
perms = '' # Remove permissions
if len(permission_list) == 0:
continue
if len(perms):
perms += ';'
- perms += '%s=%s'\
- % (permission_type, list2str(permission_list, separator=','))
+ perms += '%s=%s' % (
+ permission_type,
+ list2str(permission_list, separator=','))
self.set_header('X-Object-Sharing', perms)
self.set_header('X-Object-Public', public)
- if metadata is not None:
+ if metadata:
for key, val in metadata.items():
self.set_header('X-Object-Meta-' + key, val)
success = kwargs.pop('success', 201)
return self.put(path, *args, success=success, **kwargs)
- def object_copy(self, object, destination,
- format='json',
- ignore_content_type=False,
- if_etag_match=None,
- if_etag_not_match=None,
- destination_account=None,
- content_type=None,
- content_encoding=None,
- content_disposition=None,
- source_version=None,
- permissions=None,
- public=False,
- metadata=None,
- *args,
- **kwargs):
+ def object_copy(
+ self, object, destination,
+ format='json',
+ ignore_content_type=False,
+ if_etag_match=None,
+ if_etag_not_match=None,
+ destination_account=None,
+ content_type=None,
+ content_encoding=None,
+ content_disposition=None,
+ source_version=None,
+ permissions=None,
+ public=False,
+ metadata=None,
+ *args, **kwargs):
""" Full Pithos+ COPY at object level
--- request parameters ---
self._assert_container()
- self.set_param('format', format, iff=format is not None)
+ self.set_param('format', format, iff=format)
self.set_param('ignore_content_type', iff=ignore_content_type)
self.set_header('If-Match', if_etag_match)
perms = None
if permissions:
for permission_type, permission_list in permissions.items():
- if perms is None:
+ if not perms:
perms = '' # Remove permissions
if len(permission_list) == 0:
continue
if len(perms):
perms += ';'
- perms += '%s=%s'\
- % (permission_type, list2str(permission_list, separator=','))
+ perms += '%s=%s' % (
+ permission_type,
+ list2str(permission_list, separator=','))
self.set_header('X-Object-Sharing', perms)
self.set_header('X-Object-Public', public)
- if metadata is not None:
+ if metadata:
for key, val in metadata.items():
self.set_header('X-Object-Meta-' + key, val)
success = kwargs.pop('success', 201)
return self.copy(path, *args, success=success, **kwargs)
- def object_move(self, object,
- format='json',
- ignore_content_type=False,
- if_etag_match=None,
- if_etag_not_match=None,
- destination=None,
- destination_account=None,
- content_type=None,
- content_encoding=None,
- content_disposition=None,
- permissions={},
- public=False,
- metadata={},
- *args,
- **kwargs):
+ def object_move(
+ self, object,
+ format='json',
+ ignore_content_type=False,
+ if_etag_match=None,
+ if_etag_not_match=None,
+ destination=None,
+ destination_account=None,
+ content_type=None,
+ content_encoding=None,
+ content_disposition=None,
+ permissions={},
+ public=False,
+ metadata={},
+ *args, **kwargs):
""" Full Pithos+ COPY at object level
--- request parameters ---
self._assert_container()
- self.set_param('format', format, iff=format is not None)
+ self.set_param('format', format, iff=format)
self.set_param('ignore_content_type', iff=ignore_content_type)
self.set_header('If-Match', if_etag_match)
self.set_header('Content-Disposition', content_disposition)
perms = None
for permission_type, permission_list in permissions.items():
- if perms is None:
+ if not perms:
perms = '' # Remove permissions
if len(permission_list) == 0:
continue
if len(perms):
perms += ';'
- perms += '%s=%s'\
- % (permission_type, list2str(permission_list, separator=','))
+ perms += '%s=%s' % (
+ permission_type,
+ list2str(permission_list, separator=','))
self.set_header('X-Object-Sharing', perms)
self.set_header('X-Object-Public', public)
for key, val in metadata.items():
success = kwargs.pop('success', 201)
return self.move(path, *args, success=success, **kwargs)
- def object_post(self, object,
- format='json',
- update=True,
- if_etag_match=None,
- if_etag_not_match=None,
- content_length=None,
- content_type=None,
- content_range=None,
- transfer_encoding=None,
- content_encoding=None,
- content_disposition=None,
- source_object=None,
- source_account=None,
- source_version=None,
- object_bytes=None,
- manifest=None,
- permissions={},
- public=False,
- metadata={},
- *args,
- **kwargs):
+ def object_post(
+ self, object,
+ format='json',
+ update=True,
+ if_etag_match=None,
+ if_etag_not_match=None,
+ content_length=None,
+ content_type=None,
+ content_range=None,
+ transfer_encoding=None,
+ content_encoding=None,
+ content_disposition=None,
+ source_object=None,
+ source_account=None,
+ source_version=None,
+ object_bytes=None,
+ manifest=None,
+ permissions={},
+ public=False,
+ metadata={},
+ *args, **kwargs):
""" Full Pithos+ POST at object level
--- request parameters ---
self._assert_container()
- self.set_param('format', format, iff=format is not None)
+ self.set_param('format', format, iff=format)
self.set_param('update', iff=update)
self.set_header('If-Match', if_etag_match)
self.set_header('If-None-Match', if_etag_not_match)
- self.set_header('Content-Length',
+ self.set_header(
+ 'Content-Length',
content_length,
- iff=transfer_encoding is None)
+ iff=not transfer_encoding)
self.set_header('Content-Type', content_type)
self.set_header('Content-Range', content_range)
self.set_header('Transfer-Encoding', transfer_encoding)
self.set_header('X-Object-Manifest', manifest)
perms = None
for permission_type, permission_list in permissions.items():
- if perms is None:
+ if not perms:
perms = '' # Remove permissions
if len(permission_list) == 0:
continue
if len(perms):
perms += ';'
- perms += '%s=%s'\
- % (permission_type, list2str(permission_list, separator=','))
+ perms += '%s=%s' % (
+ permission_type,
+ list2str(permission_list, separator=','))
self.set_header('X-Object-Sharing', perms)
self.set_header('X-Object-Public', public)
for key, val in metadata.items():
success = kwargs.pop('success', (202, 204))
return self.post(path, *args, success=success, **kwargs)
- def object_delete(self, object,
- until=None,
- delimiter=None,
- *args,
- **kwargs):
+ def object_delete(
+ self, object,
+ until=None, delimiter=None,
+ *args, **kwargs):
""" Full Pithos+ DELETE at object level
--- request parameters ---
"""
self._assert_container()
- self.set_param('until', until, iff=until is not None)
- self.set_param('delimiter', delimiter, iff=delimiter is not None)
+ self.set_param('until', until, iff=until)
+ self.set_param('delimiter', delimiter, iff=delimiter)
path = path4url(self.account, self.container, object)
success = kwargs.pop('success', 204)
class QuotaholderAPI(Specificator):
- def create_entity(
- self,
- context=Context,
- create_entity=ListOf(Entity, Owner, Key, OwnerKey,
- nonempty=1)
- ):
+ def create_entity(self, context=Context, create_entity=ListOf(
+ Entity, Owner, Key, OwnerKey, nonempty=1)):
rejected = ListOf(Index)
return rejected
:param metakey: (str) metadatum key
"""
headers = self.get_account_info()
- self.headers = filter_out(headers,
+ self.headers = filter_out(
+ headers,
'X-Account-Meta-' + metakey,
exactMatch=True)
if len(self.headers) == len(headers):
r = self.put(path, data=data, success=201)
r.release()
- def create_object(self,
- obj,
- content_type='application/octet-stream',
- content_length=0):
+ def create_object(
+ self, obj,
+ content_type='application/octet-stream', content_length=0):
"""
:param obj: (str) directory-object name
:param content_type: (str) explicitly set content_type
cnt = r.content
return cnt, size
- def copy_object(self, src_container, src_object, dst_container,
- dst_object=False):
+ def copy_object(
+ self, src_container, src_object, dst_container,
+ dst_object=False):
"""Copy an objects from src_contaier:src_object to
dst_container[:dst_object]
r = self.put(dst_path, success=201)
r.release()
- def move_object(self, src_container, src_object, dst_container,
- dst_object=False):
+ def move_object(
+ self, src_container, src_object, dst_container,
+ dst_object=False):
"""Move an objects from src_contaier:src_object to
dst_container[:dst_object]
self.set_param('format', 'json')
r = self.get(path, success=(200, 204, 304, 404), )
if r.status_code == 404:
- raise ClientError(\
+ raise ClientError(
"Invalid account (%s) for that container" % self.account,
r.status_code)
elif r.status_code == 304:
self.set_param('path', 'path_prefix')
r = self.get(path, success=(200, 204, 404))
if r.status_code == 404:
- raise ClientError(\
+ raise ClientError(
"Invalid account (%s) for that container" % self.account,
r.status_code)
reply = r.json
self._safe_progress_bar_finish(action_bar)
def assert_dicts_are_deeply_equal(self, d1, d2):
+ (st1, st2) = (set(d1.keys()), set(d2.keys()))
+ diff1 = st1.difference(st2)
+ diff2 = st2.difference(st1)
+ self.assertTrue(
+ not (diff1 or diff2),
+ 'Key differences:\n\td1-d2=%s\n\td2-d1=%s' % (diff1, diff2))
for k, v in d1.items():
- self.assertTrue(k in d2)
if isinstance(v, dict):
self.assert_dicts_are_deeply_equal(v, d2[k])
else:
print('')
methods = [method for method in inspect.getmembers(
self,
- predicate=inspect.ismethod)\
- if method[0].startswith('_test_')]
+ predicate=inspect.ismethod) if method[0].startswith('_test_')]
failures = 0
for method in methods:
stdout.write('Test %s ' % method[0][6:])
def init_parser():
parser = ArgumentParser(add_help=False)
- parser.add_argument('-h', '--help',
+ parser.add_argument(
+ '-h', '--help',
dest='help',
action='store_true',
default=False,
'uuid',
'id',
'email'):
- self.assertTrue(term in r)
+ self.assertTrue(term in r)
def test_info(self):
self._test_0020_info()
self._test_0020_get()
def _test_0020_get(self):
- for term in (
- 'uuid',
- 'name',
- 'username'):
+ for term in ('uuid', 'name', 'username'):
self.assertEqual(
self.client.term(term, self['astakos', 'token']),
self['astakos', term])
def test_000(self):
"Prepare a full Cyclades test scenario"
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
- self.server2 = self._create_server(self.servname2,
+ self.server2 = self._create_server(
+ self.servname2,
self.flavorid + 2,
self.img)
super(self.__class__, self).test_000()
def _create_server(self, servername, flavorid, imageid, personality=None):
- server = self.client.create_server(servername,
+ server = self.client.create_server(
+ servername,
flavorid,
imageid,
personality)
nics = self.client.list_server_nics(servid)
for net in nics:
found_nic = net['network_id'] == netid
- if (in_creation and found_nic)\
- or not (in_creation or found_nic):
- return
+ if (in_creation and found_nic) or not (
+ in_creation or found_nic):
+ return
time.sleep(wait)
self.do_with_progress_bar(
nicwait,
Do not use this in regular tests
"""
from kamaki.clients import SilentEvent
- c1 = SilentEvent(self._create_server,
+ c1 = SilentEvent(
+ self._create_server,
self.servname1,
self.flavorid,
self.img)
- c2 = SilentEvent(self._create_server,
+ c2 = SilentEvent(
+ self._create_server,
self.servname2,
self.flavorid + 2,
self.img)
- c3 = SilentEvent(self._create_server,
+ c3 = SilentEvent(
+ self._create_server,
self.servname1,
self.flavorid,
self.img)
- c4 = SilentEvent(self._create_server,
+ c4 = SilentEvent(
+ self._create_server,
self.servname2,
self.flavorid + 2,
self.img)
- c5 = SilentEvent(self._create_server,
+ c5 = SilentEvent(
+ self._create_server,
self.servname1,
self.flavorid,
self.img)
- c6 = SilentEvent(self._create_server,
+ c6 = SilentEvent(
+ self._create_server,
self.servname2,
self.flavorid + 2,
self.img)
- c7 = SilentEvent(self._create_server,
+ c7 = SilentEvent(
+ self._create_server,
self.servname1,
self.flavorid,
self.img)
- c8 = SilentEvent(self._create_server,
+ c8 = SilentEvent(
+ self._create_server,
self.servname2,
self.flavorid + 2,
self.img)
def test_create_server(self):
"""Test create_server"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._wait_for_status(self.server1['id'], 'BUILD')
def test_get_server_details(self):
"""Test get_server_details"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._wait_for_status(self.server1['id'], 'BUILD')
def test_update_server_name(self):
"""Test update_server_name"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._test_0050_update_server_name()
def test_reboot_server(self):
"""Test reboot server"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._wait_for_status(self.server1['id'], 'BUILD')
- self.server2 = self._create_server(self.servname2,
+ self.server2 = self._create_server(
+ self.servname2,
self.flavorid + 1,
self.img)
self._wait_for_status(self.server2['id'], 'BUILD')
self._test_0080_create_server_metadata()
def _test_0080_create_server_metadata(self):
- r1 = self.client.create_server_metadata(self.server1['id'],
+ r1 = self.client.create_server_metadata(
+ self.server1['id'],
'mymeta',
'mymeta val')
self.assertTrue('mymeta' in r1)
def test_get_server_metadata(self):
"""Test get server_metadata"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._test_0090_get_server_metadata()
def _test_0090_get_server_metadata(self):
- self.client.create_server_metadata(self.server1['id'],
+ self.client.create_server_metadata(
+ self.server1['id'],
'mymeta_0',
'val_0')
r = self.client.get_server_metadata(self.server1['id'], 'mymeta_0')
def test_update_server_metadata(self):
"""Test update_server_metadata"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._test_0100_update_server_metadata()
def _test_0100_update_server_metadata(self):
- r1 = self.client.create_server_metadata(self.server1['id'],
+ r1 = self.client.create_server_metadata(
+ self.server1['id'],
'mymeta3',
'val2')
self.assertTrue('mymeta3'in r1)
- r2 = self.client.update_server_metadata(self.server1['id'],
+ r2 = self.client.update_server_metadata(
+ self.server1['id'],
mymeta3='val3')
self.assertTrue(r2['mymeta3'], 'val3')
def test_delete_server_metadata(self):
"""Test delete_server_metadata"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._test_0110_delete_server_metadata()
def _test_0110_delete_server_metadata(self):
- r1 = self.client.create_server_metadata(self.server1['id'],
+ r1 = self.client.create_server_metadata(
+ self.server1['id'],
'mymeta',
'val')
self.assertTrue('mymeta' in r1)
def test_shutdown_server(self):
"""Test shutdown_server"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._wait_for_status(self.server1['id'], 'BUILD')
def test_start_server(self):
"""Test start_server"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._wait_for_status(self.server1['id'], 'BUILD')
def test_get_server_console(self):
"""Test get_server_console"""
- self.server2 = self._create_server(self.servname2,
+ self.server2 = self._create_server(
+ self.servname2,
self.flavorid + 2,
self.img)
self._wait_for_status(self.server2['id'], 'BUILD')
def test_get_firewall_profile(self):
"""Test get_firewall_profile"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._test_0200_get_firewall_profile()
def test_set_firewall_profile(self):
"""Test set_firewall_profile"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._test_0210_set_firewall_profile()
count_success += 1
def test_get_server_stats(self):
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self._test_0220_get_server_stats()
def _test_0220_get_server_stats(self):
r = self.client.get_server_stats(self.server1['id'])
- for term in ('cpuBar',
- 'cpuTimeSeries',
- 'netBar',
- 'netTimeSeries',
- 'refresh'):
+ it = ('cpuBar', 'cpuTimeSeries', 'netBar', 'netTimeSeries', 'refresh')
+ for term in it:
self.assertTrue(term in r)
def test_create_network(self):
def test_connect_server(self):
"""Test connect_server"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self.network1 = self._create_network(self.netname1)
def _test_0250_disconnect_server(self):
self.client.disconnect_server(self.server1['id'], self.network1['id'])
- self.assertTrue(self._wait_for_nic(self.network1['id'],
+ self.assertTrue(self._wait_for_nic(
+ self.network1['id'],
self.server1['id'],
in_creation=False))
def test_list_server_nics(self):
"""Test list_server_nics"""
- self.server1 = self._create_server(self.servname1,
+ self.server1 = self._create_server(
+ self.servname1,
self.flavorid,
self.img)
self.network2 = self._create_network(self.netname2)
print('\t- ok')
f.close()
- self.client.register(self.imgname,
+ self.client.register(
+ self.imgname,
self.location,
params=dict(is_public=True))
img = self._get_img_by_name(self.imgname)
r0 = self.client.list_public(order='-')
self.assertTrue(len(r) > 0)
for img in r:
- for term in ('status',
- 'name',
- 'container_format',
- 'disk_format',
- 'id',
- 'size'):
+ for term in (
+ 'status',
+ 'name',
+ 'container_format',
+ 'disk_format',
+ 'id',
+ 'size'):
self.assertTrue(term in img)
self.assertTrue(r, r0)
r0.reverse()
self.assert_dicts_are_deeply_equal(img, r0[i])
r1 = self.client.list_public(detail=True)
for img in r1:
- for term in ('status',
- 'name',
- 'checksum',
- 'created_at',
- 'disk_format',
- 'updated_at',
- 'id',
- 'location',
- 'container_format',
- 'owner',
- 'is_public',
- 'deleted_at',
- 'properties',
- 'size'):
+ for term in (
+ 'status',
+ 'name',
+ 'checksum',
+ 'created_at',
+ 'disk_format',
+ 'updated_at',
+ 'id',
+ 'location',
+ 'container_format',
+ 'owner',
+ 'is_public',
+ 'deleted_at',
+ 'properties',
+ 'size'):
self.assertTrue(term in img)
if img['properties']:
for interm in (
- 'osfamily',
- 'users',
- 'os',
- 'root_partition',
- 'description'):
+ 'osfamily',
+ 'users',
+ 'os',
+ 'root_partition',
+ 'description'):
self.assertTrue(interm in img['properties'])
size_max = 1000000000
r2 = self.client.list_public(filters=dict(size_max=size_max))
def _test_get_meta(self):
r = self.client.get_meta(self['image', 'id'])
self.assertEqual(r['id'], self['image', 'id'])
- for term in ('status',
- 'name',
- 'checksum',
- 'updated-at',
- 'created-at',
- 'deleted-at',
- 'location',
- 'is-public',
- 'owner',
- 'disk-format',
- 'size',
- 'container-format'):
+ for term in (
+ 'status',
+ 'name',
+ 'checksum',
+ 'updated-at',
+ 'created-at',
+ 'deleted-at',
+ 'location',
+ 'is-public',
+ 'owner',
+ 'disk-format',
+ 'size',
+ 'container-format'):
self.assertTrue(term in r)
- for interm in ('kernel',
- 'osfamily',
- 'users',
- 'gui', 'sortorder',
- 'root-partition',
- 'os',
- 'description'):
+ for interm in (
+ 'kernel',
+ 'osfamily',
+ 'users',
+ 'gui', 'sortorder',
+ 'root-partition',
+ 'os',
+ 'description'):
self.assertTrue(interm in r['properties'])
def test_register(self):
def _test_register(self):
self.assertTrue(self._imglist)
for img in self._imglist.values():
- self.assertTrue(img != None)
+ self.assertTrue(img is not None)
def test_reregister(self):
"""Test reregister"""
def create_remote_object(self, container, obj):
self.client.container = container
- self.client.object_put(obj,
+ self.client.object_put(
+ obj,
content_type='application/octet-stream',
data='file %s that lives in %s' % (obj, container),
metadata={'incontainer': container})
"""Check if(un)modified_since"""
for format in self.client.DATE_FORMATS:
now_formated = self.now_unformated.strftime(format)
- r1 = self.client.account_head(if_modified_since=now_formated,
+ r1 = self.client.account_head(
+ if_modified_since=now_formated,
success=(204, 304, 412))
sc1 = r1.status_code
r1.release()
- r2 = self.client.account_head(if_unmodified_since=now_formated,
+ r2 = self.client.account_head(
+ if_unmodified_since=now_formated,
success=(204, 304, 412))
sc2 = r2.status_code
r2.release()
temp_c2 = r.json[2]['name']
r = self.client.account_get(limit=2, marker='c2_')
- conames = [container['name'] for container in r.json \
- if container['name'].lower().startswith('c2_')]
+ conames = [container['name'] for container in r.json if (
+ container['name'].lower().startswith('c2_'))]
self.assertTrue(temp_c0 in conames)
self.assertFalse(temp_c2 in conames)
"""Check if(un)modified_since"""
for format in self.client.DATE_FORMATS:
now_formated = self.now_unformated.strftime(format)
- r1 = self.client.account_get(if_modified_since=now_formated,
+ r1 = self.client.account_get(
+ if_modified_since=now_formated,
success=(200, 304, 412))
sc1 = r1.status_code
r1.release()
- r2 = self.client.account_get(if_unmodified_since=now_formated,
+ r2 = self.client.account_get(
+ if_unmodified_since=now_formated,
success=(200, 304, 412))
sc2 = r2.status_code
r2.release()
self.assertTrue('x-account-group-' + grpName not in r)
mprefix = 'meta' + unicode(self.now)
- self.client.set_account_meta({mprefix + '1': 'v1',
+ self.client.set_account_meta({
+ mprefix + '1': 'v1',
mprefix + '2': 'v2'})
r = self.client.get_account_meta()
self.assertEqual(r['x-account-meta-' + mprefix + '1'], 'v1')
"""Check and if(un)modified_since"""
for format in self.client.DATE_FORMATS:
now_formated = self.now_unformated.strftime(format)
- r1 = self.client.container_head(if_modified_since=now_formated,
+ r1 = self.client.container_head(
+ if_modified_since=now_formated,
success=(204, 304, 412))
sc1 = r1.status_code
r1.release()
- r2 = self.client.container_head(if_unmodified_since=now_formated,
+ r2 = self.client.container_head(
+ if_unmodified_since=now_formated,
success=(204, 304, 412))
sc2 = r2.status_code
r2.release()
"""Check and if un/modified_since"""
for format in self.client.DATE_FORMATS:
now_formated = self.now_unformated.strftime(format)
- r1 = self.client.container_get(if_modified_since=now_formated,
+ r1 = self.client.container_get(
+ if_modified_since=now_formated,
success=(200, 304, 412))
sc1 = r1.status_code
r1.release()
- r2 = self.client.container_get(if_unmodified_since=now_formated,
+ r2 = self.client.container_get(
+ if_unmodified_since=now_formated,
success=(200, 304, 412))
sc2 = r2.status_code
r2.release()
#TODO
"""Check update=False"""
- r = self.client.object_post('test',
+ r = self.client.object_post(
+ 'test',
update=False,
metadata={'newmeta': 'newval'})
self.assertTrue(len(r) > 0)
self.assertTrue(len(r[0]) > 1)
self.client.purge_container()
- self.assertRaises(ClientError,
+ self.assertRaises(
+ ClientError,
self.client.get_object_versionlist,
'test')
r = self.client.object_head(obj, if_etag_match=etag)
self.assertEqual(r.status_code, 200)
- r = self.client.object_head(obj,
+ r = self.client.object_head(
+ obj,
if_etag_not_match=etag,
success=(200, 412, 304))
self.assertNotEqual(r.status_code, 200)
- r = self.client.object_head(obj,
+ r = self.client.object_head(
+ obj,
version=40,
if_etag_match=etag,
success=412)
"""Check and if(un)modified_since"""
for format in self.client.DATE_FORMATS:
now_formated = self.now_unformated.strftime(format)
- r1 = self.client.object_head(obj, if_modified_since=now_formated,
+ r1 = self.client.object_head(
+ obj,
+ if_modified_since=now_formated,
success=(200, 304, 412))
sc1 = r1.status_code
r1.release()
- r2 = self.client.object_head(obj, if_unmodified_since=now_formated,
+ r2 = self.client.object_head(
+ obj,
+ if_unmodified_since=now_formated,
success=(200, 304, 412))
sc2 = r2.status_code
r2.release()
etag = r.headers['etag']
r = self.client.object_get(obj, hashmap=True)
- self.assertTrue('hashes' in r.json\
- and 'block_hash' in r.json\
- and 'block_size' in r.json\
- and 'bytes' in r.json)
+ for term in ('hashes', 'block_hash', 'block_hash', 'bytes'):
+ self.assertTrue(term in r.json)
r = self.client.object_get(obj, format='xml', hashmap=True)
self.assertEqual(len(r.text.split('hash>')), 3)
rangestr = 'bytes=%s-%s' % (osize / 3, osize / 2)
- r = self.client.object_get(obj,
+ r = self.client.object_get(
+ obj,
data_range=rangestr,
success=(200, 206))
partsize = int(r.headers['content-length'])
self.assertTrue(0 < partsize and partsize <= 1 + osize / 3)
rangestr = 'bytes=%s-%s' % (osize / 3, osize / 2)
- r = self.client.object_get(obj,
+ r = self.client.object_get(
+ obj,
data_range=rangestr,
if_range=True,
success=(200, 206))
"""Check and if(un)modified_since"""
for format in self.client.DATE_FORMATS:
now_formated = self.now_unformated.strftime(format)
- r1 = self.client.object_get(obj, if_modified_since=now_formated,
+ r1 = self.client.object_get(
+ obj,
+ if_modified_since=now_formated,
success=(200, 304, 412))
sc1 = r1.status_code
r1.release()
- r2 = self.client.object_get(obj,
+ r2 = self.client.object_get(
+ obj,
if_unmodified_since=now_formated,
success=(200, 304, 412))
sc2 = r2.status_code
r = self.client.object_put(obj,
data='a',
content_type='application/octer-stream',
- permissions={
- 'read': ['accX:groupA', 'u1', 'u2'],
- 'write': ['u2', 'u3']},
- metadata={'key1': 'val1', 'key2': 'val2'},
+ permissions=dict(
+ read=['accX:groupA', 'u1', 'u2'],
+ write=['u2', 'u3']),
+ metadata=dict(key1='val1', key2='val2'),
content_encoding='UTF-8',
content_disposition='attachment; filename="fname.ext"')
self.assertEqual(r.status_code, 201)
self.assertEqual(r['x-object-meta-key2'], 'val2')
"""Check public and if_etag_match"""
- r = self.client.object_put(obj, if_etag_match=etag, data='b',
- content_type='application/octet-stream', public=True)
+ r = self.client.object_put(
+ obj,
+ if_etag_match=etag,
+ data='b',
+ content_type='application/octet-stream',
+ public=True)
r = self.client.object_get(obj)
self.assertTrue('x-object-public' in r.headers)
self.assertEqual(r.text, 'b')
"""Check if_etag_not_match"""
- r = self.client.object_put(obj, if_etag_not_match=etag, data='c',
- content_type='application/octet-stream', success=(201, 412))
+ r = self.client.object_put(
+ obj,
+ if_etag_not_match=etag,
+ data='c',
+ content_type='application/octet-stream',
+ success=(201, 412))
self.assertEqual(r.status_code, 412)
"""Check content_type and content_length"""
tmpdir = 'dir' + unicode(self.now)
- r = self.client.object_put(tmpdir,
+ r = self.client.object_put(
+ tmpdir,
content_type='application/directory',
content_length=0)
self.assertEqual(r['content-type'], 'application/directory')
"""Check copy_from, content_encoding"""
- r = self.client.object_put('%s/%s' % (tmpdir, obj),
+ r = self.client.object_put(
+ '%s/%s' % (tmpdir, obj),
format=None,
copy_from='/%s/%s' % (self.client.container, obj),
content_encoding='application/octet-stream',
source_account=self.client.account,
- content_length=0, success=201)
+ content_length=0,
+ success=201)
self.assertEqual(r.status_code, 201)
"""Test copy_object for cross-conctainer copy"""
- self.client.copy_object(src_container=self.c2,
+ self.client.copy_object(
+ src_container=self.c2,
src_object='%s/%s' % (tmpdir, obj),
dst_container=self.c1,
dst_object=obj)
"""Check cross-container copy_from, content_encoding"""
self.client.container = self.c1
fromstr = '/%s/%s/%s' % (self.c2, tmpdir, obj)
- r = self.client.object_put(obj,
+ r = self.client.object_put(
+ obj,
format=None,
copy_from=fromstr,
content_encoding='application/octet-stream',
source_account=self.client.account,
- content_length=0, success=201)
+ content_length=0,
+ success=201)
self.assertEqual(r.status_code, 201)
r = self.client.get_object_info(obj)
"""Check source_account"""
self.client.container = self.c2
fromstr = '/%s/%s' % (self.c1, obj)
- r = self.client.object_put(obj + 'v2',
+ r = self.client.object_put(
+ '%sv2' % obj,
format=None,
move_from=fromstr,
content_encoding='application/octet-stream',
self.client.container = self.c1
r1 = self.client.get_object_info(obj)
self.client.container = self.c2
- self.client.move_object(src_container=self.c1,
+ self.client.move_object(
+ src_container=self.c1,
src_object=obj,
dst_container=self.c2,
dst_object=obj + 'v0')
self.assertEqual(r1['x-object-hash'], r0['x-object-hash'])
"""Check move_from"""
- r = self.client.object_put(obj + 'v1',
+ r = self.client.object_put(
+ '%sv1' % obj,
format=None,
move_from='/%s/%s' % (self.c2, obj),
source_version=vers2,
txt = ''
for i in range(10):
txt += '%s' % i
- r = self.client.object_put('%s/%s' % (mobj, i),
+ r = self.client.object_put(
+ '%s/%s' % (mobj, i),
data='%s' % i,
content_length=1,
success=201,
content_type='application/octet-stream',
content_encoding='application/octet-stream')
- r = self.client.object_put(mobj,
+ r = self.client.object_put(
+ mobj,
content_length=0,
content_type='application/octet-stream',
manifest='%s/%s' % (self.client.container, mobj))
r = self.client.object_put(obj + 'orig',
content_type='application/octet-stream',
data=data,
- metadata={'mkey1': 'mval1', 'mkey2': 'mval2'},
- permissions={
- 'read': ['accX:groupA', 'u1', 'u2'],
- 'write': ['u2', 'u3']},
+ metadata=dict(mkey1='mval1', mkey2='mval2'),
+ permissions=dict(
+ read=['accX:groupA', 'u1', 'u2'],
+ write=['u2', 'u3']),
content_disposition='attachment; filename="fname.ext"')
- r = self.client.object_copy(obj + 'orig',
+ r = self.client.object_copy(
+ '%sorig' % obj,
destination='/%s/%s' % (self.client.container, obj),
ignore_content_type=False, content_type='application/json',
metadata={'mkey2': 'mval2a', 'mkey3': 'mval3'},
self.assertTrue('accx:groupb' in r['write'])
"""Check destination account"""
- r = self.client.object_copy(obj,
+ r = self.client.object_copy(
+ obj,
destination='/%s/%s' % (self.c1, obj),
content_encoding='utf8',
content_type='application/json',
"""Check destination being another container
and also content_type and content encoding"""
- r = self.client.object_copy(obj,
+ r = self.client.object_copy(
+ obj,
destination='/%s/%s' % (self.c1, obj),
content_encoding='utf8',
content_type='application/json')
self.assertEqual(r.status_code, 201)
- self.assertEqual(r.headers['content-type'],
+ self.assertEqual(
+ r.headers['content-type'],
'application/json; charset=UTF-8')
"""Check ignore_content_type and content_type"""
ctype = r.headers['content-type']
self.assertEqual(ctype, 'application/json')
- r = self.client.object_copy(obj + 'orig',
+ r = self.client.object_copy(
+ '%sorig' % obj,
destination='/%s/%s0' % (self.client.container, obj),
ignore_content_type=True,
content_type='application/json')
self.assertNotEqual(r.headers['content-type'], 'application/json')
"""Check if_etag_(not_)match"""
- r = self.client.object_copy(obj,
+ r = self.client.object_copy(
+ obj,
destination='/%s/%s1' % (self.client.container, obj),
if_etag_match=etag)
self.assertEqual(r.status_code, 201)
- r = self.client.object_copy(obj,
+ r = self.client.object_copy(
+ obj,
destination='/%s/%s2' % (self.client.container, obj),
if_etag_not_match='lalala')
self.assertEqual(r.status_code, 201)
vers2 = r.headers['x-object-version']
"""Check source_version, public and format """
- r = self.client.object_copy(obj + '2',
+ r = self.client.object_copy(
+ '%s2' % obj,
destination='/%s/%s3' % (self.client.container, obj),
source_version=vers2,
format='xml',
obj = 'test2'
data = '{"key1": "val1", "key2": "val2"}'
- r = self.client.object_put(obj + 'orig',
+ r = self.client.object_put(
+ '%sorig' % obj,
content_type='application/octet-stream',
data=data,
- metadata={'mkey1': 'mval1', 'mkey2': 'mval2'},
- permissions={'read': ['accX:groupA', 'u1', 'u2'],
- 'write': ['u2', 'u3']})
+ metadata=dict(mkey1='mval1', mkey2='mval2'),
+ permissions=dict(
+ read=['accX:groupA', 'u1', 'u2'],
+ write=['u2', 'u3']))
- r = self.client.object_move(obj + 'orig',
+ r = self.client.object_move(
+ '%sorig' % obj,
destination='/%s/%s' % (self.client.container, obj),
ignore_content_type=False,
content_type='application/json',
- metadata={'mkey2': 'mval2a', 'mkey3': 'mval3'},
- permissions={'write': ['u5', 'accX:groupB']})
+ metadata=dict(mkey2='mval2a', mkey3='mval3'),
+ permissions=dict(write=['u5', 'accX:groupB']))
self.assertEqual(r.status_code, 201)
"""Check Metadata"""
self.assertTrue('accx:groupb' in r['write'])
"""Check destination account"""
- r = self.client.object_move(obj,
+ r = self.client.object_move(
+ obj,
destination='/%s/%s' % (self.c1, obj),
content_encoding='utf8',
content_type='application/json',
"""Check destination being another container and also
content_type, content_disposition and content encoding"""
- r = self.client.object_move(obj,
+ r = self.client.object_move(
+ obj,
destination='/%s/%s' % (self.c1, obj),
content_encoding='utf8',
content_type='application/json',
content_disposition='attachment; filename="fname.ext"')
self.assertEqual(r.status_code, 201)
- self.assertEqual(r.headers['content-type'],
+ self.assertEqual(
+ r.headers['content-type'],
'application/json; charset=UTF-8')
self.client.container = self.c1
r = self.client.get_object_info(obj)
- self.assertTrue('content-disposition' in r\
- and 'fname.ext' in r['content-disposition'])
+ self.assertTrue('content-disposition' in r)
+ self.assertTrue('fname.ext' in r['content-disposition'])
etag = r['etag']
ctype = r['content-type']
self.assertEqual(ctype, 'application/json')
"""Check ignore_content_type and content_type"""
- r = self.client.object_move(obj,
+ r = self.client.object_move(
+ obj,
destination='/%s/%s' % (self.c2, obj),
ignore_content_type=True,
content_type='application/json')
"""Check if_etag_(not_)match"""
self.client.container = self.c2
- r = self.client.object_move(obj,
+ r = self.client.object_move(
+ obj,
destination='/%s/%s0' % (self.client.container, obj),
if_etag_match=etag)
self.assertEqual(r.status_code, 201)
- r = self.client.object_move(obj + '0',
+ r = self.client.object_move(
+ '%s0' % obj,
destination='/%s/%s1' % (self.client.container, obj),
if_etag_not_match='lalala')
self.assertEqual(r.status_code, 201)
"""Check public and format """
- r = self.client.object_move(obj + '1',
+ r = self.client.object_move(
+ '%s1' % obj,
destination='/%s/%s2' % (self.client.container, obj),
- format='xml', public=True)
+ format='xml',
+ public=True)
self.assertEqual(r.status_code, 201)
self.assertTrue(r.headers['content-type'].index('xml') > 0)
"""create a filesystem file"""
self.files.append(NamedTemporaryFile())
newf = self.files[-1]
- newf.writelines(['ello!\n',
+ newf.writelines([
+ 'ello!\n',
'This is a test line\n',
'inside a test file\n'])
"""create a file on container"""
- r = self.client.object_put(obj,
+ r = self.client.object_put(
+ obj,
content_type='application/octet-stream',
data='H',
- metadata={'mkey1': 'mval1', 'mkey2': 'mval2'},
- permissions={'read': ['accX:groupA', 'u1', 'u2'],
- 'write': ['u2', 'u3']})
+ metadata=dict(mkey1='mval1', mkey2='mval2'),
+ permissions=dict(
+ read=['accX:groupA', 'u1', 'u2'],
+ write=['u2', 'u3']))
"""Append tests update, content_range, content_type, content_length"""
newf.seek(0)
self.assertFalse('x-object-meta-mkey1' in r)
"""Check permissions"""
- self.client.set_object_sharing(obj,
- read_permition=['u4', 'u5'], write_permition=['u4'])
+ self.client.set_object_sharing(
+ obj,
+ read_permition=['u4', 'u5'],
+ write_permition=['u4'])
r = self.client.get_object_sharing(obj)
self.assertTrue('read' in r)
self.assertTrue('u5' in r['read'])
"""Check if_etag_(not)match"""
etag = r['etag']
"""
- r = self.client.object_post(obj,
+ r = self.client.object_post(
+ obj,
update=True,
public=True,
if_etag_not_match=etag,
self.assertEqual(r.status_code, 412)
"""
- r = self.client.object_post(obj, update=True, public=True,
- if_etag_match=etag, content_encoding='application/json')
+ r = self.client.object_post(
+ obj,
+ update=True,
+ public=True,
+ if_etag_match=etag,
+ content_encoding='application/json')
r = self.client.get_object_info(obj)
helloVersion = r['x-object-version']
self.assertEqual(r['content-encoding'], 'application/json')
"""Check source_version and source_account and content_disposition"""
- r = self.client.object_post(obj,
+ r = self.client.object_post(
+ obj,
update=True,
content_type='application/octet-srteam',
content_length=5,
success=(403, 202, 204))
self.assertEqual(r.status_code, 403)
- r = self.client.object_post(obj,
+ r = self.client.object_post(
+ obj,
update=True,
content_type='application/octet-srteam',
content_length=5,
r = self.client.object_get(obj)
self.assertEqual(r.text, 'eello!')
- self.assertTrue('content-disposition' in r.headers\
- and 'fname.ext' in r.headers['content-disposition'])
+ self.assertTrue('content-disposition' in r.headers)
+ self.assertTrue('fname.ext' in r.headers['content-disposition'])
"""Check manifest"""
mobj = 'manifest.test'
txt = ''
for i in range(10):
txt += '%s' % i
- r = self.client.object_put('%s/%s' % (mobj, i),
- data='%s' % i,
- content_length=1,
- success=201,
- content_encoding='application/octet-stream',
- content_type='application/octet-stream')
+ r = self.client.object_put(
+ '%s/%s' % (mobj, i),
+ data='%s' % i,
+ content_length=1,
+ success=201,
+ content_encoding='application/octet-stream',
+ content_type='application/octet-stream')
- self.client.create_object_by_manifestation(mobj,
+ self.client.create_object_by_manifestation(
+ mobj,
content_type='application/octet-stream')
- r = self.client.object_post(mobj,
+ r = self.client.object_post(
+ mobj,
manifest='%s/%s' % (self.client.container, mobj))
r = self.client.object_get(mobj)
self.client.container = self.c2
obj = 'test2'
"""create a file on container"""
- r = self.client.object_put(obj,
+ r = self.client.object_put(
+ obj,
content_type='application/octet-stream',
data='H',
- metadata={'mkey1': 'mval1', 'mkey2': 'mval2'},
- permissions={'read': ['accX:groupA', 'u1', 'u2'],
- 'write': ['u2', 'u3']})
+ metadata=dict(mkey1='mval1', mkey2='mval2'),
+ permissions=dict(
+ read=['accX:groupA', 'u1', 'u2'],
+ write=['u2', 'u3']))
"""Check with false until"""
r = self.client.object_delete(obj, until=1000000)