Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 4375e020

History | View | Annotate | Download (31.1 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self):
77
        """Delete an empty container and destroy associated blocks
78
        """
79
        r = self.container_delete(until=unicode(time()))
80
        r.release()
81

    
82
    def upload_object_unchunked(self, obj, f,
83
        withHashFile=False,
84
        size=None,
85
        etag=None,
86
        content_encoding=None,
87
        content_disposition=None,
88
        content_type=None,
89
        sharing=None,
90
        public=None):
91
        """
92
        :param obj: (str) remote object path
93

94
        :param f: open file descriptor
95

96
        :param withHashFile: (bool)
97

98
        :param size: (int) size of data to upload
99

100
        :param etag: (str)
101

102
        :param content_encoding: (str)
103

104
        :param content_disposition: (str)
105

106
        :param content_type: (str)
107

108
        :param sharing: {'read':[user and/or grp names],
109
            'write':[usr and/or grp names]}
110

111
        :param public: (bool)
112
        """
113
        self.assert_container()
114

    
115
        if withHashFile:
116
            data = f.read()
117
            try:
118
                import json
119
                data = json.dumps(json.loads(data))
120
            except ValueError:
121
                raise ClientError(message='"%s" is not json-formated' % f.name,
122
                    status=1)
123
            except SyntaxError:
124
                raise ClientError(message='"%s" is not a valid hashmap file'\
125
                % f.name, status=1)
126
            f = StringIO(data)
127
        data = f.read(size) if size is not None else f.read()
128
        r = self.object_put(obj,
129
            data=data,
130
            etag=etag,
131
            content_encoding=content_encoding,
132
            content_disposition=content_disposition,
133
            content_type=content_type,
134
            permissions=sharing,
135
            public=public,
136
            success=201)
137
        r.release()
138

    
139
    def create_object_by_manifestation(self, obj,
140
        etag=None,
141
        content_encoding=None,
142
        content_disposition=None,
143
        content_type=None,
144
        sharing=None,
145
        public=None):
146
        """
147
        :param obj: (str) remote object path
148

149
        :param etag: (str)
150

151
        :param content_encoding: (str)
152

153
        :param content_disposition: (str)
154

155
        :param content_type: (str)
156

157
        :param sharing: {'read':[user and/or grp names],
158
            'write':[usr and/or grp names]}
159

160
        :param public: (bool)
161
        """
162
        self.assert_container()
163
        r = self.object_put(obj,
164
            content_length=0,
165
            etag=etag,
166
            content_encoding=content_encoding,
167
            content_disposition=content_disposition,
168
            content_type=content_type,
169
            permissions=sharing,
170
            public=public,
171
            manifest='%s/%s' % (self.container, obj))
172
        r.release()
173

    
174
    # upload_* auxiliary methods
175
    def _put_block_async(self, data, hash):
176
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
177
        event.start()
178
        return event
179

    
180
    def _put_block(self, data, hash):
181
        r = self.container_post(update=True,
182
            content_type='application/octet-stream',
183
            content_length=len(data),
184
            data=data,
185
            format='json')
186
        assert r.json[0] == hash, 'Local hash does not match server'
187

    
188
    def _get_file_block_info(self, fileobj, size=None):
189
        meta = self.get_container_info()
190
        blocksize = int(meta['x-container-block-size'])
191
        blockhash = meta['x-container-block-hash']
192
        size = size if size is not None else fstat(fileobj.fileno()).st_size
193
        nblocks = 1 + (size - 1) // blocksize
194
        return (blocksize, blockhash, size, nblocks)
195

    
196
    def _get_missing_hashes(self, obj, json,
197
        size=None,
198
        format='json',
199
        hashmap=True,
200
        content_type=None,
201
        etag=None,
202
        content_encoding=None,
203
        content_disposition=None,
204
        permissions=None,
205
        public=None,
206
        success=(201, 409)):
207
        r = self.object_put(obj,
208
            format='json',
209
            hashmap=True,
210
            content_type=content_type,
211
            json=json,
212
            etag=etag,
213
            content_encoding=content_encoding,
214
            content_disposition=content_disposition,
215
            permissions=permissions,
216
            public=public,
217
            success=success)
