Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 3e7d1e0e

History | View | Annotate | Download (34.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self):
77
        """Delete an empty container and destroy associated blocks
78
        """
79
        r = self.container_delete(until=unicode(time()))
80
        r.release()
81

    
82
    def upload_object_unchunked(self, obj, f,
83
        withHashFile=False,
84
        size=None,
85
        etag=None,
86
        content_encoding=None,
87
        content_disposition=None,
88
        content_type=None,
89
        sharing=None,
90
        public=None):
91
        """
92
        :param obj: (str) remote object path
93

94
        :param f: open file descriptor
95

96
        :param withHashFile: (bool)
97

98
        :param size: (int) size of data to upload
99

100
        :param etag: (str)
101

102
        :param content_encoding: (str)
103

104
        :param content_disposition: (str)
105

106
        :param content_type: (str)
107

108
        :param sharing: {'read':[user and/or grp names],
109
            'write':[usr and/or grp names]}
110

111
        :param public: (bool)
112
        """
113
        self._assert_container()
114

    
115
        if withHashFile:
116
            data = f.read()
117
            try:
118
                import json
119
                data = json.dumps(json.loads(data))
120
            except ValueError:
121
                raise ClientError(message='"%s" is not json-formated' % f.name,
122
                    status=1)
123
            except SyntaxError:
124
                raise ClientError(message='"%s" is not a valid hashmap file'\
125
                % f.name, status=1)
126
            f = StringIO(data)
127
        data = f.read(size) if size is not None else f.read()
128
        r = self.object_put(obj,
129
            data=data,
130
            etag=etag,
131
            content_encoding=content_encoding,
132
            content_disposition=content_disposition,
133
            content_type=content_type,
134
            permissions=sharing,
135
            public=public,
136
            success=201)
137
        r.release()
138

    
139
    def create_object_by_manifestation(self, obj,
140
        etag=None,
141
        content_encoding=None,
142
        content_disposition=None,
143
        content_type=None,
144
        sharing=None,
145
        public=None):
146
        """
147
        :param obj: (str) remote object path
148

149
        :param etag: (str)
150

151
        :param content_encoding: (str)
152

153
        :param content_disposition: (str)
154

155
        :param content_type: (str)
156

157
        :param sharing: {'read':[user and/or grp names],
158
            'write':[usr and/or grp names]}
159

160
        :param public: (bool)
161
        """
162
        self._assert_container()
163
        r = self.object_put(obj,
164
            content_length=0,
165
            etag=etag,
166
            content_encoding=content_encoding,
167
            content_disposition=content_disposition,
168
            content_type=content_type,
169
            permissions=sharing,
170
            public=public,
171
            manifest='%s/%s' % (self.container, obj))
172
        r.release()
173

    
174
    # upload_* auxiliary methods
175
    def _put_block_async(self, data, hash, upload_gen=None):
176
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
177
        event.start()
178
        return event
179

    
180
    def _put_block(self, data, hash):
181
        #from random import randint
182
        #if not randint(0, 3):
183
        #    raise ClientError('BAD GATEWAY STUFF', 503)
184
        r = self.container_post(update=True,
185
            content_type='application/octet-stream',
186
            content_length=len(data),
187
            data=data,
188
            format='json')
189
        assert r.json[0] == hash, 'Local hash does not match server'
190

    
191
    def _get_file_block_info(self, fileobj, size=None):
192
        meta = self.get_container_info()
193
        blocksize = int(meta['x-container-block-size'])
194
        blockhash = meta['x-container-block-hash']
195
        size = size if size is not None else fstat(fileobj.fileno()).st_size
196
        nblocks = 1 + (size - 1) // blocksize
197
        return (blocksize, blockhash, size, nblocks)
198

    
199
    def _get_missing_hashes(self, obj, json,
200
        size=None,
201
        format='json',
202
        hashmap=True,
203
        content_type=None,
204
        etag=None,
205
        content_encoding=None,
206
        content_disposition=None,
207
        permissions=None,
208
        public=None,
209
        success=(201, 409)):
