Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ c270fe96

History | View | Annotate | Download (25.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import gevent
35
#import gevent.monkey
36
# Monkey-patch everything for gevent early on
37
#gevent.monkey.patch_all()
38
import gevent.pool
39

    
40
from os import fstat, path
41
from hashlib import new as newhashlib
42
from time import time, sleep
43
from datetime import datetime
44
import sys
45

    
46
from binascii import hexlify
47

    
48
from kamaki.clients.pithos_rest_api import PithosRestAPI
49
from kamaki.clients.storage import ClientError
50
from kamaki.clients.utils import path4url, filter_in
51
from StringIO import StringIO
52

    
53
def pithos_hash(block, blockhash):
54
    h = newhashlib(blockhash)
55
    h.update(block.rstrip('\x00'))
56
    return h.hexdigest()
57

    
58
def _range_up(start, end, a_range):
59
    if a_range:
60
        (rstart, rend) = a_range.split('-')
61
        (rstart, rend) = (int(rstart), int(rend))
62
        if rstart > end or rend < start:
63
            return (0,0)
64
        if rstart > start:
65
            start = rstart
66
        if rend < end:
67
            end = rend
68
    return (start, end)
69

    
70
class PithosClient(PithosRestAPI):
71
    """GRNet Pithos API client"""
72

    
73
    def __init__(self, base_url, token, account=None, container = None):
74
        super(PithosClient, self).__init__(base_url, token, account = account,
75
            container = container)
76
        self.async_pool = None
77

    
78
    def purge_container(self):
79
        r = self.container_delete(until=unicode(time()))
80
        r.release()
81
        
82
    def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
83
        content_encoding=None, content_disposition=None, content_type=None, sharing=None,
84
        public=None):
85
        # This is a naive implementation, it loads the whole file in memory
86
        #Look in pithos for a nice implementation
87
        self.assert_container()
88

    
89
        if withHashFile:
90
            data = f.read()
91
            try:
92
                import json
93
                data = json.dumps(json.loads(data))
94
            except ValueError:
95
                raise ClientError(message='"%s" is not json-formated'%f.name, status=1)
96
            except SyntaxError:
97
                raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1)
98
            f = StringIO(data)
99
        data = f.read(size) if size is not None else f.read()
100
        r = self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
101
            content_disposition=content_disposition, content_type=content_type, permitions=sharing,
102
            public=public, success=201)
103
        r.release()
104
        
105
    #upload_* auxiliary methods 
106
    def put_block_async(self, data, hash):
107
        class SilentGreenlet(gevent.Greenlet):
108
            def _report_error(self, exc_info):
109
                try:
110
                    sys.stderr = StringIO()
111
                    gevent.Greenlet._report_error(self, exc_info)
112
                finally:
113
                    if hasattr(sys, '_stderr'):
114
                        sys.stderr = _stderr
115
        POOL_SIZE = self.POOL_SIZE if hasattr(self, 'POOL_SIZE') else 5
116
        if self.async_pool is None:
117
            self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
118
        g = SilentGreenlet(self.put_block, data, hash)
119
        self.async_pool.start(g)
120
        return g
121

    
122
    def put_block(self, data, hash):
123
        r = self.container_post(update=True, content_type='application/octet-stream',
124
            content_length=len(data), data=data, format='json')
125
        assert r.json[0] == hash, 'Local hash does not match server'
126
        
127
    def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
128
        content_disposition=None, content_type=None, sharing=None, public=None):
129
        self.assert_container()
130
        obj_content_type = 'application/octet-stream' if content_type is None else content_type
131
        r = self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
132
            content_disposition=content_disposition, content_type=content_type, permitions=sharing,
133
            public=public, manifest='%s/%s'%(self.container,obj))
134
        r.release()
135
       
136
    def _get_file_block_info(self, fileobj, size=None):
137
        meta = self.get_container_info()
138
        blocksize = int(meta['x-container-block-size'])
139
        blockhash = meta['x-container-block-hash']
140
        size = size if size is not None else fstat(fileobj.fileno()).st_size
141
        nblocks = 1 + (size - 1) // blocksize
142
        return (blocksize, blockhash, size, nblocks)
143

    
144
    def _get_missing_hashes(self, obj, json, size=None, format='json', hashmap=True,
145
        content_type=None, etag=None, content_encoding=None, content_disposition=None,
146
        permitions=None, public=None, success=(201, 409)):
