Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 2b74ab4a

History | View | Annotate | Download (26.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self):
77
        r = self.container_delete(until=unicode(time()))
78
        r.release()
79

    
80
    def upload_object_unchunked(self, obj, f,
81
        withHashFile=False,
82
        size=None,
83
        etag=None,
84
        content_encoding=None,
85
        content_disposition=None,
86
        content_type=None,
87
        sharing=None,
88
        public=None):
89
        self.assert_container()
90

    
91
        if withHashFile:
92
            data = f.read()
93
            try:
94
                import json
95
                data = json.dumps(json.loads(data))
96
            except ValueError:
97
                raise ClientError(message='"%s" is not json-formated' % f.name,
98
                    status=1)
99
            except SyntaxError:
100
                raise ClientError(message='"%s" is not a valid hashmap file'\
101
                % f.name, status=1)
102
            f = StringIO(data)
103
        data = f.read(size) if size is not None else f.read()
104
        r = self.object_put(obj,
105
            data=data,
106
            etag=etag,
107
            content_encoding=content_encoding,
108
            content_disposition=content_disposition,
109
            content_type=content_type,
110
            permissions=sharing,
111
            public=public,
112
            success=201)
113
        r.release()
114

    
115
    # upload_* auxiliary methods
116
    def put_block_async(self, data, hash):
117
        event = SilentEvent(method=self.put_block, data=data, hash=hash)
118
        event.start()
119
        return event
120

    
121
    def put_block(self, data, hash):
122
        r = self.container_post(update=True,
123
            content_type='application/octet-stream',
124
            content_length=len(data),
125
            data=data,
126
            format='json')
127
        assert r.json[0] == hash, 'Local hash does not match server'
128

    
129
    def create_object_by_manifestation(self, obj,
130
        etag=None,
131
        content_encoding=None,
132
        content_disposition=None,
133
        content_type=None,
134
        sharing=None,
135
        public=None):
136
        self.assert_container()
137
        r = self.object_put(obj,
138
            content_length=0,
139
            etag=etag,
140
            content_encoding=content_encoding,
141
            content_disposition=content_disposition,
142
            content_type=content_type,
143
            permissions=sharing,
144
            public=public,
145
            manifest='%s/%s' % (self.container, obj))
146
        r.release()
147

    
148
    def _get_file_block_info(self, fileobj, size=None):
149
        meta = self.get_container_info()
150
        blocksize = int(meta['x-container-block-size'])
151
        blockhash = meta['x-container-block-hash']
152
        size = size if size is not None else fstat(fileobj.fileno()).st_size
153
        nblocks = 1 + (size - 1) // blocksize
154
        return (blocksize, blockhash, size, nblocks)
155

    
156
    def _get_missing_hashes(self, obj, json,
157
        size=None,
158
        format='json',
159
        hashmap=True,
160
        content_type=None,
161
        etag=None,
162
        content_encoding=None,
163
        content_disposition=None,
164
        permissions=None,
165
        public=None,
166
        success=(201, 409)):
167
        r = self.object_put(obj,
168
            format='json',
169
            hashmap=True,
170
            content_type=content_type,
171
            json=json,
172
            etag=etag,
173
            content_encoding=content_encoding,
174
            content_disposition=content_disposition,
175
            permissions=permissions,
176
            public=public,
177
            success=success)
178
        if r.status_code == 201:
179
            r.release()
180
            return None
181
        return r.json
182

    
183
    def _caclulate_uploaded_blocks(self,
184
        blocksize,
185
        blockhash,
186
        size,
187
        nblocks,
188
        hashes,
189
        hmap,
190
        fileobj,
191
        hash_cb=None):
192
        offset = 0
193
        if hash_cb:
194
            hash_gen = hash_cb(nblocks)
195
            hash_gen.next()
196

    
197
        for i in range(nblocks):
198
            block = fileobj.read(min(blocksize, size - offset))
199
            bytes = len(block)
200
            hash = pithos_hash(block, blockhash)
