Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 6d192774

History | View | Annotate | Download (35.1 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self):
77
        """Delete an empty container and destroy associated blocks
78
        """
79
        r = self.container_delete(until=unicode(time()))
80
        r.release()
81

    
82
    def upload_object_unchunked(self, obj, f,
83
        withHashFile=False,
84
        size=None,
85
        etag=None,
86
        content_encoding=None,
87
        content_disposition=None,
88
        content_type=None,
89
        sharing=None,
90
        public=None):
91
        """
92
        :param obj: (str) remote object path
93

94
        :param f: open file descriptor
95

96
        :param withHashFile: (bool)
97

98
        :param size: (int) size of data to upload
99

100
        :param etag: (str)
101

102
        :param content_encoding: (str)
103

104
        :param content_disposition: (str)
105

106
        :param content_type: (str)
107

108
        :param sharing: {'read':[user and/or grp names],
109
            'write':[usr and/or grp names]}
110

111
        :param public: (bool)
112
        """
113
        self._assert_container()
114

    
115
        if withHashFile:
116
            data = f.read()
117
            try:
118
                import json
119
                data = json.dumps(json.loads(data))
120
            except ValueError:
121
                raise ClientError(message='"%s" is not json-formated' % f.name,
122
                    status=1)
123
            except SyntaxError:
124
                raise ClientError(message='"%s" is not a valid hashmap file'\
125
                % f.name, status=1)
126
            f = StringIO(data)
127
        data = f.read(size) if size is not None else f.read()
128
        r = self.object_put(obj,
129
            data=data,
130
            etag=etag,
131
            content_encoding=content_encoding,
132
            content_disposition=content_disposition,
133
            content_type=content_type,
134
            permissions=sharing,
135
            public=public,
136
            success=201)
137
        r.release()
138

    
139
    def create_object_by_manifestation(self, obj,
140
        etag=None,
141
        content_encoding=None,
142
        content_disposition=None,
143
        content_type=None,
144
        sharing=None,
145
        public=None):
146
        """
147
        :param obj: (str) remote object path
148

149
        :param etag: (str)
150

151
        :param content_encoding: (str)
152

153
        :param content_disposition: (str)
154

155
        :param content_type: (str)
156

157
        :param sharing: {'read':[user and/or grp names],
158
            'write':[usr and/or grp names]}
159

160
        :param public: (bool)
161
        """
162
        self._assert_container()
163
        r = self.object_put(obj,
164
            content_length=0,
165
            etag=etag,
166
            content_encoding=content_encoding,
167
            content_disposition=content_disposition,
168
            content_type=content_type,
169
            permissions=sharing,
170
            public=public,
171
            manifest='%s/%s' % (self.container, obj))
172
        r.release()
173

    
174
    # upload_* auxiliary methods
175
    def _put_block_async(self, data, hash, upload_gen=None):
176
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
177
        event.start()
178
        return event
179

    
180
    def _put_block(self, data, hash):
181
        from random import randint
182
        if not randint(0, 7):
183
            raise ClientError('BAD GATEWAY STUFF', 503)
184
        r = self.container_post(update=True,
185
            content_type='application/octet-stream',
186
            content_length=len(data),
187
            data=data,
188
            format='json')
189
        assert r.json[0] == hash, 'Local hash does not match server'
190

    
191
    def _get_file_block_info(self, fileobj, size=None):
192
        meta = self.get_container_info()
193
        blocksize = int(meta['x-container-block-size'])
194
        blockhash = meta['x-container-block-hash']
195
        size = size if size is not None else fstat(fileobj.fileno()).st_size
196
        nblocks = 1 + (size - 1) // blocksize
197
        return (blocksize, blockhash, size, nblocks)
198

    
199
    def _get_missing_hashes(self, obj, json,
200
        size=None,
201
        format='json',
202
        hashmap=True,
203
        content_type=None,
204
        etag=None,
205
        content_encoding=None,
206
        content_disposition=None,
207
        permissions=None,
208
        public=None,
209
        success=(201, 409)):
