Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ e02728f9

History | View | Annotate | Download (26.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import Thread
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients.pithos_rest_api import PithosRestAPI
43
from kamaki.clients.storage import ClientError
44
from kamaki.clients.utils import path4url, filter_in
45
from StringIO import StringIO
46

    
47

    
48
def pithos_hash(block, blockhash):
49
    h = newhashlib(blockhash)
50
    h.update(block.rstrip('\x00'))
51
    return h.hexdigest()
52

    
53

    
54
def _range_up(start, end, a_range):
55
    if a_range:
56
        (rstart, rend) = a_range.split('-')
57
        (rstart, rend) = (int(rstart), int(rend))
58
        if rstart > end or rend < start:
59
            return (0, 0)
60
        if rstart > start:
61
            start = rstart
62
        if rend < end:
63
            end = rend
64
    return (start, end)
65

    
66

    
67
class SilentEvent(Thread):
68
    """ Thread-run method(*args, **kwargs)
69
        put exception in exception_bucket
70
    """
71
    def __init__(self, method, *args, **kwargs):
72
        super(self.__class__, self).__init__()
73
        self.method = method
74
        self.args = args
75
        self.kwargs = kwargs
76

    
77
    @property
78
    def exception(self):
79
        return getattr(self, '_exception', False)
80

    
81
    @property
82
    def value(self):
83
        return getattr(self, '_value', None)
84

    
85
    def run(self):
86
        try:
87
            self._value = self.method(*(self.args), **(self.kwargs))
88
        except Exception as e:
89
            print('______\n%s\n_______' % e)
90
            self._exception = e
91

    
92

    
93
class PithosClient(PithosRestAPI):
94
    """GRNet Pithos API client"""
95

    
96
    _thread_exceptions = []
97

    
98
    def __init__(self, base_url, token, account=None, container=None):
99
        super(PithosClient, self).__init__(base_url, token, account, container)
100
        self.async_pool = None
101

    
102
    def purge_container(self):
103
        r = self.container_delete(until=unicode(time()))
104
        r.release()
105

    
106
    def upload_object_unchunked(self, obj, f,
107
        withHashFile=False,
108
        size=None,
109
        etag=None,
110
        content_encoding=None,
111
        content_disposition=None,
112
        content_type=None,
113
        sharing=None,
114
        public=None):
115
        self.assert_container()
116

    
117
        if withHashFile:
118
            data = f.read()
119
            try:
120
                import json
121
                data = json.dumps(json.loads(data))
122
            except ValueError:
123
                raise ClientError(message='"%s" is not json-formated' % f.name,
124
                    status=1)
125
            except SyntaxError:
126
                raise ClientError(message='"%s" is not a valid hashmap file'\
127
                % f.name, status=1)
128
            f = StringIO(data)
129
        data = f.read(size) if size is not None else f.read()
130
        r = self.object_put(obj,
131
            data=data,
132
            etag=etag,
133
            content_encoding=content_encoding,
134
            content_disposition=content_disposition,
135
            content_type=content_type,
136
            permissions=sharing,
137
            public=public,
138
            success=201)
139
        r.release()
140

    
141
    # upload_* auxiliary methods
142
    def put_block_async(self, data, hash):
143
        event = SilentEvent(method=self.put_block, data=data, hash=hash)
144
        event.start()
145
        return event
146

    
147
    def put_block(self, data, hash):
148
        r = self.container_post(update=True,
149
            content_type='application/octet-stream',
150
            content_length=len(data),
151
            data=data,
152
            format='json')
153
        assert r.json[0] == hash, 'Local hash does not match server'
154

    
155
    def create_object_by_manifestation(self, obj,
156
        etag=None,
157
        content_encoding=None,
158
        content_disposition=None,
159
        content_type=None,
160
        sharing=None,
161
        public=None):
162
        self.assert_container()
163
        r = self.object_put(obj,
164
            content_length=0,
165
            etag=etag,
166
            content_encoding=content_encoding,
167
            content_disposition=content_disposition,
168
            content_type=content_type,
169
            permissions=sharing,
170
            public=public,
171
            manifest='%s/%s' % (self.container, obj))
172
        r.release()
173

    
174
    def _get_file_block_info(self, fileobj, size=None):
175
        meta = self.get_container_info()
176
        blocksize = int(meta['x-container-block-size'])
177
        blockhash = meta['x-container-block-hash']
178
        size = size if size is not None else fstat(fileobj.fileno()).st_size
179
        nblocks = 1 + (size - 1) // blocksize
180
        return (blocksize, blockhash, size, nblocks)
181

    
182
    def _get_missing_hashes(self, obj, json,
183
        size=None,
184
        format='json',
185
        hashmap=True,
186
        content_type=None,
187
        etag=None,
188
        content_encoding=None,
189
        content_disposition=None,
190
        permissions=None,
191
        public=None,
192
        success=(201, 409)):
193
        r = self.object_put(obj,
194
            format='json',
195
            hashmap=True,
196
            content_type=content_type,
197
            json=json,
198
            etag=etag,
199
            content_encoding=content_encoding,
200
            content_disposition=content_disposition,
201
            permissions=permissions,
202
            public=public,
203
            success=success)
204
        if r.status_code == 201:
205
            r.release()
206
            return None
207
        return r.json
208

    
209
    def _caclulate_uploaded_blocks(self,
210
        blocksize,
211
        blockhash,
212
        size,
213
        nblocks,
214
        hashes,
215
        hmap,
216
        fileobj,
217
        hash_cb=None):
218
        offset = 0
219
        if hash_cb:
220
            hash_gen = hash_cb(nblocks)
221
            hash_gen.next()
222

    
223
        for i in range(nblocks):
224
            block = fileobj.read(min(blocksize, size - offset))
225
            bytes = len(block)
226
            hash = pithos_hash(block, blockhash)
227
            hashes.append(hash)
228
            hmap[hash] = (offset, bytes)
229
            offset += bytes
230
            if hash_cb:
231
                hash_gen.next()
232
        assert offset == size
233

    
234
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
235
        """upload missing blocks asynchronously. Use greenlets to avoid waiting
236
        """
237
        if upload_cb:
238
            upload_gen = upload_cb(len(missing))
239
            upload_gen.next()
240

    
241
        flying = []
242
        for hash in missing:
243
            offset, bytes = hmap[hash]
244
            fileobj.seek(offset)
245
            data = fileobj.read(bytes)
246
            r = self.put_block_async(data, hash)
247
            flying.append(r)
248
            unfinished = []
249
            for thread in flying:
250
                if thread.isAlive() or thread.exception:
251
                    unfinished.append(thread)
252
                else:
253
                    if upload_cb:
254
                        upload_gen.next()
255
            flying = unfinished
256
        while upload_cb:
257
            try:
258
                upload_gen.next()
259
            except StopIteration:
260
                break
261

    
262
        for thread in flying:
263
            thread.join()
264

    
265
        failures = [r for r in flying if r.exception]
266
        if len(failures):
267
            details = ', '.join([' (%s).%s' % (i, r.exception)\
268
                for i, r in enumerate(failures)])
269
            raise ClientError(message="Block uploading failed",
270
                status=505,
271
                details=details)
272

    
273
    def upload_object(self, obj, f,
274
        size=None,
275
        hash_cb=None,
276
        upload_cb=None,
277
        etag=None,
278
        content_encoding=None,
279
        content_disposition=None,
280
        content_type=None,
281
        sharing=None,
282
        public=None):
283
        self.assert_container()
284

    
285
        #init
286
        block_info = (blocksize, blockhash, size, nblocks) =\
287
            self._get_file_block_info(f, size)
288
        (hashes, hmap, offset) = ([], {}, 0)
289
        if content_type is None:
290
            content_type = 'application/octet-stream'
291

    
292
        self._caclulate_uploaded_blocks(*block_info,
293
            hashes=hashes,
294
            hmap=hmap,
295
            fileobj=f,
296
            hash_cb=hash_cb)
297

    
298
        hashmap = dict(bytes=size, hashes=hashes)
299
        missing = self._get_missing_hashes(obj, hashmap,
300
            content_type=content_type,
301
            size=size,
302
            etag=etag,
303
            content_encoding=content_encoding,
304
            content_disposition=content_disposition,
305
            permissions=sharing,
306
            public=public)
307

    
308
        if missing is None:
309
            return
310
        self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
311

    
312
        r = self.object_put(obj,
313
            format='json',
314
            hashmap=True,
315
            content_type=content_type,
316
            json=hashmap,
317
            success=201)
318
        r.release()
319

    
320
    #download_* auxiliary methods
321
    #ALl untested
322
    def _get_remote_blocks_info(self, obj, **restargs):
323
        #retrieve object hashmap
324
        myrange = restargs.pop('data_range', None)
325
        hashmap = self.get_object_hashmap(obj, **restargs)
326
        restargs['data_range'] = myrange
327
        blocksize = int(hashmap['block_size'])
328
        blockhash = hashmap['block_hash']
329
        total_size = hashmap['bytes']
330
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
331
        map_dict = {}
332
        for i, h in enumerate(hashmap['hashes']):
333
            map_dict[h] = i
334
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
335

    
336
    def _dump_blocks_sync(self,
337
        obj,
338
        remote_hashes,
339
        blocksize,
340
        total_size,
341
        dst,
342
        range,
343
        **restargs):
344
        for blockid, blockhash in enumerate(remote_hashes):
345
            if blockhash == None:
346
                continue
347
            start = blocksize * blockid
348
            end = total_size - 1 if start + blocksize > total_size\
349
                else start + blocksize - 1
350
            (start, end) = _range_up(start, end, range)
351
            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
352
            r = self.object_get(obj, success=(200, 206), **restargs)
353
            self._cb_next()
354
            dst.write(r.content)
355
            dst.flush()
356

    
357
    def _get_block_async(self, obj, **restargs):
358
        event = SilentEvent(self.object_get,
359
            obj,
360
            success=(200, 206),
361
            **restargs)
362
        event.start()
363
        return event
364

    
365
    def _hash_from_file(self, fp, start, size, blockhash):
366
        fp.seek(start)
367
        block = fp.read(size)
368
        h = newhashlib(blockhash)
369
        h.update(block.strip('\x00'))
370
        return hexlify(h.digest())
371

    
372
    def _thread2file(self,
373
        flying,
374
        local_file,
375
        offset=0,
376
        **restargs):
377
        """write the results of a greenleted rest call to a file
378
        @offset: the offset of the file up to blocksize
379
            - e.g. if the range is 10-100, all
380
        blocks will be written to normal_position - 10"""
381
        finished = []
382
        for start, g in flying.items():
383
            if not g.isAlive():
384
                if g.exception:
385
                    raise g.exception
386
                block = g.value.content
387
                local_file.seek(start - offset)
388
                local_file.write(block)
389
                self._cb_next()
390
                finished.append(flying.pop(start))
391
        local_file.flush()
392
        return finished
393

    
394
    def _dump_blocks_async(self,
395
        obj,
396
        remote_hashes,
397
        blocksize,
398
        total_size,
399
        local_file,
400
        blockhash=None,
401
        resume=False,
402
        filerange=None,
403
        **restargs):
404

    
405
        file_size = fstat(local_file.fileno()).st_size if resume else 0
406
        flying = {}
407
        finished = []
408
        offset = 0
409
        if filerange is not None:
410
            rstart = int(filerange.split('-')[0])
411
            offset = rstart if blocksize > rstart else rstart % blocksize
412
        for block_hash, blockid in remote_hashes.items():
413
            start = blocksize * blockid
414
            if start < file_size\
415
            and block_hash == self._hash_from_file(local_file,
416
                start,
417
                blocksize,
418
                blockhash):
419
                self._cb_next()
420
                continue
421
            if len(flying) >= self.POOL_SIZE:
422
                finished += self._thread2file(flying,
423
                    local_file,
424
                    offset,
425
                    **restargs)
426
            end = total_size - 1 if start + blocksize > total_size\
427
                else start + blocksize - 1
428
            (start, end) = _range_up(start, end, filerange)
429
            if start == end:
430
                self._cb_next()
431
                continue
432
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
433
            flying[start] = self._get_block_async(obj, **restargs)
434

    
435
        for thread in flying.values():
436
            thread.join()
437
        finished += self._thread2file(flying, local_file, offset, **restargs)
438

    
439
    def download_object(self,
440
        obj,
441
        dst,
442
        download_cb=None,
443
        version=None,
444
        overide=False,
445
        resume=False,
446
        range=None,
447
        if_match=None,
448
        if_none_match=None,
449
        if_modified_since=None,
450
        if_unmodified_since=None):
451

    
452
        restargs = dict(version=version,
453
            data_range=None if range is None else 'bytes=%s' % range,
454
            if_match=if_match,
455
            if_none_match=if_none_match,
456
            if_modified_since=if_modified_since,
457
            if_unmodified_since=if_unmodified_since)
458

    
459
        (blocksize,
460
            blockhash,
461
            total_size,
462
            hash_list,
463
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
464
        assert total_size >= 0
465
        self.POOL_SIZE = 5
466

    
467
        if download_cb:
468
            self.progress_bar_gen = download_cb(len(remote_hashes))
469
            self._cb_next()
470

    
471
        if dst.isatty():
472
            self._dump_blocks_sync(obj,
473
                hash_list,
474
                blocksize,
475
                total_size,
476
                dst,
477
                range,
478
                **restargs)
479
        else:
480
            self._dump_blocks_async(obj,
481
                remote_hashes,
482
                blocksize,
483
                total_size,
484
                dst,
485
                blockhash,
486
                resume,
487
                range,
488
                **restargs)
489
            if range is None:
490
                dst.truncate(total_size)
491

    
492
        self._complete_cb()
493

    
494
    #Command Progress Bar method
495
    def _cb_next(self):
496
        if hasattr(self, 'progress_bar_gen'):
497
            try:
498
                self.progress_bar_gen.next()
499
            except:
500
                pass
501

    
502
    def _complete_cb(self):
503
        while True:
504
            try:
505
                self.progress_bar_gen.next()
506
            except:
507
                break
508

    
509
    # Untested - except is download_object is tested first
510
    def get_object_hashmap(self, obj,
511
        version=None,
512
        if_match=None,
513
        if_none_match=None,
514
        if_modified_since=None,
515
        if_unmodified_since=None,
516
        data_range=None):
517
        try:
518
            r = self.object_get(obj,
519
                hashmap=True,
520
                version=version,
521
                if_etag_match=if_match,
522
                if_etag_not_match=if_none_match,
523
                if_modified_since=if_modified_since,
524
                if_unmodified_since=if_unmodified_since,
525
                data_range=data_range)
526
        except ClientError as err:
527
            if err.status == 304 or err.status == 412:
528
                return {}
529
            raise
530
        return r.json
531

    
532
    def set_account_group(self, group, usernames):
533
        r = self.account_post(update=True, groups={group: usernames})
534
        r.release()
535

    
536
    def del_account_group(self, group):
537
        r = self.account_post(update=True, groups={group: []})
538
        r.release()
539

    
540
    def get_account_info(self, until=None):
541
        r = self.account_head(until=until)
542
        if r.status_code == 401:
543
            raise ClientError("No authorization")
544
        return r.headers
545

    
546
    def get_account_quota(self):
547
        return filter_in(self.get_account_info(),
548
            'X-Account-Policy-Quota',
549
            exactMatch=True)
550

    
551
    def get_account_versioning(self):
552
        return filter_in(self.get_account_info(),
553
            'X-Account-Policy-Versioning',
554
            exactMatch=True)
555

    
556
    def get_account_meta(self, until=None):
557
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
558

    
559
    def get_account_group(self):
560
        return filter_in(self.get_account_info(), 'X-Account-Group-')
561

    
562
    def set_account_meta(self, metapairs):
563
        assert(type(metapairs) is dict)
564
        r = self.account_post(update=True, metadata=metapairs)
565
        r.release()
566

    
567
    def del_account_meta(self, metakey):
568
        r = self.account_post(update=True, metadata={metakey: ''})
569
        r.release()
570

    
571
    def set_account_quota(self, quota):
572
        r = self.account_post(update=True, quota=quota)
573
        r.release()
574

    
575
    def set_account_versioning(self, versioning):
576
        r = self.account_post(update=True, versioning=versioning)
577
        r.release()
578

    
579
    def list_containers(self):
580
        r = self.account_get()
581
        return r.json
582

    
583
    def del_container(self, until=None, delimiter=None):
584
        self.assert_container()
585
        r = self.container_delete(until=until,
586
            delimiter=delimiter,
587
            success=(204, 404, 409))
588
        r.release()
589
        if r.status_code == 404:
590
            raise ClientError('Container "%s" does not exist' % self.container,
591
                r.status_code)
592
        elif r.status_code == 409:
593
            raise ClientError('Container "%s" is not empty' % self.container,
594
                r.status_code)
595

    
596
    def get_container_versioning(self, container):
597
        self.container = container
598
        return filter_in(self.get_container_info(),
599
            'X-Container-Policy-Versioning')
600

    
601
    def get_container_quota(self, container):
602
        self.container = container
603
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
604

    
605
    def get_container_info(self, until=None):
606
        r = self.container_head(until=until)
607
        return r.headers
608

    
609
    def get_container_meta(self, until=None):
610
        return filter_in(self.get_container_info(until=until),
611
            'X-Container-Meta')
612

    
613
    def get_container_object_meta(self, until=None):
614
        return filter_in(self.get_container_info(until=until),
615
            'X-Container-Object-Meta')
616

    
617
    def set_container_meta(self, metapairs):
618
        assert(type(metapairs) is dict)
619
        r = self.container_post(update=True, metadata=metapairs)
620
        r.release()
621

    
622
    def del_container_meta(self, metakey):
623
        r = self.container_post(update=True, metadata={metakey: ''})
624
        r.release()
625

    
626
    def set_container_quota(self, quota):
627
        r = self.container_post(update=True, quota=quota)
628
        r.release()
629

    
630
    def set_container_versioning(self, versioning):
631
        r = self.container_post(update=True, versioning=versioning)
632
        r.release()
633

    
634
    def del_object(self, obj, until=None, delimiter=None):
635
        self.assert_container()
636
        r = self.object_delete(obj, until=until, delimiter=delimiter)
637
        r.release()
638

    
639
    def set_object_meta(self, object, metapairs):
640
        assert(type(metapairs) is dict)
641
        r = self.object_post(object, update=True, metadata=metapairs)
642
        r.release()
643

    
644
    def del_object_meta(self, metakey, object):
645
        r = self.object_post(object, update=True, metadata={metakey: ''})
646
        r.release()
647

    
648
    def publish_object(self, object):
649
        r = self.object_post(object, update=True, public=True)
650
        r.release()
651

    
652
    def unpublish_object(self, object):
653
        r = self.object_post(object, update=True, public=False)
654
        r.release()
655

    
656
    def get_object_info(self, obj, version=None):
657
        r = self.object_head(obj, version=version)
658
        return r.headers
659

    
660
    def get_object_meta(self, obj, version=None):
661
        return filter_in(self.get_object_info(obj, version=version),
662
            'X-Object-Meta')
663

    
664
    def get_object_sharing(self, object):
665
        r = filter_in(self.get_object_info(object),
666
            'X-Object-Sharing',
667
            exactMatch=True)
668
        reply = {}
669
        if len(r) > 0:
670
            perms = r['x-object-sharing'].split(';')
671
            for perm in perms:
672
                try:
673
                    perm.index('=')
674
                except ValueError:
675
                    raise ClientError('Incorrect reply format')
676
                (key, val) = perm.strip().split('=')
677
                reply[key] = val
678
        return reply
679

    
680
    def set_object_sharing(self, object,
681
        read_permition=False,
682
        write_permition=False):
683
        """Give read/write permisions to an object.
684
           @param object is the object to change sharing permissions
685
 onto
686
           @param read_permition is a list of users and user groups that
687
                get read permition for this object
688
                False means all previous read permissions
689
 will be removed
690
           @param write_perimition is a list of users and user groups to
691
                get write permition for this object
692
                False means all previous read permissions
693
 will be removed
694
        """
695
        perms = dict(read='' if not read_permition else read_permition,
696
            write='' if not write_permition else write_permition)
697
        r = self.object_post(object, update=True, permissions=perms)
698
        r.release()
699

    
700
    def del_object_sharing(self, object):
701
        self.set_object_sharing(object)
702

    
703
    def append_object(self, object, source_file, upload_cb=None):
704
        """@param upload_db is a generator for showing progress of upload
705
            to caller application, e.g. a progress bar. Its next is called
706
            whenever a block is uploaded
707
        """
708
        self.assert_container()
709
        meta = self.get_container_info()
710
        blocksize = int(meta['x-container-block-size'])
711
        filesize = fstat(source_file.fileno()).st_size
712
        nblocks = 1 + (filesize - 1) // blocksize
713
        offset = 0
714
        if upload_cb is not None:
715
            upload_gen = upload_cb(nblocks)
716
        for i in range(nblocks):
717
            block = source_file.read(min(blocksize, filesize - offset))
718
            offset += len(block)
719
            r = self.object_post(object,
720
                update=True,
721
                content_range='bytes */*',
722
                content_type='application/octet-stream',
723
                content_length=len(block),
724
                data=block)
725
            r.release()
726

    
727
            if upload_cb is not None:
728
                upload_gen.next()
729

    
730
    def truncate_object(self, object, upto_bytes):
731
        r = self.object_post(object,
732
            update=True,
733
            content_range='bytes 0-%s/*' % upto_bytes,
734
            content_type='application/octet-stream',
735
            object_bytes=upto_bytes,
736
            source_object=path4url(self.container, object))
737
        r.release()
738

    
739
    def overwrite_object(self,
740
        object,
741
        start,
742
        end,
743
        source_file,
744
        upload_cb=None):
745
        """Overwrite a part of an object with given source file
746
           @start the part of the remote object to start overwriting from,
747
                in bytes
748
           @end the part of the remote object to stop overwriting to, in bytes
749
        """
750
        self.assert_container()
751
        meta = self.get_container_info()
752
        blocksize = int(meta['x-container-block-size'])
753
        filesize = fstat(source_file.fileno()).st_size
754
        datasize = int(end) - int(start) + 1
755
        nblocks = 1 + (datasize - 1) // blocksize
756
        offset = 0
757
        if upload_cb is not None:
758
            upload_gen = upload_cb(nblocks)
759
        for i in range(nblocks):
760
            block = source_file.read(min(blocksize,
761
                filesize - offset,
762
                datasize - offset))
763
            offset += len(block)
764
            r = self.object_post(object,
765
                update=True,
766
                content_type='application/octet-stream',
767
                content_length=len(block),
768
                content_range='bytes %s-%s/*' % (start, end),
769
                data=block)
770
            r.release()
771

    
772
            if upload_cb is not None:
773
                upload_gen.next()
774

    
775
    def copy_object(self, src_container, src_object, dst_container,
776
        dst_object=False,
777
        source_version=None,
778
        public=False,
779
        content_type=None,
780
        delimiter=None):
781
        self.assert_account()
782
        self.container = dst_container
783
        dst_object = dst_object or src_object
784
        src_path = path4url(src_container, src_object)
785
        r = self.object_put(dst_object,
786
            success=201,
787
            copy_from=src_path,
788
            content_length=0,
789
            source_version=source_version,
790
            public=public,
791
            content_type=content_type,
792
            delimiter=delimiter)
793
        r.release()
794

    
795
    def move_object(self, src_container, src_object, dst_container,
796
        dst_object=False,
797
        source_version=None,
798
        public=False,
799
        content_type=None,
800
        delimiter=None):
801
        self.assert_account()
802
        self.container = dst_container
803
        dst_object = dst_object or src_object
804
        src_path = path4url(src_container, src_object)
805
        r = self.object_put(dst_object,
806
            success=201,
807
            move_from=src_path,
808
            content_length=0,
809
            source_version=source_version,
810
            public=public,
811
            content_type=content_type,
812
            delimiter=delimiter)
813
        r.release()
814

    
815
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
816
        """Get accounts that share with self.account"""
817
        self.assert_account()
818

    
819
        self.set_param('format', 'json')
820
        self.set_param('limit', limit, iff=limit is not None)
821
        self.set_param('marker', marker, iff=marker is not None)
822

    
823
        path = ''
824
        success = kwargs.pop('success', (200, 204))
825
        r = self.get(path, *args, success=success, **kwargs)
826
        return r.json
827

    
828
    def get_object_versionlist(self, path):
829
        self.assert_container()
830
        r = self.object_get(path, format='json', version='list')
831
        return r.json['versions']