147
        r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
148
            json=json, etag=etag, content_encoding=content_encoding,
149
            content_disposition=content_disposition, permitions=permitions, public=public,
150
            success=success)
151
        if r.status_code == 201:
152
            r.release()
153
            return None
154
        return r.json
155

    
156
    def _caclulate_uploaded_blocks(self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
157
        hash_cb=None):
158
        offset=0
159
        if hash_cb:
160
            hash_gen = hash_cb(nblocks)
161
            hash_gen.next()
162

    
163
        for i in range(nblocks):
164
            block = fileobj.read(min(blocksize, size - offset))
165
            bytes = len(block)
166
            hash = pithos_hash(block, blockhash)
167
            hashes.append(hash)
168
            hmap[hash] = (offset, bytes)
169
            offset += bytes
170
            if hash_cb:
171
                hash_gen.next()
172
        assert offset == size
173

    
174
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
175
        """upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
176
        """
177
        if upload_cb:
178
            upload_gen = upload_cb(len(missing))
179
            upload_gen.next()
180

    
181
        flying = []
182
        for hash in missing:
183
            offset, bytes = hmap[hash]
184
            fileobj.seek(offset)
185
            data = fileobj.read(bytes)
186
            r = self.put_block_async(data, hash)
187
            flying.append(r)
188
            for r in flying:
189
                if r.ready():
190
                    if r.exception:
191
                        raise r.exception
192
                    if upload_cb:
193
                        upload_gen.next()
194
            flying = [r for r in flying if not r.ready()]
195
        while upload_cb:
196
            try:
197
                upload_gen.next()
198
            except StopIteration:
199
                break
200
        gevent.joinall(flying)
201

    
202
        failures = [r for r in flying if r.exception]
203
        if len(failures):
204
            details = ', '.join(['(%s).%s'%(i,r.exception) for i,r in enumerate(failures)])
205
            raise ClientError(message="Block uploading failed", status=505, details=details)
206

    
207
    def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None,
208
        content_encoding=None, content_disposition=None, content_type=None, sharing=None,
209
        public=None):
210
        self.assert_container()
211

    
212
        #init
213
        block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size)
214
        (hashes, hmap, offset) = ([], {}, 0)
215
        content_type = 'application/octet-stream' if content_type is None else content_type
216

    
217
        self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f,
218
            hash_cb=hash_cb)
219

    
220
        hashmap = dict(bytes=size, hashes=hashes)
221
        missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size,
222
            etag=etag, content_encoding=content_encoding, content_disposition=content_disposition,
223
            permitions=sharing, public=public)
224

    
225
        if missing is None:
226
            return
227
        self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
228

    
229
        r = self.object_put(obj, format='json', hashmap=True, content_type=content_type, 
230
            json=hashmap, success=201)
231
        r.release()
232
    
233
    #download_* auxiliary methods
234
    #ALl untested
235
    def _get_remote_blocks_info(self, obj, **restargs):
236
        #retrieve object hashmap
237
        myrange = restargs.pop('data_range') if 'data_range' in restargs.keys() else None
238
        hashmap = self.get_object_hashmap(obj, **restargs)
239
        restargs['data_range'] = myrange
240
        blocksize = int(hashmap['block_size'])
241
        blockhash = hashmap['block_hash']
242
        total_size = hashmap['bytes']
243
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
244
        map_dict = {}
245
        for i, h in enumerate(hashmap['hashes']):
246
            map_dict[h] = i
247
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
248

    
249
    def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, range, **restargs):
250
        for blockid, blockhash in enumerate(remote_hashes):
251
            if blockhash == None:
252
                continue
253
            start = blocksize*blockid
254
            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
255
            (start, end) = _range_up(start, end, range)
256
            restargs['data_range'] = 'bytes=%s-%s'%(start, end)
257
            r = self.object_get(obj, success=(200, 206), **restargs)
258
            self._cb_next()
259
            dst.write(r.content)
260
            dst.flush()
261

    
262
    def _get_block_async(self, obj, **restargs):
263
        class SilentGreenlet(gevent.Greenlet):
264
            def _report_error(self, exc_info):
265
                try:
266
                    sys.stderr = StringIO()
267
                    gevent.Greenlet._report_error(self, exc_info)
