Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 9a7efb0d

History | View | Annotate | Download (24.3 kB)

1 0c6d7489 Giorgos Verigakis
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2 d2cea1e2 Giorgos Verigakis
#
3 d2cea1e2 Giorgos Verigakis
# Redistribution and use in source and binary forms, with or
4 d2cea1e2 Giorgos Verigakis
# without modification, are permitted provided that the following
5 d2cea1e2 Giorgos Verigakis
# conditions are met:
6 d2cea1e2 Giorgos Verigakis
#
7 d2cea1e2 Giorgos Verigakis
#   1. Redistributions of source code must retain the above
8 d2cea1e2 Giorgos Verigakis
#      copyright notice, this list of conditions and the following
9 d2cea1e2 Giorgos Verigakis
#      disclaimer.
10 d2cea1e2 Giorgos Verigakis
#
11 d2cea1e2 Giorgos Verigakis
#   2. Redistributions in binary form must reproduce the above
12 d2cea1e2 Giorgos Verigakis
#      copyright notice, this list of conditions and the following
13 d2cea1e2 Giorgos Verigakis
#      disclaimer in the documentation and/or other materials
14 d2cea1e2 Giorgos Verigakis
#      provided with the distribution.
15 d2cea1e2 Giorgos Verigakis
#
16 d2cea1e2 Giorgos Verigakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 d2cea1e2 Giorgos Verigakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 d2cea1e2 Giorgos Verigakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 d2cea1e2 Giorgos Verigakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 d2cea1e2 Giorgos Verigakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 d2cea1e2 Giorgos Verigakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 d2cea1e2 Giorgos Verigakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 d2cea1e2 Giorgos Verigakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 d2cea1e2 Giorgos Verigakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 d2cea1e2 Giorgos Verigakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 d2cea1e2 Giorgos Verigakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 d2cea1e2 Giorgos Verigakis
# POSSIBILITY OF SUCH DAMAGE.
28 d2cea1e2 Giorgos Verigakis
#
29 d2cea1e2 Giorgos Verigakis
# The views and conclusions contained in the software and
30 d2cea1e2 Giorgos Verigakis
# documentation are those of the authors and should not be
31 d2cea1e2 Giorgos Verigakis
# interpreted as representing official policies, either expressed
32 d2cea1e2 Giorgos Verigakis
# or implied, of GRNET S.A.
33 d2cea1e2 Giorgos Verigakis
34 435008b6 Stavros Sachtouris
import gevent
35 435008b6 Stavros Sachtouris
import gevent.monkey
36 435008b6 Stavros Sachtouris
# Monkey-patch everything for gevent early on
37 435008b6 Stavros Sachtouris
gevent.monkey.patch_all()
38 64ab4c13 Stavros Sachtouris
import gevent.pool
39 435008b6 Stavros Sachtouris
40 64ab4c13 Stavros Sachtouris
from os import fstat, path
41 64ab4c13 Stavros Sachtouris
from hashlib import new as newhashlib
42 699d3bb1 Stavros Sachtouris
from time import time, sleep
43 5b263ba2 Stavros Sachtouris
from datetime import datetime
44 f0598cb2 Stavros Sachtouris
import sys
45 a91e0293 Giorgos Verigakis
46 64ab4c13 Stavros Sachtouris
from binascii import hexlify
47 64ab4c13 Stavros Sachtouris
from .pithos_sh_lib.hashmap import HashMap
48 6a0b1658 Giorgos Verigakis
49 64ab4c13 Stavros Sachtouris
from .pithos_rest_api import PithosRestAPI
50 64ab4c13 Stavros Sachtouris
from .storage import ClientError
51 64ab4c13 Stavros Sachtouris
from .utils import path4url, filter_in
52 6a0b1658 Giorgos Verigakis
53 6a0b1658 Giorgos Verigakis
def pithos_hash(block, blockhash):
54 64ab4c13 Stavros Sachtouris
    h = newhashlib(blockhash)
55 6a0b1658 Giorgos Verigakis
    h.update(block.rstrip('\x00'))
56 6a0b1658 Giorgos Verigakis
    return h.hexdigest()
57 6a0b1658 Giorgos Verigakis
58 64ab4c13 Stavros Sachtouris
class PithosClient(PithosRestAPI):
59 d2cea1e2 Giorgos Verigakis
    """GRNet Pithos API client"""
60 a91e0293 Giorgos Verigakis
61 435008b6 Stavros Sachtouris
    def __init__(self, base_url, token, account=None, container = None):
62 64ab4c13 Stavros Sachtouris
        super(PithosClient, self).__init__(base_url, token, account = account,
63 64ab4c13 Stavros Sachtouris
            container = container)
64 435008b6 Stavros Sachtouris
        self.async_pool = None
65 435008b6 Stavros Sachtouris
66 17edd3f4 Stavros Sachtouris
    def purge_container(self):
67 64ab4c13 Stavros Sachtouris
        self.container_delete(until=unicode(time()))
68 f0598cb2 Stavros Sachtouris
        
69 65a45524 Stavros Sachtouris
    def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
70 65a45524 Stavros Sachtouris
        content_encoding=None, content_disposition=None, content_type=None, sharing=None,
71 65a45524 Stavros Sachtouris
        public=None):
72 65a45524 Stavros Sachtouris
        # This is a naive implementation, it loads the whole file in memory
73 65a45524 Stavros Sachtouris
        #Look in pithos for a nice implementation
74 65a45524 Stavros Sachtouris
        self.assert_container()
75 65a45524 Stavros Sachtouris
76 65a45524 Stavros Sachtouris
        if withHashFile:
77 65a45524 Stavros Sachtouris
            data = f.read()
78 65a45524 Stavros Sachtouris
            try:
79 65a45524 Stavros Sachtouris
                import json
80 65a45524 Stavros Sachtouris
                data = json.dumps(json.loads(data))
81 65a45524 Stavros Sachtouris
            except ValueError:
82 65a45524 Stavros Sachtouris
                raise ClientError(message='"%s" is not json-formated'%f.name, status=1)
83 65a45524 Stavros Sachtouris
            except SyntaxError:
84 65a45524 Stavros Sachtouris
                raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1)
85 65a45524 Stavros Sachtouris
            from StringIO import StringIO
86 65a45524 Stavros Sachtouris
            f = StringIO(data)
87 65a45524 Stavros Sachtouris
        data = f.read(size) if size is not None else f.read()
88 64ab4c13 Stavros Sachtouris
        self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
89 65a45524 Stavros Sachtouris
            content_disposition=content_disposition, content_type=content_type, permitions=sharing,
90 65a45524 Stavros Sachtouris
            public=public, success=201)
91 f0598cb2 Stavros Sachtouris
        
92 435008b6 Stavros Sachtouris
    def put_block_async(self, data, hash):
93 435008b6 Stavros Sachtouris
        class SilentGreenlet(gevent.Greenlet):
94 435008b6 Stavros Sachtouris
            def _report_error(self, exc_info):
95 64ab4c13 Stavros Sachtouris
                _stderr = None
96 435008b6 Stavros Sachtouris
                try:
97 64ab4c13 Stavros Sachtouris
                    _stderr = sys._stderr
98 435008b6 Stavros Sachtouris
                    sys.stderr = StringIO()
99 435008b6 Stavros Sachtouris
                    gevent.Greenlet._report_error(self, exc_info)
100 435008b6 Stavros Sachtouris
                finally:
101 435008b6 Stavros Sachtouris
                    sys.stderr = _stderr
102 435008b6 Stavros Sachtouris
        POOL_SIZE = 5
103 435008b6 Stavros Sachtouris
        if self.async_pool is None:
104 435008b6 Stavros Sachtouris
            self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
105 435008b6 Stavros Sachtouris
        g = SilentGreenlet(self.put_block, data, hash)
106 435008b6 Stavros Sachtouris
        self.async_pool.start(g)
107 435008b6 Stavros Sachtouris
        return g
108 435008b6 Stavros Sachtouris
109 53129af9 Giorgos Verigakis
    def put_block(self, data, hash):
110 4adfa919 Stavros Sachtouris
        r = self.container_post(update=True, content_type='application/octet-stream',
111 c2236544 Stavros Sachtouris
            content_length=len(data), data=data, format='json')
112 c2236544 Stavros Sachtouris
        assert r.json[0] == hash, 'Local hash does not match server'
113 f0598cb2 Stavros Sachtouris
        
114 44b8928d Giorgos Verigakis
115 65a45524 Stavros Sachtouris
    def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
116 65a45524 Stavros Sachtouris
        content_disposition=None, content_type=None, sharing=None, public=None):
117 65a45524 Stavros Sachtouris
        self.assert_container()
118 65a45524 Stavros Sachtouris
        obj_content_type = 'application/octet-stream' if content_type is None else content_type
119 64ab4c13 Stavros Sachtouris
        self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
120 65a45524 Stavros Sachtouris
            content_disposition=content_disposition, content_type=content_type, permitions=sharing,
121 65a45524 Stavros Sachtouris
            public=public, manifest='%s/%s'%(self.container,obj))
122 64ab4c13 Stavros Sachtouris
       
123 64ab4c13 Stavros Sachtouris
    #upload_* auxiliary methods 
124 64ab4c13 Stavros Sachtouris
    def _get_file_block_info(self, fileobj, size=None):
125 2f749e6e Stavros Sachtouris
        meta = self.get_container_info()
126 435008b6 Stavros Sachtouris
        blocksize = int(meta['x-container-block-size'])
127 435008b6 Stavros Sachtouris
        blockhash = meta['x-container-block-hash']
128 64ab4c13 Stavros Sachtouris
        size = size if size is not None else fstat(fileobj.fileno()).st_size
129 435008b6 Stavros Sachtouris
        nblocks = 1 + (size - 1) // blocksize
130 64ab4c13 Stavros Sachtouris
        return (blocksize, blockhash, size, nblocks)
131 64ab4c13 Stavros Sachtouris
132 64ab4c13 Stavros Sachtouris
    def _get_missing_hashes(self, obj, json, size=None, format='json', hashmap=True,
133 64ab4c13 Stavros Sachtouris
        content_type=None, etag=None, content_encoding=None, content_disposition=None,
134 64ab4c13 Stavros Sachtouris
        permitions=None, public=None, success=(201, 409)):
135 64ab4c13 Stavros Sachtouris
        r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
136 64ab4c13 Stavros Sachtouris
            json=json, etag=etag, content_encoding=content_encoding,
137 64ab4c13 Stavros Sachtouris
            content_disposition=content_disposition, permitions=permitions, public=public,
138 64ab4c13 Stavros Sachtouris
            success=success)
139 64ab4c13 Stavros Sachtouris
        if r.status_code == 201:
140 64ab4c13 Stavros Sachtouris
            return None
141 64ab4c13 Stavros Sachtouris
        return r.json
142 435008b6 Stavros Sachtouris
143 64ab4c13 Stavros Sachtouris
    def _caclulate_uploaded_blocks(self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
144 64ab4c13 Stavros Sachtouris
        hash_cb=None):