201
            hashes.append(hash)
202
            hmap[hash] = (offset, bytes)
203
            offset += bytes
204
            if hash_cb:
205
                hash_gen.next()
206
        assert offset == size
207

    
208
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
209
        """upload missing blocks asynchronously. 
210
        """
211
        if upload_cb:
212
            upload_gen = upload_cb(len(missing))
213
            upload_gen.next()
214

    
215
        self._init_thread_limit()
216

    
217
        flying = []
218
        for hash in missing:
219
            offset, bytes = hmap[hash]
220
            fileobj.seek(offset)
221
            data = fileobj.read(bytes)
222
            r = self.put_block_async(data, hash)
223
            flying.append(r)
224
            unfinished = []
225
            for i, thread in enumerate(flying):
226

    
227
                unfinished = self._watch_thread_limit(unfinished)
228

    
229
                if thread.isAlive() or thread.exception:
230
                    unfinished.append(thread)
231
                else:
232
                    if upload_cb:
233
                        upload_gen.next()
234
            flying = unfinished
235

    
236
        for thread in flying:
237
            thread.join()
238

    
239
        failures = [r for r in flying if r.exception]
240
        if len(failures):
241
            details = ', '.join([' (%s).%s' % (i, r.exception)\
242
                for i, r in enumerate(failures)])
243
            raise ClientError(message="Block uploading failed",
244
                status=505,
245
                details=details)
246

    
247
        while upload_cb:
248
            try:
249
                upload_gen.next()
250
            except StopIteration:
251
                break
252

    
253
    def upload_object(self, obj, f,
254
        size=None,
255
        hash_cb=None,
256
        upload_cb=None,
257
        etag=None,
258
        content_encoding=None,
259
        content_disposition=None,
260
        content_type=None,
261
        sharing=None,
262
        public=None):
263
        self.assert_container()
264

    
265
        #init
266
        block_info = (blocksize, blockhash, size, nblocks) =\
267
            self._get_file_block_info(f, size)
268
        (hashes, hmap, offset) = ([], {}, 0)
269
        if content_type is None:
270
            content_type = 'application/octet-stream'
271

    
272
        self._caclulate_uploaded_blocks(*block_info,
273
            hashes=hashes,
274
            hmap=hmap,
275
            fileobj=f,
276
            hash_cb=hash_cb)
277

    
278
        hashmap = dict(bytes=size, hashes=hashes)
279
        missing = self._get_missing_hashes(obj, hashmap,
280
            content_type=content_type,
281
            size=size,
282
            etag=etag,
283
            content_encoding=content_encoding,
284
            content_disposition=content_disposition,
285
            permissions=sharing,
286
            public=public)
287

    
288
        if missing is None:
289
            return
290
        try:
291
            self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
292
        except KeyboardInterrupt:
293
            print('- - - wait for threads to finish')
294
            for thread in activethreads():
295
                thread.join()
296
            raise
297

    
298
        r = self.object_put(
299
            obj,
300
            format='json',
301
            hashmap=True,
302
            content_type=content_type,
303
            json=hashmap,
304
            success=201)
305
        r.release()
306

    
307
    # download_* auxiliary methods
308
    def _get_remote_blocks_info(self, obj, **restargs):
309
        #retrieve object hashmap
310
        myrange = restargs.pop('data_range', None)
311
        hashmap = self.get_object_hashmap(obj, **restargs)
312
        restargs['data_range'] = myrange
313
        blocksize = int(hashmap['block_size'])
314
        blockhash = hashmap['block_hash']
315
        total_size = hashmap['bytes']
316
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
317
        map_dict = {}
318
        for i, h in enumerate(hashmap['hashes']):
319
            map_dict[h] = i
320
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
321

    
322
    def _dump_blocks_sync(self,
323
        obj,
324
        remote_hashes,
325
        blocksize,
326
        total_size,
327
        dst,
328
        range,
329
        **restargs):
330
        for blockid, blockhash in enumerate(remote_hashes):