210
        r = self.object_put(obj,
211
            format='json',
212
            hashmap=True,
213
            content_type=content_type,
214
            json=json,
215
            etag=etag,
216
            content_encoding=content_encoding,
217
            content_disposition=content_disposition,
218
            permissions=permissions,
219
            public=public,
220
            success=success)
221
        if r.status_code == 201:
222
            r.release()
223
            return None
224
        return r.json
225

    
226
    def _caclulate_uploaded_blocks(self,
227
        blocksize,
228
        blockhash,
229
        size,
230
        nblocks,
231
        hashes,
232
        hmap,
233
        fileobj,
234
        hash_cb=None):
235
        offset = 0
236
        if hash_cb:
237
            hash_gen = hash_cb(nblocks)
238
            hash_gen.next()
239

    
240
        for i in range(nblocks):
241
            block = fileobj.read(min(blocksize, size - offset))
242
            bytes = len(block)
243
            hash = _pithos_hash(block, blockhash)
244
            hashes.append(hash)
245
            hmap[hash] = (offset, bytes)
246
            offset += bytes
247
            if hash_cb:
248
                hash_gen.next()
249
        if offset != size:
250
            assert offset == size, \
251
                   "Failed to calculate uploaded blocks: " \
252
                    "Offset and object size do not match"
253

    
254
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
255
        """upload missing blocks asynchronously"""
256

    
257
        self._init_thread_limit()
258

    
259
        flying = []
260
        failures = []
261
        for hash in missing:
262
            offset, bytes = hmap[hash]
263
            fileobj.seek(offset)
264
            data = fileobj.read(bytes)
265
            r = self._put_block_async(data, hash, upload_gen)
266
            flying.append(r)
267
            unfinished = self._watch_thread_limit(flying)
268
            for thread in set(flying).difference(unfinished):
269
                if thread.exception:
270
                    failures.append(thread)
271
                    if isinstance(thread.exception, ClientError)\
272
                    and thread.exception.status == 502:
273
                        self.POOLSIZE = self._thread_limit
274
                elif thread.isAlive():
275
                    flying.append(thread)
276
                elif upload_gen:
277
                    try:
278
                        upload_gen.next()
279
                    except:
280
                        pass
281
            flying = unfinished
282

    
283
        for thread in flying:
284
            thread.join()
285
            if thread.exception:
286
                failures.append(thread)
287
            elif upload_gen:
288
                try:
289
                    upload_gen.next()
290
                except:
291
                    pass
292

    
293
        return [failure.kwargs['hash'] for failure in failures]
294

    
295
    def upload_object(self, obj, f,
296
        size=None,
297
        hash_cb=None,
298
        upload_cb=None,
299
        etag=None,
300
        content_encoding=None,
301
        content_disposition=None,
302
        content_type=None,
303
        sharing=None,
304
        public=None):
305
        """Upload an object using multiple connections (threads)
306

307
        :param obj: (str) remote object path
308

309
        :param f: open file descriptor (rb)
310

311
        :param hash_cb: optional progress.bar object for calculating hashes
312

313
        :param upload_cb: optional progress.bar object for uploading
314

315
        :param etag: (str)
316

317
        :param content_encoding: (str)
318

319
        :param content_disposition: (str)
320

321
        :param content_type: (str)
322

323
        :param sharing: {'read':[user and/or grp names],
324
            'write':[usr and/or grp names]}
325

326
        :param public: (bool)
327
        """
328
        self._assert_container()
329

    
330
        #init
331
        block_info = (blocksize, blockhash, size, nblocks) =\
332
            self._get_file_block_info(f, size)
333
        (hashes, hmap, offset) = ([], {}, 0)
334
        if content_type is None:
335
            content_type = 'application/octet-stream'
336

    
337
        self._caclulate_uploaded_blocks(*block_info,
338
            hashes=hashes,
339
            hmap=hmap,
340
            fileobj=f,
341
            hash_cb=hash_cb)
342

    
343
        hashmap = dict(bytes=size, hashes=hashes)
344
        missing = self._get_missing_hashes(obj, hashmap,
345
            content_type=content_type,
346
            size=size,
347
            etag=etag,
348
            content_encoding=content_encoding,
349
            content_disposition=content_disposition,
350
            permissions=sharing,
351
            public=public)