268
                finally:
269
                    if hasattr(sys, '_stderr'):
270
                        sys.stderr = sys._stderr
271
        if not hasattr(self, 'POOL_SIZE'):
272
            self.POOL_SIZE = 5
273
        if self.async_pool is None:
274
            self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
275
        g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
276
        self.async_pool.start(g)
277
        return g
278

    
279
    def _hash_from_file(self, fp, start, size, blockhash):
280
        fp.seek(start)
281
        block = fp.read(size)
282
        h = newhashlib(blockhash)
283
        h.update(block.strip('\x00'))
284
        return hexlify(h.digest())
285

    
286
    def _greenlet2file(self, flying_greenlets, local_file, offset = 0, **restargs):
287
        """write the results of a greenleted rest call to a file
288
        @offset: the offset of the file up to blocksize - e.g. if the range is 10-100, all
289
        blocks will be written to normal_position - 10"""
290
        finished = []
291
        for start, g in flying_greenlets.items():
292
            if g.ready():
293
                if g.exception:
294
                    raise g.exception
295
                block = g.value.content
296
                local_file.seek(start - offset)
297
                local_file.write(block)
298
                self._cb_next()
299
                finished.append(flying_greenlets.pop(start))
300
        local_file.flush()
301
        return finished
302

    
303
    def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file,
304
        blockhash=None, resume=False, filerange = None, **restargs):
305

    
306
        file_size = fstat(local_file.fileno()).st_size if resume else 0
307
        flying_greenlets = {}
308
        finished_greenlets = []
309
        offset = 0
310
        if filerange is not None:
311
            rstart = int(filerange.split('-')[0])
312
            offset = rstart if blocksize > rstart else rstart%blocksize
313
        for block_hash, blockid in remote_hashes.items():
314
            start = blocksize*blockid
315
            if start < file_size and block_hash == self._hash_from_file(local_file, 
316
                start, blocksize, blockhash):
317
                    self._cb_next()
318
                    continue
319
            if len(flying_greenlets) >= self.POOL_SIZE:
320
                finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
321
                    **restargs)
322
            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
323
            (start, end) = _range_up(start, end, filerange)
324
            if start == end:
325
                self._cb_next()
326
                continue
327
            restargs['async_headers'] = dict(Range='bytes=%s-%s'%(start, end))
328
            flying_greenlets[start] = self._get_block_async(obj, **restargs)
329

    
330
        #check the greenlets
331
        while len(flying_greenlets) > 0:
332
            sleep(0.001)
333
            finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
334
                **restargs)
335

    
336
        gevent.joinall(finished_greenlets)
337

    
338
    def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
339
        range=None, if_match=None, if_none_match=None, if_modified_since=None,
340
        if_unmodified_since=None):
341

    
342
        restargs=dict(version=version,
343
            data_range = None if range is None else 'bytes=%s'%range,
344
            if_match=if_match,
345
            if_none_match=if_none_match,
346
            if_modified_since=if_modified_since,
347
            if_unmodified_since=if_unmodified_since)
348

    
349
        (   blocksize,
350
            blockhash,
351
            total_size,
352
            hash_list, 
353
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
354
        assert total_size >= 0
355
        self.POOL_SIZE = 5
356

    
357
        if download_cb:
358
            self.progress_bar_gen = download_cb(len(remote_hashes))
359
            self._cb_next()
360

    
361
        if dst.isatty():
362
            self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, range, **restargs)
363
        else:
364
            self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, blockhash,
365
                resume, range, **restargs)
366
            if range is None:
367
                dst.truncate(total_size)
368

    
369
        self._complete_cb()
370

    
371
    #Command Progress Bar method
372
    def _cb_next(self):
373
        if hasattr(self, 'progress_bar_gen'):
374
            try:
375
                self.progress_bar_gen.next()
376
            except:
377
                pass
378
    def _complete_cb(self):
379
        while True:
380
            try:
381
                self.progress_bar_gen.next()
382
            except:
383
                break
384

    
385
    #Untested - except is download_object is tested first
386
    def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
387
        if_modified_since=None, if_unmodified_since=None, data_range=None):
388
        try:
389
            r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
390
                if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
391
                if_unmodified_since=if_unmodified_since, data_range=data_range)
392
        except ClientError as err:
393
            if err.status == 304 or err.status == 412:
394
                return {}
395
            raise
396
        return r.json
397

    
398
    def set_account_group(self, group, usernames):
399
        r = self.account_post(update=True, groups = {group:usernames})
400
        r.release()
401

    
402
    def del_account_group(self, group):
403
        r = self.account_post(update=True, groups={group:[]})
404
        r.release()
405

    
406
    def get_account_info(self, until=None):
407
        r = self.account_head(until=until)
408
        if r.status_code == 401:
409
            raise ClientError("No authorization")
410
        return r.headers
411

    
412
    def get_account_quota(self):
413
        return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
414

    
415
    def get_account_versioning(self):
416
        return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
417

    
418
    def get_account_meta(self, until=None):
419
        return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
420

    
421
    def get_account_group(self):
422
        return filter_in(self.get_account_info(), 'X-Account-Group-')
423

    
424
    def set_account_meta(self, metapairs):
425
        assert(type(metapairs) is dict)
426
        r = self.account_post(update=True, metadata=metapairs)
427
        r.release()
428

    
429
    def del_account_meta(self, metakey):
430
        r = self.account_post(update=True, metadata={metakey:''})
431
        r.release()
432

    
433
    def set_account_quota(self, quota):
434
        r = self.account_post(update=True, quota=quota)
435
        r.release()
436

    
437
    def set_account_versioning(self, versioning):
438
        r = self.account_post(update=True, versioning = versioning)
439
        r.release()
440

    
441
    def list_containers(self):
442
        r = self.account_get()
443
        return r.json
444

    
445
    def del_container(self, until=None, delimiter=None):
446
        self.assert_container()
447
        r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
448
        r.release()
449
        if r.status_code == 404:
450
            raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
451
        elif r.status_code == 409:
452
            raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
453

    
454
    def get_container_versioning(self, container):
455
        self.container = container
456
        return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
457

    
458
    def get_container_quota(self, container):
459
        self.container = container
460
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
461

    
462
    def get_container_info(self, until = None):
463
        r = self.container_head(until=until)
464
        return r.headers
465

    
466
    def get_container_meta(self, until = None):
467
        return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
468

    
469
    def get_container_object_meta(self, until = None):
470
        return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
471

    
472
    def set_container_meta(self, metapairs):
473
        assert(type(metapairs) is dict)
474
        r = self.container_post(update=True, metadata=metapairs)
475
        r.release()
476
        
477
    def del_container_meta(self, metakey):
478
        r = self.container_post(update=True, metadata={metakey:''})
479
        r.release()
480

    
481
    def set_container_quota(self, quota):
482
        r = self.container_post(update=True, quota=quota)
483
        r.release()
484

    
485
    def set_container_versioning(self, versioning):
486
        r = self.container_post(update=True, versioning=versioning)
487
        r.release()
488

    
489
    def del_object(self, obj, until=None, delimiter=None):
490
        self.assert_container()
491
        r = self.object_delete(obj, until=until, delimiter=delimiter)
492
        r.release()
493

    
494
    def set_object_meta(self, object, metapairs):
495
        assert(type(metapairs) is dict)
496
        r = self.object_post(object, update=True, metadata=metapairs)
497
        r.release()
498

    
499
    def del_object_meta(self, metakey, object):
500
        r = self.object_post(object, update=True, metadata={metakey:''})
501
        r.release()
502

    
503
    def publish_object(self, object):
504
        r = self.object_post(object, update=True, public=True)
505
        r.release()
506

    
507
    def unpublish_object(self, object):
508
        r = self.object_post(object, update=True, public=False)
509
        r.release()
510

    
511
    def get_object_info(self, obj, version=None):
512
        r = self.object_head(obj, version=version)
513
        return r.headers
514

    
515
    def get_object_meta(self, obj, version=None):
516
        return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
517

    
518
    def get_object_sharing(self, object):
519
        r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
520
        reply = {}
521
        if len(r) > 0:
522
            perms = r['x-object-sharing'].split(';')
523
            for perm in perms:
524
                try:
525
                    perm.index('=')
526
                except ValueError:
527
                    raise ClientError('Incorrect reply format')
528
                (key, val) = perm.strip().split('=')
529
                reply[key] = val
530
        return reply
531

    
532
    def set_object_sharing(self, object, read_permition = False, write_permition = False):