218
        if r.status_code == 201:
219
            r.release()
220
            return None
221
        return r.json
222

    
223
    def _caclulate_uploaded_blocks(self,
224
        blocksize,
225
        blockhash,
226
        size,
227
        nblocks,
228
        hashes,
229
        hmap,
230
        fileobj,
231
        hash_cb=None):
232
        offset = 0
233
        if hash_cb:
234
            hash_gen = hash_cb(nblocks)
235
            hash_gen.next()
236

    
237
        for i in range(nblocks):
238
            block = fileobj.read(min(blocksize, size - offset))
239
            bytes = len(block)
240
            hash = _pithos_hash(block, blockhash)
241
            hashes.append(hash)
242
            hmap[hash] = (offset, bytes)
243
            offset += bytes
244
            if hash_cb:
245
                hash_gen.next()
246
        assert offset == size
247

    
248
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
249
        """upload missing blocks asynchronously.
250
        """
251
        if upload_cb:
252
            upload_gen = upload_cb(len(missing))
253
            upload_gen.next()
254

    
255
        self._init_thread_limit()
256

    
257
        flying = []
258
        for hash in missing:
259
            offset, bytes = hmap[hash]
260
            fileobj.seek(offset)
261
            data = fileobj.read(bytes)
262
            r = self._put_block_async(data, hash)
263
            flying.append(r)
264
            unfinished = []
265
            for i, thread in enumerate(flying):
266

    
267
                unfinished = self._watch_thread_limit(unfinished)
268

    
269
                if thread.isAlive() or thread.exception:
270
                    unfinished.append(thread)
271
                else:
272
                    if upload_cb:
273
                        upload_gen.next()
274
            flying = unfinished
275

    
276
        for thread in flying:
277
            thread.join()
278

    
279
        failures = [r for r in flying if r.exception]
280
        if len(failures):
281
            details = ', '.join([' (%s).%s' % (i, r.exception)\
282
                for i, r in enumerate(failures)])
283
            raise ClientError(message="Block uploading failed",
284
                status=505,
285
                details=details)
286

    
287
        while upload_cb:
288
            try:
289
                upload_gen.next()
290
            except StopIteration:
291
                break
292

    
293
    def upload_object(self, obj, f,
294
        size=None,
295
        hash_cb=None,
296
        upload_cb=None,
297
        etag=None,
298
        content_encoding=None,
299
        content_disposition=None,
300
        content_type=None,
301
        sharing=None,
302
        public=None):
303
        """Upload an object using multiple connections (threads)
304

305
        :param obj: (str) remote object path
306

307
        :param f: open file descriptor (rb)
308

309
        :param hash_cb: optional progress.bar object for calculating hashes
310

311
        :param upload_cb: optional progress.bar object for uploading
312

313
        :param etag: (str)
314

315
        :param content_encoding: (str)
316

317
        :param content_disposition: (str)
318

319
        :param content_type: (str)
320

321
        :param sharing: {'read':[user and/or grp names],
322
            'write':[usr and/or grp names]}
323

324
        :param public: (bool)
325
        """
326
        self.assert_container()
327

    
328
        #init
329
        block_info = (blocksize, blockhash, size, nblocks) =\
330
            self._get_file_block_info(f, size)
331
        (hashes, hmap, offset) = ([], {}, 0)
332
        if content_type is None:
333
            content_type = 'application/octet-stream'
334

    
335
        self._caclulate_uploaded_blocks(*block_info,
336
            hashes=hashes,
337
            hmap=hmap,
338
            fileobj=f,
339
            hash_cb=hash_cb)
340

    
341
        hashmap = dict(bytes=size, hashes=hashes)
342
        missing = self._get_missing_hashes(obj, hashmap,
343
            content_type=content_type,
344
            size=size,
345
            etag=etag,
346
            content_encoding=content_encoding,
347
            content_disposition=content_disposition,
348
            permissions=sharing,
349
            public=public)