352

    
353
        if missing is None:
354
            return
355

    
356
        if upload_cb:
357
            upload_gen = upload_cb(len(missing))
358
            for i in range(len(missing), len(hashmap['hashes']) + 1):
359
                try:
360
                    upload_gen.next()
361
                except:
362
                    upload_gen = None
363
        else:
364
            upload_gen = None
365

    
366
        retries = 7
367
        try:
368
            while retries:
369
                sendlog.info('%s blocks missing' % len(missing))
370
                num_of_blocks = len(missing)
371
                missing = self._upload_missing_blocks(
372
                    missing,
373
                    hmap,
374
                    f,
375
                    upload_gen)
376
                if missing:
377
                    if num_of_blocks == len(missing):
378
                        retries -= 1
379
                    else:
380
                        num_of_blocks = len(missing)
381
                else:
382
                    break
383
            if missing:
384
                raise ClientError(
385
                    '%s blocks failed to upload' % len(missing),
386
                    status=800)
387
        except KeyboardInterrupt:
388
            sendlog.info('- - - wait for threads to finish')
389
            for thread in activethreads():
390
                thread.join()
391
            raise
392

    
393
        r = self.object_put(
394
            obj,
395
            format='json',
396
            hashmap=True,
397
            content_type=content_type,
398
            json=hashmap,
399
            success=201)
400
        r.release()
401

    
402
    # download_* auxiliary methods
403
    def _get_remote_blocks_info(self, obj, **restargs):
404
        #retrieve object hashmap
405
        myrange = restargs.pop('data_range', None)
406
        hashmap = self.get_object_hashmap(obj, **restargs)
407
        restargs['data_range'] = myrange
408
        blocksize = int(hashmap['block_size'])
409
        blockhash = hashmap['block_hash']
410
        total_size = hashmap['bytes']
411
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
412
        map_dict = {}
413
        for i, h in enumerate(hashmap['hashes']):
414
            map_dict[h] = i
415
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
416

    
417
    def _dump_blocks_sync(self,
418
        obj,
419
        remote_hashes,
420
        blocksize,
421
        total_size,
422
        dst,
423
        range,
424
        **restargs):
425
        for blockid, blockhash in enumerate(remote_hashes):
426
            if blockhash == None:
427
                continue
428
            start = blocksize * blockid
429
            end = total_size - 1 if start + blocksize > total_size\
430
                else start + blocksize - 1
431
            (start, end) = _range_up(start, end, range)
432
            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
433
            r = self.object_get(obj, success=(200, 206), **restargs)
434
            self._cb_next()
435
            dst.write(r.content)
436
            dst.flush()
437

    
438
    def _get_block_async(self, obj, **restargs):
439
        event = SilentEvent(self.object_get,
440
            obj,
441
            success=(200, 206),
442
            **restargs)
443
        event.start()
444
        return event
445

    
446
    def _hash_from_file(self, fp, start, size, blockhash):
447
        fp.seek(start)
448
        block = fp.read(size)
449
        h = newhashlib(blockhash)
450
        h.update(block.strip('\x00'))
451
        return hexlify(h.digest())
452

    
453
    def _thread2file(self,
454
        flying,
455
        local_file,
456
        offset=0,
457
        **restargs):
458
        """write the results of a greenleted rest call to a file
459
        @offset: the offset of the file up to blocksize
460
            - e.g. if the range is 10-100, all
461
        blocks will be written to normal_position - 10"""
462
        finished = []
463
        for i, (start, g) in enumerate(flying.items()):
464
            if not g.isAlive():
465
                if g.exception:
466
                    raise g.exception
467
                block = g.value.content
468
                local_file.seek(start - offset)
469
                local_file.write(block)
470
                self._cb_next()
471
                finished.append(flying.pop(start))
472
        local_file.flush()
473
        return finished
474

    
475
    def _dump_blocks_async(self,
476
        obj,
477
        remote_hashes,
478
        blocksize,
479
        total_size,
480
        local_file,
481
        blockhash=None,
482
        resume=False,
483
        filerange=None,
484
        **restargs):
