Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ b349b84b

History | View | Annotate | Download (36.5 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos.rest_api import PithosRestClient
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def purge_container(self, container=None):
75
        """Delete an empty container and destroy associated blocks
76
        """
77
        cnt_back_up = self.container
78
        try:
79
            self.container = container or cnt_back_up
80
            self.container_delete(until=unicode(time()))
81
        finally:
82
            self.container = cnt_back_up
83

    
84
    def upload_object_unchunked(
85
            self, obj, f,
86
            withHashFile=False,
87
            size=None,
88
            etag=None,
89
            content_encoding=None,
90
            content_disposition=None,
91
            content_type=None,
92
            sharing=None,
93
            public=None):
94
        """
95
        :param obj: (str) remote object path
96

97
        :param f: open file descriptor
98

99
        :param withHashFile: (bool)
100

101
        :param size: (int) size of data to upload
102

103
        :param etag: (str)
104

105
        :param content_encoding: (str)
106

107
        :param content_disposition: (str)
108

109
        :param content_type: (str)
110

111
        :param sharing: {'read':[user and/or grp names],
112
            'write':[usr and/or grp names]}
113

114
        :param public: (bool)
115
        """
116
        self._assert_container()
117

    
118
        if withHashFile:
119
            data = f.read()
120
            try:
121
                import json
122
                data = json.dumps(json.loads(data))
123
            except ValueError:
124
                raise ClientError('"%s" is not json-formated' % f.name, 1)
125
            except SyntaxError:
126
                msg = '"%s" is not a valid hashmap file' % f.name
127
                raise ClientError(msg, 1)
128
            f = StringIO(data)
129
        else:
130
            data = f.read(size) if size else f.read()
131
        self.object_put(
132
            obj,
133
            data=data,
134
            etag=etag,
135
            content_encoding=content_encoding,
136
            content_disposition=content_disposition,
137
            content_type=content_type,
138
            permissions=sharing,
139
            public=public,
140
            success=201)
141

    
142
    def create_object_by_manifestation(
143
            self, obj,
144
            etag=None,
145
            content_encoding=None,
146
            content_disposition=None,
147
            content_type=None,
148
            sharing=None,
149
            public=None):
150
        """
151
        :param obj: (str) remote object path
152

153
        :param etag: (str)
154

155
        :param content_encoding: (str)
156

157
        :param content_disposition: (str)
158

159
        :param content_type: (str)
160

161
        :param sharing: {'read':[user and/or grp names],
162
            'write':[usr and/or grp names]}
163

164
        :param public: (bool)
165
        """
166
        self._assert_container()
167
        self.object_put(
168
            obj,
169
            content_length=0,
170
            etag=etag,
171
            content_encoding=content_encoding,
172
            content_disposition=content_disposition,
173
            content_type=content_type,
174
            permissions=sharing,
175
            public=public,
176
            manifest='%s/%s' % (self.container, obj))
177

    
178
    # upload_* auxiliary methods
179
    def _put_block_async(self, data, hash, upload_gen=None):
180
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
181
        event.start()
182
        return event
183

    
184
    def _put_block(self, data, hash):
185
        r = self.container_post(
186
            update=True,
187
            content_type='application/octet-stream',
188
            content_length=len(data),
189
            data=data,
190
            format='json')
191
        assert r.json[0] == hash, 'Local hash does not match server'
192

    
193
    def _get_file_block_info(self, fileobj, size=None):
194
        meta = self.get_container_info()
195
        blocksize = int(meta['x-container-block-size'])
196
        blockhash = meta['x-container-block-hash']
197
        size = size if size is not None else fstat(fileobj.fileno()).st_size
198
        nblocks = 1 + (size - 1) // blocksize
199
        return (blocksize, blockhash, size, nblocks)
200

    
201
    def _create_or_get_missing_hashes(
202
            self, obj, json,
203
            size=None,
204
            format='json',
205
            hashmap=True,
206
            content_type=None,
207
            if_etag_match=None,
208
            if_etag_not_match=None,
209
            content_encoding=None,
210
            content_disposition=None,
211
            permissions=None,
212
            public=None,
213
            success=(201, 409)):
214
        r = self.object_put(
215
            obj,
216
            format='json',
217
            hashmap=True,
218
            content_type=content_type,
219
            json=json,
220
            if_etag_match=if_etag_match,
221
            if_etag_not_match=if_etag_not_match,
222
            content_encoding=content_encoding,
223
            content_disposition=content_disposition,
224
            permissions=permissions,
225
            public=public,
226
            success=success)
227
        return None if r.status_code == 201 else r.json
228

    
229
    def _calculate_blocks_for_upload(
230
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
231
            hash_cb=None):
232
        offset = 0
233
        if hash_cb:
234
            hash_gen = hash_cb(nblocks)
235
            hash_gen.next()
236

    
237
        for i in range(nblocks):
238
            block = fileobj.read(min(blocksize, size - offset))
239
            bytes = len(block)
240
            hash = _pithos_hash(block, blockhash)
241
            hashes.append(hash)
242
            hmap[hash] = (offset, bytes)
243
            offset += bytes
244
            if hash_cb:
245
                hash_gen.next()
246
        msg = 'Failed to calculate uploaded blocks:'
247
        ' Offset and object size do not match'
248
        assert offset == size, msg
249

    
250
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
251
        """upload missing blocks asynchronously"""
252

    
253
        self._init_thread_limit()
254

    
255
        flying = []
256
        failures = []
257
        for hash in missing:
258
            offset, bytes = hmap[hash]
259
            fileobj.seek(offset)
260
            data = fileobj.read(bytes)
261
            r = self._put_block_async(data, hash, upload_gen)
262
            flying.append(r)
263
            unfinished = self._watch_thread_limit(flying)
264
            for thread in set(flying).difference(unfinished):
265
                if thread.exception:
266
                    failures.append(thread)
267
                    if isinstance(
268
                            thread.exception,
269
                            ClientError) and thread.exception.status == 502:
270
                        self.POOLSIZE = self._thread_limit
271
                elif thread.isAlive():
272
                    flying.append(thread)
273
                elif upload_gen:
274
                    try:
275
                        upload_gen.next()
276
                    except:
277
                        pass
278
            flying = unfinished
279

    
280
        for thread in flying:
281
            thread.join()
282
            if thread.exception:
283
                failures.append(thread)
284
            elif upload_gen:
285
                try:
286
                    upload_gen.next()
287
                except:
288
                    pass
289

    
290
        return [failure.kwargs['hash'] for failure in failures]
291

    
292
    def upload_object(
293
            self, obj, f,
294
            size=None,
295
            hash_cb=None,
296
            upload_cb=None,
297
            etag=None,
298
            if_etag_match=None,
299
            if_not_exist=None,
300
            content_encoding=None,
301
            content_disposition=None,
302
            content_type=None,
303
            sharing=None,
304
            public=None):
305
        """Upload an object using multiple connections (threads)
306

307
        :param obj: (str) remote object path
308

309
        :param f: open file descriptor (rb)
310

311
        :param hash_cb: optional progress.bar object for calculating hashes
312

313
        :param upload_cb: optional progress.bar object for uploading
314

315
        :param etag: (str)
316

317
        :param if_etag_match: (str) Push that value to if-match header at file
318
            creation
319

320
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
321
            it does not exist remotely, otherwise the operation will fail.
322
            Involves the case of an object with the same path is created while
323
            the object is being uploaded.
324

325
        :param content_encoding: (str)
326

327
        :param content_disposition: (str)
328

329
        :param content_type: (str)
330

331
        :param sharing: {'read':[user and/or grp names],
332
            'write':[usr and/or grp names]}
333

334
        :param public: (bool)
335
        """
336
        self._assert_container()
337

    
338
        #init
339
        block_info = (blocksize, blockhash, size, nblocks) =\
340
            self._get_file_block_info(f, size)
341
        (hashes, hmap, offset) = ([], {}, 0)
342
        if not content_type:
343
            content_type = 'application/octet-stream'
344

    
345
        self._calculate_blocks_for_upload(
346
            *block_info,
347
            hashes=hashes,
348
            hmap=hmap,
349
            fileobj=f,
350
            hash_cb=hash_cb)
351

    
352
        hashmap = dict(bytes=size, hashes=hashes)
353
        missing = self.create_or_get_missing_hashes(
354
            obj, hashmap,
355
            content_type=content_type,
356
            size=size,
357
            if_etag_match=if_etag_match,
358
            if_etag_not_match='*' if if_not_exist else None,
359
            content_encoding=content_encoding,
360
            content_disposition=content_disposition,
361
            permissions=sharing,
362
            public=public)
363

    
364
        if missing is None:
365
            return
366

    
367
        if upload_cb:
368
            upload_gen = upload_cb(len(missing))
369
            for i in range(len(missing), len(hashmap['hashes']) + 1):
370
                try:
371
                    upload_gen.next()
372
                except:
373
                    upload_gen = None
374
        else:
375
            upload_gen = None
376

    
377
        retries = 7
378
        try:
379
            while retries:
380
                sendlog.info('%s blocks missing' % len(missing))
381
                num_of_blocks = len(missing)
382
                missing = self._upload_missing_blocks(
383
                    missing,
384
                    hmap,
385
                    f,
386
                    upload_gen)
387
                if missing:
388
                    if num_of_blocks == len(missing):
389
                        retries -= 1
390
                    else:
391
                        num_of_blocks = len(missing)
392
                else:
393
                    break
394
            if missing:
395
                raise ClientError(
396
                    '%s blocks failed to upload' % len(missing),
397
                    status=800)
398
        except KeyboardInterrupt:
399
            sendlog.info('- - - wait for threads to finish')
400
            for thread in activethreads():
401
                thread.join()
402
            raise
403

    
404
        self.object_put(
405
            obj,
406
            format='json',
407
            hashmap=True,
408
            content_type=content_type,
409
            if_etag_match=if_etag_match,
410
            if_etag_not_match='*' if if_not_exist else None,
411
            etag=etag,
412
            json=hashmap,
413
            permissions=sharing,
414
            public=public,
415
            success=201)
416

    
417
    # download_* auxiliary methods
418
    def _get_remote_blocks_info(self, obj, **restargs):
419
        #retrieve object hashmap
420
        myrange = restargs.pop('data_range', None)
421
        hashmap = self.get_object_hashmap(obj, **restargs)
422
        restargs['data_range'] = myrange
423
        blocksize = int(hashmap['block_size'])
424
        blockhash = hashmap['block_hash']
425
        total_size = hashmap['bytes']
426
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
427
        map_dict = {}
428
        for i, h in enumerate(hashmap['hashes']):
429
            #  map_dict[h] = i   CHAGE
430
            if h in map_dict:
431
                map_dict[h].append(i)
432
            else:
433
                map_dict[h] = [i]
434
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
435

    
436
    def _dump_blocks_sync(
437
            self, obj, remote_hashes, blocksize, total_size, dst, range,
438
            **args):
439
        for blockid, blockhash in enumerate(remote_hashes):
440
            if blockhash:
441
                start = blocksize * blockid
442
                is_last = start + blocksize > total_size
443
                end = (total_size - 1) if is_last else (start + blocksize - 1)
444
                (start, end) = _range_up(start, end, range)
445
                args['data_range'] = 'bytes=%s-%s' % (start, end)
446
                r = self.object_get(obj, success=(200, 206), **args)
447
                self._cb_next()
448
                dst.write(r.content)
449
                dst.flush()
450

    
451
    def _get_block_async(self, obj, **args):
452
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
453
        event.start()
454
        return event
455

    
456
    def _hash_from_file(self, fp, start, size, blockhash):
457
        fp.seek(start)
458
        block = fp.read(size)
459
        h = newhashlib(blockhash)
460
        h.update(block.strip('\x00'))
461
        return hexlify(h.digest())
462

    
463
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
464
        """write the results of a greenleted rest call to a file
465

466
        :param offset: the offset of the file up to blocksize
467
        - e.g. if the range is 10-100, all blocks will be written to
468
        normal_position - 10
469
        """
470
        for i, (key, g) in enumerate(flying.items()):
471
            if g.isAlive():
472
                continue
473
            if g.exception:
474
                raise g.exception
475
            block = g.value.content
476
            for block_start in blockids[key]:
477
                local_file.seek(block_start + offset)
478
                local_file.write(block)
479
                self._cb_next()
480
            flying.pop(key)
481
            blockids.pop(key)
482
        local_file.flush()
483

    
484
    def _dump_blocks_async(
485
            self, obj, remote_hashes, blocksize, total_size, local_file,
486
            blockhash=None, resume=False, filerange=None, **restargs):
487
        file_size = fstat(local_file.fileno()).st_size if resume else 0
488
        flying = dict()
489
        blockid_dict = dict()
490
        offset = 0
491
        if filerange is not None:
492
            rstart = int(filerange.split('-')[0])
493
            offset = rstart if blocksize > rstart else rstart % blocksize
494

    
495
        self._init_thread_limit()
496
        for block_hash, blockids in remote_hashes.items():
497
            blockids = [blk * blocksize for blk in blockids]
498
            unsaved = [blk for blk in blockids if not (
499
                blk < file_size and block_hash == self._hash_from_file(
500
                        local_file, blk, blocksize, blockhash))]
501
            self._cb_next(len(blockids) - len(unsaved))
502
            if unsaved:
503
                key = unsaved[0]
504
                self._watch_thread_limit(flying.values())
505
                self._thread2file(
506
                    flying, blockid_dict, local_file, offset,
507
                    **restargs)
508
                end = total_size - 1 if key + blocksize > total_size\
509
                    else key + blocksize - 1
510
                start, end = _range_up(key, end, filerange)
511
                if start == end:
512
                    self._cb_next()
513
                    continue
514
                restargs['async_headers'] = {
515
                    'Range': 'bytes=%s-%s' % (start, end)}
516
                flying[key] = self._get_block_async(obj, **restargs)
517
                blockid_dict[key] = unsaved
518

    
519
        for thread in flying.values():
520
            thread.join()
521
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
522

    
523
    def download_object(
524
            self, obj, dst,
525
            download_cb=None,
526
            version=None,
527
            resume=False,
528
            range_str=None,
529
            if_match=None,
530
            if_none_match=None,
531
            if_modified_since=None,
532
            if_unmodified_since=None):
533
        """Download an object (multiple connections, random blocks)
534

535
        :param obj: (str) remote object path
536

537
        :param dst: open file descriptor (wb+)
538

539
        :param download_cb: optional progress.bar object for downloading
540

541
        :param version: (str) file version
542

543
        :param resume: (bool) if set, preserve already downloaded file parts
544

545
        :param range_str: (str) from, to are file positions (int) in bytes
546

547
        :param if_match: (str)
548

549
        :param if_none_match: (str)
550

551
        :param if_modified_since: (str) formated date
552

553
        :param if_unmodified_since: (str) formated date"""
554
        restargs = dict(
555
            version=version,
556
            data_range=None if range_str is None else 'bytes=%s' % range_str,
557
            if_match=if_match,
558
            if_none_match=if_none_match,
559
            if_modified_since=if_modified_since,
560
            if_unmodified_since=if_unmodified_since)
561

    
562
        (
563
            blocksize,
564
            blockhash,
565
            total_size,
566
            hash_list,
567
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
568
        assert total_size >= 0
569

    
570
        if download_cb:
571
            self.progress_bar_gen = download_cb(len(hash_list))
572
            self._cb_next()
573

    
574
        if dst.isatty():
575
            self._dump_blocks_sync(
576
                obj,
577
                hash_list,
578
                blocksize,
579
                total_size,
580
                dst,
581
                range_str,
582
                **restargs)
583
        else:
584
            self._dump_blocks_async(
585
                obj,
586
                remote_hashes,
587
                blocksize,
588
                total_size,
589
                dst,
590
                blockhash,
591
                resume,
592
                range_str,
593
                **restargs)
594
            if not range_str:
595
                dst.truncate(total_size)
596

    
597
        self._complete_cb()
598

    
599
    #Command Progress Bar method
600
    def _cb_next(self, step=1):
601
        if hasattr(self, 'progress_bar_gen'):
602
            try:
603
                for i in xrange(step):
604
                    self.progress_bar_gen.next()
605
            except:
606
                pass
607

    
608
    def _complete_cb(self):
609
        while True:
610
            try:
611
                self.progress_bar_gen.next()
612
            except:
613
                break
614

    
615
    def get_object_hashmap(
616
            self, obj,
617
            version=None,
618
            if_match=None,
619
            if_none_match=None,
620
            if_modified_since=None,
621
            if_unmodified_since=None,
622
            data_range=None):
623
        """
624
        :param obj: (str) remote object path
625

626
        :param if_match: (str)
627

628
        :param if_none_match: (str)
629

630
        :param if_modified_since: (str) formated date
631

632
        :param if_unmodified_since: (str) formated date
633

634
        :param data_range: (str) from-to where from and to are integers
635
            denoting file positions in bytes
636

637
        :returns: (list)
638
        """
639
        try:
640
            r = self.object_get(
641
                obj,
642
                hashmap=True,
643
                version=version,
644
                if_etag_match=if_match,
645
                if_etag_not_match=if_none_match,
646
                if_modified_since=if_modified_since,
647
                if_unmodified_since=if_unmodified_since,
648
                data_range=data_range)
649
        except ClientError as err:
650
            if err.status == 304 or err.status == 412:
651
                return {}
652
            raise
653
        return r.json
654

    
655
    def set_account_group(self, group, usernames):
656
        """
657
        :param group: (str)
658

659
        :param usernames: (list)
660
        """
661
        self.account_post(update=True, groups={group: usernames})
662

    
663
    def del_account_group(self, group):
664
        """
665
        :param group: (str)
666
        """
667
        self.account_post(update=True, groups={group: []})
668

    
669
    def get_account_info(self, until=None):
670
        """
671
        :param until: (str) formated date
672

673
        :returns: (dict)
674
        """
675
        r = self.account_head(until=until)
676
        if r.status_code == 401:
677
            raise ClientError("No authorization", status=401)
678
        return r.headers
679

    
680
    def get_account_quota(self):
681
        """
682
        :returns: (dict)
683
        """
684
        return filter_in(
685
            self.get_account_info(),
686
            'X-Account-Policy-Quota',
687
            exactMatch=True)
688

    
689
    def get_account_versioning(self):
690
        """
691
        :returns: (dict)
692
        """
693
        return filter_in(
694
            self.get_account_info(),
695
            'X-Account-Policy-Versioning',
696
            exactMatch=True)
697

    
698
    def get_account_meta(self, until=None):
699
        """
700
        :meta until: (str) formated date
701

702
        :returns: (dict)
703
        """
704
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
705

    
706
    def get_account_group(self):
707
        """
708
        :returns: (dict)
709
        """
710
        return filter_in(self.get_account_info(), 'X-Account-Group-')
711

    
712
    def set_account_meta(self, metapairs):
713
        """
714
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
715
        """
716
        assert(type(metapairs) is dict)
717
        self.account_post(update=True, metadata=metapairs)
718

    
719
    def del_account_meta(self, metakey):
720
        """
721
        :param metakey: (str) metadatum key
722
        """
723
        self.account_post(update=True, metadata={metakey: ''})
724

    
725
    """
726
    def set_account_quota(self, quota):
727
        ""
728
        :param quota: (int)
729
        ""
730
        self.account_post(update=True, quota=quota)
731
    """
732

    
733
    def set_account_versioning(self, versioning):
734
        """
735
        "param versioning: (str)
736
        """
737
        self.account_post(update=True, versioning=versioning)
738

    
739
    def list_containers(self):
740
        """
741
        :returns: (dict)
742
        """
743
        r = self.account_get()
744
        return r.json
745

    
746
    def del_container(self, until=None, delimiter=None):
747
        """
748
        :param until: (str) formated date
749

750
        :param delimiter: (str) with / empty container
751

752
        :raises ClientError: 404 Container does not exist
753

754
        :raises ClientError: 409 Container is not empty
755
        """
756
        self._assert_container()
757
        r = self.container_delete(
758
            until=until,
759
            delimiter=delimiter,
760
            success=(204, 404, 409))
761
        if r.status_code == 404:
762
            raise ClientError(
763
                'Container "%s" does not exist' % self.container,
764
                r.status_code)
765
        elif r.status_code == 409:
766
            raise ClientError(
767
                'Container "%s" is not empty' % self.container,
768
                r.status_code)
769

    
770
    def get_container_versioning(self, container=None):
771
        """
772
        :param container: (str)
773

774
        :returns: (dict)
775
        """
776
        cnt_back_up = self.container
777
        try:
778
            self.container = container or cnt_back_up
779
            return filter_in(
780
                self.get_container_info(),
781
                'X-Container-Policy-Versioning')
782
        finally:
783
            self.container = cnt_back_up
784

    
785
    def get_container_limit(self, container=None):
786
        """
787
        :param container: (str)
788

789
        :returns: (dict)
790
        """
791
        cnt_back_up = self.container
792
        try:
793
            self.container = container or cnt_back_up
794
            return filter_in(
795
                self.get_container_info(),
796
                'X-Container-Policy-Quota')
797
        finally:
798
            self.container = cnt_back_up
799

    
800
    def get_container_info(self, until=None):
801
        """
802
        :param until: (str) formated date
803

804
        :returns: (dict)
805

806
        :raises ClientError: 404 Container not found
807
        """
808
        try:
809
            r = self.container_head(until=until)
810
        except ClientError as err:
811
            err.details.append('for container %s' % self.container)
812
            raise err
813
        return r.headers
814

    
815
    def get_container_meta(self, until=None):
816
        """
817
        :param until: (str) formated date
818

819
        :returns: (dict)
820
        """
821
        return filter_in(
822
            self.get_container_info(until=until),
823
            'X-Container-Meta')
824

    
825
    def get_container_object_meta(self, until=None):
826
        """
827
        :param until: (str) formated date
828

829
        :returns: (dict)
830
        """
831
        return filter_in(
832
            self.get_container_info(until=until),
833
            'X-Container-Object-Meta')
834

    
835
    def set_container_meta(self, metapairs):
836
        """
837
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
838
        """
839
        assert(type(metapairs) is dict)
840
        self.container_post(update=True, metadata=metapairs)
841

    
842
    def del_container_meta(self, metakey):
843
        """
844
        :param metakey: (str) metadatum key
845
        """
846
        self.container_post(update=True, metadata={metakey: ''})
847

    
848
    def set_container_limit(self, limit):
849
        """
850
        :param limit: (int)
851
        """
852
        self.container_post(update=True, quota=limit)
853

    
854
    def set_container_versioning(self, versioning):
855
        """
856
        :param versioning: (str)
857
        """
858
        self.container_post(update=True, versioning=versioning)
859

    
860
    def del_object(self, obj, until=None, delimiter=None):
861
        """
862
        :param obj: (str) remote object path
863

864
        :param until: (str) formated date
865

866
        :param delimiter: (str)
867
        """
868
        self._assert_container()
869
        self.object_delete(obj, until=until, delimiter=delimiter)
870

    
871
    def set_object_meta(self, obj, metapairs):
872
        """
873
        :param obj: (str) remote object path
874

875
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
876
        """
877
        assert(type(metapairs) is dict)
878
        self.object_post(obj, update=True, metadata=metapairs)
879

    
880
    def del_object_meta(self, obj, metakey):
881
        """
882
        :param obj: (str) remote object path
883

884
        :param metakey: (str) metadatum key
885
        """
886
        self.object_post(obj, update=True, metadata={metakey: ''})
887

    
888
    def publish_object(self, obj):
889
        """
890
        :param obj: (str) remote object path
891

892
        :returns: (str) access url
893
        """
894
        self.object_post(obj, update=True, public=True)
895
        info = self.get_object_info(obj)
896
        pref, sep, rest = self.base_url.partition('//')
897
        base = rest.split('/')[0]
898
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
899

    
900
    def unpublish_object(self, obj):
901
        """
902
        :param obj: (str) remote object path
903
        """
904
        self.object_post(obj, update=True, public=False)
905

    
906
    def get_object_info(self, obj, version=None):
907
        """
908
        :param obj: (str) remote object path
909

910
        :param version: (str)
911

912
        :returns: (dict)
913
        """
914
        try:
915
            r = self.object_head(obj, version=version)
916
            return r.headers
917
        except ClientError as ce:
918
            if ce.status == 404:
919
                raise ClientError('Object %s not found' % obj, status=404)
920
            raise
921

    
922
    def get_object_meta(self, obj, version=None):
923
        """
924
        :param obj: (str) remote object path
925

926
        :param version: (str)
927

928
        :returns: (dict)
929
        """
930
        return filter_in(
931
            self.get_object_info(obj, version=version),
932
            'X-Object-Meta')
933

    
934
    def get_object_sharing(self, obj):
935
        """
936
        :param obj: (str) remote object path
937

938
        :returns: (dict)
939
        """
940
        r = filter_in(
941
            self.get_object_info(obj),
942
            'X-Object-Sharing',
943
            exactMatch=True)
944
        reply = {}
945
        if len(r) > 0:
946
            perms = r['x-object-sharing'].split(';')
947
            for perm in perms:
948
                try:
949
                    perm.index('=')
950
                except ValueError:
951
                    raise ClientError('Incorrect reply format')
952
                (key, val) = perm.strip().split('=')
953
                reply[key] = val
954
        return reply
955

    
956
    def set_object_sharing(
957
            self, obj,
958
            read_permition=False, write_permition=False):
959
        """Give read/write permisions to an object.
960

961
        :param obj: (str) remote object path
962

963
        :param read_permition: (list - bool) users and user groups that get
964
            read permition for this object - False means all previous read
965
            permissions will be removed
966

967
        :param write_perimition: (list - bool) of users and user groups to get
968
           write permition for this object - False means all previous write
969
           permissions will be removed
970
        """
971

    
972
        perms = dict(read=read_permition or '', write=write_permition or '')
973
        self.object_post(obj, update=True, permissions=perms)
974

    
975
    def del_object_sharing(self, obj):
976
        """
977
        :param obj: (str) remote object path
978
        """
979
        self.set_object_sharing(obj)
980

    
981
    def append_object(self, obj, source_file, upload_cb=None):
982
        """
983
        :param obj: (str) remote object path
984

985
        :param source_file: open file descriptor
986

987
        :param upload_db: progress.bar for uploading
988
        """
989

    
990
        self._assert_container()
991
        meta = self.get_container_info()
992
        blocksize = int(meta['x-container-block-size'])
993
        filesize = fstat(source_file.fileno()).st_size
994
        nblocks = 1 + (filesize - 1) // blocksize
995
        offset = 0
996
        if upload_cb:
997
            upload_gen = upload_cb(nblocks)
998
            upload_gen.next()
999
        for i in range(nblocks):
1000
            block = source_file.read(min(blocksize, filesize - offset))
1001
            offset += len(block)
1002
            self.object_post(
1003
                obj,
1004
                update=True,
1005
                content_range='bytes */*',
1006
                content_type='application/octet-stream',
1007
                content_length=len(block),
1008
                data=block)
1009

    
1010
            if upload_cb:
1011
                upload_gen.next()
1012

    
1013
    def truncate_object(self, obj, upto_bytes):
1014
        """
1015
        :param obj: (str) remote object path
1016

1017
        :param upto_bytes: max number of bytes to leave on file
1018
        """
1019
        self.object_post(
1020
            obj,
1021
            update=True,
1022
            content_range='bytes 0-%s/*' % upto_bytes,
1023
            content_type='application/octet-stream',
1024
            object_bytes=upto_bytes,
1025
            source_object=path4url(self.container, obj))
1026

    
1027
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1028
        """Overwrite a part of an object from local source file
1029

1030
        :param obj: (str) remote object path
1031

1032
        :param start: (int) position in bytes to start overwriting from
1033

1034
        :param end: (int) position in bytes to stop overwriting at
1035

1036
        :param source_file: open file descriptor
1037

1038
        :param upload_db: progress.bar for uploading
1039
        """
1040

    
1041
        r = self.get_object_info(obj)
1042
        rf_size = int(r['content-length'])
1043
        if rf_size < int(start):
1044
            raise ClientError(
1045
                'Range start exceeds file size',
1046
                status=416)
1047
        elif rf_size < int(end):
1048
            raise ClientError(
1049
                'Range end exceeds file size',
1050
                status=416)
1051
        self._assert_container()
1052
        meta = self.get_container_info()
1053
        blocksize = int(meta['x-container-block-size'])
1054
        filesize = fstat(source_file.fileno()).st_size
1055
        datasize = int(end) - int(start) + 1
1056
        nblocks = 1 + (datasize - 1) // blocksize
1057
        offset = 0
1058
        if upload_cb:
1059
            upload_gen = upload_cb(nblocks)
1060
            upload_gen.next()
1061
        for i in range(nblocks):
1062
            read_size = min(blocksize, filesize - offset, datasize - offset)
1063
            block = source_file.read(read_size)
1064
            self.object_post(
1065
                obj,
1066
                update=True,
1067
                content_type='application/octet-stream',
1068
                content_length=len(block),
1069
                content_range='bytes %s-%s/*' % (
1070
                    start + offset,
1071
                    start + offset + len(block) - 1),
1072
                data=block)
1073
            offset += len(block)
1074

    
1075
            if upload_cb:
1076
                upload_gen.next()
1077

    
1078
    def copy_object(
1079
            self, src_container, src_object, dst_container,
1080
            dst_object=None,
1081
            source_version=None,
1082
            source_account=None,
1083
            public=False,
1084
            content_type=None,
1085
            delimiter=None):
1086
        """
1087
        :param src_container: (str) source container
1088

1089
        :param src_object: (str) source object path
1090

1091
        :param dst_container: (str) destination container
1092

1093
        :param dst_object: (str) destination object path
1094

1095
        :param source_version: (str) source object version
1096

1097
        :param source_account: (str) account to copy from
1098

1099
        :param public: (bool)
1100

1101
        :param content_type: (str)
1102

1103
        :param delimiter: (str)
1104
        """
1105
        self._assert_account()
1106
        self.container = dst_container
1107
        src_path = path4url(src_container, src_object)
1108
        self.object_put(
1109
            dst_object or src_object,
1110
            success=201,
1111
            copy_from=src_path,
1112
            content_length=0,
1113
            source_version=source_version,
1114
            source_account=source_account,
1115
            public=public,
1116
            content_type=content_type,
1117
            delimiter=delimiter)
1118

    
1119
    def move_object(
1120
            self, src_container, src_object, dst_container,
1121
            dst_object=False,
1122
            source_account=None,
1123
            source_version=None,
1124
            public=False,
1125
            content_type=None,
1126
            delimiter=None):
1127
        """
1128
        :param src_container: (str) source container
1129

1130
        :param src_object: (str) source object path
1131

1132
        :param dst_container: (str) destination container
1133

1134
        :param dst_object: (str) destination object path
1135

1136
        :param source_account: (str) account to move from
1137

1138
        :param source_version: (str) source object version
1139

1140
        :param public: (bool)
1141

1142
        :param content_type: (str)
1143

1144
        :param delimiter: (str)
1145
        """
1146
        self._assert_account()
1147
        self.container = dst_container
1148
        dst_object = dst_object or src_object
1149
        src_path = path4url(src_container, src_object)
1150
        self.object_put(
1151
            dst_object,
1152
            success=201,
1153
            move_from=src_path,
1154
            content_length=0,
1155
            source_account=source_account,
1156
            source_version=source_version,
1157
            public=public,
1158
            content_type=content_type,
1159
            delimiter=delimiter)
1160

    
1161
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1162
        """Get accounts that share with self.account
1163

1164
        :param limit: (str)
1165

1166
        :param marker: (str)
1167

1168
        :returns: (dict)
1169
        """
1170
        self._assert_account()
1171

    
1172
        self.set_param('format', 'json')
1173
        self.set_param('limit', limit, iff=limit is not None)
1174
        self.set_param('marker', marker, iff=marker is not None)
1175

    
1176
        path = ''
1177
        success = kwargs.pop('success', (200, 204))
1178
        r = self.get(path, *args, success=success, **kwargs)
1179
        return r.json
1180

    
1181
    def get_object_versionlist(self, obj):
1182
        """
1183
        :param obj: (str) remote object path
1184

1185
        :returns: (list)
1186
        """
1187
        self._assert_container()
1188
        r = self.object_get(obj, format='json', version='list')
1189
        return r.json['versions']