210
        r = self.object_put(obj,
211
            format='json',
212
            hashmap=True,
213
            content_type=content_type,
214
            json=json,
215
            etag=etag,
216
            content_encoding=content_encoding,
217
            content_disposition=content_disposition,
218
            permissions=permissions,
219
            public=public,
220
            success=success)
221
        if r.status_code == 201:
222
            r.release()
223
            return None
224
        return r.json
225

    
226
    def _caclulate_uploaded_blocks(self,
227
        blocksize,
228
        blockhash,
229
        size,
230
        nblocks,
231
        hashes,
232
        hmap,
233
        fileobj,
234
        hash_cb=None):
235
        offset = 0
236
        if hash_cb:
237
            hash_gen = hash_cb(nblocks)
238
            hash_gen.next()
239

    
240
        for i in range(nblocks):
241
            block = fileobj.read(min(blocksize, size - offset))
242
            bytes = len(block)
243
            hash = _pithos_hash(block, blockhash)
244
            hashes.append(hash)
245
            hmap[hash] = (offset, bytes)
246
            offset += bytes
247
            if hash_cb:
248
                hash_gen.next()
249
        if offset != size:
250
            assert offset == size, \
251
                   "Failed to calculate uploaded blocks: " \
252
                    "Offset and object size do not match"
253

    
254
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
255
        """upload missing blocks asynchronously"""
256

    
257
        self._init_thread_limit()
258

    
259
        flying = []
260
        failures = []
261
        for hash in missing:
262
            offset, bytes = hmap[hash]
263
            fileobj.seek(offset)
264
            data = fileobj.read(bytes)
265
            r = self._put_block_async(data, hash, upload_gen)
266
            flying.append(r)
267
            unfinished = []
268
            for thread in flying:
269
                unfinished = self._watch_thread_limit(unfinished)
270
                if upload_gen:
271
                    for i in range(len(unfinished), len(flying)):
272
                        try:
273
                            upload_gen.next()
274
                        except:
275
                            pass
276
                if thread.exception:
277
                    failures.append(thread)
278
                    if isinstance(thread.exception, ClientError)\
279
                    and thread.exception.status == 502:
280
                        self.POOLSIZE = self._thread_limit
281
                elif thread.isAlive():
282
                    unfinished.append(thread)
283
                elif upload_gen:
284
                    try:
285
                        upload_gen.next()
286
                    except:
287
                        pass
288
            flying = unfinished
289

    
290
        for thread in flying:
291
            thread.join()
292
            if thread.exception:
293
                failures.append(thread)
294
            elif upload_gen:
295
                try:
296
                    upload_gen.next()
297
                except:
298
                    pass
299

    
300
        return [failure.kwargs['hash'] for failure in failures]
301

    
302
    def upload_object(self, obj, f,
303
        size=None,
304
        hash_cb=None,
305
        upload_cb=None,
306
        etag=None,
307
        content_encoding=None,
308
        content_disposition=None,
309
        content_type=None,
310
        sharing=None,
311
        public=None):
312
        """Upload an object using multiple connections (threads)
313

314
        :param obj: (str) remote object path
315

316
        :param f: open file descriptor (rb)
317

318
        :param hash_cb: optional progress.bar object for calculating hashes
319

320
        :param upload_cb: optional progress.bar object for uploading
321

322
        :param etag: (str)
323

324
        :param content_encoding: (str)
325

326
        :param content_disposition: (str)
327

328
        :param content_type: (str)
329

330
        :param sharing: {'read':[user and/or grp names],
331
            'write':[usr and/or grp names]}
332

333
        :param public: (bool)
334
        """
335
        self._assert_container()
336

    
337
        #init
338
        block_info = (blocksize, blockhash, size, nblocks) =\
339
            self._get_file_block_info(f, size)
340
        (hashes, hmap, offset) = ([], {}, 0)
341
        if content_type is None:
342
            content_type = 'application/octet-stream'