145 64ab4c13 Stavros Sachtouris
        offset=0
146 435008b6 Stavros Sachtouris
        if hash_cb:
147 435008b6 Stavros Sachtouris
            hash_gen = hash_cb(nblocks)
148 435008b6 Stavros Sachtouris
            hash_gen.next()
149 435008b6 Stavros Sachtouris
150 435008b6 Stavros Sachtouris
        for i in range(nblocks):
151 64ab4c13 Stavros Sachtouris
            block = fileobj.read(min(blocksize, size - offset))
152 435008b6 Stavros Sachtouris
            bytes = len(block)
153 435008b6 Stavros Sachtouris
            hash = pithos_hash(block, blockhash)
154 435008b6 Stavros Sachtouris
            hashes.append(hash)
155 5b263ba2 Stavros Sachtouris
            hmap[hash] = (offset, bytes)
156 435008b6 Stavros Sachtouris
            offset += bytes
157 435008b6 Stavros Sachtouris
            if hash_cb:
158 435008b6 Stavros Sachtouris
                hash_gen.next()
159 435008b6 Stavros Sachtouris
        assert offset == size
160 435008b6 Stavros Sachtouris
161 64ab4c13 Stavros Sachtouris
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
162 64ab4c13 Stavros Sachtouris
        """upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
163 64ab4c13 Stavros Sachtouris
        """
164 435008b6 Stavros Sachtouris
        if upload_cb:
165 435008b6 Stavros Sachtouris
            upload_gen = upload_cb(len(missing))
166 435008b6 Stavros Sachtouris
            upload_gen.next()
167 435008b6 Stavros Sachtouris
168 435008b6 Stavros Sachtouris
        flying = []
169 435008b6 Stavros Sachtouris
        for hash in missing:
170 5b263ba2 Stavros Sachtouris
            offset, bytes = hmap[hash]
171 64ab4c13 Stavros Sachtouris
            fileobj.seek(offset)
172 64ab4c13 Stavros Sachtouris
            data = fileobj.read(bytes)
173 435008b6 Stavros Sachtouris
            r = self.put_block_async(data, hash)
174 435008b6 Stavros Sachtouris
            flying.append(r)
175 435008b6 Stavros Sachtouris
            for r in flying:
176 435008b6 Stavros Sachtouris
                if r.ready():
177 435008b6 Stavros Sachtouris
                    if r.exception:
178 435008b6 Stavros Sachtouris
                        raise r.exception
179 435008b6 Stavros Sachtouris
                    if upload_cb:
180 435008b6 Stavros Sachtouris
                        upload_gen.next()
181 435008b6 Stavros Sachtouris
            flying = [r for r in flying if not r.ready()]
182 64ab4c13 Stavros Sachtouris
        while upload_cb:
183 64ab4c13 Stavros Sachtouris
            try:
184 64ab4c13 Stavros Sachtouris
                upload_gen.next()
185 64ab4c13 Stavros Sachtouris
            except StopIteration:
186 64ab4c13 Stavros Sachtouris
                break
187 435008b6 Stavros Sachtouris
        gevent.joinall(flying)
188 56f0908a Stavros Sachtouris
189 64ab4c13 Stavros Sachtouris
    def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None,
190 64ab4c13 Stavros Sachtouris
        content_encoding=None, content_disposition=None, content_type=None, sharing=None,
191 64ab4c13 Stavros Sachtouris
        public=None):
192 56f0908a Stavros Sachtouris
        self.assert_container()
193 56f0908a Stavros Sachtouris
194 64ab4c13 Stavros Sachtouris
        #init
195 64ab4c13 Stavros Sachtouris
        block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size)
196 64ab4c13 Stavros Sachtouris
        (hashes, hmap, offset) = ([], {}, 0)
197 64ab4c13 Stavros Sachtouris
        content_type = 'application/octet-stream' if content_type is None else content_type
198 64ab4c13 Stavros Sachtouris
199 64ab4c13 Stavros Sachtouris
        self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f,
200 64ab4c13 Stavros Sachtouris
            hash_cb=hash_cb)
201 64ab4c13 Stavros Sachtouris
202 64ab4c13 Stavros Sachtouris
        hashmap = dict(bytes=size, hashes=hashes)
203 64ab4c13 Stavros Sachtouris
        missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size,
204 64ab4c13 Stavros Sachtouris
            etag=etag, content_encoding=content_encoding, content_disposition=content_disposition,
205 64ab4c13 Stavros Sachtouris
            permitions=sharing, public=public)
206 64ab4c13 Stavros Sachtouris
207 64ab4c13 Stavros Sachtouris
        if missing is None:
208 64ab4c13 Stavros Sachtouris
            return
209 64ab4c13 Stavros Sachtouris
        self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
210 64ab4c13 Stavros Sachtouris
211 64ab4c13 Stavros Sachtouris
        self.object_put(obj, format='json', hashmap=True, content_type=content_type, 
212 64ab4c13 Stavros Sachtouris
            json=hashmap, success=201)
213 fbfee225 Stavros Sachtouris
    
214 64ab4c13 Stavros Sachtouris
    #download_* auxiliary methods
215 fbfee225 Stavros Sachtouris
    def _get_remote_blocks_info(self, obj, **restargs):
216 56f0908a Stavros Sachtouris
        #retrieve object hashmap
217 fbfee225 Stavros Sachtouris
        hashmap = self.get_object_hashmap(obj, **restargs)
218 56f0908a Stavros Sachtouris
        blocksize = int(hashmap['block_size'])
219 56f0908a Stavros Sachtouris
        blockhash = hashmap['block_hash']
