# conditions are met:
#
# 1. Redistributions of source code must retain the above
-# copyright notice, this list of conditions and the following
+# copyright notice, self.list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following
+# copyright notice, self.list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
import json
import logging
-
-import requests
-
-from requests.auth import AuthBase
-
+from kamaki.clients.connection import HTTPConnectionError
+from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
sendlog = logging.getLogger('clients.send')
recvlog = logging.getLogger('clients.recv')
-
-# Add a convenience status property to the responses
-def _status(self):
- return requests.status_codes._codes[self.status_code][0].upper()
-requests.Response.status = property(_status)
-
-
class ClientError(Exception):
def __init__(self, message, status=0, details=''):
super(ClientError, self).__init__(message, status, details)
self.status = status
self.details = details
-
class Client(object):
- def __init__(self, base_url, token):
+
+ def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
self.base_url = base_url
self.token = token
-
- def raise_for_status(self, r):
- message = "%d %s" % (r.status_code, r.status)
- details = r.text
- raise ClientError(message, r.status_code, details)
-
- def request(self, method, path, **kwargs):
- raw = kwargs.pop('raw', False)
- success = kwargs.pop('success', 200)
-
- data = kwargs.pop('data', None)
- headers = kwargs.pop('headers', {})
- headers.setdefault('X-Auth-Token', self.token)
-
- if 'json' in kwargs:
- data = json.dumps(kwargs.pop('json'))
- headers.setdefault('Content-Type', 'application/json')
-
- if data:
- headers.setdefault('Content-Length', str(len(data)))
-
- url = self.base_url + path
- kwargs.setdefault('verify', False) # Disable certificate verification
- r = requests.request(method, url, headers=headers, data=data, **kwargs)
-
- req = r.request
- sendlog.info('%s %s', req.method, req.url)
- for key, val in req.headers.items():
- sendlog.info('%s: %s', key, val)
- sendlog.info('')
- if req.data:
- sendlog.info('%s', req.data)
-
- recvlog.info('%d %s', r.status_code, r.status)
- for key, val in r.headers.items():
- recvlog.info('%s: %s', key, val)
- recvlog.info('')
- if not raw and r.content:
- recvlog.debug(r.content)
-
- if success is not None:
- # Success can either be an in or a collection
- success = (success,) if isinstance(success, int) else success
- if r.status_code not in success:
- self.raise_for_status(r)
-
+ self.headers = {}
+ self.DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
+ "%A, %d-%b-%y %H:%M:%S GMT",
+ "%a, %d %b %Y %H:%M:%S GMT"]
+ self.http_client = http_client
+
+ def _raise_for_status(self, r):
+ message = "%s" % r.status
+ try:
+ details = r.text
+ except:
+ details = ''
+ raise ClientError(message=message, status=r.status_code, details=details)
+
+ def set_header(self, name, value, iff=True):
+ """Set a header 'name':'value' provided value is not None and iff is True"""
+ if value is not None and iff:
+ self.http_client.set_header(name, value)
+
+ def set_param(self, name, value=None, iff=True):
+ if iff:
+ self.http_client.set_param(name, value)
+
+ def set_default_header(self, name, value):
+ self.http_client.headers.setdefault(name, value)
+
+ def request(self,
+ method,
+ path,
+ async_headers={},
+ async_params={},
+ **kwargs):
+ """In threaded/asynchronous requests, headers and params are not safe
+ Therefore, the standard self.set_header/param system can be used only for
+ headers and params that are common for all requests. All other params and
+ headers should passes as
+ @param async_headers
+ @async_params
+ E.g. in most queries the 'X-Auth-Token' header might be the same for all, but the
+ 'Range' header might be different from request to request.
+ """
+
+ try:
+ success = kwargs.pop('success', 200)
+
+ data = kwargs.pop('data', None)
+ self.set_default_header('X-Auth-Token', self.token)
+
+ if 'json' in kwargs:
+ data = json.dumps(kwargs.pop('json'))
+ self.set_default_header('Content-Type', 'application/json')
+ if data:
+ self.set_default_header('Content-Length', unicode(len(data)))
+
+ self.http_client.url = self.base_url + path
+ r = self.http_client.perform_request(method, data, async_headers, async_params)
+
+ req = self.http_client
+ sendlog.info('%s %s', method, req.url)
+ headers = dict(req.headers)
+ headers.update(async_headers)
+
+ for key, val in headers.items():
+ sendlog.info('\t%s: %s', key, val)
+ sendlog.info('')
+ if data:
+ sendlog.info('%s', data)
+
+ recvlog.info('%d %s', r.status_code, r.status)
+ for key, val in r.headers.items():
+ recvlog.info('%s: %s', key, val)
+ #if r.content:
+ # recvlog.debug(r.content)
+
+ if success is not None:
+ # Success can either be an in or a collection
+ success = (success,) if isinstance(success, int) else success
+ if r.status_code not in success:
+ r.release()
+ self._raise_for_status(r)
+ except Exception as err:
+ self.http_client.reset_headers()
+ self.http_client.reset_params()
+ errmsg = getattr(err, 'message', unicode(err))
+ errdetails = getattr(err, 'details', '')+' (%s)'%type(err)
+ errstatus = getattr(err, 'status', 0)
+ raise ClientError(message=errmsg,status=errstatus,details=errdetails)
+
+ self.http_client.reset_headers()
+ self.http_client.reset_params()
return r
def delete(self, path, **kwargs):
def put(self, path, **kwargs):
return self.request('put', path, **kwargs)
+ def copy(self, path, **kwargs):
+ return self.request('copy', path, **kwargs)
-from .compute import ComputeClient as compute
-from .image import ImageClient as image
-from .storage import StorageClient as storage
-from .cyclades import CycladesClient as cyclades
-from .pithos import PithosClient as pithos
-from .astakos import AstakosClient as astakos
+ def move(self, path, **kwargs):
+ return self.request('move', path, **kwargs)
-
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
-from . import Client, ClientError
-
+from kamaki.clients import Client, ClientError
+from kamaki.clients.utils import path4url
+import json
-
class ComputeClient(Client):
"""OpenStack Compute API 1.1 client"""
def raise_for_status(self, r):
- d = r.json
- if not d:
- return super(ComputeClient, self).raise_for_status(r)
- key = d.keys()[0]
- val = d[key]
- message = '%s: %s' % (key, val.get('message', ''))
- details = val.get('details', '')
+ try:
+ d = r.json
+ key = d.keys()[0]
+ val = d[key]
+ message = '%s: %s' % (key, val.get('message', ''))
+ details = val.get('details', '')
+ except AttributeError:
+ message = 'Request responded with error code '+unicode(r.status_code)
+ details = unicode(r.request.method)+' '+unicode(r.request.url)
raise ClientError(message, r.status_code, details)
+
+ def servers_get(self, server_id='', command='', **kwargs):
+ """GET base_url/servers[/server_id][/command] request
+ @param server_id or ''
+ @param command: can be 'ips', 'stats', or ''
+ """
+ path = path4url('servers', server_id, command)
+ success = kwargs.pop('success', 200)
+ return self.get(path, success=success, **kwargs)
+
+ def servers_delete(self, server_id='', command='', **kwargs):
+ """DEL ETE base_url/servers[/server_id][/command] request
+ @param server_id or ''
+ @param command: can be 'ips', 'stats', or ''
+ """
+ path = path4url('servers', server_id, command)
+ success = kwargs.pop('success', 204)
+ return self.delete(path, success=success, **kwargs)
+
+ def servers_post(self, server_id='', command='', json_data=None, **kwargs):
+ """POST base_url/servers[/server_id]/[command] request
+ @param server_id or ''
+ @param command: can be 'action' or ''
+ @param json_data: a json valid dict that will be send as data
+ """
+ data = json_data
+ if json_data is not None:
+ data = json.dumps(json_data)
+ self.set_header('Content-Type', 'application/json')
+ self.set_header('Content-Length', len(data))
+
+ path = path4url('servers', server_id, command)
+ success = kwargs.pop('success', 202)
+ return self.post(path, data=data, success=success, **kwargs)
+
+ def servers_put(self, server_id='', command='', json_data=None, **kwargs):
+ """PUT base_url/servers[/server_id]/[command] request
+ @param server_id or ''
+ @param command: can be 'action' or ''
+ @param json_data: a json valid dict that will be send as data
+ """
+ data = json_data
+ if json_data is not None:
+ data = json.dumps(json_data)
+ self.set_header('Content-Type', 'application/json')
+ self.set_header('Content-Length', len(data))
+
+ path = path4url('servers', server_id, command)
+ success = kwargs.pop('success', 204)
+ return self.put(path, data=data, success=success, **kwargs)
def list_servers(self, detail=False):
"""List servers, returned detailed output if detailed is True"""
-
- path = '/servers/detail' if detail else '/servers'
- r = self.get(path, success=200)
+ detail = 'detail' if detail else ''
+ r = self.servers_get(command=detail)
return r.json['servers']['values']
-
- def get_server_details(self, server_id):
+
+ def get_server_details(self, server_id, **kwargs):
"""Return detailed output on a server specified by its id"""
-
- path = '/servers/%s' % (server_id,)
- r = self.get(path, success=200)
+ r = self.servers_get(server_id, **kwargs)
return r.json['server']
-
+
def create_server(self, name, flavor_id, image_id, personality=None):
"""Submit request to create a new server
'imageRef': image_id}}
if personality:
req['server']['personality'] = personality
-
- r = self.post('/servers', json=req, success=202)
+
+ try:
+ r = self.servers_post(json_data=req)
+ except ClientError as err:
+ try:
+ err.message = err.details.split(',')[0].split(':')[2].split('"')[1]
+ finally:
+ raise err
return r.json['server']
-
+
def update_server_name(self, server_id, new_name):
"""Update the name of the server as reported by the API.
This call does not modify the hostname actually used by the server
internally.
"""
- path = '/servers/%s' % (server_id,)
req = {'server': {'name': new_name}}
- self.put(path, json=req, success=204)
-
+ r = self.servers_put(server_id, json_data=req)
+ r.release()
+
def delete_server(self, server_id):
"""Submit a deletion request for a server specified by id"""
-
- path = '/servers/%s' % (server_id,)
- self.delete(path, success=204)
+ r = self.servers_delete(server_id)
+ r.release()
def reboot_server(self, server_id, hard=False):
"""Submit a reboot request for a server specified by id"""
-
- path = '/servers/%s/action' % (server_id,)
type = 'HARD' if hard else 'SOFT'
req = {'reboot': {'type': type}}
- self.post(path, json=req, success=202)
-
- def get_server_metadata(self, server_id, key=None):
- path = '/servers/%s/meta' % (server_id,)
- if key:
- path += '/%s' % key
- r = self.get(path, success=200)
- return r.json['meta'] if key else r.json['metadata']['values']
-
+ r = self.servers_post(server_id, 'action', json_data=req)
+ r.release()
+
+ def get_server_metadata(self, server_id, key=''):
+ command = path4url('meta', key)
+ r = self.servers_get(server_id, command)
+ return r.json['meta'] if key != '' else r.json['metadata']['values']
+
def create_server_metadata(self, server_id, key, val):
- path = '/servers/%d/meta/%s' % (server_id, key)
req = {'meta': {key: val}}
- r = self.put(path, json=req, success=201)
+ r = self.servers_put(server_id, 'meta/'+key, json_data=req, success=201)
return r.json['meta']
-
+
def update_server_metadata(self, server_id, **metadata):
- path = '/servers/%d/meta' % (server_id,)
req = {'metadata': metadata}
- r = self.post(path, json=req, success=201)
+ r = self.servers_post(server_id, 'meta', json_data=req, success=201)
return r.json['metadata']
-
+
def delete_server_metadata(self, server_id, key):
- path = '/servers/%d/meta/%s' % (server_id, key)
- self.delete(path, success=204)
+ r = self.servers_delete(server_id, 'meta/'+key)
+ r.release()
+
+
+ def flavors_get(self, flavor_id='', command='', **kwargs):
+ """GET base_url[/flavor_id][/command]
+ @param flavor_id
+ @param command
+ """
+ path = path4url('flavors', flavor_id, command)
+ success=kwargs.pop('success', 200)
+ return self.get(path, success=success, **kwargs)
def list_flavors(self, detail=False):
- path = '/flavors/detail' if detail else '/flavors'
- r = self.get(path, success=200)
+ detail = 'detail' if detail else ''
+ r = self.flavors_get(command='detail')
return r.json['flavors']['values']
def get_flavor_details(self, flavor_id):
- path = '/flavors/%d' % flavor_id
- r = self.get(path, success=200)
+ r = self.flavors_get(flavor_id)
return r.json['flavor']
+
- def list_images(self, detail=False):
- path = '/images/detail' if detail else '/images'
- r = self.get(path, success=200)
- return r.json['images']['values']
+ def images_get(self, image_id='', command='', **kwargs):
+ """GET base_url[/image_id][/command]
+ @param image_id
+ @param command
+ """
+ path = path4url('images', image_id, command)
+ success=kwargs.pop('success', 200)
+ return self.get(path, success=success, **kwargs)
+
+ def images_delete(self, image_id='', command='', **kwargs):
+ """DEL ETE base_url[/image_id][/command]
+ @param image_id
+ @param command
+ """
+ path = path4url('images', image_id, command)
+ success=kwargs.pop('success', 204)
+ return self.delete(path, success=success, **kwargs)
+
+ def images_post(self, image_id='', command='', json_data=None, **kwargs):
+ """POST base_url/images[/image_id]/[command] request
+ @param image_id or ''
+ @param command: can be 'action' or ''
+ @param json_data: a json valid dict that will be send as data
+ """
+ data = json_data
+ if json_data is not None:
+ data = json.dumps(json_data)
+ self.set_header('Content-Type', 'application/json')
+ self.set_header('Content-Length', len(data))
+
+ path = path4url('images', image_id, command)
+ success = kwargs.pop('success', 201)
+ return self.post(path, data=data, success=success, **kwargs)
+
+ def images_put(self, image_id='', command='', json_data=None, **kwargs):
+ """PUT base_url/images[/image_id]/[command] request
+ @param image_id or ''
+ @param command: can be 'action' or ''
+ @param json_data: a json valid dict that will be send as data
+ """
+ data = json_data
+ if json_data is not None:
+ data = json.dumps(json_data)
+ self.set_header('Content-Type', 'application/json')
+ self.set_header('Content-Length', len(data))
- def get_image_details(self, image_id):
- path = '/images/%s' % (image_id,)
- r = self.get(path, success=200)
- return r.json['image']
+ path = path4url('images', image_id, command)
+ success = kwargs.pop('success', 201)
+ return self.put(path, data=data, success=success, **kwargs)
+ def list_images(self, detail=False):
+ detail = 'detail' if detail else ''
+ r = self.images_get(command=detail)
+ return r.json['images']['values']
+
+ def get_image_details(self, image_id, **kwargs):
+ r = self.images_get(image_id, **kwargs)
+ try:
+ return r.json['image']
+ except KeyError:
+ raise ClientError('Image not available', 404,
+ details='Image %d not found or not accessible')
+
def delete_image(self, image_id):
- path = '/images/%s' % (image_id,)
- self.delete(path, success=204)
-
- def get_image_metadata(self, image_id, key=None):
- path = '/images/%s/meta' % (image_id,)
- if key:
- path += '/%s' % key
- r = self.get(path, success=200)
- return r.json['meta'] if key else r.json['metadata']['values']
-
+ r = self.images_delete(image_id)
+ r.release()
+
+ def get_image_metadata(self, image_id, key=''):
+ command = path4url('meta', key)
+ r = self.images_get(image_id, command)
+ return r.json['meta'] if key != '' else r.json['metadata']['values']
+
def create_image_metadata(self, image_id, key, val):
- path = '/images/%s/meta/%s' % (image_id, key)
req = {'meta': {key: val}}
- r = self.put(path, json=req, success=201)
+ r = self.images_put(image_id, 'meta/'+key, json_data=req)
return r.json['meta']
def update_image_metadata(self, image_id, **metadata):
- path = '/images/%s/meta' % (image_id,)
req = {'metadata': metadata}
- r = self.post(path, json=req, success=201)
+ r - self.images_post(image_id, 'meta', json_data=req)
return r.json['metadata']
def delete_image_metadata(self, image_id, key):
- path = '/images/%s/meta/%s' % (image_id, key)
- self.delete(path, success=204)
+ command = path4url('meta', key)
+ r = self.images_delete(image_id, command)
+ r.release()
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
-
-
-from . import Client, ClientError
-
+from kamaki.clients import Client, ClientError
+from kamaki.clients.utils import path4url
class ImageClient(Client):
"""OpenStack Image Service API 1.0 and GRNET Plankton client"""
+ def __init__(self, base_url, token):
+ super(ImageClient, self).__init__(base_url, token)
+
def raise_for_status(self, r):
if r.status_code == 404:
raise ClientError("Image not found", r.status_code)
super(ImageClient, self).raise_for_status(r)
def list_public(self, detail=False, filters={}, order=''):
- path = '/images/detail' if detail else '/images/'
- params = {}
- params.update(filters)
+ path = path4url('images','detail') if detail else path4url('images/')
+ if isinstance(filters, dict):
+ self.http_client.params.update(filters)
if order.startswith('-'):
- params['sort_dir'] = 'desc'
+ self.set_param('sort_dir', 'desc')
order = order[1:]
else:
- params['sort_dir'] = 'asc'
+ self.set_param('sort_dir', 'asc')
+ self.set_param('sort_key', order, iff=order)
- if order:
- params['sort_key'] = order
-
- r = self.get(path, params=params, success=200)
+ r = self.get(path, success=200)
return r.json
def get_meta(self, image_id):
- path = '/images/%s' % (image_id,)
+ path=path4url('images', image_id)
r = self.head(path, success=200)
reply = {}
return reply
def register(self, name, location, params={}, properties={}):
- path = '/images/'
- headers = {}
- headers['x-image-meta-name'] = name
- headers['x-image-meta-location'] = location
+ path = path4url('images/')
+ self.set_header('X-Image-Meta-Name', name)
+ self.set_header('X-Image-Meta-Location', location)
for key, val in params.items():
if key in ('id', 'store', 'disk_format', 'container_format',
'size', 'checksum', 'is_public', 'owner'):
key = 'x-image-meta-' + key.replace('_', '-')
- headers[key] = val
+ self.set_header(key, val)
for key, val in properties.items():
- headers['x-image-meta-property-' + key] = val
-
- self.post(path, headers=headers, success=200)
+ self.set_header('X-Image-Meta-Property-'+key, val)
+
+ try:
+ r = self.post(path, success=200)
+ except ClientError as err:
+ try:
+ prefix, suffix = err.details.split('File not found')
+ details = '%s Location %s not found %s'%(prefix, location, suffix)
+ raise ClientError(err.message, err.status, details)
+ except ValueError:
+ pass
+ raise
+ r.release()
def list_members(self, image_id):
- path = '/images/%s/members' % (image_id,)
+ path = path4url('images',image_id,'members')
r = self.get(path, success=200)
return r.json['members']
def list_shared(self, member):
- path = '/shared-images/%s' % (member,)
+ path = path4url('shared-images', member)
+ #self.set_param('format', 'json')
r = self.get(path, success=200)
return r.json['shared_images']
def add_member(self, image_id, member):
- path = '/images/%s/members/%s' % (image_id, member)
- self.put(path, success=204)
+ path = path4url('images', image_id, 'members', member)
+ r = self.put(path, success=204)
+ r.release()
def remove_member(self, image_id, member):
- path = '/images/%s/members/%s' % (image_id, member)
- self.delete(path, success=204)
+ path = path4url('images', image_id, 'members', member)
+ r = self.delete(path, success=204)
+ r.release()
def set_members(self, image_id, members):
- path = '/images/%s/members' % image_id
+ path = path4url('images', image_id, 'members')
req = {'memberships': [{'member_id': member} for member in members]}
- self.put(path, json=req, success=204)
+ r = self.put(path, json=req, success=204)
+ r.release()
++
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
-import hashlib
-import os
+import gevent
+#import gevent.monkey
+# Monkey-patch everything for gevent early on
+#gevent.monkey.patch_all()
+import gevent.pool
-from time import time
+from os import fstat, path
+from hashlib import new as newhashlib
+from time import time, sleep
+from datetime import datetime
+import sys
-from .storage import StorageClient
+from binascii import hexlify
+from kamaki.clients.pithos_rest_api import PithosRestAPI
+from kamaki.clients.storage import ClientError
+from kamaki.clients.utils import path4url, filter_in
+from StringIO import StringIO
def pithos_hash(block, blockhash):
- h = hashlib.new(blockhash)
+ h = newhashlib(blockhash)
h.update(block.rstrip('\x00'))
return h.hexdigest()
+def _range_up(start, end, a_range):
+ if a_range:
+ (rstart, rend) = a_range.split('-')
+ (rstart, rend) = (int(rstart), int(rend))
+ if rstart > end or rend < start:
+ return (0,0)
+ if rstart > start:
+ start = rstart
+ if rend < end:
+ end = rend
+ return (start, end)
-class PithosClient(StorageClient):
+class PithosClient(PithosRestAPI):
"""GRNet Pithos API client"""
- def purge_container(self, container):
- self.assert_account()
-
- path = '/%s/%s' % (self.account, container)
- params = {'until': int(time())}
- self.delete(path, params=params, success=204)
+ def __init__(self, base_url, token, account=None, container = None):
+ super(PithosClient, self).__init__(base_url, token, account = account,
+ container = container)
+ self.async_pool = None
- def put_block(self, data, hash):
- path = '/%s/%s' % (self.account, self.container)
- params = {'update': ''}
- headers = {'Content-Type': 'application/octet-stream',
- 'Content-Length': str(len(data))}
- r = self.post(path, params=params, data=data, headers=headers,
- success=202)
- assert r.text.strip() == hash, 'Local hash does not match server'
-
- def create_object(self, object, f, size=None, hash_cb=None,
- upload_cb=None):
- """Create an object by uploading only the missing blocks
-
- hash_cb is a generator function taking the total number of blocks to
- be hashed as an argument. Its next() will be called every time a block
- is hashed.
-
- upload_cb is a generator function with the same properties that is
- called every time a block is uploaded.
- """
+ def purge_container(self):
+ r = self.container_delete(until=unicode(time()))
+ r.release()
+
+ def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
+ content_encoding=None, content_disposition=None, content_type=None, sharing=None,
+ public=None):
+ # This is a naive implementation, it loads the whole file in memory
+ #Look in pithos for a nice implementation
self.assert_container()
- meta = self.get_container_meta(self.container)
- blocksize = int(meta['block-size'])
- blockhash = meta['block-hash']
+ if withHashFile:
+ data = f.read()
+ try:
+ import json
+ data = json.dumps(json.loads(data))
+ except ValueError:
+ raise ClientError(message='"%s" is not json-formated'%f.name, status=1)
+ except SyntaxError:
+ raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1)
+ f = StringIO(data)
+ data = f.read(size) if size is not None else f.read()
+ r = self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
+ content_disposition=content_disposition, content_type=content_type, permitions=sharing,
+ public=public, success=201)
+ r.release()
+
+ #upload_* auxiliary methods
+ def put_block_async(self, data, hash):
+ class SilentGreenlet(gevent.Greenlet):
+ def _report_error(self, exc_info):
+ try:
+ sys.stderr = StringIO()
+ gevent.Greenlet._report_error(self, exc_info)
+ finally:
+ if hasattr(sys, '_stderr'):
+ sys.stderr = _stderr
+ POOL_SIZE = self.POOL_SIZE if hasattr(self, 'POOL_SIZE') else 5
+ if self.async_pool is None:
+ self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
+ g = SilentGreenlet(self.put_block, data, hash)
+ self.async_pool.start(g)
+ return g
- size = size if size is not None else os.fstat(f.fileno()).st_size
+ def put_block(self, data, hash):
+ r = self.container_post(update=True, content_type='application/octet-stream',
+ content_length=len(data), data=data, format='json')
+ assert r.json[0] == hash, 'Local hash does not match server'
+
+ def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
+ content_disposition=None, content_type=None, sharing=None, public=None):
+ self.assert_container()
+ obj_content_type = 'application/octet-stream' if content_type is None else content_type
+ r = self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
+ content_disposition=content_disposition, content_type=content_type, permitions=sharing,
+ public=public, manifest='%s/%s'%(self.container,obj))
+ r.release()
+
+ def _get_file_block_info(self, fileobj, size=None):
+ meta = self.get_container_info()
+ blocksize = int(meta['x-container-block-size'])
+ blockhash = meta['x-container-block-hash']
+ size = size if size is not None else fstat(fileobj.fileno()).st_size
nblocks = 1 + (size - 1) // blocksize
- hashes = []
- map = {}
+ return (blocksize, blockhash, size, nblocks)
- offset = 0
+ def _get_missing_hashes(self, obj, json, size=None, format='json', hashmap=True,
+ content_type=None, etag=None, content_encoding=None, content_disposition=None,
+ permitions=None, public=None, success=(201, 409)):
+ r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
+ json=json, etag=etag, content_encoding=content_encoding,
+ content_disposition=content_disposition, permitions=permitions, public=public,
+ success=success)
+ if r.status_code == 201:
+ r.release()
+ return None
+ return r.json
+ def _caclulate_uploaded_blocks(self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
+ hash_cb=None):
+ offset=0
if hash_cb:
hash_gen = hash_cb(nblocks)
hash_gen.next()
for i in range(nblocks):
- block = f.read(min(blocksize, size - offset))
+ block = fileobj.read(min(blocksize, size - offset))
bytes = len(block)
hash = pithos_hash(block, blockhash)
hashes.append(hash)
- map[hash] = (offset, bytes)
+ hmap[hash] = (offset, bytes)
offset += bytes
if hash_cb:
hash_gen.next()
-
assert offset == size
- path = '/%s/%s/%s' % (self.account, self.container, object)
- params = dict(format='json', hashmap='')
+ def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
+ """upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
+ """
+ if upload_cb:
+ upload_gen = upload_cb(len(missing))
+ upload_gen.next()
+
+ flying = []
+ for hash in missing:
+ offset, bytes = hmap[hash]
+ fileobj.seek(offset)
+ data = fileobj.read(bytes)
+ r = self.put_block_async(data, hash)
+ flying.append(r)
+ for r in flying:
+ if r.ready():
+ if r.exception:
+ raise r.exception
+ if upload_cb:
+ upload_gen.next()
+ flying = [r for r in flying if not r.ready()]
+ while upload_cb:
+ try:
+ upload_gen.next()
+ except StopIteration:
+ break
+ gevent.joinall(flying)
+
+ failures = [r for r in flying if r.exception]
+ if len(failures):
+ details = ', '.join(['(%s).%s'%(i,r.exception) for i,r in enumerate(failures)])
+ raise ClientError(message="Block uploading failed", status=505, details=details)
+
+ def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None,
+ content_encoding=None, content_disposition=None, content_type=None, sharing=None,
+ public=None):
+ self.assert_container()
+
+ #init
+ block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size)
+ (hashes, hmap, offset) = ([], {}, 0)
+ content_type = 'application/octet-stream' if content_type is None else content_type
+
+ self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f,
+ hash_cb=hash_cb)
+
hashmap = dict(bytes=size, hashes=hashes)
- headers = {'Content-Type': 'application/octet-stream'}
- r = self.put(path, params=params, headers=headers, json=hashmap,
- success=(201, 409))
+ missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size,
+ etag=etag, content_encoding=content_encoding, content_disposition=content_disposition,
+ permitions=sharing, public=public)
- if r.status_code == 201:
+ if missing is None:
return
+ self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
- missing = r.json
+ r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
+ json=hashmap, success=201)
+ r.release()
+
+ #download_* auxiliary methods
+ #ALl untested
+ def _get_remote_blocks_info(self, obj, **restargs):
+ #retrieve object hashmap
+ myrange = restargs.pop('data_range') if 'data_range' in restargs.keys() else None
+ hashmap = self.get_object_hashmap(obj, **restargs)
+ restargs['data_range'] = myrange
+ blocksize = int(hashmap['block_size'])
+ blockhash = hashmap['block_hash']
+ total_size = hashmap['bytes']
+ #assert total_size/blocksize + 1 == len(hashmap['hashes'])
+ map_dict = {}
+ for i, h in enumerate(hashmap['hashes']):
+ map_dict[h] = i
+ return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
- if upload_cb:
- upload_gen = upload_cb(len(missing))
- upload_gen.next()
+ def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, range, **restargs):
+ for blockid, blockhash in enumerate(remote_hashes):
+ if blockhash == None:
+ continue
+ start = blocksize*blockid
+ end = total_size-1 if start+blocksize > total_size else start+blocksize-1
+ (start, end) = _range_up(start, end, range)
+ restargs['data_range'] = 'bytes=%s-%s'%(start, end)
+ r = self.object_get(obj, success=(200, 206), **restargs)
+ self._cb_next()
+ dst.write(r.content)
+ dst.flush()
- for hash in missing:
- offset, bytes = map[hash]
- f.seek(offset)
- data = f.read(bytes)
- self.put_block(data, hash)
- if upload_cb:
+ def _get_block_async(self, obj, **restargs):
+ class SilentGreenlet(gevent.Greenlet):
+ def _report_error(self, exc_info):
+ try:
+ sys.stderr = StringIO()
+ gevent.Greenlet._report_error(self, exc_info)
+ finally:
+ if hasattr(sys, '_stderr'):
+ sys.stderr = sys._stderr
+ if not hasattr(self, 'POOL_SIZE'):
+ self.POOL_SIZE = 5
+ if self.async_pool is None:
+ self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
+ g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
+ self.async_pool.start(g)
+ return g
+
+ def _hash_from_file(self, fp, start, size, blockhash):
+ fp.seek(start)
+ block = fp.read(size)
+ h = newhashlib(blockhash)
+ h.update(block.strip('\x00'))
+ return hexlify(h.digest())
+
+ def _greenlet2file(self, flying_greenlets, local_file, offset = 0, **restargs):
+ """write the results of a greenleted rest call to a file
+ @offset: the offset of the file up to blocksize - e.g. if the range is 10-100, all
+ blocks will be written to normal_position - 10"""
+ finished = []
+ for start, g in flying_greenlets.items():
+ if g.ready():
+ if g.exception:
+ raise g.exception
+ block = g.value.content
+ local_file.seek(start - offset)
+ local_file.write(block)
+ self._cb_next()
+ finished.append(flying_greenlets.pop(start))
+ local_file.flush()
+ return finished
+
+ def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file,
+ blockhash=None, resume=False, filerange = None, **restargs):
+
+ file_size = fstat(local_file.fileno()).st_size if resume else 0
+ flying_greenlets = {}
+ finished_greenlets = []
+ offset = 0
+ if filerange is not None:
+ rstart = int(filerange.split('-')[0])
+ offset = rstart if blocksize > rstart else rstart%blocksize
+ for block_hash, blockid in remote_hashes.items():
+ start = blocksize*blockid
+ if start < file_size and block_hash == self._hash_from_file(local_file,
+ start, blocksize, blockhash):
+ self._cb_next()
+ continue
+ if len(flying_greenlets) >= self.POOL_SIZE:
+ finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
+ **restargs)
+ end = total_size-1 if start+blocksize > total_size else start+blocksize-1
+ (start, end) = _range_up(start, end, filerange)
+ if start == end:
+ self._cb_next()
+ continue
+ restargs['async_headers'] = dict(Range='bytes=%s-%s'%(start, end))
+ flying_greenlets[start] = self._get_block_async(obj, **restargs)
+
+ #check the greenlets
+ while len(flying_greenlets) > 0:
+ sleep(0.001)
+ finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
+ **restargs)
+
+ gevent.joinall(finished_greenlets)
+
+ def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
+ range=None, if_match=None, if_none_match=None, if_modified_since=None,
+ if_unmodified_since=None):
+
+ restargs=dict(version=version,
+ data_range = None if range is None else 'bytes=%s'%range,
+ if_match=if_match,
+ if_none_match=if_none_match,
+ if_modified_since=if_modified_since,
+ if_unmodified_since=if_unmodified_since)
+
+ ( blocksize,
+ blockhash,
+ total_size,
+ hash_list,
+ remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
+ assert total_size >= 0
+ self.POOL_SIZE = 5
+
+ if download_cb:
+ self.progress_bar_gen = download_cb(len(remote_hashes))
+ self._cb_next()
+
+ if dst.isatty():
+ self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, range, **restargs)
+ else:
+ self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, blockhash,
+ resume, range, **restargs)
+ if range is None:
+ dst.truncate(total_size)
+
+ self._complete_cb()
+
+ #Command Progress Bar method
+ def _cb_next(self):
+ if hasattr(self, 'progress_bar_gen'):
+ try:
+ self.progress_bar_gen.next()
+ except:
+ pass
+ def _complete_cb(self):
+ while True:
+ try:
+ self.progress_bar_gen.next()
+ except:
+ break
+
+ #Untested - except is download_object is tested first
+ def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
+ if_modified_since=None, if_unmodified_since=None, data_range=None):
+ try:
+ r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
+ if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
+ if_unmodified_since=if_unmodified_since, data_range=data_range)
+ except ClientError as err:
+ if err.status == 304 or err.status == 412:
+ return {}
+ raise
+ return r.json
+
+ def set_account_group(self, group, usernames):
+ r = self.account_post(update=True, groups = {group:usernames})
+ r.release()
+
+ def del_account_group(self, group):
+ r = self.account_post(update=True, groups={group:[]})
+ r.release()
+
+ def get_account_info(self, until=None):
+ r = self.account_head(until=until)
+ if r.status_code == 401:
+ raise ClientError("No authorization")
+ return r.headers
+
+ def get_account_quota(self):
+ return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
+
+ def get_account_versioning(self):
+ return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
+
+ def get_account_meta(self, until=None):
+ return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
+
+ def get_account_group(self):
+ return filter_in(self.get_account_info(), 'X-Account-Group-')
+
+ def set_account_meta(self, metapairs):
+ assert(type(metapairs) is dict)
+ r = self.account_post(update=True, metadata=metapairs)
+ r.release()
+
+ def del_account_meta(self, metakey):
+ r = self.account_post(update=True, metadata={metakey:''})
+ r.release()
+
+ def set_account_quota(self, quota):
+ r = self.account_post(update=True, quota=quota)
+ r.release()
+
+ def set_account_versioning(self, versioning):
+ r = self.account_post(update=True, versioning = versioning)
+ r.release()
+
+ def list_containers(self):
+ r = self.account_get()
+ return r.json
+
+ def del_container(self, until=None, delimiter=None):
+ self.assert_container()
+ r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
+ r.release()
+ if r.status_code == 404:
+ raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
+ elif r.status_code == 409:
+ raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
+
+ def get_container_versioning(self, container):
+ self.container = container
+ return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
+
+ def get_container_quota(self, container):
+ self.container = container
+ return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
+
+ def get_container_info(self, until = None):
+ r = self.container_head(until=until)
+ return r.headers
+
+ def get_container_meta(self, until = None):
+ return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
+
+ def get_container_object_meta(self, until = None):
+ return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
+
+ def set_container_meta(self, metapairs):
+ assert(type(metapairs) is dict)
+ r = self.container_post(update=True, metadata=metapairs)
+ r.release()
+
+ def del_container_meta(self, metakey):
+ r = self.container_post(update=True, metadata={metakey:''})
+ r.release()
+
+ def set_container_quota(self, quota):
+ r = self.container_post(update=True, quota=quota)
+ r.release()
+
+ def set_container_versioning(self, versioning):
+ r = self.container_post(update=True, versioning=versioning)
+ r.release()
+
+ def del_object(self, obj, until=None, delimiter=None):
+ self.assert_container()
+ r = self.object_delete(obj, until=until, delimiter=delimiter)
+ r.release()
+
+ def set_object_meta(self, object, metapairs):
+ assert(type(metapairs) is dict)
+ r = self.object_post(object, update=True, metadata=metapairs)
+ r.release()
+
+ def del_object_meta(self, metakey, object):
+ r = self.object_post(object, update=True, metadata={metakey:''})
+ r.release()
+
+ def publish_object(self, object):
+ r = self.object_post(object, update=True, public=True)
+ r.release()
+
+ def unpublish_object(self, object):
+ r = self.object_post(object, update=True, public=False)
+ r.release()
+
+ def get_object_info(self, obj, version=None):
+ r = self.object_head(obj, version=version)
+ return r.headers
+
+ def get_object_meta(self, obj, version=None):
+ return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
+
+ def get_object_sharing(self, object):
+ r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
+ reply = {}
+ if len(r) > 0:
+ perms = r['x-object-sharing'].split(';')
+ for perm in perms:
+ try:
+ perm.index('=')
+ except ValueError:
+ raise ClientError('Incorrect reply format')
+ (key, val) = perm.strip().split('=')
+ reply[key] = val
+ return reply
+
+ def set_object_sharing(self, object, read_permition = False, write_permition = False):
+ """Give read/write permisions to an object.
+ @param object is the object to change sharing permitions onto
+ @param read_permition is a list of users and user groups that get read permition for this object
+ False means all previous read permitions will be removed
+ @param write_perimition is a list of users and user groups to get write permition for this object
+ False means all previous read permitions will be removed
+ """
+ perms = {}
+ perms['read'] = read_permition if isinstance(read_permition, list) else ''
+ perms['write'] = write_permition if isinstance(write_permition, list) else ''
+ r = self.object_post(object, update=True, permitions=perms)
+ r.release()
+
+ def del_object_sharing(self, object):
+ self.set_object_sharing(object)
+
+ def append_object(self, object, source_file, upload_cb = None):
+ """@param upload_db is a generator for showing progress of upload
+ to caller application, e.g. a progress bar. Its next is called
+ whenever a block is uploaded
+ """
+ self.assert_container()
+ meta = self.get_container_info()
+ blocksize = int(meta['x-container-block-size'])
+ filesize = fstat(source_file.fileno()).st_size
+ nblocks = 1 + (filesize - 1)//blocksize
+ offset = 0
+ if upload_cb is not None:
+ upload_gen = upload_cb(nblocks)
+ for i in range(nblocks):
+ block = source_file.read(min(blocksize, filesize - offset))
+ offset += len(block)
+ r = self.object_post(object, update=True, content_range='bytes */*',
+ content_type='application/octet-stream', content_length=len(block), data=block)
+ r.release()
+
+ if upload_cb is not None:
+ upload_gen.next()
+
+ def truncate_object(self, object, upto_bytes):
+ r = self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
+ content_type='application/octet-stream', object_bytes=upto_bytes,
+ source_object=path4url(self.container, object))
+ r.release()
+
+ def overwrite_object(self, object, start, end, source_file, upload_cb=None):
+ """Overwrite a part of an object with given source file
+ @start the part of the remote object to start overwriting from, in bytes
+ @end the part of the remote object to stop overwriting to, in bytes
+ """
+ self.assert_container()
+ meta = self.get_container_info()
+ blocksize = int(meta['x-container-block-size'])
+ filesize = fstat(source_file.fileno()).st_size
+ datasize = int(end) - int(start) + 1
+ nblocks = 1 + (datasize - 1)//blocksize
+ offset = 0
+ if upload_cb is not None:
+ upload_gen = upload_cb(nblocks)
+ for i in range(nblocks):
+ block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
+ offset += len(block)
+ r = self.object_post(object, update=True, content_type='application/octet-stream',
+ content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
+ r.release()
+
+ if upload_cb is not None:
upload_gen.next()
- self.put(path, params=params, headers=headers, json=hashmap,
- success=201)
+ def copy_object(self, src_container, src_object, dst_container, dst_object=False,
+ source_version = None, public=False, content_type=None, delimiter=None):
+ self.assert_account()
+ self.container = dst_container
+ dst_object = dst_object or src_object
+ src_path = path4url(src_container, src_object)
+ r = self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
+ source_version=source_version, public=public, content_type=content_type,
+ delimiter=delimiter)
+ r.release()
+
+ def move_object(self, src_container, src_object, dst_container, dst_object=False,
+ source_version = None, public=False, content_type=None, delimiter=None):
+ self.assert_account()
+ self.container = dst_container
+ dst_object = dst_object or src_object
+ src_path = path4url(src_container, src_object)
+ r = self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
+ source_version=source_version, public=public, content_type=content_type,
+ delimiter=delimiter)
+ r.release()
+
+ def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
+ """Get accounts that share with self.account"""
+ self.assert_account()
+
+ self.set_param('format','json')
+ self.set_param('limit',limit, iff = limit is not None)
+ self.set_param('marker',marker, iff = marker is not None)
+
+ path = ''
+ success = kwargs.pop('success', (200, 204))
+ r = self.get(path, *args, success = success, **kwargs)
+ return r.json
+
+ def get_object_versionlist(self, path):
+ self.assert_container()
+ r = self.object_get(path, format='json', version='list')
+ return r.json['versions']
++
-# Copyright 2011 GRNET S.A. All rights reserved.
+#a Copyright 2011 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
-from . import Client, ClientError
-
+from kamaki.clients import Client, ClientError
+from kamaki.clients.utils import filter_in, filter_out, prefix_keys, path4url
+#from .connection.kamakicon import KamakiHTTPConnection
class StorageClient(Client):
"""OpenStack Object Storage API 1.0 client"""
def __init__(self, base_url, token, account=None, container=None):
super(StorageClient, self).__init__(base_url, token)
+ #super(StorageClient, self).__init__(base_url, token, http_client=KamakiHTTPConnection())
self.account = account
self.container = container
def assert_account(self):
if not self.account:
- raise ClientError("Please provide an account")
+ raise ClientError("No account provided")
def assert_container(self):
self.assert_account()
if not self.container:
- raise ClientError("Please provide a container")
+ raise ClientError("No container provided")
+
+ def get_account_info(self):
+ self.assert_account()
+ path = path4url(self.account)
+ r = self.head(path, success=(204, 401))
+ if r.status_code == 401:
+ raise ClientError("No authorization")
+ reply = r.headers
+ return reply
+
+ def replace_account_meta(self, metapairs):
+ self.assert_account()
+ path = path4url(self.account)
+ for key, val in metapairs:
+ self.set_header('X-Account-Meta-'+key, val)
+ r = self.post(path, success=202)
+
+ def del_account_meta(self, metakey):
+ headers = self.get_account_info()
+ self.headers = filter_out(headers, 'X-Account-Meta-'+metakey, exactMatch = True)
+ if len(self.headers) == len(headers):
+ raise ClientError('X-Account-Meta-%s not found' % metakey, 404)
+ path = path4url(self.account)
+ r = self.post(path, success = 202)
def create_container(self, container):
self.assert_account()
- path = '/%s/%s' % (self.account, container)
+ path = path4url(self.account, container)
r = self.put(path, success=(201, 202))
if r.status_code == 202:
raise ClientError("Container already exists", r.status_code)
- def get_container_meta(self, container):
+ def get_container_info(self, container):
self.assert_account()
- path = '/%s/%s' % (self.account, container)
+ path = path4url(self.account, container)
r = self.head(path, success=(204, 404))
if r.status_code == 404:
raise ClientError("Container does not exist", r.status_code)
+ reply = r.headers
+ return reply
- reply = {}
- prefix = 'x-container-'
- for key, val in r.headers.items():
- key = key.lower()
- if key.startswith(prefix):
- reply[key[len(prefix):]] = val
+ def delete_container(self, container):
+ self.assert_account()
+ path = path4url(self.account, container)
+ r = self.delete(path, success=(204, 404, 409))
+ if r.status_code == 404:
+ raise ClientError("Container does not exist", r.status_code)
+ elif r.status_code == 409:
+ raise ClientError("Container is not empty", r.status_code)
+ def list_containers(self):
+ self.assert_account()
+ self.set_param('format', 'json')
+ path = path4url(self.account)
+ r = self.get(path, success = (200, 204))
+ reply = r.json
return reply
- def create_object(self, object, f, size=None, hash_cb=None,
- upload_cb=None):
+ def upload_object(self, object, f, size=None):
# This is a naive implementation, it loads the whole file in memory
+ #Look in pithos for a nice implementation
self.assert_container()
- path = '/%s/%s/%s' % (self.account, self.container, object)
+ path = path4url(self.account, self.container, object)
data = f.read(size) if size is not None else f.read()
- self.put(path, data=data, success=201)
+ r = self.put(path, data=data, success=201)
+
+ def create_directory(self, object):
+ self.assert_container()
+ path = path4url(self.account, self.container, object)
+ self.set_header('Content-Type', 'application/directory')
+ self.set_header('Content-length', '0')
+ r = self.put(path, success=201)
+
+ def get_object_info(self, object):
+ self.assert_container()
+ path = path4url(self.account, self.container, object)
+ r = self.head(path, success=200)
+ reply = r.headers
+ return reply
+
+ def get_object_meta(self, object):
+ r = filter_in(self.get_object_info(object), 'X-Object-Meta-')
+ reply = {}
+ for (key, val) in r.items():
+ metakey = key.split('-')[-1]
+ reply[metakey] = val
+ return reply
+
+ def del_object_meta(self, metakey, object):
+ self.assert_container()
+ self.set_header('X-Object-Meta-'+metakey, '')
+ path = path4url(self.account, self.container, object)
+ r = self.post(path, success = 202)
+
+ def replace_object_meta(self, metapairs):
+ self.assert_container()
+ path=path4url(self.account, self.container)
+ for key, val in metapairs:
+ self.set_header('X-Object-Meta-'+key, val)
+ r = self.post(path, success=202)
def get_object(self, object):
self.assert_container()
- path = '/%s/%s/%s' % (self.account, self.container, object)
- r = self.get(path, raw=True, success=200)
+ path = path4url(self.account, self.container, object)
+ r = self.get(path, success=200)
size = int(r.headers['content-length'])
- return r.raw, size
+ cnt = r.content
+ return cnt, size
+
+ def copy_object(self, src_container, src_object, dst_container, dst_object=False):
+ self.assert_account()
+ dst_object = dst_object or src_object
+ dst_path = path4url(self.account, dst_container, dst_object)
+ self.set_header('X-Copy-From', path4url(src_container, src_object))
+ self.set_header('Content-Length', 0)
+ r = self.put(dst_path, success=201)
+
+ def move_object(self, src_container, src_object, dst_container, dst_object=False):
+ self.assert_account()
+ dst_object = dst_object or src_object
+ dst_path = path4url(self.account, dst_container, dst_object)
+ self.set_header('X-Move-From', path4url(src_container, src_object))
+ self.set_header('Content-Length', 0)
+ r = self.put(dst_path, success=201)
def delete_object(self, object):
self.assert_container()
- path = '/%s/%s/%s' % (self.account, self.container, object)
- self.delete(path, success=204)
+ path = path4url(self.account, self.container, object)
+ r = self.delete(path, success=(204, 404))
+ if r.status_code == 404:
+ raise ClientError("Object %s not found" %object, r.status_code)
+
+ def list_objects(self):
+ self.assert_container()
+ path = path4url(self.account, self.container)
+ self.set_param('format', 'json')
+ r = self.get(path, success=(200, 204, 304, 404), )
+ if r.status_code == 404:
+ raise ClientError("Incorrect account (%s) for that container"%self.account, r.status_code)
+ elif r.status_code == 304:
+ return []
+ reply = r.json
+ return reply
- def list_objects(self, path=''):
+ def list_objects_in_path(self, path_prefix):
self.assert_container()
- path = '/%s/%s' % (self.account, self.container)
- params = dict(format='json')
- r = self.get(path, params=params, success=(200, 204))
- return r.json
+ path = path4url(self.account, self.container)
+ self.set_param('format', 'json')
+ self.set_param('path', 'path_prefix')
+ r = self.get(path, success=(200, 204, 404))
+ if r.status_code == 404:
+ raise ClientError("Incorrect account (%s) for that container"%self.account, r.status_code)
+ reply = r.json
+ return reply
+
++
# or implied, of GRNET S.A.
from setuptools import setup
++<<<<<<< HEAD
+#from sys import version_info
+
+import kamaki
+
+optional = ['ansicolors==1.0.2', 'progress==1.0.1']
+required = ['gevent==0.13.6', 'snf-common>=0.10', 'argparse']
++=======
+ from sys import version_info
+
+ import kamaki
+
+
+ required = ['ansicolors==1.0.2', 'progress==1.0.1', 'requests==0.12.1']
+
+ if version_info[0:2] < (2, 7):
+ required.extend(['argparse', 'ordereddict'])
++>>>>>>> a8f25c942b7d547f1622c92291ff9037719f454d
setup(
name='kamaki',
version=kamaki.__version__,
- description='A command-line tool for managing clouds',
+ description='A command-line tool for poking clouds',
long_description=open('README.rst').read(),
url='http://code.grnet.gr/projects/kamaki',
license='BSD',
- packages=['kamaki', 'kamaki.clients'],
+ packages=['kamaki', 'kamaki.cli', 'kamaki.clients', 'kamaki.clients.connection', 'kamaki.cli.commands'],
include_package_data=True,
entry_points={
'console_scripts': ['kamaki = kamaki.cli:main']