343

    
344
        self._caclulate_uploaded_blocks(*block_info,
345
            hashes=hashes,
346
            hmap=hmap,
347
            fileobj=f,
348
            hash_cb=hash_cb)
349

    
350
        hashmap = dict(bytes=size, hashes=hashes)
351
        missing = self._get_missing_hashes(obj, hashmap,
352
            content_type=content_type,
353
            size=size,
354
            etag=etag,
355
            content_encoding=content_encoding,
356
            content_disposition=content_disposition,
357
            permissions=sharing,
358
            public=public)
359

    
360
        if missing is None:
361
            return
362

    
363
        if upload_cb:
364
            upload_gen = upload_cb(len(missing))
365
            for i in range(len(missing), len(hashmap)):
366
                upload_gen.next()
367
        else:
368
            upload_gen = None
369

    
370
        retries = 3
371
        try:
372
            while retries:
373
                num_of_blocks = len(missing)
374
                missing = self._upload_missing_blocks(
375
                    missing,
376
                    hmap,
377
                    f,
378
                    upload_gen)
379
                if missing:
380
                    if num_of_blocks == len(missing):
381
                        retries -= 1
382
                    else:
383
                        num_of_blocks = len(missing)
384
                else:
385
                    break
386
            if missing:
387
                raise ClientError(
388
                    '%s blocks failed to upload' % len(missing),
389
                    status=800)
390
        except KeyboardInterrupt:
391
            sendlog.info('- - - wait for threads to finish')
392
            for thread in activethreads():
393
                thread.join()
394
            raise
395

    
396
        r = self.object_put(
397
            obj,
398
            format='json',
399
            hashmap=True,
400
            content_type=content_type,
401
            json=hashmap,
402
            success=201)
403
        r.release()
404

    
405
    # download_* auxiliary methods
406
    def _get_remote_blocks_info(self, obj, **restargs):
407
        #retrieve object hashmap
408
        myrange = restargs.pop('data_range', None)
409
        hashmap = self.get_object_hashmap(obj, **restargs)
410
        restargs['data_range'] = myrange
411
        blocksize = int(hashmap['block_size'])
412
        blockhash = hashmap['block_hash']
413
        total_size = hashmap['bytes']
414
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
415
        map_dict = {}
416
        for i, h in enumerate(hashmap['hashes']):
417
            map_dict[h] = i
418
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
419

    
420
    def _dump_blocks_sync(self,
421
        obj,
422
        remote_hashes,
423
        blocksize,
424
        total_size,
425
        dst,
426
        range,
427
        **restargs):
428
        for blockid, blockhash in enumerate(remote_hashes):
429
            if blockhash == None:
430
                continue
431
            start = blocksize * blockid
432
            end = total_size - 1 if start + blocksize > total_size\
433
                else start + blocksize - 1
434
            (start, end) = _range_up(start, end, range)
435
            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
436
            r = self.object_get(obj, success=(200, 206), **restargs)
437
            self._cb_next()
438
            dst.write(r.content)
439
            dst.flush()
440

    
441
    def _get_block_async(self, obj, **restargs):
442
        event = SilentEvent(self.object_get,
443
            obj,
444
            success=(200, 206),
445
            **restargs)
446
        event.start()
447
        return event
448

    
449
    def _hash_from_file(self, fp, start, size, blockhash):
450
        fp.seek(start)
451
        block = fp.read(size)
452
        h = newhashlib(blockhash)
453
        h.update(block.strip('\x00'))
454
        return hexlify(h.digest())
455

    
456
    def _thread2file(self,
457
        flying,
458
        local_file,
459
        offset=0,
460
        **restargs):
461
        """write the results of a greenleted rest call to a file
462
        @offset: the offset of the file up to blocksize
463
            - e.g. if the range is 10-100, all
464
        blocks will be written to normal_position - 10"""
465
        finished = []
466
        for i, (start, g) in enumerate(flying.items()):
467
            if not g.isAlive():
468
                if g.exception:
469
                    raise g.exception
470
                block = g.value.content
471
                local_file.seek(start - offset)
472
                local_file.write(block)
473
                self._cb_next()
