Merge branch 'master' of https://code.grnet.gr/git/kamaki
authorStavros Sachtouris <saxtouri@admin.grnet.gr>
Wed, 31 Oct 2012 09:50:40 +0000 (11:50 +0200)
committerStavros Sachtouris <saxtouri@admin.grnet.gr>
Wed, 31 Oct 2012 09:50:40 +0000 (11:50 +0200)
Conflicts:
kamaki/__init__.py
kamaki/cli.py
kamaki/clients/__init__.py
kamaki/clients/astakos.py
kamaki/clients/compute.py
kamaki/clients/cyclades.py
kamaki/clients/image.py
kamaki/clients/pithos.py
kamaki/clients/storage.py
kamaki/config.py
kamaki/utils.py
setup.py

1  2 
kamaki/clients/__init__.py
kamaki/clients/compute.py
kamaki/clients/image.py
kamaki/clients/pithos.py
kamaki/clients/storage.py
setup.py

@@@ -158,9 -124,10 +158,8 @@@ class Client(object)
      def put(self, path, **kwargs):
          return self.request('put', path, **kwargs)
  
 +    def copy(self, path, **kwargs):
 +        return self.request('copy', path, **kwargs)
  
 -from .compute import ComputeClient as compute
 -from .image import ImageClient as image
 -from .storage import StorageClient as storage
 -from .cyclades import CycladesClient as cyclades
 -from .pithos import PithosClient as pithos
 -from .astakos import AstakosClient as astakos
 +    def move(self, path, **kwargs):
 +        return self.request('move', path, **kwargs)
  # interpreted as representing official policies, either expressed
  # or implied, of GRNET S.A.
  
 -from . import Client, ClientError
 -
 +from kamaki.clients import Client, ClientError
 +from kamaki.clients.utils import path4url
 +import json
  
  class ComputeClient(Client):
      """OpenStack Compute API 1.1 client"""
  
@@@ -121,17 -112,14 +121,18 @@@ class ImageClient(Client)
          return r.json['shared_images']
  
      def add_member(self, image_id, member):
 -        path = '/images/%s/members/%s' % (image_id, member)
 -        self.put(path, success=204)
 +        path = path4url('images', image_id, 'members', member)
 +        r = self.put(path, success=204)
 +        r.release()
  
      def remove_member(self, image_id, member):
 -        path = '/images/%s/members/%s' % (image_id, member)
 -        self.delete(path, success=204)
 +        path = path4url('images', image_id, 'members', member)
 +        r = self.delete(path, success=204)
 +        r.release()
  
      def set_members(self, image_id, members):
 -        path = '/images/%s/members' % image_id
 +        path = path4url('images', image_id, 'members')
          req = {'memberships': [{'member_id': member} for member in members]}
 -        self.put(path, json=req, success=204)
 +        r = self.put(path, json=req, success=204)
 +        r.release()