220 56f0908a Stavros Sachtouris
        total_size = hashmap['bytes']
221 fbfee225 Stavros Sachtouris
        print('total_size:%s, blocksize:%s, x/y:%s, len:%s'%(total_size, blocksize,
222 fbfee225 Stavros Sachtouris
            total_size/blocksize + 1, len(hashmap['hashes'])))
223 fbfee225 Stavros Sachtouris
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
224 56f0908a Stavros Sachtouris
        map_dict = {}
225 fbfee225 Stavros Sachtouris
        for i, h in enumerate(hashmap['hashes']):
226 fbfee225 Stavros Sachtouris
            map_dict[h] = i
227 fbfee225 Stavros Sachtouris
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
228 64ab4c13 Stavros Sachtouris
229 fbfee225 Stavros Sachtouris
    def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, **restargs):
230 fbfee225 Stavros Sachtouris
        for blockid, blockhash in enumerate(remote_hashes):
231 fbfee225 Stavros Sachtouris
            if blockhash == None:
232 fbfee225 Stavros Sachtouris
                continue
233 fbfee225 Stavros Sachtouris
            start = blocksize*blockid
234 fbfee225 Stavros Sachtouris
            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
235 fbfee225 Stavros Sachtouris
            restargs['data_range'] = 'bytes=%s-%s'%(start, end)
236 fbfee225 Stavros Sachtouris
            r = self.object_get(obj, success=(200, 206), **restargs)
237 fbfee225 Stavros Sachtouris
            self._cb_next()
238 fbfee225 Stavros Sachtouris
            dst.write(r.content)
239 fbfee225 Stavros Sachtouris
            dst.flush()
240 fbfee225 Stavros Sachtouris
241 fbfee225 Stavros Sachtouris
    def _filter_out_downloaded_hashses(self, remote_hashes, hash_list, local_file, blocksize,
242 fbfee225 Stavros Sachtouris
        blockhash):
243 fbfee225 Stavros Sachtouris
        #load file hashmap
244 fbfee225 Stavros Sachtouris
        file_hashmap = HashMap(blocksize, blockhash)
245 fbfee225 Stavros Sachtouris
        file_hashmap.load(local_file, hasattr(self, 'progress_bar_gen'))
246 fbfee225 Stavros Sachtouris
247 fbfee225 Stavros Sachtouris
        for i, x in enumerate(file_hashmap):
248 fbfee225 Stavros Sachtouris
            local_hash = hexlify(x)
249 fbfee225 Stavros Sachtouris
            if local_hash in remote_hashes:
250 fbfee225 Stavros Sachtouris
                blockid = remote_hashes.pop(local_hash)
251 fbfee225 Stavros Sachtouris
                hash_list[blockid] = None
252 fbfee225 Stavros Sachtouris
                self._cb_next()
253 699d3bb1 Stavros Sachtouris
            else:
254 fbfee225 Stavros Sachtouris
                raise ClientError(message='Local file is substantialy different', status=600)
255 699d3bb1 Stavros Sachtouris
256 fbfee225 Stavros Sachtouris
    def _get_block_async(self, obj, **restargs):
257 699d3bb1 Stavros Sachtouris
        class SilentGreenlet(gevent.Greenlet):
258 699d3bb1 Stavros Sachtouris
            def _report_error(self, exc_info):
259 699d3bb1 Stavros Sachtouris
                _stderr = sys._stderr
260 699d3bb1 Stavros Sachtouris
                try:
261 699d3bb1 Stavros Sachtouris
                    sys.stderr = StringIO()
262 699d3bb1 Stavros Sachtouris
                    gevent.Greenlet._report_error(self, exc_info)
263 699d3bb1 Stavros Sachtouris
                finally:
264 699d3bb1 Stavros Sachtouris
                    sys.stderr = _stderr
265 9a7efb0d Stavros Sachtouris
        if not hasattr(self, 'POOL_SIZE'):
266 9a7efb0d Stavros Sachtouris
            self.POOL_SIZE = 5
267 699d3bb1 Stavros Sachtouris
        if self.async_pool is None:
268 fbfee225 Stavros Sachtouris
            self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
269 fbfee225 Stavros Sachtouris
        g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
270 699d3bb1 Stavros Sachtouris
        self.async_pool.start(g)
271 699d3bb1 Stavros Sachtouris
        return g
272 fb0cd49a Stavros Sachtouris
273 fbfee225 Stavros Sachtouris
    def _greenlet2file(self, flying_greenlets, local_file, broken={}, **restargs):
274 fbfee225 Stavros Sachtouris
        finished = []
275 fbfee225 Stavros Sachtouris
        for start, g in flying_greenlets.items():
276 9a7efb0d Stavros Sachtouris
            print('\tIs g ID(%s) ready? %s'%(self.mmaapp[start], g.ready()))
277 fbfee225 Stavros Sachtouris
            if g.ready():
278 fbfee225 Stavros Sachtouris
                if g.exception:
279 fbfee225 Stavros Sachtouris
                    raise g.exception
280 699d3bb1 Stavros Sachtouris
                try:
281 fbfee225 Stavros Sachtouris
                    block = g.value.content
282 fbfee225 Stavros Sachtouris
                except AttributeError:
283 fbfee225 Stavros Sachtouris
                    broken[start] = flying_greenlets.pop(start)
284 9a7efb0d Stavros Sachtouris
                    #g.spawn()
285 fbfee225 Stavros Sachtouris
                    continue
286 fbfee225 Stavros Sachtouris
                local_file.seek(start)