474
                finished.append(flying.pop(start))
475
        local_file.flush()
476
        return finished
477

    
478
    def _dump_blocks_async(self,
479
        obj,
480
        remote_hashes,
481
        blocksize,
482
        total_size,
483
        local_file,
484
        blockhash=None,
485
        resume=False,
486
        filerange=None,
487
        **restargs):
488

    
489
        file_size = fstat(local_file.fileno()).st_size if resume else 0
490
        flying = {}
491
        finished = []
492
        offset = 0
493
        if filerange is not None:
494
            rstart = int(filerange.split('-')[0])
495
            offset = rstart if blocksize > rstart else rstart % blocksize
496

    
497
        self._init_thread_limit()
498
        for block_hash, blockid in remote_hashes.items():
499
            start = blocksize * blockid
500
            if start < file_size\
501
            and block_hash == self._hash_from_file(
502
                    local_file,
503
                    start,
504
                    blocksize,
505
                    blockhash):
506
                self._cb_next()
507
                continue
508
            self._watch_thread_limit(flying.values())
509
            finished += self._thread2file(
510
                flying,
511
                local_file,
512
                offset,
513
                **restargs)
514
            end = total_size - 1 if start + blocksize > total_size\
515
                else start + blocksize - 1
516
            (start, end) = _range_up(start, end, filerange)
517
            if start == end:
518
                self._cb_next()
519
                continue
520
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
521
            flying[start] = self._get_block_async(obj, **restargs)
522

    
523
        for thread in flying.values():
524
            thread.join()
525
        finished += self._thread2file(flying, local_file, offset, **restargs)
526

    
527
    def download_object(self,
528
        obj,
529
        dst,
530
        download_cb=None,
531
        version=None,
532
        resume=False,
533
        range=None,
534
        if_match=None,
535
        if_none_match=None,
536
        if_modified_since=None,
537
        if_unmodified_since=None):
538
        """Download an object using multiple connections (threads) and
539
            writing to random parts of the file
540

541
        :param obj: (str) remote object path
542

543
        :param dst: open file descriptor (wb+)
544

545
        :param download_cb: optional progress.bar object for downloading
546

547
        :param version: (str) file version
548

549
        :param resume: (bool) if set, preserve already downloaded file parts
550

551
        :param range: (str) from-to where from and to are integers denoting
552
            file positions in bytes
553

554
        :param if_match: (str)
555

556
        :param if_none_match: (str)
557

558
        :param if_modified_since: (str) formated date
559

560
        :param if_unmodified_since: (str) formated date
561
        """
562

    
563
        restargs = dict(version=version,
564
            data_range=None if range is None else 'bytes=%s' % range,
565
            if_match=if_match,
566
            if_none_match=if_none_match,
567
            if_modified_since=if_modified_since,
568
            if_unmodified_since=if_unmodified_since)