++
@@@ -169,472 -101,32 +169,473 @@@ class PithosClient(PithosRestAPI)
              offset += bytes
              if hash_cb:
                  hash_gen.next()
 -
          assert offset == size
  
 -        path = '/%s/%s/%s' % (self.account, self.container, object)
 -        params = dict(format='json', hashmap='')
 +    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
 +        """upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
 +        """
 +        if upload_cb:
 +            upload_gen = upload_cb(len(missing))
 +            upload_gen.next()
 +
 +        flying = []
 +        for hash in missing:
 +            offset, bytes = hmap[hash]
 +            fileobj.seek(offset)
 +            data = fileobj.read(bytes)
 +            r = self.put_block_async(data, hash)
 +            flying.append(r)
 +            for r in flying:
 +                if r.ready():
 +                    if r.exception:
 +                        raise r.exception
 +                    if upload_cb:
 +                        upload_gen.next()
 +            flying = [r for r in flying if not r.ready()]
 +        while upload_cb:
 +            try:
 +                upload_gen.next()
 +            except StopIteration:
 +                break
 +        gevent.joinall(flying)
 +
 +        failures = [r for r in flying if r.exception]
 +        if len(failures):
 +            details = ', '.join(['(%s).%s'%(i,r.exception) for i,r in enumerate(failures)])
 +            raise ClientError(message="Block uploading failed", status=505, details=details)
 +
 +    def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None,
 +        content_encoding=None, content_disposition=None, content_type=None, sharing=None,
 +        public=None):
 +        self.assert_container()
 +
 +        #init
 +        block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size)
 +        (hashes, hmap, offset) = ([], {}, 0)
 +        content_type = 'application/octet-stream' if content_type is None else content_type
 +
 +        self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f,
 +            hash_cb=hash_cb)
 +
          hashmap = dict(bytes=size, hashes=hashes)
 -        headers = {'Content-Type': 'application/octet-stream'}
 -        r = self.put(path, params=params, headers=headers, json=hashmap,
 -                     success=(201, 409))
 +        missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size,
 +            etag=etag, content_encoding=content_encoding, content_disposition=content_disposition,
 +            permitions=sharing, public=public)
  
 -        if r.status_code == 201:
 +        if missing is None:
              return
 +        self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
  
 -        missing = r.json
 +        r = self.object_put(obj, format='json', hashmap=True, content_type=content_type, 
 +            json=hashmap, success=201)
 +        r.release()
 +    
 +    #download_* auxiliary methods
 +    #ALl untested
 +    def _get_remote_blocks_info(self, obj, **restargs):
 +        #retrieve object hashmap
 +        myrange = restargs.pop('data_range') if 'data_range' in restargs.keys() else None
 +        hashmap = self.get_object_hashmap(obj, **restargs)
 +        restargs['data_range'] = myrange
 +        blocksize = int(hashmap['block_size'])
 +        blockhash = hashmap['block_hash']
 +        total_size = hashmap['bytes']
 +        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
 +        map_dict = {}
 +        for i, h in enumerate(hashmap['hashes']):
 +            map_dict[h] = i
 +        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
  
 -        if upload_cb:
 -            upload_gen = upload_cb(len(missing))
 -            upload_gen.next()
 +    def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, range, **restargs):
 +        for blockid, blockhash in enumerate(remote_hashes):
 +            if blockhash == None:
 +                continue
 +            start = blocksize*blockid
 +            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
 +            (start, end) = _range_up(start, end, range)
 +            restargs['data_range'] = 'bytes=%s-%s'%(start, end)
 +            r = self.object_get(obj, success=(200, 206), **restargs)
 +            self._cb_next()
 +            dst.write(r.content)
 +            dst.flush()
  
 -        for hash in missing:
 -            offset, bytes = map[hash]
 -            f.seek(offset)
 -            data = f.read(bytes)
 -            self.put_block(data, hash)
 -            if upload_cb:
 +    def _get_block_async(self, obj, **restargs):
 +        class SilentGreenlet(gevent.Greenlet):
 +            def _report_error(self, exc_info):
 +                try:
 +                    sys.stderr = StringIO()
 +                    gevent.Greenlet._report_error(self, exc_info)
 +                finally:
 +                    if hasattr(sys, '_stderr'):
 +                        sys.stderr = sys._stderr
 +        if not hasattr(self, 'POOL_SIZE'):
 +            self.POOL_SIZE = 5
 +        if self.async_pool is None:
 +            self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
 +        g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
 +        self.async_pool.start(g)
 +        return g
 +
 +    def _hash_from_file(self, fp, start, size, blockhash):
 +        fp.seek(start)
 +        block = fp.read(size)
 +        h = newhashlib(blockhash)
 +        h.update(block.strip('\x00'))
 +        return hexlify(h.digest())
 +
 +    def _greenlet2file(self, flying_greenlets, local_file, offset = 0, **restargs):
 +        """write the results of a greenleted rest call to a file
 +        @offset: the offset of the file up to blocksize - e.g. if the range is 10-100, all
 +        blocks will be written to normal_position - 10"""
 +        finished = []
 +        for start, g in flying_greenlets.items():
 +            if g.ready():
 +                if g.exception:
 +                    raise g.exception
 +                block = g.value.content
 +                local_file.seek(start - offset)
 +                local_file.write(block)
 +                self._cb_next()
 +                finished.append(flying_greenlets.pop(start))
 +        local_file.flush()
 +        return finished
 +
 +    def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file,
 +        blockhash=None, resume=False, filerange = None, **restargs):
 +
 +        file_size = fstat(local_file.fileno()).st_size if resume else 0
 +        flying_greenlets = {}
 +        finished_greenlets = []
 +        offset = 0
 +        if filerange is not None:
 +            rstart = int(filerange.split('-')[0])
 +            offset = rstart if blocksize > rstart else rstart%blocksize
 +        for block_hash, blockid in remote_hashes.items():
 +            start = blocksize*blockid
 +            if start < file_size and block_hash == self._hash_from_file(local_file, 
 +                start, blocksize, blockhash):
 +                    self._cb_next()
 +                    continue
 +            if len(flying_greenlets) >= self.POOL_SIZE:
 +                finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
 +                    **restargs)
 +            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
 +            (start, end) = _range_up(start, end, filerange)
 +            if start == end:
 +                self._cb_next()
 +                continue
 +            restargs['async_headers'] = dict(Range='bytes=%s-%s'%(start, end))
 +            flying_greenlets[start] = self._get_block_async(obj, **restargs)
 +
 +        #check the greenlets
 +        while len(flying_greenlets) > 0:
 +            sleep(0.001)
 +            finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
 +                **restargs)
 +
 +        gevent.joinall(finished_greenlets)
 +
 +    def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
 +        range=None, if_match=None, if_none_match=None, if_modified_since=None,
 +        if_unmodified_since=None):
 +
 +        restargs=dict(version=version,
 +            data_range = None if range is None else 'bytes=%s'%range,
 +            if_match=if_match,
 +            if_none_match=if_none_match,
 +            if_modified_since=if_modified_since,
 +            if_unmodified_since=if_unmodified_since)
 +
 +        (   blocksize,
 +            blockhash,
 +            total_size,
 +            hash_list, 
 +            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
 +        assert total_size >= 0
 +        self.POOL_SIZE = 5
 +
 +        if download_cb:
 +            self.progress_bar_gen = download_cb(len(remote_hashes))
 +            self._cb_next()
 +
 +        if dst.isatty():
 +            self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, range, **restargs)
 +        else:
 +            self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, blockhash,
 +                resume, range, **restargs)
 +            if range is None:
 +                dst.truncate(total_size)
 +
 +        self._complete_cb()
 +
 +    #Command Progress Bar method
 +    def _cb_next(self):
 +        if hasattr(self, 'progress_bar_gen'):
 +            try:
 +                self.progress_bar_gen.next()
 +            except:
 +                pass
 +    def _complete_cb(self):
 +        while True:
 +            try:
 +                self.progress_bar_gen.next()
 +            except:
 +                break
 +
 +    #Untested - except is download_object is tested first
 +    def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
 +        if_modified_since=None, if_unmodified_since=None, data_range=None):
 +        try:
 +            r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
 +                if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
 +                if_unmodified_since=if_unmodified_since, data_range=data_range)
 +        except ClientError as err:
 +            if err.status == 304 or err.status == 412:
 +                return {}
 +            raise
 +        return r.json
 +
 +    def set_account_group(self, group, usernames):
 +        r = self.account_post(update=True, groups = {group:usernames})
 +        r.release()
 +
 +    def del_account_group(self, group):
 +        r = self.account_post(update=True, groups={group:[]})
 +        r.release()
 +
 +    def get_account_info(self, until=None):
 +        r = self.account_head(until=until)
 +        if r.status_code == 401:
 +            raise ClientError("No authorization")
 +        return r.headers
 +
 +    def get_account_quota(self):
 +        return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
 +
 +    def get_account_versioning(self):
 +        return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
 +
 +    def get_account_meta(self, until=None):
 +        return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
 +
 +    def get_account_group(self):
 +        return filter_in(self.get_account_info(), 'X-Account-Group-')
 +
 +    def set_account_meta(self, metapairs):
 +        assert(type(metapairs) is dict)
 +        r = self.account_post(update=True, metadata=metapairs)
 +        r.release()
 +
 +    def del_account_meta(self, metakey):
 +        r = self.account_post(update=True, metadata={metakey:''})
 +        r.release()
 +
 +    def set_account_quota(self, quota):
 +        r = self.account_post(update=True, quota=quota)
 +        r.release()
 +
 +    def set_account_versioning(self, versioning):
 +        r = self.account_post(update=True, versioning = versioning)
 +        r.release()
 +
 +    def list_containers(self):
 +        r = self.account_get()
 +        return r.json
 +
 +    def del_container(self, until=None, delimiter=None):
 +        self.assert_container()
 +        r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
 +        r.release()
 +        if r.status_code == 404:
 +            raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
 +        elif r.status_code == 409:
 +            raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
 +
 +    def get_container_versioning(self, container):
 +        self.container = container
 +        return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
 +
 +    def get_container_quota(self, container):
 +        self.container = container
 +        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
 +
 +    def get_container_info(self, until = None):
 +        r = self.container_head(until=until)
 +        return r.headers
 +
 +    def get_container_meta(self, until = None):
 +        return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
 +
 +    def get_container_object_meta(self, until = None):
 +        return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
 +
 +    def set_container_meta(self, metapairs):
 +        assert(type(metapairs) is dict)
 +        r = self.container_post(update=True, metadata=metapairs)
 +        r.release()
 +        
 +    def del_container_meta(self, metakey):
 +        r = self.container_post(update=True, metadata={metakey:''})
 +        r.release()
 +
 +    def set_container_quota(self, quota):
 +        r = self.container_post(update=True, quota=quota)
 +        r.release()
 +
 +    def set_container_versioning(self, versioning):
 +        r = self.container_post(update=True, versioning=versioning)
 +        r.release()
 +
 +    def del_object(self, obj, until=None, delimiter=None):
 +        self.assert_container()
 +        r = self.object_delete(obj, until=until, delimiter=delimiter)
 +        r.release()
 +
 +    def set_object_meta(self, object, metapairs):
 +        assert(type(metapairs) is dict)
 +        r = self.object_post(object, update=True, metadata=metapairs)
 +        r.release()
 +
 +    def del_object_meta(self, metakey, object):
 +        r = self.object_post(object, update=True, metadata={metakey:''})
 +        r.release()
 +
 +    def publish_object(self, object):
 +        r = self.object_post(object, update=True, public=True)
 +        r.release()
 +
 +    def unpublish_object(self, object):
 +        r = self.object_post(object, update=True, public=False)
 +        r.release()
 +
 +    def get_object_info(self, obj, version=None):
 +        r = self.object_head(obj, version=version)
 +        return r.headers
 +
 +    def get_object_meta(self, obj, version=None):
 +        return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
 +
 +    def get_object_sharing(self, object):
 +        r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
 +        reply = {}
 +        if len(r) > 0:
 +            perms = r['x-object-sharing'].split(';')
 +            for perm in perms:
 +                try:
 +                    perm.index('=')
 +                except ValueError:
 +                    raise ClientError('Incorrect reply format')
 +                (key, val) = perm.strip().split('=')
 +                reply[key] = val
 +        return reply
 +
 +    def set_object_sharing(self, object, read_permition = False, write_permition = False):
 +        """Give read/write permisions to an object.
 +           @param object is the object to change sharing permitions onto
 +           @param read_permition is a list of users and user groups that get read permition for this object
 +                False means all previous read permitions will be removed
 +           @param write_perimition is a list of users and user groups to get write permition for this object
 +                False means all previous read permitions will be removed
 +        """
 +        perms = {}
 +        perms['read'] = read_permition if isinstance(read_permition, list) else ''
 +        perms['write'] = write_permition if isinstance(write_permition, list) else ''
 +        r = self.object_post(object, update=True, permitions=perms)
 +        r.release()
 +
 +    def del_object_sharing(self, object):
 +        self.set_object_sharing(object)
 +
 +    def append_object(self, object, source_file, upload_cb = None):
 +        """@param upload_db is a generator for showing progress of upload
 +            to caller application, e.g. a progress bar. Its next is called
 +            whenever a block is uploaded
 +        """
 +        self.assert_container()
 +        meta = self.get_container_info()
 +        blocksize = int(meta['x-container-block-size'])
 +        filesize = fstat(source_file.fileno()).st_size
 +        nblocks = 1 + (filesize - 1)//blocksize
 +        offset = 0
 +        if upload_cb is not None:
 +            upload_gen = upload_cb(nblocks)
 +        for i in range(nblocks):
 +            block = source_file.read(min(blocksize, filesize - offset))
 +            offset += len(block)
 +            r = self.object_post(object, update=True, content_range='bytes */*',
 +                content_type='application/octet-stream', content_length=len(block), data=block)
 +            r.release()
 +            
 +            if upload_cb is not None:
 +                upload_gen.next()
 +
 +    def truncate_object(self, object, upto_bytes):
 +        r = self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
 +            content_type='application/octet-stream', object_bytes=upto_bytes,
 +            source_object=path4url(self.container, object))
 +        r.release()
 +
 +    def overwrite_object(self, object, start, end, source_file, upload_cb=None):
 +        """Overwrite a part of an object with given source file
 +           @start the part of the remote object to start overwriting from, in bytes
 +           @end the part of the remote object to stop overwriting to, in bytes
 +        """
 +        self.assert_container()
 +        meta = self.get_container_info()
 +        blocksize = int(meta['x-container-block-size'])
 +        filesize = fstat(source_file.fileno()).st_size
 +        datasize = int(end) - int(start) + 1
 +        nblocks = 1 + (datasize - 1)//blocksize
 +        offset = 0
 +        if upload_cb is not None:
 +            upload_gen = upload_cb(nblocks)
 +        for i in range(nblocks):
 +            block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
 +            offset += len(block)
 +            r = self.object_post(object, update=True, content_type='application/octet-stream', 
 +                content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
 +            r.release()
 +            
 +            if upload_cb is not None:
                  upload_gen.next()
  
 -        self.put(path, params=params, headers=headers, json=hashmap,
 -                 success=201)
 +    def copy_object(self, src_container, src_object, dst_container, dst_object=False,
 +        source_version = None, public=False, content_type=None, delimiter=None):
 +        self.assert_account()
 +        self.container = dst_container
 +        dst_object = dst_object or src_object
 +        src_path = path4url(src_container, src_object)
 +        r = self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
 +            source_version=source_version, public=public, content_type=content_type,
 +            delimiter=delimiter)
 +        r.release()
 +
 +    def move_object(self, src_container, src_object, dst_container, dst_object=False,
 +        source_version = None, public=False, content_type=None, delimiter=None):
 +        self.assert_account()
 +        self.container = dst_container
 +        dst_object = dst_object or src_object
 +        src_path = path4url(src_container, src_object)
 +        r = self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
 +            source_version=source_version, public=public, content_type=content_type,
 +            delimiter=delimiter)
 +        r.release()
 +
 +    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
 +        """Get accounts that share with self.account"""
 +        self.assert_account()
 +
 +        self.set_param('format','json')
 +        self.set_param('limit',limit, iff = limit is not None)
 +        self.set_param('marker',marker, iff = marker is not None)
 +
 +        path = ''
 +        success = kwargs.pop('success', (200, 204))
 +        r = self.get(path, *args, success = success, **kwargs)
 +        return r.json
 +
 +    def get_object_versionlist(self, path):
 +        self.assert_container()
 +        r = self.object_get(path, format='json', version='list')
 +        return r.json['versions']