485

    
486
        file_size = fstat(local_file.fileno()).st_size if resume else 0
487
        flying = {}
488
        finished = []
489
        offset = 0
490
        if filerange is not None:
491
            rstart = int(filerange.split('-')[0])
492
            offset = rstart if blocksize > rstart else rstart % blocksize
493

    
494
        self._init_thread_limit()
495
        for block_hash, blockid in remote_hashes.items():
496
            start = blocksize * blockid
497
            if start < file_size\
498
            and block_hash == self._hash_from_file(
499
                    local_file,
500
                    start,
501
                    blocksize,
502
                    blockhash):
503
                self._cb_next()
504
                continue
505
            self._watch_thread_limit(flying.values())
506
            finished += self._thread2file(
507
                flying,
508
                local_file,
509
                offset,
510
                **restargs)
511
            end = total_size - 1 if start + blocksize > total_size\
512
                else start + blocksize - 1
513
            (start, end) = _range_up(start, end, filerange)
514
            if start == end:
515
                self._cb_next()
516
                continue
517
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
518
            flying[start] = self._get_block_async(obj, **restargs)
519

    
520
        for thread in flying.values():
521
            thread.join()
522
        finished += self._thread2file(flying, local_file, offset, **restargs)
523

    
524
    def download_object(self,
525
        obj,
526
        dst,
527
        download_cb=None,
528
        version=None,
529
        resume=False,
530
        range=None,
531
        if_match=None,
532
        if_none_match=None,
533
        if_modified_since=None,
534
        if_unmodified_since=None):
535
        """Download an object using multiple connections (threads) and
536
            writing to random parts of the file
537

538
        :param obj: (str) remote object path
539

540
        :param dst: open file descriptor (wb+)
541

542
        :param download_cb: optional progress.bar object for downloading
543

544
        :param version: (str) file version
545

546
        :param resume: (bool) if set, preserve already downloaded file parts
547

548
        :param range: (str) from-to where from and to are integers denoting
549
            file positions in bytes
550

551
        :param if_match: (str)
552

553
        :param if_none_match: (str)
554

555
        :param if_modified_since: (str) formated date
556

557
        :param if_unmodified_since: (str) formated date
558
        """
559

    
560
        restargs = dict(version=version,
561
            data_range=None if range is None else 'bytes=%s' % range,
562
            if_match=if_match,
563
            if_none_match=if_none_match,
564
            if_modified_since=if_modified_since,
565
            if_unmodified_since=if_unmodified_since)