350

    
351
        if missing is None:
352
            return
353
        try:
354
            self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
355
        except KeyboardInterrupt:
356
            print('- - - wait for threads to finish')
357
            for thread in activethreads():
358
                thread.join()
359
            raise
360

    
361
        r = self.object_put(
362
            obj,
363
            format='json',
364
            hashmap=True,
365
            content_type=content_type,
366
            json=hashmap,
367
            success=201)
368
        r.release()
369

    
370
    # download_* auxiliary methods
371
    def _get_remote_blocks_info(self, obj, **restargs):
372
        #retrieve object hashmap
373
        myrange = restargs.pop('data_range', None)
374
        hashmap = self.get_object_hashmapp(obj, **restargs)
375
        restargs['data_range'] = myrange
376
        blocksize = int(hashmap['block_size'])
377
        blockhash = hashmap['block_hash']
378
        total_size = hashmap['bytes']
379
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
380
        map_dict = {}
381
        for i, h in enumerate(hashmap['hashes']):
382
            map_dict[h] = i
383
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
384

    
385
    def _dump_blocks_sync(self,
386
        obj,
387
        remote_hashes,
388
        blocksize,
389
        total_size,
390
        dst,
391
        range,
392
        **restargs):
393
        for blockid, blockhash in enumerate(remote_hashes):
394
            if blockhash == None:
395
                continue
396
            start = blocksize * blockid
397
            end = total_size - 1 if start + blocksize > total_size\
398
                else start + blocksize - 1
399
            (start, end) = _range_up(start, end, range)
400
            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
401
            r = self.object_get(obj, success=(200, 206), **restargs)
402
            self._cb_next()
403
            dst.write(r.content)
404
            dst.flush()
405

    
406
    def _get_block_async(self, obj, **restargs):
407
        event = SilentEvent(self.object_get,
408
            obj,
409
            success=(200, 206),
410
            **restargs)
411
        event.start()
412
        return event
413

    
414
    def _hash_from_file(self, fp, start, size, blockhash):
415
        fp.seek(start)
416
        block = fp.read(size)
417
        h = newhashlib(blockhash)
418
        h.update(block.strip('\x00'))
419
        return hexlify(h.digest())
420

    
421
    def _thread2file(self,
422
        flying,
423
        local_file,
424
        offset=0,
425
        **restargs):
426
        """write the results of a greenleted rest call to a file
427
        @offset: the offset of the file up to blocksize
428
            - e.g. if the range is 10-100, all
429
        blocks will be written to normal_position - 10"""
430
        finished = []
431
        for i, (start, g) in enumerate(flying.items()):
432
            if not g.isAlive():
433
                if g.exception:
434
                    raise g.exception
435
                block = g.value.content
436
                local_file.seek(start - offset)
437
                local_file.write(block)
438
                self._cb_next()
439
                finished.append(flying.pop(start))
440
        local_file.flush()
441
        return finished
442

    
443
    def _dump_blocks_async(self,
444
        obj,
445
        remote_hashes,
446
        blocksize,
447
        total_size,
448
        local_file,
449
        blockhash=None,
450
        resume=False,
451
        filerange=None,
452
        **restargs):
453

    
454
        file_size = fstat(local_file.fileno()).st_size if resume else 0
455
        flying = {}
456
        finished = []
457
        offset = 0
458
        if filerange is not None:
459
            rstart = int(filerange.split('-')[0])
460
            offset = rstart if blocksize > rstart else rstart % blocksize
461

    
462
        self._init_thread_limit()
463
        for block_hash, blockid in remote_hashes.items():
464
            start = blocksize * blockid
465
            if start < file_size\
466
            and block_hash == self._hash_from_file(
467
                    local_file,
468
                    start,
469
                    blocksize,
470
                    blockhash):
471
                self._cb_next()
472
                continue
473
            self._watch_thread_limit(flying.values())
474
            finished += self._thread2file(
475
                flying,
476
                local_file,
477
                offset,
478
                **restargs)
479
            end = total_size - 1 if start + blocksize > total_size\