287 9a7efb0d Stavros Sachtouris
                print('\tID(%s) [%s...]\n\tg.value:%s\n\tg:%s\n'%(self.mmaapp[start], block[1:10],
288 9a7efb0d Stavros Sachtouris
                    g.value, g))
289 9a7efb0d Stavros Sachtouris
                print('\tID(%s): g.value.request: %s\n---'%(self.mmaapp[start], g.value.request))
290 fbfee225 Stavros Sachtouris
                local_file.write(block)
291 fbfee225 Stavros Sachtouris
                #local_file.flush()
292 fbfee225 Stavros Sachtouris
                self._cb_next()
293 fbfee225 Stavros Sachtouris
                finished.append(flying_greenlets.pop(start))
294 fbfee225 Stavros Sachtouris
        local_file.flush()
295 fbfee225 Stavros Sachtouris
        return finished
296 fbfee225 Stavros Sachtouris
297 fbfee225 Stavros Sachtouris
    def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file, **restargs):
298 fbfee225 Stavros Sachtouris
        flying_greenlets = {}
299 fbfee225 Stavros Sachtouris
        finished_greenlets = []
300 fbfee225 Stavros Sachtouris
        broken = {}
301 9a7efb0d Stavros Sachtouris
        self.mmaapp = {}
302 fbfee225 Stavros Sachtouris
        for block_hash, blockid in remote_hashes.items():
303 9a7efb0d Stavros Sachtouris
            if len(flying_greenlets) >= self.POOL_SIZE:
304 9a7efb0d Stavros Sachtouris
                finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
305 9a7efb0d Stavros Sachtouris
                    **restargs)
306 fbfee225 Stavros Sachtouris
            start = blocksize*blockid
307 9a7efb0d Stavros Sachtouris
            self.mmaapp[start] = blockid
308 fbfee225 Stavros Sachtouris
            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
309 9a7efb0d Stavros Sachtouris
            restargs['async_headers'] = dict(data_range='bytes=%s-%s'%(start, end))
310 9a7efb0d Stavros Sachtouris
            print('ID(%s) get_grnlt {'%blockid)
311 fbfee225 Stavros Sachtouris
            flying_greenlets[start] = self._get_block_async(obj, **restargs)
312 9a7efb0d Stavros Sachtouris
            print('ID(%s) got_grnlt }'%blockid)
313 fbfee225 Stavros Sachtouris
314 fbfee225 Stavros Sachtouris
        #check the greenlets
315 fbfee225 Stavros Sachtouris
        while len(flying_greenlets) > 0:
316 fbfee225 Stavros Sachtouris
            sleep(0.1)
317 fbfee225 Stavros Sachtouris
            finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
318 fbfee225 Stavros Sachtouris
                **restargs)
319 fbfee225 Stavros Sachtouris
320 fbfee225 Stavros Sachtouris
        gevent.joinall(finished_greenlets)
321 fbfee225 Stavros Sachtouris
322 fbfee225 Stavros Sachtouris
323 fbfee225 Stavros Sachtouris
    def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
324 fbfee225 Stavros Sachtouris
        range=None, if_match=None, if_none_match=None, if_modified_since=None,
325 fbfee225 Stavros Sachtouris
        if_unmodified_since=None):
326 fbfee225 Stavros Sachtouris
327 fbfee225 Stavros Sachtouris
        #init REST api args
328 fbfee225 Stavros Sachtouris
        restargs=dict(version=version,
329 fbfee225 Stavros Sachtouris
            data_range = None if range is None else 'bytes=%s'%range,
330 fbfee225 Stavros Sachtouris
            if_match=if_match,
331 fbfee225 Stavros Sachtouris
            if_none_match=if_none_match,
332 fbfee225 Stavros Sachtouris
            if_modified_since=if_modified_since,
333 fbfee225 Stavros Sachtouris
            if_unmodified_since=if_unmodified_since)
334 fbfee225 Stavros Sachtouris
335 fbfee225 Stavros Sachtouris
        #1. get remote object hash info
336 fbfee225 Stavros Sachtouris
        (   blocksize,
337 fbfee225 Stavros Sachtouris
            blockhash,
338 fbfee225 Stavros Sachtouris
            total_size,
339 fbfee225 Stavros Sachtouris
            hash_list, 
340 fbfee225 Stavros Sachtouris
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
341 fbfee225 Stavros Sachtouris
        assert total_size >= 0
342 9a7efb0d Stavros Sachtouris
        self.POOL_SIZE = 5
343 fbfee225 Stavros Sachtouris
344 fbfee225 Stavros Sachtouris
        if download_cb:
345 fbfee225 Stavros Sachtouris
            self.progress_bar_gen = download_cb(len(remote_hashes)+1)
346 fbfee225 Stavros Sachtouris
            self._cb_next()
347 fbfee225 Stavros Sachtouris
348 fbfee225 Stavros Sachtouris
        if dst.isatty():
349 fbfee225 Stavros Sachtouris
            self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs)
350 fbfee225 Stavros Sachtouris
        elif resume:
351 fbfee225 Stavros Sachtouris
            self._filter_out_downloaded_hashses(remote_hashes, hash_list, dst, blocksize, blockhash)
352 fbfee225 Stavros Sachtouris
            self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs)
353 699d3bb1 Stavros Sachtouris
        else:
354 fbfee225 Stavros Sachtouris
            self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, **restargs)
355 fbfee225 Stavros Sachtouris
            dst.truncate(total_size)
356 5b263ba2 Stavros Sachtouris
357 fbfee225 Stavros Sachtouris
        self._complete_cb()