331
            if blockhash == None:
332
                continue
333
            start = blocksize * blockid
334
            end = total_size - 1 if start + blocksize > total_size\
335
                else start + blocksize - 1
336
            (start, end) = _range_up(start, end, range)
337
            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
338
            r = self.object_get(obj, success=(200, 206), **restargs)
339
            self._cb_next()
340
            dst.write(r.content)
341
            dst.flush()
342

    
343
    def _get_block_async(self, obj, **restargs):
344
        event = SilentEvent(self.object_get,
345
            obj,
346
            success=(200, 206),
347
            **restargs)
348
        event.start()
349
        return event
350

    
351
    def _hash_from_file(self, fp, start, size, blockhash):
352
        fp.seek(start)
353
        block = fp.read(size)
354
        h = newhashlib(blockhash)
355
        h.update(block.strip('\x00'))
356
        return hexlify(h.digest())
357

    
358
    def _thread2file(self,
359
        flying,
360
        local_file,
361
        offset=0,
362
        **restargs):
363
        """write the results of a greenleted rest call to a file
364
        @offset: the offset of the file up to blocksize
365
            - e.g. if the range is 10-100, all
366
        blocks will be written to normal_position - 10"""
367
        finished = []
368
        for i, (start, g) in enumerate(flying.items()):
369
            if not g.isAlive():
370
                if g.exception:
371
                    raise g.exception
372
                block = g.value.content
373
                local_file.seek(start - offset)
374
                local_file.write(block)
375
                self._cb_next()
376
                finished.append(flying.pop(start))
377
        local_file.flush()
378
        return finished
379

    
380
    def _dump_blocks_async(self,
381
        obj,
382
        remote_hashes,
383
        blocksize,
384
        total_size,
385
        local_file,
386
        blockhash=None,
387
        resume=False,
388
        filerange=None,
389
        **restargs):
390

    
391
        file_size = fstat(local_file.fileno()).st_size if resume else 0
392
        flying = {}
393
        finished = []
394
        offset = 0
395
        if filerange is not None:
396
            rstart = int(filerange.split('-')[0])
397
            offset = rstart if blocksize > rstart else rstart % blocksize
398

    
399
        self._init_thread_limit()
400
        for block_hash, blockid in remote_hashes.items():
401
            start = blocksize * blockid
402
            if start < file_size\
403
            and block_hash == self._hash_from_file(
404
                    local_file,
405
                    start,
406
                    blocksize,
407
                    blockhash):
408
                self._cb_next()
409
                continue
410
            self._watch_thread_limit(flying.values())
411
            finished += self._thread2file(
412
                flying,
413
                local_file,
414
                offset,
415
                **restargs)
416
            end = total_size - 1 if start + blocksize > total_size\
417
                else start + blocksize - 1
418
            (start, end) = _range_up(start, end, filerange)
419
            if start == end:
420
                self._cb_next()
421
                continue
422
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
423
            flying[start] = self._get_block_async(obj, **restargs)
424

    
425
        for thread in flying.values():
426
            thread.join()
427
        finished += self._thread2file(flying, local_file, offset, **restargs)
428

    
429
    def download_object(self,
430
        obj,
431
        dst,
432
        download_cb=None,
433
        version=None,
434
        overide=False,
435
        resume=False,
436
        range=None,
437
        if_match=None,
438
        if_none_match=None,
439
        if_modified_since=None,
440
        if_unmodified_since=None):
441

    
442
        restargs = dict(version=version,
443
            data_range=None if range is None else 'bytes=%s' % range,
444
            if_match=if_match,
445
            if_none_match=if_none_match,
446
            if_modified_since=if_modified_since,
447
            if_unmodified_since=if_unmodified_since)
