Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ cad39033

History | View | Annotate | Download (27.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import Thread
35
from threading import enumerate as activethreads
36

    
37
from os import fstat
38
from hashlib import new as newhashlib
39
from time import time
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class SilentEvent(Thread):
69
    """ Thread-run method(*args, **kwargs)
70
        put exception in exception_bucket
71
    """
72
    def __init__(self, method, *args, **kwargs):
73
        super(self.__class__, self).__init__()
74
        self.method = method
75
        self.args = args
76
        self.kwargs = kwargs
77

    
78
    @property
79
    def exception(self):
80
        return getattr(self, '_exception', False)
81

    
82
    @property
83
    def value(self):
84
        return getattr(self, '_value', None)
85

    
86
    def run(self):
87
        try:
88
            self._value = self.method(*(self.args), **(self.kwargs))
89
        except Exception as e:
90
            print('______\n%s\n_______' % e)
91
            self._exception = e
92

    
93

    
94
class PithosClient(PithosRestAPI):
95
    """GRNet Pithos API client"""
96

    
97
    _thread_exceptions = []
98

    
99
    def __init__(self, base_url, token, account=None, container=None):
100
        super(PithosClient, self).__init__(base_url, token, account, container)
101

    
102
    def purge_container(self):
103
        r = self.container_delete(until=unicode(time()))
104
        r.release()
105

    
106
    def upload_object_unchunked(self, obj, f,
107
        withHashFile=False,
108
        size=None,
109
        etag=None,
110
        content_encoding=None,
111
        content_disposition=None,
112
        content_type=None,
113
        sharing=None,
114
        public=None):
115
        self.assert_container()
116

    
117
        if withHashFile:
118
            data = f.read()
119
            try:
120
                import json
121
                data = json.dumps(json.loads(data))
122
            except ValueError:
123
                raise ClientError(message='"%s" is not json-formated' % f.name,
124
                    status=1)
125
            except SyntaxError:
126
                raise ClientError(message='"%s" is not a valid hashmap file'\
127
                % f.name, status=1)
128
            f = StringIO(data)
129
        data = f.read(size) if size is not None else f.read()
130
        r = self.object_put(obj,
131
            data=data,
132
            etag=etag,
133
            content_encoding=content_encoding,
134
            content_disposition=content_disposition,
135
            content_type=content_type,
136
            permissions=sharing,
137
            public=public,
138
            success=201)
139
        r.release()
140

    
141
    # upload_* auxiliary methods
142
    def put_block_async(self, data, hash):
143
        event = SilentEvent(method=self.put_block, data=data, hash=hash)
144
        event.start()
145
        return event
146

    
147
    def put_block(self, data, hash):
148
        r = self.container_post(update=True,
149
            content_type='application/octet-stream',
150
            content_length=len(data),
151
            data=data,
152
            format='json')
153
        assert r.json[0] == hash, 'Local hash does not match server'
154

    
155
    def create_object_by_manifestation(self, obj,
156
        etag=None,
157
        content_encoding=None,
158
        content_disposition=None,
159
        content_type=None,
160
        sharing=None,
161
        public=None):
162
        self.assert_container()
163
        r = self.object_put(obj,
164
            content_length=0,
165
            etag=etag,
166
            content_encoding=content_encoding,
167
            content_disposition=content_disposition,
168
            content_type=content_type,
169
            permissions=sharing,
170
            public=public,
171
            manifest='%s/%s' % (self.container, obj))
172
        r.release()
173

    
174
    def _get_file_block_info(self, fileobj, size=None):
175
        meta = self.get_container_info()
176
        blocksize = int(meta['x-container-block-size'])
177
        blockhash = meta['x-container-block-hash']
178
        size = size if size is not None else fstat(fileobj.fileno()).st_size
179
        nblocks = 1 + (size - 1) // blocksize
180
        return (blocksize, blockhash, size, nblocks)
181

    
182
    def _get_missing_hashes(self, obj, json,
183
        size=None,
184
        format='json',
185
        hashmap=True,
186
        content_type=None,
187
        etag=None,
188
        content_encoding=None,
189
        content_disposition=None,
190
        permissions=None,
191
        public=None,
192
        success=(201, 409)):
193
        r = self.object_put(obj,
194
            format='json',
195
            hashmap=True,
196
            content_type=content_type,
197
            json=json,
198
            etag=etag,
199
            content_encoding=content_encoding,
200
            content_disposition=content_disposition,
201
            permissions=permissions,
202
            public=public,
203
            success=success)
204
        if r.status_code == 201:
205
            r.release()
206
            return None
207
        return r.json
208

    
209
    def _caclulate_uploaded_blocks(self,
210
        blocksize,
211
        blockhash,
212
        size,
213
        nblocks,
214
        hashes,
215
        hmap,
216
        fileobj,
217
        hash_cb=None):
218
        offset = 0
219
        if hash_cb:
220
            hash_gen = hash_cb(nblocks)
221
            hash_gen.next()
222

    
223
        for i in range(nblocks):
224
            block = fileobj.read(min(blocksize, size - offset))
225
            bytes = len(block)
226
            hash = pithos_hash(block, blockhash)
227
            hashes.append(hash)
228
            hmap[hash] = (offset, bytes)
229
            offset += bytes
230
            if hash_cb:
231
                hash_gen.next()
232
        assert offset == size
233

    
234
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
235
        """upload missing blocks asynchronously. Use greenlets to avoid waiting
236
        """
237
        if upload_cb:
238
            upload_gen = upload_cb(len(missing))
239
            upload_gen.next()
240

    
241
        self._init_thread_limit()
242

    
243
        flying = []
244
        for hash in missing:
245
            offset, bytes = hmap[hash]
246
            fileobj.seek(offset)
247
            data = fileobj.read(bytes)
248
            r = self.put_block_async(data, hash)
249
            flying.append(r)
250
            unfinished = []
251
            for i, thread in enumerate(flying):
252

    
253
                unfinished = self._watch_thread_limit(unfinished)
254

    
255
                if thread.isAlive() or thread.exception:
256
                    unfinished.append(thread)
257
                else:
258
                    if upload_cb:
259
                        upload_gen.next()
260
            flying = unfinished
261

    
262
        for thread in flying:
263
            thread.join()
264

    
265
        failures = [r for r in flying if r.exception]
266
        if len(failures):
267
            details = ', '.join([' (%s).%s' % (i, r.exception)\
268
                for i, r in enumerate(failures)])
269
            raise ClientError(message="Block uploading failed",
270
                status=505,
271
                details=details)
272

    
273
        while upload_cb:
274
            try:
275
                upload_gen.next()
276
            except StopIteration:
277
                break
278

    
279
    def upload_object(self, obj, f,
280
        size=None,
281
        hash_cb=None,
282
        upload_cb=None,
283
        etag=None,
284
        content_encoding=None,
285
        content_disposition=None,
286
        content_type=None,
287
        sharing=None,
288
        public=None):
289
        self.assert_container()
290

    
291
        #init
292
        block_info = (blocksize, blockhash, size, nblocks) =\
293
            self._get_file_block_info(f, size)
294
        (hashes, hmap, offset) = ([], {}, 0)
295
        if content_type is None:
296
            content_type = 'application/octet-stream'
297

    
298
        self._caclulate_uploaded_blocks(*block_info,
299
            hashes=hashes,
300
            hmap=hmap,
301
            fileobj=f,
302
            hash_cb=hash_cb)
303

    
304
        hashmap = dict(bytes=size, hashes=hashes)
305
        missing = self._get_missing_hashes(obj, hashmap,
306
            content_type=content_type,
307
            size=size,
308
            etag=etag,
309
            content_encoding=content_encoding,
310
            content_disposition=content_disposition,
311
            permissions=sharing,
312
            public=public)
313

    
314
        if missing is None:
315
            return
316
        try:
317
            self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
318
        except KeyboardInterrupt:
319
            print('- - - wait for threads to finish')
320
            for thread in activethreads():
321
                thread.join()
322
            raise
323

    
324
        r = self.object_put(
325
            obj,
326
            format='json',
327
            hashmap=True,
328
            content_type=content_type,
329
            json=hashmap,
330
            success=201)
331
        r.release()
332

    
333
    # download_* auxiliary methods
334
    def _get_remote_blocks_info(self, obj, **restargs):
335
        #retrieve object hashmap
336
        myrange = restargs.pop('data_range', None)
337
        hashmap = self.get_object_hashmap(obj, **restargs)
338
        restargs['data_range'] = myrange
339
        blocksize = int(hashmap['block_size'])
340
        blockhash = hashmap['block_hash']
341
        total_size = hashmap['bytes']
342
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
343
        map_dict = {}
344
        for i, h in enumerate(hashmap['hashes']):
345
            map_dict[h] = i
346
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
347

    
348
    def _dump_blocks_sync(self,
349
        obj,
350
        remote_hashes,
351
        blocksize,
352
        total_size,
353
        dst,
354
        range,
355
        **restargs):
356
        for blockid, blockhash in enumerate(remote_hashes):
357
            if blockhash == None:
358
                continue
359
            start = blocksize * blockid
360
            end = total_size - 1 if start + blocksize > total_size\
361
                else start + blocksize - 1
362
            (start, end) = _range_up(start, end, range)
363
            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
364
            r = self.object_get(obj, success=(200, 206), **restargs)
365
            self._cb_next()
366
            dst.write(r.content)
367
            dst.flush()
368

    
369
    def _get_block_async(self, obj, **restargs):
370
        event = SilentEvent(self.object_get,
371
            obj,
372
            success=(200, 206),
373
            **restargs)
374
        event.start()
375
        return event
376

    
377
    def _hash_from_file(self, fp, start, size, blockhash):
378
        fp.seek(start)
379
        block = fp.read(size)
380
        h = newhashlib(blockhash)
381
        h.update(block.strip('\x00'))
382
        return hexlify(h.digest())
383

    
384
    def _thread2file(self,
385
        flying,
386
        local_file,
387
        offset=0,
388
        **restargs):
389
        """write the results of a greenleted rest call to a file
390
        @offset: the offset of the file up to blocksize
391
            - e.g. if the range is 10-100, all
392
        blocks will be written to normal_position - 10"""
393
        finished = []
394
        for i, (start, g) in enumerate(flying.items()):
395
            #if i % self.POOL_SIZE == 0:
396
            #    g.join(0.1)
397
            if not g.isAlive():
398
                if g.exception:
399
                    raise g.exception
400
                block = g.value.content
401
                local_file.seek(start - offset)
402
                local_file.write(block)
403
                self._cb_next()
404
                finished.append(flying.pop(start))
405
        local_file.flush()
406
        return finished
407

    
408
    def _dump_blocks_async(self,
409
        obj,
410
        remote_hashes,
411
        blocksize,
412
        total_size,
413
        local_file,
414
        blockhash=None,
415
        resume=False,
416
        filerange=None,
417
        **restargs):
418

    
419
        file_size = fstat(local_file.fileno()).st_size if resume else 0
420
        flying = {}
421
        finished = []
422
        offset = 0
423
        if filerange is not None:
424
            rstart = int(filerange.split('-')[0])
425
            offset = rstart if blocksize > rstart else rstart % blocksize
426

    
427
        self._init_thread_limit()
428
        for block_hash, blockid in remote_hashes.items():
429
            start = blocksize * blockid
430
            if start < file_size\
431
            and block_hash == self._hash_from_file(
432
                    local_file,
433
                    start,
434
                    blocksize,
435
                    blockhash):
436
                self._cb_next()
437
                continue
438
            self._watch_thread_limit(flying.values())
439
            finished += self._thread2file(
440
                flying,
441
                local_file,
442
                offset,
443
                **restargs)
444
            end = total_size - 1 if start + blocksize > total_size\
445
                else start + blocksize - 1
446
            (start, end) = _range_up(start, end, filerange)
447
            if start == end:
448
                self._cb_next()
449
                continue
450
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
451
            flying[start] = self._get_block_async(obj, **restargs)
452

    
453
        for thread in flying.values():
454
            thread.join()
455
        finished += self._thread2file(flying, local_file, offset, **restargs)
456

    
457
    def download_object(self,
458
        obj,
459
        dst,
460
        download_cb=None,
461
        version=None,
462
        overide=False,
463
        resume=False,
464
        range=None,
465
        if_match=None,
466
        if_none_match=None,
467
        if_modified_since=None,
468
        if_unmodified_since=None):
469

    
470
        restargs = dict(version=version,
471
            data_range=None if range is None else 'bytes=%s' % range,
472
            if_match=if_match,
473
            if_none_match=if_none_match,
474
            if_modified_since=if_modified_since,
475
            if_unmodified_since=if_unmodified_since)
476

    
477
        (blocksize,
478
            blockhash,
479
            total_size,
480
            hash_list,
481
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
482
        assert total_size >= 0
483

    
484
        if download_cb:
485
            self.progress_bar_gen = download_cb(len(remote_hashes))
486
            self._cb_next()
487

    
488
        if dst.isatty():
489
            self._dump_blocks_sync(obj,
490
                hash_list,
491
                blocksize,
492
                total_size,
493
                dst,
494
                range,
495
                **restargs)
496
        else:
497
            if len(remote_hashes) > self.POOL_SIZE:
498
                self.POOL_SIZE = len(remote_hashes) // 10
499
            self._dump_blocks_async(obj,
500
                remote_hashes,
501
                blocksize,
502
                total_size,
503
                dst,
504
                blockhash,
505
                resume,
506
                range,
507
                **restargs)
508
            if range is None:
509
                dst.truncate(total_size)
510

    
511
        self._complete_cb()
512

    
513
    #Command Progress Bar method
514
    def _cb_next(self):
515
        if hasattr(self, 'progress_bar_gen'):
516
            try:
517
                self.progress_bar_gen.next()
518
            except:
519
                pass
520

    
521
    def _complete_cb(self):
522
        while True:
523
            try:
524
                self.progress_bar_gen.next()
525
            except:
526
                break
527

    
528
    # Untested - except is download_object is tested first
529
    def get_object_hashmap(self, obj,
530
        version=None,
531
        if_match=None,
532
        if_none_match=None,
533
        if_modified_since=None,
534
        if_unmodified_since=None,
535
        data_range=None):
536
        try:
537
            r = self.object_get(obj,
538
                hashmap=True,
539
                version=version,
540
                if_etag_match=if_match,
541
                if_etag_not_match=if_none_match,
542
                if_modified_since=if_modified_since,
543
                if_unmodified_since=if_unmodified_since,
544
                data_range=data_range)
545
        except ClientError as err:
546
            if err.status == 304 or err.status == 412:
547
                return {}
548
            raise
549
        return r.json
550

    
551
    def set_account_group(self, group, usernames):
552
        r = self.account_post(update=True, groups={group: usernames})
553
        r.release()
554

    
555
    def del_account_group(self, group):
556
        r = self.account_post(update=True, groups={group: []})
557
        r.release()
558

    
559
    def get_account_info(self, until=None):
560
        r = self.account_head(until=until)
561
        if r.status_code == 401:
562
            raise ClientError("No authorization")
563
        return r.headers
564

    
565
    def get_account_quota(self):
566
        return filter_in(self.get_account_info(),
567
            'X-Account-Policy-Quota',
568
            exactMatch=True)
569

    
570
    def get_account_versioning(self):
571
        return filter_in(self.get_account_info(),
572
            'X-Account-Policy-Versioning',
573
            exactMatch=True)
574

    
575
    def get_account_meta(self, until=None):
576
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
577

    
578
    def get_account_group(self):
579
        return filter_in(self.get_account_info(), 'X-Account-Group-')
580

    
581
    def set_account_meta(self, metapairs):
582
        assert(type(metapairs) is dict)
583
        r = self.account_post(update=True, metadata=metapairs)
584
        r.release()
585

    
586
    def del_account_meta(self, metakey):
587
        r = self.account_post(update=True, metadata={metakey: ''})
588
        r.release()
589

    
590
    def set_account_quota(self, quota):
591
        r = self.account_post(update=True, quota=quota)
592
        r.release()
593

    
594
    def set_account_versioning(self, versioning):
595
        r = self.account_post(update=True, versioning=versioning)
596
        r.release()
597

    
598
    def list_containers(self):
599
        r = self.account_get()
600
        return r.json
601

    
602
    def del_container(self, until=None, delimiter=None):
603
        self.assert_container()
604
        r = self.container_delete(until=until,
605
            delimiter=delimiter,
606
            success=(204, 404, 409))
607
        r.release()
608
        if r.status_code == 404:
609
            raise ClientError('Container "%s" does not exist' % self.container,
610
                r.status_code)
611
        elif r.status_code == 409:
612
            raise ClientError('Container "%s" is not empty' % self.container,
613
                r.status_code)
614

    
615
    def get_container_versioning(self, container):
616
        self.container = container
617
        return filter_in(self.get_container_info(),
618
            'X-Container-Policy-Versioning')
619

    
620
    def get_container_quota(self, container):
621
        self.container = container
622
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
623

    
624
    def get_container_info(self, until=None):
625
        r = self.container_head(until=until)
626
        return r.headers
627

    
628
    def get_container_meta(self, until=None):
629
        return filter_in(self.get_container_info(until=until),
630
            'X-Container-Meta')
631

    
632
    def get_container_object_meta(self, until=None):
633
        return filter_in(self.get_container_info(until=until),
634
            'X-Container-Object-Meta')
635

    
636
    def set_container_meta(self, metapairs):
637
        assert(type(metapairs) is dict)
638
        r = self.container_post(update=True, metadata=metapairs)
639
        r.release()
640

    
641
    def del_container_meta(self, metakey):
642
        r = self.container_post(update=True, metadata={metakey: ''})
643
        r.release()
644

    
645
    def set_container_quota(self, quota):
646
        r = self.container_post(update=True, quota=quota)
647
        r.release()
648

    
649
    def set_container_versioning(self, versioning):
650
        r = self.container_post(update=True, versioning=versioning)
651
        r.release()
652

    
653
    def del_object(self, obj, until=None, delimiter=None):
654
        self.assert_container()
655
        r = self.object_delete(obj, until=until, delimiter=delimiter)
656
        r.release()
657

    
658
    def set_object_meta(self, object, metapairs):
659
        assert(type(metapairs) is dict)
660
        r = self.object_post(object, update=True, metadata=metapairs)
661
        r.release()
662

    
663
    def del_object_meta(self, metakey, object):
664
        r = self.object_post(object, update=True, metadata={metakey: ''})
665
        r.release()
666

    
667
    def publish_object(self, object):
668
        r = self.object_post(object, update=True, public=True)
669
        r.release()
670

    
671
    def unpublish_object(self, object):
672
        r = self.object_post(object, update=True, public=False)
673
        r.release()
674

    
675
    def get_object_info(self, obj, version=None):
676
        r = self.object_head(obj, version=version)
677
        return r.headers
678

    
679
    def get_object_meta(self, obj, version=None):
680
        return filter_in(self.get_object_info(obj, version=version),
681
            'X-Object-Meta')
682

    
683
    def get_object_sharing(self, object):
684
        r = filter_in(self.get_object_info(object),
685
            'X-Object-Sharing',
686
            exactMatch=True)
687
        reply = {}
688
        if len(r) > 0:
689
            perms = r['x-object-sharing'].split(';')
690
            for perm in perms:
691
                try:
692
                    perm.index('=')
693
                except ValueError:
694
                    raise ClientError('Incorrect reply format')
695
                (key, val) = perm.strip().split('=')
696
                reply[key] = val
697
        return reply
698

    
699
    def set_object_sharing(self, object,
700
        read_permition=False,
701
        write_permition=False):
702
        """Give read/write permisions to an object.
703
           @param object is the object to change sharing permissions
704
 onto
705
           @param read_permition is a list of users and user groups that
706
                get read permition for this object
707
                False means all previous read permissions
708
 will be removed
709
           @param write_perimition is a list of users and user groups to
710
                get write permition for this object
711
                False means all previous read permissions
712
 will be removed
713
        """
714
        perms = dict(read='' if not read_permition else read_permition,
715
            write='' if not write_permition else write_permition)
716
        r = self.object_post(object, update=True, permissions=perms)
717
        r.release()
718

    
719
    def del_object_sharing(self, object):
720
        self.set_object_sharing(object)
721

    
722
    def append_object(self, object, source_file, upload_cb=None):
723
        """@param upload_db is a generator for showing progress of upload
724
            to caller application, e.g. a progress bar. Its next is called
725
            whenever a block is uploaded
726
        """
727
        self.assert_container()
728
        meta = self.get_container_info()
729
        blocksize = int(meta['x-container-block-size'])
730
        filesize = fstat(source_file.fileno()).st_size
731
        nblocks = 1 + (filesize - 1) // blocksize
732
        offset = 0
733
        if upload_cb is not None:
734
            upload_gen = upload_cb(nblocks)
735
        for i in range(nblocks):
736
            block = source_file.read(min(blocksize, filesize - offset))
737
            offset += len(block)
738
            r = self.object_post(object,
739
                update=True,
740
                content_range='bytes */*',
741
                content_type='application/octet-stream',
742
                content_length=len(block),
743
                data=block)
744
            r.release()
745

    
746
            if upload_cb is not None:
747
                upload_gen.next()
748

    
749
    def truncate_object(self, object, upto_bytes):
750
        r = self.object_post(object,
751
            update=True,
752
            content_range='bytes 0-%s/*' % upto_bytes,
753
            content_type='application/octet-stream',
754
            object_bytes=upto_bytes,
755
            source_object=path4url(self.container, object))
756
        r.release()
757

    
758
    def overwrite_object(self,
759
        object,
760
        start,
761
        end,
762
        source_file,
763
        upload_cb=None):
764
        """Overwrite a part of an object with given source file
765
           @start the part of the remote object to start overwriting from,
766
                in bytes
767
           @end the part of the remote object to stop overwriting to, in bytes
768
        """
769
        self.assert_container()
770
        meta = self.get_container_info()
771
        blocksize = int(meta['x-container-block-size'])
772
        filesize = fstat(source_file.fileno()).st_size
773
        datasize = int(end) - int(start) + 1
774
        nblocks = 1 + (datasize - 1) // blocksize
775
        offset = 0
776
        if upload_cb is not None:
777
            upload_gen = upload_cb(nblocks)
778
        for i in range(nblocks):
779
            block = source_file.read(min(blocksize,
780
                filesize - offset,
781
                datasize - offset))
782
            offset += len(block)
783
            r = self.object_post(object,
784
                update=True,
785
                content_type='application/octet-stream',
786
                content_length=len(block),
787
                content_range='bytes %s-%s/*' % (start, end),
788
                data=block)
789
            r.release()
790

    
791
            if upload_cb is not None:
792
                upload_gen.next()
793

    
794
    def copy_object(self, src_container, src_object, dst_container,
795
        dst_object=False,
796
        source_version=None,
797
        public=False,
798
        content_type=None,
799
        delimiter=None):
800
        self.assert_account()
801
        self.container = dst_container
802
        dst_object = dst_object or src_object
803
        src_path = path4url(src_container, src_object)
804
        r = self.object_put(dst_object,
805
            success=201,
806
            copy_from=src_path,
807
            content_length=0,
808
            source_version=source_version,
809
            public=public,
810
            content_type=content_type,
811
            delimiter=delimiter)
812
        r.release()
813

    
814
    def move_object(self, src_container, src_object, dst_container,
815
        dst_object=False,
816
        source_version=None,
817
        public=False,
818
        content_type=None,
819
        delimiter=None):
820
        self.assert_account()
821
        self.container = dst_container
822
        dst_object = dst_object or src_object
823
        src_path = path4url(src_container, src_object)
824
        r = self.object_put(dst_object,
825
            success=201,
826
            move_from=src_path,
827
            content_length=0,
828
            source_version=source_version,
829
            public=public,
830
            content_type=content_type,
831
            delimiter=delimiter)
832
        r.release()
833

    
834
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
835
        """Get accounts that share with self.account"""
836
        self.assert_account()
837

    
838
        self.set_param('format', 'json')
839
        self.set_param('limit', limit, iff=limit is not None)
840
        self.set_param('marker', marker, iff=marker is not None)
841

    
842
        path = ''
843
        success = kwargs.pop('success', (200, 204))
844
        r = self.get(path, *args, success=success, **kwargs)
845
        return r.json
846

    
847
    def get_object_versionlist(self, path):
848
        self.assert_container()
849
        r = self.object_get(path, format='json', version='list')
850
        return r.json['versions']