Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 3dabe5d2

History | View | Annotate | Download (27.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import gevent
35
#import gevent.monkey
36
# Monkey-patch everything for gevent early on
37
#gevent.monkey.patch_all()
38
import gevent.pool
39

    
40
from os import fstat
41
from hashlib import new as newhashlib
42
from time import time, sleep
43
import sys
44

    
45
from binascii import hexlify
46

    
47
from kamaki.clients.pithos_rest_api import PithosRestAPI
48
from kamaki.clients.storage import ClientError
49
from kamaki.clients.utils import path4url, filter_in
50
from StringIO import StringIO
51

    
52

    
53
def pithos_hash(block, blockhash):
54
    h = newhashlib(blockhash)
55
    h.update(block.rstrip('\x00'))
56
    return h.hexdigest()
57

    
58

    
59
def _range_up(start, end, a_range):
60
    if a_range:
61
        (rstart, rend) = a_range.split('-')
62
        (rstart, rend) = (int(rstart), int(rend))
63
        if rstart > end or rend < start:
64
            return (0, 0)
65
        if rstart > start:
66
            start = rstart
67
        if rend < end:
68
            end = rend
69
    return (start, end)
70

    
71

    
72
class PithosClient(PithosRestAPI):
73
    """GRNet Pithos API client"""
74

    
75
    def __init__(self, base_url, token, account=None, container=None):
76
        super(PithosClient, self).__init__(base_url, token, account, container)
77
        self.async_pool = None
78

    
79
    def purge_container(self):
80
        r = self.container_delete(until=unicode(time()))
81
        r.release()
82

    
83
    def upload_object_unchunked(self, obj, f,
84
        withHashFile=False,
85
        size=None,
86
        etag=None,
87
        content_encoding=None,
88
        content_disposition=None,
89
        content_type=None,
90
        sharing=None,
91
        public=None):
92
        self.assert_container()
93

    
94
        if withHashFile:
95
            data = f.read()
96
            try:
97
                import json
98
                data = json.dumps(json.loads(data))
99
            except ValueError:
100
                raise ClientError(message='"%s" is not json-formated' % f.name,
101
                    status=1)
102
            except SyntaxError:
103
                raise ClientError(message='"%s" is not a valid hashmap file'\
104
                % f.name, status=1)
105
            f = StringIO(data)
106
        data = f.read(size) if size is not None else f.read()
107
        r = self.object_put(obj,
108
            data=data,
109
            etag=etag,
110
            content_encoding=content_encoding,
111
            content_disposition=content_disposition,
112
            content_type=content_type,
113
            permissions=sharing,
114
            public=public,
115
            success=201)
116
        r.release()
117

    
118
    # upload_* auxiliary methods
119
    def put_block_async(self, data, hash):
120
        class SilentGreenlet(gevent.Greenlet):
121
            def _report_error(self, exc_info):
122
                try:
123
                    sys.stderr = StringIO()
124
                    gevent.Greenlet._report_error(self, exc_info)
125
                finally:
126
                    if hasattr(sys, '_stderr'):
127
                        sys.stderr = sys._stderr
128
        POOL_SIZE = self.POOL_SIZE if hasattr(self, 'POOL_SIZE') else 5
129
        if self.async_pool is None:
130
            self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
131
        g = SilentGreenlet(self.put_block, data, hash)
132
        self.async_pool.start(g)
133
        return g
134

    
135
    def put_block(self, data, hash):
136
        r = self.container_post(update=True,
137
            content_type='application/octet-stream',
138
            content_length=len(data),
139
            data=data,
140
            format='json')
141
        assert r.json[0] == hash, 'Local hash does not match server'
142

    
143
    def create_object_by_manifestation(self, obj,
144
        etag=None,
145
        content_encoding=None,
146
        content_disposition=None,
147
        content_type=None,
148
        sharing=None,
149
        public=None):
150
        self.assert_container()
151
        r = self.object_put(obj,
152
            content_length=0,
153
            etag=etag,
154
            content_encoding=content_encoding,
155
            content_disposition=content_disposition,
156
            content_type=content_type,
157
            permissions=sharing,
158
            public=public,
159
            manifest='%s/%s' % (self.container, obj))
160
        r.release()
161

    
162
    def _get_file_block_info(self, fileobj, size=None):
163
        meta = self.get_container_info()
164
        blocksize = int(meta['x-container-block-size'])
165
        blockhash = meta['x-container-block-hash']
166
        size = size if size is not None else fstat(fileobj.fileno()).st_size
167
        nblocks = 1 + (size - 1) // blocksize
168
        return (blocksize, blockhash, size, nblocks)
169

    
170
    def _get_missing_hashes(self, obj, json,
171
        size=None,
172
        format='json',
173
        hashmap=True,
174
        content_type=None,
175
        etag=None,
176
        content_encoding=None,
177
        content_disposition=None,
178
        permissions=None,
179
        public=None,
180
        success=(201, 409)):
181
        r = self.object_put(obj,
182
            format='json',
183
            hashmap=True,
184
            content_type=content_type,
185
            json=json,
186
            etag=etag,
187
            content_encoding=content_encoding,
188
            content_disposition=content_disposition,
189
            permissions=permissions,
190
            public=public,
191
            success=success)
192
        if r.status_code == 201:
193
            r.release()
194
            return None
195
        return r.json
196

    
197
    def _caclulate_uploaded_blocks(self,
198
        blocksize,
199
        blockhash,
200
        size,
201
        nblocks,
202
        hashes,
203
        hmap,
204
        fileobj,
205
        hash_cb=None):
206
        offset = 0
207
        if hash_cb:
208
            hash_gen = hash_cb(nblocks)
209
            hash_gen.next()
210

    
211
        for i in range(nblocks):
212
            block = fileobj.read(min(blocksize, size - offset))
213
            bytes = len(block)
214
            hash = pithos_hash(block, blockhash)
215
            hashes.append(hash)
216
            hmap[hash] = (offset, bytes)
217
            offset += bytes
218
            if hash_cb:
219
                hash_gen.next()
220
        assert offset == size
221

    
222
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
223
        """upload missing blocks asynchronously. Use greenlets to avoid waiting
224
        """
225
        if upload_cb:
226
            upload_gen = upload_cb(len(missing))
227
            upload_gen.next()
228

    
229
        flying = []
230
        for hash in missing:
231
            offset, bytes = hmap[hash]
232
            fileobj.seek(offset)
233
            data = fileobj.read(bytes)
234
            r = self.put_block_async(data, hash)
235
            flying.append(r)
236
            for r in flying:
237
                if r.ready():
238
                    if r.exception:
239
                        raise r.exception
240
                    if upload_cb:
241
                        upload_gen.next()
242
            flying = [r for r in flying if not r.ready()]
243
        while upload_cb:
244
            try:
245
                upload_gen.next()
246
            except StopIteration:
247
                break
248
        gevent.joinall(flying)
249

    
250
        failures = [r for r in flying if r.exception]
251
        if len(failures):
252
            details = ', '.join(['(%s).%s' % (i, r.exception)\
253
                for i, r in enumerate(failures)])
254
            raise ClientError(message="Block uploading failed",
255
                status=505,
256
                details=details)
257

    
258
    def upload_object(self, obj, f,
259
        size=None,
260
        hash_cb=None,
261
        upload_cb=None,
262
        etag=None,
263
        content_encoding=None,
264
        content_disposition=None,
265
        content_type=None,
266
        sharing=None,
267
        public=None):
268
        self.assert_container()
269

    
270
        #init
271
        block_info = (blocksize, blockhash, size, nblocks) =\
272
            self._get_file_block_info(f, size)
273
        (hashes, hmap, offset) = ([], {}, 0)
274
        if content_type is None:
275
            content_type = 'application/octet-stream'
276

    
277
        self._caclulate_uploaded_blocks(*block_info,
278
            hashes=hashes,
279
            hmap=hmap,
280
            fileobj=f,
281
            hash_cb=hash_cb)
282

    
283
        hashmap = dict(bytes=size, hashes=hashes)
284
        missing = self._get_missing_hashes(obj, hashmap,
285
            content_type=content_type,
286
            size=size,
287
            etag=etag,
288
            content_encoding=content_encoding,
289
            content_disposition=content_disposition,
290
            permissions=sharing,
291
            public=public)
292

    
293
        if missing is None:
294
            return
295
        self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
296

    
297
        r = self.object_put(obj,
298
            format='json',
299
            hashmap=True,
300
            content_type=content_type,
301
            json=hashmap,
302
            success=201)
303
        r.release()
304

    
305
    #download_* auxiliary methods
306
    #ALl untested
307
    def _get_remote_blocks_info(self, obj, **restargs):
308
        #retrieve object hashmap
309
        myrange = restargs.pop('data_range', None)
310
        hashmap = self.get_object_hashmap(obj, **restargs)
311
        restargs['data_range'] = myrange
312
        blocksize = int(hashmap['block_size'])
313
        blockhash = hashmap['block_hash']
314
        total_size = hashmap['bytes']
315
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
316
        map_dict = {}
317
        for i, h in enumerate(hashmap['hashes']):
318
            map_dict[h] = i
319
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
320

    
321
    def _dump_blocks_sync(self,
322
        obj,
323
        remote_hashes,
324
        blocksize,
325
        total_size,
326
        dst,
327
        range,
328
        **restargs):
329
        for blockid, blockhash in enumerate(remote_hashes):
330
            if blockhash == None:
331
                continue
332
            start = blocksize * blockid
333
            end = total_size - 1 if start + blocksize > total_size\
334
                else start + blocksize - 1
335
            (start, end) = _range_up(start, end, range)
336
            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
337
            r = self.object_get(obj, success=(200, 206), **restargs)
338
            self._cb_next()
339
            dst.write(r.content)
340
            dst.flush()
341

    
342
    def _get_block_async(self, obj, **restargs):
343
        class SilentGreenlet(gevent.Greenlet):
344
            def _report_error(self, exc_info):
345
                try:
346
                    sys.stderr = StringIO()
347
                    gevent.Greenlet._report_error(self, exc_info)
348
                finally:
349
                    if hasattr(sys, '_stderr'):
350
                        sys.stderr = sys._stderr
351
        if not hasattr(self, 'POOL_SIZE'):
352
            self.POOL_SIZE = 5
353
        if self.async_pool is None:
354
            self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
355
        g = SilentGreenlet(self.object_get, obj,
356
            success=(200, 206),
357
            **restargs)
358
        self.async_pool.start(g)
359
        return g
360

    
361
    def _hash_from_file(self, fp, start, size, blockhash):
362
        fp.seek(start)
363
        block = fp.read(size)
364
        h = newhashlib(blockhash)
365
        h.update(block.strip('\x00'))
366
        return hexlify(h.digest())
367

    
368
    def _greenlet2file(self,
369
        flying_greenlets,
370
        local_file,
371
        offset=0,
372
        **restargs):
373
        """write the results of a greenleted rest call to a file
374
        @offset: the offset of the file up to blocksize
375
            - e.g. if the range is 10-100, all
376
        blocks will be written to normal_position - 10"""
377
        finished = []
378
        for start, g in flying_greenlets.items():
379
            if g.ready():
380
                if g.exception:
381
                    raise g.exception
382
                block = g.value.content
383
                local_file.seek(start - offset)
384
                local_file.write(block)
385
                self._cb_next()
386
                finished.append(flying_greenlets.pop(start))
387
        local_file.flush()
388
        return finished
389

    
390
    def _dump_blocks_async(self,
391
        obj,
392
        remote_hashes,
393
        blocksize,
394
        total_size,
395
        local_file,
396
        blockhash=None,
397
        resume=False,
398
        filerange=None,
399
        **restargs):
400

    
401
        file_size = fstat(local_file.fileno()).st_size if resume else 0
402
        flying_greenlets = {}
403
        finished_greenlets = []
404
        offset = 0
405
        if filerange is not None:
406
            rstart = int(filerange.split('-')[0])
407
            offset = rstart if blocksize > rstart else rstart % blocksize
408
        for block_hash, blockid in remote_hashes.items():
409
            start = blocksize * blockid
410
            if start < file_size\
411
            and block_hash == self._hash_from_file(local_file,
412
                start,
413
                blocksize,
414
                blockhash):
415
                self._cb_next()
416
                continue
417
            if len(flying_greenlets) >= self.POOL_SIZE:
418
                finished_greenlets += self._greenlet2file(flying_greenlets,
419
                    local_file,
420
                    offset,
421
                    **restargs)
422
            end = total_size - 1 if start + blocksize > total_size\
423
                else start + blocksize - 1
424
            (start, end) = _range_up(start, end, filerange)
425
            if start == end:
426
                self._cb_next()
427
                continue
428
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
429
            flying_greenlets[start] = self._get_block_async(obj, **restargs)
430

    
431
        #check the greenlets
432
        while len(flying_greenlets) > 0:
433
            sleep(0.001)
434
            finished_greenlets += self._greenlet2file(flying_greenlets,
435
                local_file,
436
                offset,
437
                **restargs)
438

    
439
        gevent.joinall(finished_greenlets)
440

    
441
    def download_object(self,
442
        obj,
443
        dst,
444
        download_cb=None,
445
        version=None,
446
        overide=False,
447
        resume=False,
448
        range=None,
449
        if_match=None,
450
        if_none_match=None,
451
        if_modified_since=None,
452
        if_unmodified_since=None):
453

    
454
        restargs = dict(version=version,
455
            data_range=None if range is None else 'bytes=%s' % range,
456
            if_match=if_match,
457
            if_none_match=if_none_match,
458
            if_modified_since=if_modified_since,
459
            if_unmodified_since=if_unmodified_since)
460

    
461
        (blocksize,
462
            blockhash,
463
            total_size,
464
            hash_list,
465
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
466
        assert total_size >= 0
467
        self.POOL_SIZE = 5
468

    
469
        if download_cb:
470
            self.progress_bar_gen = download_cb(len(remote_hashes))
471
            self._cb_next()
472

    
473
        if dst.isatty():
474
            self._dump_blocks_sync(obj,
475
                hash_list,
476
                blocksize,
477
                total_size,
478
                dst,
479
                range,
480
                **restargs)
481
        else:
482
            self._dump_blocks_async(obj,
483
                remote_hashes,
484
                blocksize,
485
                total_size,
486
                dst,
487
                blockhash,
488
                resume,
489
                range,
490
                **restargs)
491
            if range is None:
492
                dst.truncate(total_size)
493

    
494
        self._complete_cb()
495

    
496
    #Command Progress Bar method
497
    def _cb_next(self):
498
        if hasattr(self, 'progress_bar_gen'):
499
            try:
500
                self.progress_bar_gen.next()
501
            except:
502
                pass
503

    
504
    def _complete_cb(self):
505
        while True:
506
            try:
507
                self.progress_bar_gen.next()
508
            except:
509
                break
510

    
511
    # Untested - except is download_object is tested first
512
    def get_object_hashmap(self, obj,
513
        version=None,
514
        if_match=None,
515
        if_none_match=None,
516
        if_modified_since=None,
517
        if_unmodified_since=None,
518
        data_range=None):
519
        try:
520
            r = self.object_get(obj,
521
                hashmap=True,
522
                version=version,
523
                if_etag_match=if_match,
524
                if_etag_not_match=if_none_match,
525
                if_modified_since=if_modified_since,
526
                if_unmodified_since=if_unmodified_since,
527
                data_range=data_range)
528
        except ClientError as err:
529
            if err.status == 304 or err.status == 412:
530
                return {}
531
            raise
532
        return r.json
533

    
534
    def set_account_group(self, group, usernames):
535
        r = self.account_post(update=True, groups={group: usernames})
536
        r.release()
537

    
538
    def del_account_group(self, group):
539
        r = self.account_post(update=True, groups={group: []})
540
        r.release()
541

    
542
    def get_account_info(self, until=None):
543
        r = self.account_head(until=until)
544
        if r.status_code == 401:
545
            raise ClientError("No authorization")
546
        return r.headers
547

    
548
    def get_account_quota(self):
549
        return filter_in(self.get_account_info(),
550
            'X-Account-Policy-Quota',
551
            exactMatch=True)
552

    
553
    def get_account_versioning(self):
554
        return filter_in(self.get_account_info(),
555
            'X-Account-Policy-Versioning',
556
            exactMatch=True)
557

    
558
    def get_account_meta(self, until=None):
559
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
560

    
561
    def get_account_group(self):
562
        return filter_in(self.get_account_info(), 'X-Account-Group-')
563

    
564
    def set_account_meta(self, metapairs):
565
        assert(type(metapairs) is dict)
566
        r = self.account_post(update=True, metadata=metapairs)
567
        r.release()
568

    
569
    def del_account_meta(self, metakey):
570
        r = self.account_post(update=True, metadata={metakey: ''})
571
        r.release()
572

    
573
    def set_account_quota(self, quota):
574
        r = self.account_post(update=True, quota=quota)
575
        r.release()
576

    
577
    def set_account_versioning(self, versioning):
578
        r = self.account_post(update=True, versioning=versioning)
579
        r.release()
580

    
581
    def list_containers(self):
582
        r = self.account_get()
583
        return r.json
584

    
585
    def del_container(self, until=None, delimiter=None):
586
        self.assert_container()
587
        r = self.container_delete(until=until,
588
            delimiter=delimiter,
589
            success=(204, 404, 409))
590
        r.release()
591
        if r.status_code == 404:
592
            raise ClientError('Container "%s" does not exist' % self.container,
593
                r.status_code)
594
        elif r.status_code == 409:
595
            raise ClientError('Container "%s" is not empty' % self.container,
596
                r.status_code)
597

    
598
    def get_container_versioning(self, container):
599
        self.container = container
600
        return filter_in(self.get_container_info(),
601
            'X-Container-Policy-Versioning')
602

    
603
    def get_container_quota(self, container):
604
        self.container = container
605
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
606

    
607
    def get_container_info(self, until=None):
608
        r = self.container_head(until=until)
609
        return r.headers
610

    
611
    def get_container_meta(self, until=None):
612
        return filter_in(self.get_container_info(until=until),
613
            'X-Container-Meta')
614

    
615
    def get_container_object_meta(self, until=None):
616
        return filter_in(self.get_container_info(until=until),
617
            'X-Container-Object-Meta')
618

    
619
    def set_container_meta(self, metapairs):
620
        assert(type(metapairs) is dict)
621
        r = self.container_post(update=True, metadata=metapairs)
622
        r.release()
623

    
624
    def del_container_meta(self, metakey):
625
        r = self.container_post(update=True, metadata={metakey: ''})
626
        r.release()
627

    
628
    def set_container_quota(self, quota):
629
        r = self.container_post(update=True, quota=quota)
630
        r.release()
631

    
632
    def set_container_versioning(self, versioning):
633
        r = self.container_post(update=True, versioning=versioning)
634
        r.release()
635

    
636
    def del_object(self, obj, until=None, delimiter=None):
637
        self.assert_container()
638
        r = self.object_delete(obj, until=until, delimiter=delimiter)
639
        r.release()
640

    
641
    def set_object_meta(self, object, metapairs):
642
        assert(type(metapairs) is dict)
643
        r = self.object_post(object, update=True, metadata=metapairs)
644
        r.release()
645

    
646
    def del_object_meta(self, metakey, object):
647
        r = self.object_post(object, update=True, metadata={metakey: ''})
648
        r.release()
649

    
650
    def publish_object(self, object):
651
        r = self.object_post(object, update=True, public=True)
652
        r.release()
653

    
654
    def unpublish_object(self, object):
655
        r = self.object_post(object, update=True, public=False)
656
        r.release()
657

    
658
    def get_object_info(self, obj, version=None):
659
        r = self.object_head(obj, version=version)
660
        return r.headers
661

    
662
    def get_object_meta(self, obj, version=None):
663
        return filter_in(self.get_object_info(obj, version=version),
664
            'X-Object-Meta')
665

    
666
    def get_object_sharing(self, object):
667
        r = filter_in(self.get_object_info(object),
668
            'X-Object-Sharing',
669
            exactMatch=True)
670
        reply = {}
671
        if len(r) > 0:
672
            perms = r['x-object-sharing'].split(';')
673
            for perm in perms:
674
                try:
675
                    perm.index('=')
676
                except ValueError:
677
                    raise ClientError('Incorrect reply format')
678
                (key, val) = perm.strip().split('=')
679
                reply[key] = val
680
        return reply
681

    
682
    def set_object_sharing(self, object,
683
        read_permition=False,
684
        write_permition=False):
685
        """Give read/write permisions to an object.
686
           @param object is the object to change sharing permissions
687
 onto
688
           @param read_permition is a list of users and user groups that
689
                get read permition for this object
690
                False means all previous read permissions
691
 will be removed
692
           @param write_perimition is a list of users and user groups to
693
                get write permition for this object
694
                False means all previous read permissions
695
 will be removed
696
        """
697
        perms = dict(read='' if not read_permition else read_permition,
698
            write='' if not write_permition else write_permition)
699
        r = self.object_post(object, update=True, permissions=perms)
700
        r.release()
701

    
702
    def del_object_sharing(self, object):
703
        self.set_object_sharing(object)
704

    
705
    def append_object(self, object, source_file, upload_cb=None):
706
        """@param upload_db is a generator for showing progress of upload
707
            to caller application, e.g. a progress bar. Its next is called
708
            whenever a block is uploaded
709
        """
710
        self.assert_container()
711
        meta = self.get_container_info()
712
        blocksize = int(meta['x-container-block-size'])
713
        filesize = fstat(source_file.fileno()).st_size
714
        nblocks = 1 + (filesize - 1) // blocksize
715
        offset = 0
716
        if upload_cb is not None:
717
            upload_gen = upload_cb(nblocks)
718
        for i in range(nblocks):
719
            block = source_file.read(min(blocksize, filesize - offset))
720
            offset += len(block)
721
            r = self.object_post(object,
722
                update=True,
723
                content_range='bytes */*',
724
                content_type='application/octet-stream',
725
                content_length=len(block),
726
                data=block)
727
            r.release()
728

    
729
            if upload_cb is not None:
730
                upload_gen.next()
731

    
732
    def truncate_object(self, object, upto_bytes):
733
        r = self.object_post(object,
734
            update=True,
735
            content_range='bytes 0-%s/*' % upto_bytes,
736
            content_type='application/octet-stream',
737
            object_bytes=upto_bytes,
738
            source_object=path4url(self.container, object))
739
        r.release()
740

    
741
    def overwrite_object(self,
742
        object,
743
        start,
744
        end,
745
        source_file,
746
        upload_cb=None):
747
        """Overwrite a part of an object with given source file
748
           @start the part of the remote object to start overwriting from,
749
                in bytes
750
           @end the part of the remote object to stop overwriting to, in bytes
751
        """
752
        self.assert_container()
753
        meta = self.get_container_info()
754
        blocksize = int(meta['x-container-block-size'])
755
        filesize = fstat(source_file.fileno()).st_size
756
        datasize = int(end) - int(start) + 1
757
        nblocks = 1 + (datasize - 1) // blocksize
758
        offset = 0
759
        if upload_cb is not None:
760
            upload_gen = upload_cb(nblocks)
761
        for i in range(nblocks):
762
            block = source_file.read(min(blocksize,
763
                filesize - offset,
764
                datasize - offset))
765
            offset += len(block)
766
            r = self.object_post(object,
767
                update=True,
768
                content_type='application/octet-stream',
769
                content_length=len(block),
770
                content_range='bytes %s-%s/*' % (start, end),
771
                data=block)
772
            r.release()
773

    
774
            if upload_cb is not None:
775
                upload_gen.next()
776

    
777
    def copy_object(self, src_container, src_object, dst_container,
778
        dst_object=False,
779
        source_version=None,
780
        public=False,
781
        content_type=None,
782
        delimiter=None):
783
        self.assert_account()
784
        self.container = dst_container
785
        dst_object = dst_object or src_object
786
        src_path = path4url(src_container, src_object)
787
        r = self.object_put(dst_object,
788
            success=201,
789
            copy_from=src_path,
790
            content_length=0,
791
            source_version=source_version,
792
            public=public,
793
            content_type=content_type,
794
            delimiter=delimiter)
795
        r.release()
796

    
797
    def move_object(self, src_container, src_object, dst_container,
798
        dst_object=False,
799
        source_version=None,
800
        public=False,
801
        content_type=None,
802
        delimiter=None):
803
        self.assert_account()
804
        self.container = dst_container
805
        dst_object = dst_object or src_object
806
        src_path = path4url(src_container, src_object)
807
        r = self.object_put(dst_object,
808
            success=201,
809
            move_from=src_path,
810
            content_length=0,
811
            source_version=source_version,
812
            public=public,
813
            content_type=content_type,
814
            delimiter=delimiter)
815
        r.release()
816

    
817
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
818
        """Get accounts that share with self.account"""
819
        self.assert_account()
820

    
821
        self.set_param('format', 'json')
822
        self.set_param('limit', limit, iff=limit is not None)
823
        self.set_param('marker', marker, iff=marker is not None)
824

    
825
        path = ''
826
        success = kwargs.pop('success', (200, 204))
827
        r = self.get(path, *args, success=success, **kwargs)
828
        return r.json
829

    
830
    def get_object_versionlist(self, path):
831
        self.assert_container()
832
        r = self.object_get(path, format='json', version='list')
833
        return r.json['versions']