448

    
449
        (blocksize,
450
            blockhash,
451
            total_size,
452
            hash_list,
453
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
454
        assert total_size >= 0
455

    
456
        if download_cb:
457
            self.progress_bar_gen = download_cb(len(remote_hashes))
458
            self._cb_next()
459

    
460
        if dst.isatty():
461
            self._dump_blocks_sync(obj,
462
                hash_list,
463
                blocksize,
464
                total_size,
465
                dst,
466
                range,
467
                **restargs)
468
        else:
469
            self._dump_blocks_async(obj,
470
                remote_hashes,
471
                blocksize,
472
                total_size,
473
                dst,
474
                blockhash,
475
                resume,
476
                range,
477
                **restargs)
478
            if range is None:
479
                dst.truncate(total_size)
480

    
481
        self._complete_cb()
482

    
483
    #Command Progress Bar method
484
    def _cb_next(self):
485
        if hasattr(self, 'progress_bar_gen'):
486
            try:
487
                self.progress_bar_gen.next()
488
            except:
489
                pass
490

    
491
    def _complete_cb(self):
492
        while True:
493
            try:
494
                self.progress_bar_gen.next()
495
            except:
496
                break
497

    
498
    # Untested - except is download_object is tested first
499
    def get_object_hashmap(self, obj,
500
        version=None,
501
        if_match=None,
502
        if_none_match=None,
503
        if_modified_since=None,
504
        if_unmodified_since=None,
505
        data_range=None):
506
        try:
507
            r = self.object_get(obj,
508
                hashmap=True,
509
                version=version,
510
                if_etag_match=if_match,
511
                if_etag_not_match=if_none_match,
512
                if_modified_since=if_modified_since,
513
                if_unmodified_since=if_unmodified_since,
514
                data_range=data_range)
515
        except ClientError as err:
516
            if err.status == 304 or err.status == 412:
517
                return {}
518
            raise
519
        return r.json
520

    
521
    def set_account_group(self, group, usernames):
522
        r = self.account_post(update=True, groups={group: usernames})
523
        r.release()
524

    
525
    def del_account_group(self, group):
526
        r = self.account_post(update=True, groups={group: []})
527
        r.release()
528

    
529
    def get_account_info(self, until=None):
530
        r = self.account_head(until=until)
531
        if r.status_code == 401:
532
            raise ClientError("No authorization")
533
        return r.headers
534

    
535
    def get_account_quota(self):
536
        return filter_in(self.get_account_info(),
537
            'X-Account-Policy-Quota',
538
            exactMatch=True)
539

    
540
    def get_account_versioning(self):
541
        return filter_in(self.get_account_info(),
542
            'X-Account-Policy-Versioning',
543
            exactMatch=True)
544

    
545
    def get_account_meta(self, until=None):
546
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
547

    
548
    def get_account_group(self):
549
        return filter_in(self.get_account_info(), 'X-Account-Group-')
550

    
551
    def set_account_meta(self, metapairs):
552
        assert(type(metapairs) is dict)
553
        r = self.account_post(update=True, metadata=metapairs)
554
        r.release()
555

    
556
    def del_account_meta(self, metakey):
557
        r = self.account_post(update=True, metadata={metakey: ''})
558
        r.release()
559

    
560
    def set_account_quota(self, quota):
561
        r = self.account_post(update=True, quota=quota)
562
        r.release()
563

    
564
    def set_account_versioning(self, versioning):
565
        r = self.account_post(update=True, versioning=versioning)
566
        r.release()
567

    
568
    def list_containers(self):
569
        r = self.account_get()
570
        return r.json
571

    
572
    def del_container(self, until=None, delimiter=None):
573
        self.assert_container()
574
        r = self.container_delete(until=until,
575
            delimiter=delimiter,
576
            success=(204, 404, 409))
577
        r.release()
578
        if r.status_code == 404:
579
            raise ClientError('Container "%s" does not exist' % self.container,
580
                r.status_code)
581
        elif r.status_code == 409:
582
            raise ClientError('Container "%s" is not empty' % self.container,
583
                r.status_code)
584

    
585
    def get_container_versioning(self, container):
586
        self.container = container
587
        return filter_in(self.get_container_info(),
588
            'X-Container-Policy-Versioning')
589

    
590
    def get_container_quota(self, container):
591
        self.container = container
592
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
593

    
594
    def get_container_info(self, until=None):
595
        r = self.container_head(until=until)
596
        return r.headers
597

    
598
    def get_container_meta(self, until=None):
599
        return filter_in(self.get_container_info(until=until),
600
            'X-Container-Meta')
601

    
602
    def get_container_object_meta(self, until=None):
603
        return filter_in(self.get_container_info(until=until),
604
            'X-Container-Object-Meta')
605

    
606
    def set_container_meta(self, metapairs):
607
        assert(type(metapairs) is dict)
608
        r = self.container_post(update=True, metadata=metapairs)
609
        r.release()
610

    
611
    def del_container_meta(self, metakey):
612
        r = self.container_post(update=True, metadata={metakey: ''})
613
        r.release()
614

    
615
    def set_container_quota(self, quota):
616
        r = self.container_post(update=True, quota=quota)
617
        r.release()
618

    
619
    def set_container_versioning(self, versioning):
620
        r = self.container_post(update=True, versioning=versioning)
621
        r.release()
622

    
623
    def del_object(self, obj, until=None, delimiter=None):
624
        self.assert_container()
625
        r = self.object_delete(obj, until=until, delimiter=delimiter)
626
        r.release()
627

    
628
    def set_object_meta(self, object, metapairs):
629
        assert(type(metapairs) is dict)
630
        r = self.object_post(object, update=True, metadata=metapairs)
631
        r.release()
632

    
633
    def del_object_meta(self, metakey, object):
634
        r = self.object_post(object, update=True, metadata={metakey: ''})
635
        r.release()
636

    
637
    def publish_object(self, object):
638
        r = self.object_post(object, update=True, public=True)
639
        r.release()
640

    
641
    def unpublish_object(self, object):
642
        r = self.object_post(object, update=True, public=False)
643
        r.release()
644

    
645
    def get_object_info(self, obj, version=None):
646
        r = self.object_head(obj, version=version)
647
        return r.headers
648

    
649
    def get_object_meta(self, obj, version=None):
650
        return filter_in(self.get_object_info(obj, version=version),
651
            'X-Object-Meta')
652

    
653
    def get_object_sharing(self, object):
654
        r = filter_in(self.get_object_info(object),
655
            'X-Object-Sharing',
656
            exactMatch=True)
657
        reply = {}
658
        if len(r) > 0:
659
            perms = r['x-object-sharing'].split(';')
660
            for perm in perms:
661
                try:
662
                    perm.index('=')
663
                except ValueError:
664
                    raise ClientError('Incorrect reply format')
665
                (key, val) = perm.strip().split('=')
666
                reply[key] = val
667
        return reply
668

    
669
    def set_object_sharing(self, object,
670
        read_permition=False,
671
        write_permition=False):
672
        """Give read/write permisions to an object.
673
           @param object is the object to change sharing permissions
674
 onto
675
           @param read_permition is a list of users and user groups that
676
                get read permition for this object
677
                False means all previous read permissions
678
 will be removed
679
           @param write_perimition is a list of users and user groups to
680
                get write permition for this object
681
                False means all previous read permissions
682
 will be removed
683
        """
684
        perms = dict(read='' if not read_permition else read_permition,
685
            write='' if not write_permition else write_permition)
686
        r = self.object_post(object, update=True, permissions=perms)
687
        r.release()
688

    
689
    def del_object_sharing(self, object):
690
        self.set_object_sharing(object)
691

    
692
    def append_object(self, object, source_file, upload_cb=None):
693
        """@param upload_db is a generator for showing progress of upload
694
            to caller application, e.g. a progress bar. Its next is called
695
            whenever a block is uploaded
696
        """
697
        self.assert_container()
698
        meta = self.get_container_info()
699
        blocksize = int(meta['x-container-block-size'])
700
        filesize = fstat(source_file.fileno()).st_size
701
        nblocks = 1 + (filesize - 1) // blocksize
702
        offset = 0
703
        if upload_cb is not None:
704
            upload_gen = upload_cb(nblocks)
705
        for i in range(nblocks):
706
            block = source_file.read(min(blocksize, filesize - offset))
707
            offset += len(block)
708
            r = self.object_post(object,
709
                update=True,
710
                content_range='bytes */*',
711
                content_type='application/octet-stream',
712
                content_length=len(block),
713
                data=block)
714
            r.release()
715

    
716
            if upload_cb is not None:
717
                upload_gen.next()
718

    
719
    def truncate_object(self, object, upto_bytes):
720
        r = self.object_post(object,
721
            update=True,
722
            content_range='bytes 0-%s/*' % upto_bytes,
723
            content_type='application/octet-stream',
724
            object_bytes=upto_bytes,
725
            source_object=path4url(self.container, object))
726
        r.release()
727

    
728
    def overwrite_object(self,
729
        object,
730
        start,
731
        end,
732
        source_file,
733
        upload_cb=None):
734
        """Overwrite a part of an object with given source file
735
           @start the part of the remote object to start overwriting from,
736
                in bytes
737
           @end the part of the remote object to stop overwriting to, in bytes
738
        """
739
        self.assert_container()
740
        meta = self.get_container_info()
741
        blocksize = int(meta['x-container-block-size'])
742
        filesize = fstat(source_file.fileno()).st_size
743
        datasize = int(end) - int(start) + 1
744
        nblocks = 1 + (datasize - 1) // blocksize
745
        offset = 0
746
        if upload_cb is not None:
747
            upload_gen = upload_cb(nblocks)
748
        for i in range(nblocks):
749
            block = source_file.read(min(blocksize,
750
                filesize - offset,
751
                datasize - offset))
752
            offset += len(block)
753
            r = self.object_post(object,
754
                update=True,
755
                content_type='application/octet-stream',
756
                content_length=len(block),
757
                content_range='bytes %s-%s/*' % (start, end),
758
                data=block)
759
            r.release()
760

    
761
            if upload_cb is not None:
762
                upload_gen.next()
763

    
764
    def copy_object(self, src_container, src_object, dst_container,
765
        dst_object=False,
766
        source_version=None,
767
        public=False,
768
        content_type=None,
769
        delimiter=None):
770
        self.assert_account()
771
        self.container = dst_container
772
        dst_object = dst_object or src_object
773
        src_path = path4url(src_container, src_object)
774
        r = self.object_put(dst_object,
775
            success=201,
776
            copy_from=src_path,
777
            content_length=0,
778
            source_version=source_version,
779
            public=public,
780
            content_type=content_type,
781
            delimiter=delimiter)
782
        r.release()
783

    
784
    def move_object(self, src_container, src_object, dst_container,
785
        dst_object=False,
786
        source_version=None,
787
        public=False,
788
        content_type=None,
789
        delimiter=None):
790
        self.assert_account()
791
        self.container = dst_container
792
        dst_object = dst_object or src_object
793
        src_path = path4url(src_container, src_object)
794
        r = self.object_put(dst_object,
795
            success=201,
796
            move_from=src_path,
797
            content_length=0,
798
            source_version=source_version,
799
            public=public,
800
            content_type=content_type,
801
            delimiter=delimiter)
802
        r.release()
803

    
804
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
805
        """Get accounts that share with self.account"""
806
        self.assert_account()
807

    
808
        self.set_param('format', 'json')
809
        self.set_param('limit', limit, iff=limit is not None)
810
        self.set_param('marker', marker, iff=marker is not None)
811

    
812
        path = ''
813
        success = kwargs.pop('success', (200, 204))
814
        r = self.get(path, *args, success=success, **kwargs)
815
        return r.json
816

    
817
    def get_object_versionlist(self, path):
818
        self.assert_container()
819
        r = self.object_get(path, format='json', version='list')
820
        return r.json['versions']