++
@@@ -179,31 -91,12 +179,32 @@@ class StorageClient(Client)
  
      def delete_object(self, object):
          self.assert_container()
 -        path = '/%s/%s/%s' % (self.account, self.container, object)
 -        self.delete(path, success=204)
 +        path = path4url(self.account, self.container, object)
 +        r = self.delete(path, success=(204, 404))
 +        if r.status_code == 404:
 +            raise ClientError("Object %s not found" %object, r.status_code)
 +       
 +    def list_objects(self):
 +        self.assert_container()
 +        path = path4url(self.account, self.container)
 +        self.set_param('format', 'json')
 +        r = self.get(path, success=(200, 204, 304, 404), )
 +        if r.status_code == 404:
 +            raise ClientError("Incorrect account (%s) for that container"%self.account, r.status_code)
 +        elif r.status_code == 304:
 +            return []
 +        reply = r.json
 +        return reply
  
 -    def list_objects(self, path=''):
 +    def list_objects_in_path(self, path_prefix):
          self.assert_container()
 -        path = '/%s/%s' % (self.account, self.container)
 -        params = dict(format='json')
 -        r = self.get(path, params=params, success=(200, 204))
 -        return r.json
 +        path = path4url(self.account, self.container)
 +        self.set_param('format', 'json')
 +        self.set_param('path', 'path_prefix')
 +        r = self.get(path, success=(200, 204, 404))
 +        if r.status_code == 404:
 +            raise ClientError("Incorrect account (%s) for that container"%self.account, r.status_code)
 +        reply = r.json
 +        return reply
 +
++
diff --cc setup.py
+++ b/setup.py
  # or implied, of GRNET S.A.
  
  from setuptools import setup
++<<<<<<< HEAD
 +#from sys import version_info
 +
 +import kamaki
 +
 +optional = ['ansicolors==1.0.2', 'progress==1.0.1']
 +required = ['gevent==0.13.6', 'snf-common>=0.10', 'argparse']
++=======
+ from sys import version_info
+ import kamaki
+ required = ['ansicolors==1.0.2', 'progress==1.0.1', 'requests==0.12.1']
+ if version_info[0:2] < (2, 7):
+     required.extend(['argparse', 'ordereddict'])
++>>>>>>> a8f25c942b7d547f1622c92291ff9037719f454d
  
  setup(
      name='kamaki',