569

    
570
        (blocksize,
571
            blockhash,
572
            total_size,
573
            hash_list,
574
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
575
        assert total_size >= 0
576

    
577
        if download_cb:
578
            self.progress_bar_gen = download_cb(len(remote_hashes))
579
            self._cb_next()
580

    
581
        if dst.isatty():
582
            self._dump_blocks_sync(obj,
583
                hash_list,
584
                blocksize,
585
                total_size,
586
                dst,
587
                range,
588
                **restargs)
589
        else:
590
            self._dump_blocks_async(obj,
591
                remote_hashes,
592
                blocksize,
593
                total_size,
594
                dst,
595
                blockhash,
596
                resume,
597
                range,
598
                **restargs)
599
            if range is None:
600
                dst.truncate(total_size)
601

    
602
        self._complete_cb()
603

    
604
    #Command Progress Bar method
605
    def _cb_next(self):
606
        if hasattr(self, 'progress_bar_gen'):
607
            try:
608
                self.progress_bar_gen.next()
609
            except:
610
                pass
611

    
612
    def _complete_cb(self):
613
        while True:
614
            try:
615
                self.progress_bar_gen.next()
616
            except:
617
                break
618

    
619
    def get_object_hashmap(self, obj,
620
        version=None,
621
        if_match=None,
622
        if_none_match=None,
623
        if_modified_since=None,
624
        if_unmodified_since=None,
625
        data_range=None):
626
        """
627
        :param obj: (str) remote object path
628

629
        :param if_match: (str)
630

631
        :param if_none_match: (str)
632

633
        :param if_modified_since: (str) formated date
634

635
        :param if_unmodified_since: (str) formated date
636

637
        :param data_range: (str) from-to where from and to are integers
638
            denoting file positions in bytes
639

640
        :returns: (list)
641
        """
642
        try:
643
            r = self.object_get(obj,
644
                hashmap=True,
645
                version=version,
646
                if_etag_match=if_match,
647
                if_etag_not_match=if_none_match,
648
                if_modified_since=if_modified_since,
649
                if_unmodified_since=if_unmodified_since,
650
                data_range=data_range)
651
        except ClientError as err:
652
            if err.status == 304 or err.status == 412:
653
                return {}
654
            raise
655
        return r.json
656

    
657
    def set_account_group(self, group, usernames):
658
        """
659
        :param group: (str)
660

661
        :param usernames: (list)
662
        """
663
        r = self.account_post(update=True, groups={group: usernames})
664
        r.release()
665

    
666
    def del_account_group(self, group):
667
        """
668
        :param group: (str)
669
        """
670
        r = self.account_post(update=True, groups={group: []})
671
        r.release()
672

    
673
    def get_account_info(self, until=None):
674
        """
675
        :param until: (str) formated date
676

677
        :returns: (dict)
678
        """
679
        r = self.account_head(until=until)
680
        if r.status_code == 401:
681
            raise ClientError("No authorization")
682
        return r.headers
683

    
684
    def get_account_quota(self):
685
        """
686
        :returns: (dict)
687
        """
688
        return filter_in(self.get_account_info(),
689
            'X-Account-Policy-Quota',
690
            exactMatch=True)
691

    
692
    def get_account_versioning(self):
693
        """
694
        :returns: (dict)
695
        """
696
        return filter_in(self.get_account_info(),
697
            'X-Account-Policy-Versioning',
698
            exactMatch=True)
699

    
700
    def get_account_meta(self, until=None):
701
        """
702
        :meta until: (str) formated date
703

704
        :returns: (dict)
705
        """
706
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
707

    
708
    def get_account_group(self):
709
        """
710
        :returns: (dict)
711
        """
712
        return filter_in(self.get_account_info(), 'X-Account-Group-')
713

    
714
    def set_account_meta(self, metapairs):
715
        """
716
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
717
        """
718
        assert(type(metapairs) is dict)
719
        r = self.account_post(update=True, metadata=metapairs)
720
        r.release()
721

    
722
    def del_account_meta(self, metakey):
723
        """
724
        :param metakey: (str) metadatum key
725
        """
726
        r = self.account_post(update=True, metadata={metakey: ''})
727
        r.release()
728

    
729
    def set_account_quota(self, quota):
730
        """
731
        :param quota: (int)
732
        """
733
        r = self.account_post(update=True, quota=quota)
734
        r.release()
735

    
736
    def set_account_versioning(self, versioning):
737
        """
738
        "param versioning: (str)
739
        """
740
        r = self.account_post(update=True, versioning=versioning)
741
        r.release()
742

    
743
    def list_containers(self):
744
        """
745
        :returns: (dict)
746
        """
747
        r = self.account_get()
748
        return r.json
749

    
750
    def del_container(self, until=None, delimiter=None):
751
        """
752
        :param until: (str) formated date
753

754
        :param delimiter: (str) with / empty container
755

756
        :raises ClientError: 404 Container does not exist
757

758
        :raises ClientError: 409 Container is not empty
759
        """
760
        self._assert_container()
761
        r = self.container_delete(until=until,
762
            delimiter=delimiter,
763
            success=(204, 404, 409))
764
        r.release()
765
        if r.status_code == 404:
766
            raise ClientError('Container "%s" does not exist' % self.container,
767
                r.status_code)
768
        elif r.status_code == 409:
769
            raise ClientError('Container "%s" is not empty' % self.container,
770
                r.status_code)
771

    
772
    def get_container_versioning(self, container):
773
        """
774
        :param container: (str)
775

776
        :returns: (dict)
777
        """
778
        self.container = container
779
        return filter_in(self.get_container_info(),
780
            'X-Container-Policy-Versioning')
781

    
782
    def get_container_quota(self, container):
783
        """
784
        :param container: (str)
785

786
        :returns: (dict)
787
        """
788
        self.container = container
789
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
790

    
791
    def get_container_info(self, until=None):
792
        """
793
        :param until: (str) formated date
794

795
        :returns: (dict)
796

797
        :raises ClientError: 404 Container not found
798
        """
799
        try:
800
            r = self.container_head(until=until)
801
        except ClientError as err:
802
            err.details.append('for container %s' % self.container)
803
            raise err
804
        return r.headers
805

    
806
    def get_container_meta(self, until=None):
807
        """
808
        :param until: (str) formated date
809

810
        :returns: (dict)
811
        """
812
        return filter_in(self.get_container_info(until=until),
813
            'X-Container-Meta')
814

    
815
    def get_container_object_meta(self, until=None):
816
        """
817
        :param until: (str) formated date
818

819
        :returns: (dict)
820
        """
821
        return filter_in(self.get_container_info(until=until),
822
            'X-Container-Object-Meta')
823

    
824
    def set_container_meta(self, metapairs):
825
        """
826
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
827
        """
828
        assert(type(metapairs) is dict)
829
        r = self.container_post(update=True, metadata=metapairs)
830
        r.release()
831

    
832
    def del_container_meta(self, metakey):
833
        """
834
        :param metakey: (str) metadatum key
835
        """
836
        r = self.container_post(update=True, metadata={metakey: ''})
837
        r.release()
838

    
839
    def set_container_quota(self, quota):
840
        """
841
        :param quota: (int)
842
        """
843
        r = self.container_post(update=True, quota=quota)
844
        r.release()
845

    
846
    def set_container_versioning(self, versioning):
847
        """
848
        :param versioning: (str)
849
        """
850
        r = self.container_post(update=True, versioning=versioning)
851
        r.release()
852

    
853
    def del_object(self, obj, until=None, delimiter=None):
854
        """
855
        :param obj: (str) remote object path
856

857
        :param until: (str) formated date
858

859
        :param delimiter: (str)
860
        """
861
        self._assert_container()
862
        r = self.object_delete(obj, until=until, delimiter=delimiter)
863
        r.release()
864

    
865
    def set_object_meta(self, obj, metapairs):
866
        """
867
        :param obj: (str) remote object path
868

869
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
870
        """
871
        assert(type(metapairs) is dict)
872
        r = self.object_post(obj, update=True, metadata=metapairs)
873
        r.release()
874

    
875
    def del_object_meta(self, obj, metakey):
876
        """
877
        :param obj: (str) remote object path
878

879
        :param metakey: (str) metadatum key
880
        """
881
        r = self.object_post(obj, update=True, metadata={metakey: ''})
882
        r.release()
883

    
884
    def publish_object(self, obj):
885
        """
886
        :param obj: (str) remote object path
887

888
        :returns: (str) access url
889
        """
890
        r = self.object_post(obj, update=True, public=True)
891
        r.release()
892
        info = self.get_object_info(obj)
893
        pref, sep, rest = self.base_url.partition('//')
894
        base = rest.split('/')[0]
895
        newurl = path4url('%s%s%s' % (pref, sep, base),
896
            info['x-object-public'])
897
        return newurl[1:]
898

    
899
    def unpublish_object(self, obj):
900
        """
901
        :param obj: (str) remote object path
902
        """
903
        r = self.object_post(obj, update=True, public=False)
904
        r.release()
905

    
906
    def get_object_info(self, obj, version=None):
907
        """
908
        :param obj: (str) remote object path
909

910
        :param version: (str)
911

912
        :returns: (dict)
913
        """
914
        r = self.object_head(obj, version=version)
915
        return r.headers
916

    
917
    def get_object_meta(self, obj, version=None):
918
        """
919
        :param obj: (str) remote object path
920

921
        :param version: (str)
922

923
        :returns: (dict)
924
        """
925
        return filter_in(self.get_object_info(obj, version=version),
926
            'X-Object-Meta')
927

    
928
    def get_object_sharing(self, obj):
929
        """
930
        :param obj: (str) remote object path
931

932
        :returns: (dict)
933
        """
934
        r = filter_in(self.get_object_info(obj),
935
            'X-Object-Sharing',
936
            exactMatch=True)
937
        reply = {}
938
        if len(r) > 0:
939
            perms = r['x-object-sharing'].split(';')
940
            for perm in perms:
941
                try:
942
                    perm.index('=')
943
                except ValueError:
944
                    raise ClientError('Incorrect reply format')
945
                (key, val) = perm.strip().split('=')
946
                reply[key] = val
947
        return reply
948

    
949
    def set_object_sharing(self, obj,
950
        read_permition=False,
951
        write_permition=False):
952
        """Give read/write permisions to an object.
953

954
        :param obj: (str) remote object path
955

956
        :param read_permition: (list - bool) users and user groups that get
957
            read permition for this object - False means all previous read
958
            permissions will be removed
959

960
        :param write_perimition: (list - bool) of users and user groups to get
961
           write permition for this object - False means all previous write
962
           permissions will be removed
963
        """
964

    
965
        perms = dict(read='' if not read_permition else read_permition,
966
            write='' if not write_permition else write_permition)
967
        r = self.object_post(obj, update=True, permissions=perms)
968
        r.release()
969

    
970
    def del_object_sharing(self, obj):
971
        """
972
        :param obj: (str) remote object path
973
        """
974
        self.set_object_sharing(obj)
975

    
976
    def append_object(self, obj, source_file, upload_cb=None):
977
        """
978
        :param obj: (str) remote object path
979

980
        :param source_file: open file descriptor
981

982
        :param upload_db: progress.bar for uploading
983
        """
984

    
985
        self._assert_container()
986
        meta = self.get_container_info()
987
        blocksize = int(meta['x-container-block-size'])
988
        filesize = fstat(source_file.fileno()).st_size
989
        nblocks = 1 + (filesize - 1) // blocksize
990
        offset = 0
991
        if upload_cb is not None:
992
            upload_gen = upload_cb(nblocks)
993
        for i in range(nblocks):
994
            block = source_file.read(min(blocksize, filesize - offset))
995
            offset += len(block)
996
            r = self.object_post(obj,
997
                update=True,
998
                content_range='bytes */*',
999
                content_type='application/octet-stream',
1000
                content_length=len(block),
1001
                data=block)
1002
            r.release()
1003

    
1004
            if upload_cb is not None:
1005
                upload_gen.next()
1006

    
1007
    def truncate_object(self, obj, upto_bytes):
1008
        """
1009
        :param obj: (str) remote object path
1010

1011
        :param upto_bytes: max number of bytes to leave on file
1012
        """
1013
        r = self.object_post(obj,
1014
            update=True,
1015
            content_range='bytes 0-%s/*' % upto_bytes,
1016
            content_type='application/octet-stream',
1017
            object_bytes=upto_bytes,
1018
            source_object=path4url(self.container, obj))
1019
        r.release()
1020

    
1021
    def overwrite_object(self,
1022
        obj,
1023
        start,
1024
        end,
1025
        source_file,
1026
        upload_cb=None):
1027
        """Overwrite a part of an object from local source file
1028

1029
        :param obj: (str) remote object path
1030

1031
        :param start: (int) position in bytes to start overwriting from
1032

1033
        :param end: (int) position in bytes to stop overwriting at
1034

1035
        :param source_file: open file descriptor
1036

1037
        :param upload_db: progress.bar for uploading
1038
        """
1039

    
1040
        self._assert_container()
1041
        meta = self.get_container_info()
1042
        blocksize = int(meta['x-container-block-size'])
1043
        filesize = fstat(source_file.fileno()).st_size
1044
        datasize = int(end) - int(start) + 1
1045
        nblocks = 1 + (datasize - 1) // blocksize
1046
        offset = 0
1047
        if upload_cb is not None:
1048
            upload_gen = upload_cb(nblocks)
1049
        for i in range(nblocks):
1050
            block = source_file.read(min(blocksize,
1051
                filesize - offset,
1052
                datasize - offset))
1053
            offset += len(block)
1054
            r = self.object_post(obj,
1055
                update=True,
1056
                content_type='application/octet-stream',
1057
                content_length=len(block),
1058
                content_range='bytes %s-%s/*' % (start, end),
1059
                data=block)
1060
            r.release()
1061

    
1062
            if upload_cb is not None:
1063
                upload_gen.next()
1064

    
1065
    def copy_object(self, src_container, src_object, dst_container,
1066
        dst_object=False,
1067
        source_version=None,
1068
        public=False,
1069
        content_type=None,
1070
        delimiter=None):
1071
        """
1072
        :param src_container: (str) source container
1073

1074
        :param src_object: (str) source object path
1075

1076
        :param dst_container: (str) destination container
1077

1078
        :param dst_object: (str) destination object path
1079

1080
        :param source_version: (str) source object version
1081

1082
        :param public: (bool)
1083

1084
        :param content_type: (str)
1085

1086
        :param delimiter: (str)
1087
        """
1088
        self._assert_account()
1089
        self.container = dst_container
1090
        dst_object = dst_object or src_object
1091
        src_path = path4url(src_container, src_object)
1092
        r = self.object_put(dst_object,
1093
            success=201,
1094
            copy_from=src_path,
1095
            content_length=0,
1096
            source_version=source_version,
1097
            public=public,
1098
            content_type=content_type,
1099
            delimiter=delimiter)
1100
        r.release()
1101

    
1102
    def move_object(self, src_container, src_object, dst_container,
1103
        dst_object=False,
1104
        source_version=None,
1105
        public=False,
1106
        content_type=None,
1107
        delimiter=None):
1108
        """
1109
        :param src_container: (str) source container
1110

1111
        :param src_object: (str) source object path
1112

1113
        :param dst_container: (str) destination container
1114

1115
        :param dst_object: (str) destination object path
1116

1117
        :param source_version: (str) source object version
1118

1119
        :param public: (bool)
1120

1121
        :param content_type: (str)
1122

1123
        :param delimiter: (str)
1124
        """
1125
        self._assert_account()
1126
        self.container = dst_container
1127
        dst_object = dst_object or src_object
1128
        src_path = path4url(src_container, src_object)
1129
        r = self.object_put(dst_object,
1130
            success=201,
1131
            move_from=src_path,
1132
            content_length=0,
1133
            source_version=source_version,
1134
            public=public,
1135
            content_type=content_type,
1136
            delimiter=delimiter)
1137
        r.release()
1138

    
1139
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1140
        """Get accounts that share with self.account
1141

1142
        :param limit: (str)
1143

1144
        :param marker: (str)
1145

1146
        :returns: (dict)
1147
        """
1148
        self._assert_account()
1149

    
1150
        self.set_param('format', 'json')
1151
        self.set_param('limit', limit, iff=limit is not None)
1152
        self.set_param('marker', marker, iff=marker is not None)
1153

    
1154
        path = ''
1155
        success = kwargs.pop('success', (200, 204))
1156
        r = self.get(path, *args, success=success, **kwargs)
1157
        return r.json
1158

    
1159
    def get_object_versionlist(self, obj):
1160
        """
1161
        :param obj: (str) remote object path
1162

1163
        :returns: (list)
1164
        """
1165
        self._assert_container()
1166
        r = self.object_get(obj, format='json', version='list')
1167
        return r.json['versions']