566

    
567
        (blocksize,
568
            blockhash,
569
            total_size,
570
            hash_list,
571
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
572
        assert total_size >= 0
573

    
574
        if download_cb:
575
            self.progress_bar_gen = download_cb(len(remote_hashes))
576
            self._cb_next()
577

    
578
        if dst.isatty():
579
            self._dump_blocks_sync(obj,
580
                hash_list,
581
                blocksize,
582
                total_size,
583
                dst,
584
                range,
585
                **restargs)
586
        else:
587
            self._dump_blocks_async(obj,
588
                remote_hashes,
589
                blocksize,
590
                total_size,
591
                dst,
592
                blockhash,
593
                resume,
594
                range,
595
                **restargs)
596
            if range is None:
597
                dst.truncate(total_size)
598

    
599
        self._complete_cb()
600

    
601
    #Command Progress Bar method
602
    def _cb_next(self):
603
        if hasattr(self, 'progress_bar_gen'):
604
            try:
605
                self.progress_bar_gen.next()
606
            except:
607
                pass
608

    
609
    def _complete_cb(self):
610
        while True:
611
            try:
612
                self.progress_bar_gen.next()
613
            except:
614
                break
615

    
616
    def get_object_hashmap(self, obj,
617
        version=None,
618
        if_match=None,
619
        if_none_match=None,
620
        if_modified_since=None,
621
        if_unmodified_since=None,
622
        data_range=None):
623
        """
624
        :param obj: (str) remote object path
625

626
        :param if_match: (str)
627

628
        :param if_none_match: (str)
629

630
        :param if_modified_since: (str) formated date
631

632
        :param if_unmodified_since: (str) formated date
633

634
        :param data_range: (str) from-to where from and to are integers
635
            denoting file positions in bytes
636

637
        :returns: (list)
638
        """
639
        try:
640
            r = self.object_get(obj,
641
                hashmap=True,
642
                version=version,
643
                if_etag_match=if_match,
644
                if_etag_not_match=if_none_match,
645
                if_modified_since=if_modified_since,
646
                if_unmodified_since=if_unmodified_since,
647
                data_range=data_range)
648
        except ClientError as err:
649
            if err.status == 304 or err.status == 412:
650
                return {}
651
            raise
652
        return r.json
653

    
654
    def set_account_group(self, group, usernames):
655
        """
656
        :param group: (str)
657

658
        :param usernames: (list)
659
        """
660
        r = self.account_post(update=True, groups={group: usernames})
661
        r.release()
662

    
663
    def del_account_group(self, group):
664
        """
665
        :param group: (str)
666
        """
667
        r = self.account_post(update=True, groups={group: []})
668
        r.release()
669

    
670
    def get_account_info(self, until=None):
671
        """
672
        :param until: (str) formated date
673

674
        :returns: (dict)
675
        """
676
        r = self.account_head(until=until)
677
        if r.status_code == 401:
678
            raise ClientError("No authorization")
679
        return r.headers
680

    
681
    def get_account_quota(self):
682
        """
683
        :returns: (dict)
684
        """
685
        return filter_in(self.get_account_info(),
686
            'X-Account-Policy-Quota',
687
            exactMatch=True)
688

    
689
    def get_account_versioning(self):
690
        """
691
        :returns: (dict)
692
        """
693
        return filter_in(self.get_account_info(),
694
            'X-Account-Policy-Versioning',
695
            exactMatch=True)
696

    
697
    def get_account_meta(self, until=None):
698
        """
699
        :meta until: (str) formated date
700

701
        :returns: (dict)
702
        """
703
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
704

    
705
    def get_account_group(self):
706
        """
707
        :returns: (dict)
708
        """
709
        return filter_in(self.get_account_info(), 'X-Account-Group-')
710

    
711
    def set_account_meta(self, metapairs):
712
        """
713
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
714
        """
715
        assert(type(metapairs) is dict)
716
        r = self.account_post(update=True, metadata=metapairs)
717
        r.release()
718

    
719
    def del_account_meta(self, metakey):
720
        """
721
        :param metakey: (str) metadatum key
722
        """
723
        r = self.account_post(update=True, metadata={metakey: ''})
724
        r.release()
725

    
726
    def set_account_quota(self, quota):
727
        """
728
        :param quota: (int)
729
        """
730
        r = self.account_post(update=True, quota=quota)
731
        r.release()
732

    
733
    def set_account_versioning(self, versioning):
734
        """
735
        "param versioning: (str)
736
        """
737
        r = self.account_post(update=True, versioning=versioning)
738
        r.release()
739

    
740
    def list_containers(self):
741
        """
742
        :returns: (dict)
743
        """
744
        r = self.account_get()
745
        return r.json
746

    
747
    def del_container(self, until=None, delimiter=None):
748
        """
749
        :param until: (str) formated date
750

751
        :param delimiter: (str) with / empty container
752

753
        :raises ClientError: 404 Container does not exist
754

755
        :raises ClientError: 409 Container is not empty
756
        """
757
        self._assert_container()
758
        r = self.container_delete(until=until,
759
            delimiter=delimiter,
760
            success=(204, 404, 409))
761
        r.release()
762
        if r.status_code == 404:
763
            raise ClientError('Container "%s" does not exist' % self.container,
764
                r.status_code)
765
        elif r.status_code == 409:
766
            raise ClientError('Container "%s" is not empty' % self.container,
767
                r.status_code)
768

    
769
    def get_container_versioning(self, container):
770
        """
771
        :param container: (str)
772

773
        :returns: (dict)
774
        """
775
        self.container = container
776
        return filter_in(self.get_container_info(),
777
            'X-Container-Policy-Versioning')
778

    
779
    def get_container_quota(self, container):
780
        """
781
        :param container: (str)
782

783
        :returns: (dict)
784
        """
785
        self.container = container
786
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
787

    
788
    def get_container_info(self, until=None):
789
        """
790
        :param until: (str) formated date
791

792
        :returns: (dict)
793

794
        :raises ClientError: 404 Container not found
795
        """
796
        try:
797
            r = self.container_head(until=until)
798
        except ClientError as err:
799
            err.details.append('for container %s' % self.container)
800
            raise err
801
        return r.headers
802

    
803
    def get_container_meta(self, until=None):
804
        """
805
        :param until: (str) formated date
806

807
        :returns: (dict)
808
        """
809
        return filter_in(self.get_container_info(until=until),
810
            'X-Container-Meta')
811

    
812
    def get_container_object_meta(self, until=None):
813
        """
814
        :param until: (str) formated date
815

816
        :returns: (dict)
817
        """
818
        return filter_in(self.get_container_info(until=until),
819
            'X-Container-Object-Meta')
820

    
821
    def set_container_meta(self, metapairs):
822
        """
823
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
824
        """
825
        assert(type(metapairs) is dict)
826
        r = self.container_post(update=True, metadata=metapairs)
827
        r.release()
828

    
829
    def del_container_meta(self, metakey):
830
        """
831
        :param metakey: (str) metadatum key
832
        """
833
        r = self.container_post(update=True, metadata={metakey: ''})
834
        r.release()
835

    
836
    def set_container_quota(self, quota):
837
        """
838
        :param quota: (int)
839
        """
840
        r = self.container_post(update=True, quota=quota)
841
        r.release()
842

    
843
    def set_container_versioning(self, versioning):
844
        """
845
        :param versioning: (str)
846
        """
847
        r = self.container_post(update=True, versioning=versioning)
848
        r.release()
849

    
850
    def del_object(self, obj, until=None, delimiter=None):
851
        """
852
        :param obj: (str) remote object path
853

854
        :param until: (str) formated date
855

856
        :param delimiter: (str)
857
        """
858
        self._assert_container()
859
        r = self.object_delete(obj, until=until, delimiter=delimiter)
860
        r.release()
861

    
862
    def set_object_meta(self, obj, metapairs):
863
        """
864
        :param obj: (str) remote object path
865

866
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
867
        """
868
        assert(type(metapairs) is dict)
869
        r = self.object_post(obj, update=True, metadata=metapairs)
870
        r.release()
871

    
872
    def del_object_meta(self, obj, metakey):
873
        """
874
        :param obj: (str) remote object path
875

876
        :param metakey: (str) metadatum key
877
        """
878
        r = self.object_post(obj, update=True, metadata={metakey: ''})
879
        r.release()
880

    
881
    def publish_object(self, obj):
882
        """
883
        :param obj: (str) remote object path
884

885
        :returns: (str) access url
886
        """
887
        r = self.object_post(obj, update=True, public=True)
888
        r.release()
889
        info = self.get_object_info(obj)
890
        pref, sep, rest = self.base_url.partition('//')
891
        base = rest.split('/')[0]
892
        newurl = path4url('%s%s%s' % (pref, sep, base),
893
            info['x-object-public'])
894
        return newurl[1:]
895

    
896
    def unpublish_object(self, obj):
897
        """
898
        :param obj: (str) remote object path
899
        """
900
        r = self.object_post(obj, update=True, public=False)
901
        r.release()
902

    
903
    def get_object_info(self, obj, version=None):
904
        """
905
        :param obj: (str) remote object path
906

907
        :param version: (str)
908

909
        :returns: (dict)
910
        """
911
        try:
912
            r = self.object_head(obj, version=version)
913
            return r.headers
914
        except ClientError as ce:
915
            if ce.status == 404:
916
                raise ClientError('Object not found', status=404)
917
            raise
918

    
919
    def get_object_meta(self, obj, version=None):
920
        """
921
        :param obj: (str) remote object path
922

923
        :param version: (str)
924

925
        :returns: (dict)
926
        """
927
        return filter_in(self.get_object_info(obj, version=version),
928
            'X-Object-Meta')
929

    
930
    def get_object_sharing(self, obj):
931
        """
932
        :param obj: (str) remote object path
933

934
        :returns: (dict)
935
        """
936
        r = filter_in(self.get_object_info(obj),
937
            'X-Object-Sharing',
938
            exactMatch=True)
939
        reply = {}
940
        if len(r) > 0:
941
            perms = r['x-object-sharing'].split(';')
942
            for perm in perms:
943
                try:
944
                    perm.index('=')
945
                except ValueError:
946
                    raise ClientError('Incorrect reply format')
947
                (key, val) = perm.strip().split('=')
948
                reply[key] = val
949
        return reply
950

    
951
    def set_object_sharing(self, obj,
952
        read_permition=False,
953
        write_permition=False):
954
        """Give read/write permisions to an object.
955

956
        :param obj: (str) remote object path
957

958
        :param read_permition: (list - bool) users and user groups that get
959
            read permition for this object - False means all previous read
960
            permissions will be removed
961

962
        :param write_perimition: (list - bool) of users and user groups to get
963
           write permition for this object - False means all previous write
964
           permissions will be removed
965
        """
966

    
967
        perms = dict(read='' if not read_permition else read_permition,
968
            write='' if not write_permition else write_permition)
969
        r = self.object_post(obj, update=True, permissions=perms)
970
        r.release()
971

    
972
    def del_object_sharing(self, obj):
973
        """
974
        :param obj: (str) remote object path
975
        """
976
        self.set_object_sharing(obj)
977

    
978
    def append_object(self, obj, source_file, upload_cb=None):
979
        """
980
        :param obj: (str) remote object path
981

982
        :param source_file: open file descriptor
983

984
        :param upload_db: progress.bar for uploading
985
        """
986

    
987
        self._assert_container()
988
        meta = self.get_container_info()
989
        blocksize = int(meta['x-container-block-size'])
990
        filesize = fstat(source_file.fileno()).st_size
991
        nblocks = 1 + (filesize - 1) // blocksize
992
        offset = 0
993
        if upload_cb:
994
            upload_gen = upload_cb(nblocks)
995
            upload_gen.next()
996
        for i in range(nblocks):
997
            block = source_file.read(min(blocksize, filesize - offset))
998
            offset += len(block)
999
            r = self.object_post(obj,
1000
                update=True,
1001
                content_range='bytes */*',
1002
                content_type='application/octet-stream',
1003
                content_length=len(block),
1004
                data=block)
1005
            r.release()
1006

    
1007
            if upload_cb:
1008
                upload_gen.next()
1009

    
1010
    def truncate_object(self, obj, upto_bytes):
1011
        """
1012
        :param obj: (str) remote object path
1013

1014
        :param upto_bytes: max number of bytes to leave on file
1015
        """
1016
        r = self.object_post(obj,
1017
            update=True,
1018
            content_range='bytes 0-%s/*' % upto_bytes,
1019
            content_type='application/octet-stream',
1020
            object_bytes=upto_bytes,
1021
            source_object=path4url(self.container, obj))
1022
        r.release()
1023

    
1024
    def overwrite_object(self,
1025
        obj,
1026
        start,
1027
        end,
1028
        source_file,
1029
        upload_cb=None):
1030
        """Overwrite a part of an object from local source file
1031

1032
        :param obj: (str) remote object path
1033

1034
        :param start: (int) position in bytes to start overwriting from
1035

1036
        :param end: (int) position in bytes to stop overwriting at
1037

1038
        :param source_file: open file descriptor
1039

1040
        :param upload_db: progress.bar for uploading
1041
        """
1042

    
1043
        r = self.get_object_info(obj)
1044
        rf_size = int(r['content-length'])
1045
        if rf_size < int(start):
1046
            raise ClientError(
1047
                'Range start exceeds file size',
1048
                status=416)
1049
        elif rf_size < int(end):
1050
            raise ClientError(
1051
                'Range end exceeds file size',
1052
                status=416)
1053
        self._assert_container()
1054
        meta = self.get_container_info()
1055
        blocksize = int(meta['x-container-block-size'])
1056
        filesize = fstat(source_file.fileno()).st_size
1057
        datasize = int(end) - int(start) + 1
1058
        nblocks = 1 + (datasize - 1) // blocksize
1059
        offset = 0
1060
        if upload_cb:
1061
            upload_gen = upload_cb(nblocks)
1062
            upload_gen.next()
1063
        for i in range(nblocks):
1064
            block = source_file.read(min(blocksize,
1065
                filesize - offset,
1066
                datasize - offset))
1067
            r = self.object_post(obj,
1068
                update=True,
1069
                content_type='application/octet-stream',
1070
                content_length=len(block),
1071
                content_range='bytes %s-%s/*' % (
1072
                    start + offset,
1073
                    start + offset + len(block) - 1),
1074
                data=block)
1075
            offset += len(block)
1076
            r.release()
1077

    
1078
            if upload_cb:
1079
                upload_gen.next()
1080

    
1081
    def copy_object(self, src_container, src_object, dst_container,
1082
        dst_object=False,
1083
        source_version=None,
1084
        public=False,
1085
        content_type=None,
1086
        delimiter=None):
1087
        """
1088
        :param src_container: (str) source container
1089

1090
        :param src_object: (str) source object path
1091

1092
        :param dst_container: (str) destination container
1093

1094
        :param dst_object: (str) destination object path
1095

1096
        :param source_version: (str) source object version
1097

1098
        :param public: (bool)
1099

1100
        :param content_type: (str)
1101

1102
        :param delimiter: (str)
1103
        """
1104
        self._assert_account()
1105
        self.container = dst_container
1106
        dst_object = dst_object or src_object
1107
        src_path = path4url(src_container, src_object)
1108
        r = self.object_put(dst_object,
1109
            success=201,
1110
            copy_from=src_path,
1111
            content_length=0,
1112
            source_version=source_version,
1113
            public=public,
1114
            content_type=content_type,
1115
            delimiter=delimiter)
1116
        r.release()
1117

    
1118
    def move_object(self, src_container, src_object, dst_container,
1119
        dst_object=False,
1120
        source_version=None,
1121
        public=False,
1122
        content_type=None,
1123
        delimiter=None):
1124
        """
1125
        :param src_container: (str) source container
1126

1127
        :param src_object: (str) source object path
1128

1129
        :param dst_container: (str) destination container
1130

1131
        :param dst_object: (str) destination object path
1132

1133
        :param source_version: (str) source object version
1134

1135
        :param public: (bool)
1136

1137
        :param content_type: (str)
1138

1139
        :param delimiter: (str)
1140
        """
1141
        self._assert_account()
1142
        self.container = dst_container
1143
        dst_object = dst_object or src_object
1144
        src_path = path4url(src_container, src_object)
1145
        r = self.object_put(dst_object,
1146
            success=201,
1147
            move_from=src_path,
1148
            content_length=0,
1149
            source_version=source_version,
1150
            public=public,
1151
            content_type=content_type,
1152
            delimiter=delimiter)
1153
        r.release()
1154

    
1155
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1156
        """Get accounts that share with self.account
1157

1158
        :param limit: (str)
1159

1160
        :param marker: (str)
1161

1162
        :returns: (dict)
1163
        """
1164
        self._assert_account()
1165

    
1166
        self.set_param('format', 'json')
1167
        self.set_param('limit', limit, iff=limit is not None)
1168
        self.set_param('marker', marker, iff=marker is not None)
1169

    
1170
        path = ''
1171
        success = kwargs.pop('success', (200, 204))
1172
        r = self.get(path, *args, success=success, **kwargs)
1173
        return r.json
1174

    
1175
    def get_object_versionlist(self, obj):
1176
        """
1177
        :param obj: (str) remote object path
1178

1179
        :returns: (list)
1180
        """
1181
        self._assert_container()
1182
        r = self.object_get(obj, format='json', version='list')
1183
        return r.json['versions']