358 fbfee225 Stavros Sachtouris
359 fbfee225 Stavros Sachtouris
    #Command Progress Bar method
360 fbfee225 Stavros Sachtouris
    def _cb_next(self):
361 fbfee225 Stavros Sachtouris
        if hasattr(self, 'progress_bar_gen'):
362 fbfee225 Stavros Sachtouris
            try:
363 fbfee225 Stavros Sachtouris
                self.progress_bar_gen.next()
364 fbfee225 Stavros Sachtouris
            except:
365 fbfee225 Stavros Sachtouris
                pass
366 fbfee225 Stavros Sachtouris
    def _complete_cb(self):
367 fbfee225 Stavros Sachtouris
        while True:
368 fbfee225 Stavros Sachtouris
            try:
369 fbfee225 Stavros Sachtouris
                self.progress_bar_gen.next()
370 fbfee225 Stavros Sachtouris
            except:
371 fbfee225 Stavros Sachtouris
                break
372 b1713259 Stavros Sachtouris
373 d804de82 Stavros Sachtouris
    def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
374 fbfee225 Stavros Sachtouris
        if_modified_since=None, if_unmodified_since=None, data_range=None):
375 d804de82 Stavros Sachtouris
        try:
376 d804de82 Stavros Sachtouris
            r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
377 d804de82 Stavros Sachtouris
                if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
378 fbfee225 Stavros Sachtouris
                if_unmodified_since=if_unmodified_since, data_range=data_range)
379 d804de82 Stavros Sachtouris
        except ClientError as err:
380 d804de82 Stavros Sachtouris
            if err.status == 304 or err.status == 412:
381 d804de82 Stavros Sachtouris
                return {}
382 d804de82 Stavros Sachtouris
            raise
383 64ab4c13 Stavros Sachtouris
        return r.json
384 56f0908a Stavros Sachtouris
385 3a9e54b0 Stavros Sachtouris
    def set_account_group(self, group, usernames):
386 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, groups = {group:usernames})
387 eb903ba7 Stavros Sachtouris
388 c2867610 Stavros Sachtouris
    def del_account_group(self, group):
389 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, groups={group:[]})
390 c2867610 Stavros Sachtouris
391 8af4cc0b Stavros Sachtouris
    def get_account_info(self, until=None):
392 8af4cc0b Stavros Sachtouris
        r = self.account_head(until=until)
393 6657ec8c Stavros Sachtouris
        if r.status_code == 401:
394 6657ec8c Stavros Sachtouris
            raise ClientError("No authorization")
395 64ab4c13 Stavros Sachtouris
        return r.headers
396 6657ec8c Stavros Sachtouris
397 d1856abf Stavros Sachtouris
    def get_account_quota(self):
398 eb903ba7 Stavros Sachtouris
        return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
399 d1856abf Stavros Sachtouris
400 d1856abf Stavros Sachtouris
    def get_account_versioning(self):
401 eb903ba7 Stavros Sachtouris
        return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
402 e6b39366 Stavros Sachtouris
403 8af4cc0b Stavros Sachtouris
    def get_account_meta(self, until=None):
404 8af4cc0b Stavros Sachtouris
        return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
405 af3b2b36 Stavros Sachtouris
406 c2867610 Stavros Sachtouris
    def get_account_group(self):
407 c2867610 Stavros Sachtouris
        return filter_in(self.get_account_info(), 'X-Account-Group-')
408 c2867610 Stavros Sachtouris
409 af3b2b36 Stavros Sachtouris
    def set_account_meta(self, metapairs):
410 af3b2b36 Stavros Sachtouris
        assert(type(metapairs) is dict)
411 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, metadata=metapairs)
412 af3b2b36 Stavros Sachtouris
413 379cd4bb Stavros Sachtouris
    def del_account_meta(self, metakey):
414 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, metadata={metakey:''})
415 379cd4bb Stavros Sachtouris
416 d1856abf Stavros Sachtouris
    def set_account_quota(self, quota):
417 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, quota=quota)
418 d1856abf Stavros Sachtouris
419 d1856abf Stavros Sachtouris
    def set_account_versioning(self, versioning):
420 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, versioning = versioning)
421 d1856abf Stavros Sachtouris
422 4fd88feb Stavros Sachtouris
    def list_containers(self):
423 4fd88feb Stavros Sachtouris
        r = self.account_get()
424 64ab4c13 Stavros Sachtouris
        return r.json
425 b758e547 Stavros Sachtouris
426 a298f2ab Stavros Sachtouris
    def del_container(self, until=None, delimiter=None):
427 a298f2ab Stavros Sachtouris
        self.assert_container()
428 a298f2ab Stavros Sachtouris
        r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
429 a298f2ab Stavros Sachtouris
        if r.status_code == 404:
430 a298f2ab Stavros Sachtouris
            raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
431 a298f2ab Stavros Sachtouris
        elif r.status_code == 409:
432 a298f2ab Stavros Sachtouris
            raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
433 a298f2ab Stavros Sachtouris
434 d1856abf Stavros Sachtouris
    def get_container_versioning(self, container):
435 2f749e6e Stavros Sachtouris
        self.container = container
436 2f749e6e Stavros Sachtouris
        return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
437 d1856abf Stavros Sachtouris
438 d1856abf Stavros Sachtouris
    def get_container_quota(self, container):
439 2f749e6e Stavros Sachtouris
        self.container = container
440 2f749e6e Stavros Sachtouris
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
441 e6b39366 Stavros Sachtouris
442 8af4cc0b Stavros Sachtouris
    def get_container_info(self, until = None):