480
                else start + blocksize - 1
481
            (start, end) = _range_up(start, end, filerange)
482
            if start == end:
483
                self._cb_next()
484
                continue
485
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
486
            flying[start] = self._get_block_async(obj, **restargs)
487

    
488
        for thread in flying.values():
489
            thread.join()
490
        finished += self._thread2file(flying, local_file, offset, **restargs)
491

    
492
    def download_object(self,
493
        obj,
494
        dst,
495
        download_cb=None,
496
        version=None,
497
        resume=False,
498
        range=None,
499
        if_match=None,
500
        if_none_match=None,
501
        if_modified_since=None,
502
        if_unmodified_since=None):
503
        """Download an object using multiple connections (threads) and
504
            writing to random parts of the file
505

506
        :param obj: (str) remote object path
507

508
        :param dst: open file descriptor (wb+)
509

510
        :param download_cb: optional progress.bar object for downloading
511

512
        :param version: (str) file version
513

514
        :param resume: (bool) if set, preserve already downloaded file parts
515

516
        :param range: (str) from-to where from and to are integers denoting
517
            file positions in bytes
518

519
        :param if_match: (str)
520

521
        :param if_none_match: (str)
522

523
        :param if_modified_since: (str) formated date
524

525
        :param if_unmodified_since: (str) formated date
526
        """
527

    
528
        restargs = dict(version=version,
529
            data_range=None if range is None else 'bytes=%s' % range,
530
            if_match=if_match,
531
            if_none_match=if_none_match,
532
            if_modified_since=if_modified_since,
533
            if_unmodified_since=if_unmodified_since)