533
        """Give read/write permisions to an object.
534
           @param object is the object to change sharing permitions onto
535
           @param read_permition is a list of users and user groups that get read permition for this object
536
                False means all previous read permitions will be removed
537
           @param write_perimition is a list of users and user groups to get write permition for this object
538
                False means all previous read permitions will be removed
539
        """
540
        perms = {}
541
        perms['read'] = read_permition if isinstance(read_permition, list) else ''
542
        perms['write'] = write_permition if isinstance(write_permition, list) else ''
543
        r = self.object_post(object, update=True, permitions=perms)
544
        r.release()
545

    
546
    def del_object_sharing(self, object):
547
        self.set_object_sharing(object)
548

    
549
    def append_object(self, object, source_file, upload_cb = None):
550
        """@param upload_db is a generator for showing progress of upload
551
            to caller application, e.g. a progress bar. Its next is called
552
            whenever a block is uploaded
553
        """
554
        self.assert_container()
555
        meta = self.get_container_info()
556
        blocksize = int(meta['x-container-block-size'])
557
        filesize = fstat(source_file.fileno()).st_size
558
        nblocks = 1 + (filesize - 1)//blocksize
559
        offset = 0
560
        if upload_cb is not None:
561
            upload_gen = upload_cb(nblocks)
562
        for i in range(nblocks):
563
            block = source_file.read(min(blocksize, filesize - offset))
564
            offset += len(block)
565
            r = self.object_post(object, update=True, content_range='bytes */*',
566
                content_type='application/octet-stream', content_length=len(block), data=block)
567
            r.release()
568
            
569
            if upload_cb is not None:
570
                upload_gen.next()
571

    
572
    def truncate_object(self, object, upto_bytes):
573
        r = self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
574
            content_type='application/octet-stream', object_bytes=upto_bytes,
575
            source_object=path4url(self.container, object))
576
        r.release()
577

    
578
    def overwrite_object(self, object, start, end, source_file, upload_cb=None):
579
        """Overwrite a part of an object with given source file
580
           @start the part of the remote object to start overwriting from, in bytes
581
           @end the part of the remote object to stop overwriting to, in bytes
582
        """
583
        self.assert_container()
584
        meta = self.get_container_info()
585
        blocksize = int(meta['x-container-block-size'])
586
        filesize = fstat(source_file.fileno()).st_size
587
        datasize = int(end) - int(start) + 1
588
        nblocks = 1 + (datasize - 1)//blocksize
589
        offset = 0
590
        if upload_cb is not None:
591
            upload_gen = upload_cb(nblocks)
592
        for i in range(nblocks):
593
            block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
594
            offset += len(block)
595
            r = self.object_post(object, update=True, content_type='application/octet-stream', 
596
                content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
597
            r.release()
598
            
599
            if upload_cb is not None:
600
                upload_gen.next()
601

    
602
    def copy_object(self, src_container, src_object, dst_container, dst_object=False,
603
        source_version = None, public=False, content_type=None, delimiter=None):
604
        self.assert_account()
605
        self.container = dst_container
606
        dst_object = dst_object or src_object
607
        src_path = path4url(src_container, src_object)
608
        r = self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
609
            source_version=source_version, public=public, content_type=content_type,
610
            delimiter=delimiter)
611
        r.release()
612

    
613
    def move_object(self, src_container, src_object, dst_container, dst_object=False,
614
        source_version = None, public=False, content_type=None, delimiter=None):
615
        self.assert_account()
616
        self.container = dst_container
617
        dst_object = dst_object or src_object
618
        src_path = path4url(src_container, src_object)
619
        r = self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
620
            source_version=source_version, public=public, content_type=content_type,
621
            delimiter=delimiter)
622
        r.release()
623

    
624
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
625
        """Get accounts that share with self.account"""
626
        self.assert_account()
627

    
628
        self.set_param('format','json')
629
        self.set_param('limit',limit, iff = limit is not None)
630
        self.set_param('marker',marker, iff = marker is not None)
631

    
632
        path = ''
633
        success = kwargs.pop('success', (200, 204))
634
        r = self.get(path, *args, success = success, **kwargs)
635
        return r.json
636

    
637
    def get_object_versionlist(self, path):
638
        self.assert_container()
639
        r = self.object_get(path, format='json', version='list')
640
        return r.json['versions']