443 8af4cc0b Stavros Sachtouris
        r = self.container_head(until=until)
444 8af4cc0b Stavros Sachtouris
        return r.headers
445 8af4cc0b Stavros Sachtouris
446 8af4cc0b Stavros Sachtouris
    def get_container_meta(self, until = None):
447 8af4cc0b Stavros Sachtouris
        return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
448 e6b39366 Stavros Sachtouris
449 8af4cc0b Stavros Sachtouris
    def get_container_object_meta(self, until = None):
450 8af4cc0b Stavros Sachtouris
        return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
451 e6b39366 Stavros Sachtouris
452 af3b2b36 Stavros Sachtouris
    def set_container_meta(self, metapairs):
453 af3b2b36 Stavros Sachtouris
        assert(type(metapairs) is dict)
454 64ab4c13 Stavros Sachtouris
        self.container_post(update=True, metadata=metapairs)
455 f0598cb2 Stavros Sachtouris
        
456 3e544e5b Stavros Sachtouris
    def del_container_meta(self, metakey):
457 64ab4c13 Stavros Sachtouris
        self.container_post(update=True, metadata={metakey:''})
458 e6b39366 Stavros Sachtouris
459 d1856abf Stavros Sachtouris
    def set_container_quota(self, quota):
460 64ab4c13 Stavros Sachtouris
        self.container_post(update=True, quota=quota)
461 d1856abf Stavros Sachtouris
462 d1856abf Stavros Sachtouris
    def set_container_versioning(self, versioning):
463 64ab4c13 Stavros Sachtouris
        self.container_post(update=True, versioning=versioning)
464 d1856abf Stavros Sachtouris
465 a298f2ab Stavros Sachtouris
    def del_object(self, obj, until=None, delimiter=None):
466 a298f2ab Stavros Sachtouris
        self.assert_container()
467 64ab4c13 Stavros Sachtouris
        self.object_delete(obj, until=until, delimiter=delimiter)
468 a298f2ab Stavros Sachtouris
469 af3b2b36 Stavros Sachtouris
    def set_object_meta(self, object, metapairs):
470 af3b2b36 Stavros Sachtouris
        assert(type(metapairs) is dict)
471 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, metadata=metapairs)
472 87688514 Stavros Sachtouris
473 89c2e77b Stavros Sachtouris
    def del_object_meta(self, metakey, object):
474 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, metadata={metakey:''})
475 6de1f262 Stavros Sachtouris
476 87688514 Stavros Sachtouris
    def publish_object(self, object):
477 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, public=True)
478 87688514 Stavros Sachtouris
479 87688514 Stavros Sachtouris
    def unpublish_object(self, object):
480 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, public=False)
481 28470086 Stavros Sachtouris
482 8af4cc0b Stavros Sachtouris
    def get_object_info(self, obj, version=None):
483 8af4cc0b Stavros Sachtouris
        r = self.object_head(obj, version=version)
484 64ab4c13 Stavros Sachtouris
        return r.headers
485 8af4cc0b Stavros Sachtouris
486 8af4cc0b Stavros Sachtouris
    def get_object_meta(self, obj, version=None):
487 8af4cc0b Stavros Sachtouris
        return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
488 8af4cc0b Stavros Sachtouris
489 f49084df Stavros Sachtouris
    def get_object_sharing(self, object):
490 f70616fc Stavros Sachtouris
        r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
491 f70616fc Stavros Sachtouris
        reply = {}
492 f70616fc Stavros Sachtouris
        if len(r) > 0:
493 f70616fc Stavros Sachtouris
            perms = r['x-object-sharing'].split(';')
494 f70616fc Stavros Sachtouris
            for perm in perms:
495 f70616fc Stavros Sachtouris
                try:
496 f70616fc Stavros Sachtouris
                    perm.index('=')
497 f70616fc Stavros Sachtouris
                except ValueError:
498 f70616fc Stavros Sachtouris
                    raise ClientError('Incorrect reply format')
499 f70616fc Stavros Sachtouris
                (key, val) = perm.strip().split('=')
500 f70616fc Stavros Sachtouris
                reply[key] = val
501 f70616fc Stavros Sachtouris
        return reply
502 f49084df Stavros Sachtouris
503 28470086 Stavros Sachtouris
    def set_object_sharing(self, object, read_permition = False, write_permition = False):
504 28470086 Stavros Sachtouris
        """Give read/write permisions to an object.
505 28470086 Stavros Sachtouris
           @param object is the object to change sharing permitions onto
506 28470086 Stavros Sachtouris
           @param read_permition is a list of users and user groups that get read permition for this object
507 28470086 Stavros Sachtouris
                False means all previous read permitions will be removed
508 28470086 Stavros Sachtouris
           @param write_perimition is a list of users and user groups to get write permition for this object
509 28470086 Stavros Sachtouris
                False means all previous read permitions will be removed
510 28470086 Stavros Sachtouris
        """
511 4adfa919 Stavros Sachtouris
        perms = {}
512 4adfa919 Stavros Sachtouris
        perms['read'] = read_permition if isinstance(read_permition, list) else ''
513 4adfa919 Stavros Sachtouris
        perms['write'] = write_permition if isinstance(write_permition, list) else ''
514 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, permitions=perms)
515 28470086 Stavros Sachtouris
516 f49084df Stavros Sachtouris
    def del_object_sharing(self, object):
517 28470086 Stavros Sachtouris
        self.set_object_sharing(object)
518 f49084df Stavros Sachtouris
519 bcabbc35 Stavros Sachtouris
    def append_object(self, object, source_file, upload_cb = None):