534

    
535
        (blocksize,
536
            blockhash,
537
            total_size,
538
            hash_list,
539
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
540
        assert total_size >= 0
541

    
542
        if download_cb:
543
            self.progress_bar_gen = download_cb(len(remote_hashes))
544
            self._cb_next()
545

    
546
        if dst.isatty():
547
            self._dump_blocks_sync(obj,
548
                hash_list,
549
                blocksize,
550
                total_size,
551
                dst,
552
                range,
553
                **restargs)
554
        else:
555
            self._dump_blocks_async(obj,
556
                remote_hashes,
557
                blocksize,
558
                total_size,
559
                dst,
560
                blockhash,
561
                resume,
562
                range,
563
                **restargs)
564
            if range is None:
565
                dst.truncate(total_size)
566

    
567
        self._complete_cb()
568

    
569
    #Command Progress Bar method
570
    def _cb_next(self):
571
        if hasattr(self, 'progress_bar_gen'):
572
            try:
573
                self.progress_bar_gen.next()
574
            except:
575
                pass
576

    
577
    def _complete_cb(self):
578
        while True:
579
            try:
580
                self.progress_bar_gen.next()
581
            except:
582
                break
583

    
584
    def get_object_hashmapp(self, obj,
585
        version=None,
586
        if_match=None,
587
        if_none_match=None,
588
        if_modified_since=None,
589
        if_unmodified_since=None,
590
        data_range=None):
591
        """
592
        :param obj: (str) remote object path
593

594
        :param if_match: (str)
595

596
        :param if_none_match: (str)
597

598
        :param if_modified_since: (str) formated date
599

600
        :param if_unmodified_since: (str) formated date
601

602
        :param data_range: (str) from-to where from and to are integers
603
            denoting file positions in bytes
604

605
        :returns: (list)
606
        """
607
        try:
608
            r = self.object_get(obj,
609
                hashmap=True,
610
                version=version,
611
                if_etag_match=if_match,
612
                if_etag_not_match=if_none_match,
613
                if_modified_since=if_modified_since,
614
                if_unmodified_since=if_unmodified_since,
615
                data_range=data_range)
616
        except ClientError as err:
617
            if err.status == 304 or err.status == 412:
618
                return {}
619
            raise
620
        return r.json
621

    
622
    def set_account_group(self, group, usernames):
623
        """
624
        :param group: (str)
625

626
        :param usernames: (list)
627
        """
628
        r = self.account_post(update=True, groups={group: usernames})
629
        r.release()
630

    
631
    def del_account_group(self, group):
632
        """
633
        :param group: (str)
634
        """
635
        r = self.account_post(update=True, groups={group: []})
636
        r.release()
637

    
638
    def get_account_info(self, until=None):
639
        """
640
        :param until: (str) formated date
641

642
        :returns: (dict)
643
        """
644
        r = self.account_head(until=until)
645
        if r.status_code == 401:
646
            raise ClientError("No authorization")
647
        return r.headers
648

    
649
    def get_account_quota(self):
650
        """
651
        :returns: (dict)
652
        """
653
        return filter_in(self.get_account_info(),
654
            'X-Account-Policy-Quota',
655
            exactMatch=True)
656

    
657
    def get_account_versioning(self):
658
        """
659
        :returns: (dict)
660
        """
661
        return filter_in(self.get_account_info(),
662
            'X-Account-Policy-Versioning',
663
            exactMatch=True)
664

    
665
    def get_account_meta(self, until=None):
666
        """
667
        :meta until: (str) formated date
668

669
        :returns: (dict)
670
        """
671
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
672

    
673
    def get_account_group(self):
674
        """
675
        :returns: (dict)
676
        """
677
        return filter_in(self.get_account_info(), 'X-Account-Group-')
678

    
679
    def set_account_meta(self, metapairs):
680
        """
681
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
682
        """
683
        assert(type(metapairs) is dict)
684
        r = self.account_post(update=True, metadata=metapairs)
685
        r.release()
686

    
687
    def del_account_meta(self, metakey):
688
        """
689
        :param metakey: (str) metadatum key
690
        """
691
        r = self.account_post(update=True, metadata={metakey: ''})
692
        r.release()
693

    
694
    def set_account_quota(self, quota):
695
        """
696
        :param quota: (int)
697
        """
698
        r = self.account_post(update=True, quota=quota)
699
        r.release()
700

    
701
    def set_account_versioning(self, versioning):
702
        """
703
        "param versioning: (str)
704
        """
705
        r = self.account_post(update=True, versioning=versioning)
706
        r.release()
707

    
708
    def list_containers(self):
709
        """
710
        :returns: (dict)
711
        """
712
        r = self.account_get()
713
        return r.json
714

    
715
    def del_container(self, until=None, delimiter=None):
716
        """
717
        :param until: (str) formated date
718

719
        :param delimiter: (str)
720

721
        :raises ClientError: 404 Container does not exist
722

723
        :raises ClientError: 409 Container is not empty
724
        """
725
        self.assert_container()
726
        r = self.container_delete(until=until,
727
            delimiter=delimiter,
728
            success=(204, 404, 409))
729
        r.release()
730
        if r.status_code == 404:
731
            raise ClientError('Container "%s" does not exist' % self.container,
732
                r.status_code)
733
        elif r.status_code == 409:
734
            raise ClientError('Container "%s" is not empty' % self.container,
735
                r.status_code)
736

    
737
    def get_container_versioning(self, container):
738
        """
739
        :param container: (str)
740

741
        :returns: (dict)
742
        """
743
        self.container = container
744
        return filter_in(self.get_container_info(),
745
            'X-Container-Policy-Versioning')
746

    
747
    def get_container_quota(self, container):
748
        """
749
        :param container: (str)
750

751
        :returns: (dict)
752
        """
753
        self.container = container
754
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
755

    
756
    def get_container_info(self, until=None):
757
        """
758
        :param until: (str) formated date
759

760
        :returns: (dict)
761
        """
762
        r = self.container_head(until=until)
763
        return r.headers
764

    
765
    def get_container_meta(self, until=None):
766
        """
767
        :param until: (str) formated date
768

769
        :returns: (dict)
770
        """
771
        return filter_in(self.get_container_info(until=until),
772
            'X-Container-Meta')
773

    
774
    def get_container_object_meta(self, until=None):
775
        """
776
        :param until: (str) formated date
777

778
        :returns: (dict)
779
        """
780
        return filter_in(self.get_container_info(until=until),
781
            'X-Container-Object-Meta')
782

    
783
    def set_container_meta(self, metapairs):
784
        """
785
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
786
        """
787
        assert(type(metapairs) is dict)
788
        r = self.container_post(update=True, metadata=metapairs)
789
        r.release()
790

    
791
    def del_container_meta(self, metakey):
792
        """
793
        :param metakey: (str) metadatum key
794
        """
795
        r = self.container_post(update=True, metadata={metakey: ''})
796
        r.release()
797

    
798
    def set_container_quota(self, quota):
799
        """
800
        :param quota: (int)
801
        """
802
        r = self.container_post(update=True, quota=quota)
803
        r.release()
804

    
805
    def set_container_versioning(self, versioning):
806
        """
807
        :param versioning: (str)
808
        """
809
        r = self.container_post(update=True, versioning=versioning)
810
        r.release()
811

    
812
    def del_object(self, obj, until=None, delimiter=None):
813
        """
814
        :param obj: (str) remote object path
815

816
        :param until: (str) formated date
817

818
        :param delimiter: (str)
819
        """
820
        self.assert_container()
821
        r = self.object_delete(obj, until=until, delimiter=delimiter)
822
        r.release()
823

    
824
    def set_object_meta(self, obj, metapairs):
825
        """
826
        :param obj: (str) remote object path
827

828
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
829
        """
830
        assert(type(metapairs) is dict)
831
        r = self.object_post(obj, update=True, metadata=metapairs)
832
        r.release()
833

    
834
    def del_object_meta(self, metakey, obj):
835
        """
836
        :param metakey: (str) metadatum key
837

838
        :param obj: (str) remote object path
839
        """
840
        r = self.object_post(object, update=True, metadata={metakey: ''})
841
        r.release()
842

    
843
    def publish_object(self, object):
844
        r = self.object_post(object, update=True, public=True)
845
        r.release()
846

    
847
    def unpublish_object(self, object):
848
        r = self.object_post(object, update=True, public=False)
849
        r.release()
850

    
851
    def get_object_info(self, obj, version=None):
852
        r = self.object_head(obj, version=version)
853
        return r.headers
854

    
855
    def get_object_meta(self, obj, version=None):
856
        return filter_in(self.get_object_info(obj, version=version),
857
            'X-Object-Meta')
858

    
859
    def get_object_sharing(self, object):
860
        r = filter_in(self.get_object_info(object),
861
            'X-Object-Sharing',
862
            exactMatch=True)
863
        reply = {}
864
        if len(r) > 0:
865
            perms = r['x-object-sharing'].split(';')
866
            for perm in perms:
867
                try:
868
                    perm.index('=')
869
                except ValueError:
870
                    raise ClientError('Incorrect reply format')
871
                (key, val) = perm.strip().split('=')
872
                reply[key] = val
873
        return reply
874

    
875
    def set_object_sharing(self, object,
876
        read_permition=False,
877
        write_permition=False):
878
        """Give read/write permisions to an object.
879
           @param object is the object to change sharing permissions onto
880
           @param read_permition is a list of users and user groups that get
881
           read permition for this object False means all previous read
882
           permissions will be removed
883
           @param write_perimition is a list of users and user groups to get
884
           write permition for this object False means all previous read
885
           permissions will be removed
886
        """
887

    
888
        perms = dict(read='' if not read_permition else read_permition,
889
            write='' if not write_permition else write_permition)
890
        r = self.object_post(object, update=True, permissions=perms)
891
        r.release()
892

    
893
    def del_object_sharing(self, object):
894
        self.set_object_sharing(object)
895

    
896
    def append_object(self, object, source_file, upload_cb=None):
897
        """@param upload_db is a generator for showing progress of upload
898
            to caller application, e.g. a progress bar. Its next is called
899
            whenever a block is uploaded
900
        """
901

    
902
        self.assert_container()
903
        meta = self.get_container_info()
904
        blocksize = int(meta['x-container-block-size'])
905
        filesize = fstat(source_file.fileno()).st_size
906
        nblocks = 1 + (filesize - 1) // blocksize
907
        offset = 0
908
        if upload_cb is not None:
909
            upload_gen = upload_cb(nblocks)
910
        for i in range(nblocks):
911
            block = source_file.read(min(blocksize, filesize - offset))
912
            offset += len(block)
913
            r = self.object_post(object,
914
                update=True,
915
                content_range='bytes */*',
916
                content_type='application/octet-stream',
917
                content_length=len(block),
918
                data=block)
919
            r.release()
920

    
921
            if upload_cb is not None:
922
                upload_gen.next()
923

    
924
    def truncate_object(self, object, upto_bytes):
925
        r = self.object_post(object,
926
            update=True,
927
            content_range='bytes 0-%s/*' % upto_bytes,
928
            content_type='application/octet-stream',
929
            object_bytes=upto_bytes,
930
            source_object=path4url(self.container, object))
931
        r.release()
932

    
933
    def overwrite_object(self,
934
        object,
935
        start,
936
        end,
937
        source_file,
938
        upload_cb=None):
939
        """Overwrite a part of an object with given source file
940
           @start the part of the remote object to start overwriting from, in
941
           bytes
942
           @end the part of the remote object to stop overwriting to, in bytes
943
        """
944

    
945
        self.assert_container()
946
        meta = self.get_container_info()
947
        blocksize = int(meta['x-container-block-size'])
948
        filesize = fstat(source_file.fileno()).st_size
949
        datasize = int(end) - int(start) + 1
950
        nblocks = 1 + (datasize - 1) // blocksize
951
        offset = 0
952
        if upload_cb is not None:
953
            upload_gen = upload_cb(nblocks)
954
        for i in range(nblocks):
955
            block = source_file.read(min(blocksize,
956
                filesize - offset,
957
                datasize - offset))
958
            offset += len(block)
959
            r = self.object_post(object,
960
                update=True,
961
                content_type='application/octet-stream',
962
                content_length=len(block),
963
                content_range='bytes %s-%s/*' % (start, end),
964
                data=block)
965
            r.release()
966

    
967
            if upload_cb is not None:
968
                upload_gen.next()
969

    
970
    def copy_object(self, src_container, src_object, dst_container,
971
        dst_object=False,
972
        source_version=None,
973
        public=False,
974
        content_type=None,
975
        delimiter=None):
976
        self.assert_account()
977
        self.container = dst_container
978
        dst_object = dst_object or src_object
979
        src_path = path4url(src_container, src_object)
980
        r = self.object_put(dst_object,
981
            success=201,
982
            copy_from=src_path,
983
            content_length=0,
984
            source_version=source_version,
985
            public=public,
986
            content_type=content_type,
987
            delimiter=delimiter)
988
        r.release()
989

    
990
    def move_object(self, src_container, src_object, dst_container,
991
        dst_object=False,
992
        source_version=None,
993
        public=False,
994
        content_type=None,
995
        delimiter=None):
996
        self.assert_account()
997
        self.container = dst_container
998
        dst_object = dst_object or src_object
999
        src_path = path4url(src_container, src_object)
1000
        r = self.object_put(dst_object,
1001
            success=201,
1002
            move_from=src_path,
1003
            content_length=0,
1004
            source_version=source_version,
1005
            public=public,
1006
            content_type=content_type,
1007
            delimiter=delimiter)
1008
        r.release()
1009

    
1010
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1011
        """Get accounts that share with self.account"""
1012
        self.assert_account()
1013

    
1014
        self.set_param('format', 'json')
1015
        self.set_param('limit', limit, iff=limit is not None)
1016
        self.set_param('marker', marker, iff=marker is not None)
1017

    
1018
        path = ''
1019
        success = kwargs.pop('success', (200, 204))
1020
        r = self.get(path, *args, success=success, **kwargs)
1021
        return r.json
1022

    
1023
    def get_object_versionlist(self, path):
1024
        self.assert_container()
1025
        r = self.object_get(path, format='json', version='list')
1026
        return r.json['versions']