520 9f74ca46 Stavros Sachtouris
        """@param upload_db is a generator for showing progress of upload
521 bcabbc35 Stavros Sachtouris
            to caller application, e.g. a progress bar. Its next is called
522 bcabbc35 Stavros Sachtouris
            whenever a block is uploaded
523 bcabbc35 Stavros Sachtouris
        """
524 ab474306 Stavros Sachtouris
        self.assert_container()
525 2f749e6e Stavros Sachtouris
        meta = self.get_container_info()
526 ab474306 Stavros Sachtouris
        blocksize = int(meta['x-container-block-size'])
527 64ab4c13 Stavros Sachtouris
        filesize = fstat(source_file.fileno()).st_size
528 ab474306 Stavros Sachtouris
        nblocks = 1 + (filesize - 1)//blocksize
529 ab474306 Stavros Sachtouris
        offset = 0
530 bcabbc35 Stavros Sachtouris
        if upload_cb is not None:
531 bcabbc35 Stavros Sachtouris
            upload_gen = upload_cb(nblocks)
532 ab474306 Stavros Sachtouris
        for i in range(nblocks):
533 ab474306 Stavros Sachtouris
            block = source_file.read(min(blocksize, filesize - offset))
534 ab474306 Stavros Sachtouris
            offset += len(block)
535 64ab4c13 Stavros Sachtouris
            self.object_post(object, update=True, content_range='bytes */*',
536 e92440bd Stavros Sachtouris
                content_type='application/octet-stream', content_length=len(block), data=block)
537 f0598cb2 Stavros Sachtouris
            
538 bcabbc35 Stavros Sachtouris
            if upload_cb is not None:
539 bcabbc35 Stavros Sachtouris
                upload_gen.next()
540 561116a6 Stavros Sachtouris
541 561116a6 Stavros Sachtouris
    def truncate_object(self, object, upto_bytes):
542 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
543 4adfa919 Stavros Sachtouris
            content_type='application/octet-stream', object_bytes=upto_bytes,
544 4adfa919 Stavros Sachtouris
            source_object=path4url(self.container, object))
545 ee62607e Stavros Sachtouris
546 bcabbc35 Stavros Sachtouris
    def overwrite_object(self, object, start, end, source_file, upload_cb=None):
547 ee62607e Stavros Sachtouris
        """Overwrite a part of an object with given source file
548 ee62607e Stavros Sachtouris
           @start the part of the remote object to start overwriting from, in bytes
549 ee62607e Stavros Sachtouris
           @end the part of the remote object to stop overwriting to, in bytes
550 ee62607e Stavros Sachtouris
        """
551 ee62607e Stavros Sachtouris
        self.assert_container()
552 2f749e6e Stavros Sachtouris
        meta = self.get_container_info()
553 ee62607e Stavros Sachtouris
        blocksize = int(meta['x-container-block-size'])
554 64ab4c13 Stavros Sachtouris
        filesize = fstat(source_file.fileno()).st_size
555 ee62607e Stavros Sachtouris
        datasize = int(end) - int(start) + 1
556 ee62607e Stavros Sachtouris
        nblocks = 1 + (datasize - 1)//blocksize
557 ee62607e Stavros Sachtouris
        offset = 0
558 bcabbc35 Stavros Sachtouris
        if upload_cb is not None:
559 bcabbc35 Stavros Sachtouris
            upload_gen = upload_cb(nblocks)
560 ee62607e Stavros Sachtouris
        for i in range(nblocks):
561 ee62607e Stavros Sachtouris
            block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
562 ee62607e Stavros Sachtouris
            offset += len(block)
563 64ab4c13 Stavros Sachtouris
            self.object_post(object, update=True, content_type='application/octet-stream', 
564 4adfa919 Stavros Sachtouris
                content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
565 f0598cb2 Stavros Sachtouris
            
566 bcabbc35 Stavros Sachtouris
            if upload_cb is not None:
567 bcabbc35 Stavros Sachtouris
                upload_gen.next()
568 7d420701 Stavros Sachtouris
569 7d420701 Stavros Sachtouris
    def copy_object(self, src_container, src_object, dst_container, dst_object=False,
570 7d420701 Stavros Sachtouris
        source_version = None, public=False, content_type=None, delimiter=None):
571 7d420701 Stavros Sachtouris
        self.assert_account()
572 7d420701 Stavros Sachtouris
        self.container = dst_container
573 7d420701 Stavros Sachtouris
        dst_object = dst_object or src_object
574 7d420701 Stavros Sachtouris
        src_path = path4url(src_container, src_object)
575 64ab4c13 Stavros Sachtouris
        self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
576 7d420701 Stavros Sachtouris
            source_version=source_version, public=public, content_type=content_type,
577 7d420701 Stavros Sachtouris
            delimiter=delimiter)
578 a5e0629d Stavros Sachtouris
579 a5e0629d Stavros Sachtouris
    def move_object(self, src_container, src_object, dst_container, dst_object=False,
580 a5e0629d Stavros Sachtouris
        source_version = None, public=False, content_type=None, delimiter=None):
581 a5e0629d Stavros Sachtouris
        self.assert_account()
582 a5e0629d Stavros Sachtouris
        self.container = dst_container
583 a5e0629d Stavros Sachtouris
        dst_object = dst_object or src_object
584 a5e0629d Stavros Sachtouris
        src_path = path4url(src_container, src_object)
585 64ab4c13 Stavros Sachtouris
        self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
586 a5e0629d Stavros Sachtouris
            source_version=source_version, public=public, content_type=content_type,
587 64ab4c13 Stavros Sachtouris
            delimiter=delimiter)