Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 3ed6dbde

History | View | Annotate | Download (36.2 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos.rest_api import PithosRestClient
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """GRNet Pithos API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def purge_container(self, container=None):
75
        """Delete an empty container and destroy associated blocks
76
        """
77
        cnt_back_up = self.container
78
        try:
79
            self.container = container or cnt_back_up
80
            self.container_delete(until=unicode(time()))
81
        finally:
82
            self.container = cnt_back_up
83

    
84
    def upload_object_unchunked(
85
            self, obj, f,
86
            withHashFile=False,
87
            size=None,
88
            etag=None,
89
            content_encoding=None,
90
            content_disposition=None,
91
            content_type=None,
92
            sharing=None,
93
            public=None):
94
        """
95
        :param obj: (str) remote object path
96

97
        :param f: open file descriptor
98

99
        :param withHashFile: (bool)
100

101
        :param size: (int) size of data to upload
102

103
        :param etag: (str)
104

105
        :param content_encoding: (str)
106

107
        :param content_disposition: (str)
108

109
        :param content_type: (str)
110

111
        :param sharing: {'read':[user and/or grp names],
112
            'write':[usr and/or grp names]}
113

114
        :param public: (bool)
115
        """
116
        self._assert_container()
117

    
118
        if withHashFile:
119
            data = f.read()
120
            try:
121
                import json
122
                data = json.dumps(json.loads(data))
123
            except ValueError:
124
                raise ClientError('"%s" is not json-formated' % f.name, 1)
125
            except SyntaxError:
126
                msg = '"%s" is not a valid hashmap file' % f.name
127
                raise ClientError(msg, 1)
128
            f = StringIO(data)
129
        else:
130
            data = f.read(size) if size else f.read()
131
        self.object_put(
132
            obj,
133
            data=data,
134
            etag=etag,
135
            content_encoding=content_encoding,
136
            content_disposition=content_disposition,
137
            content_type=content_type,
138
            permissions=sharing,
139
            public=public,
140
            success=201)
141

    
142
    def create_object_by_manifestation(
143
            self, obj,
144
            etag=None,
145
            content_encoding=None,
146
            content_disposition=None,
147
            content_type=None,
148
            sharing=None,
149
            public=None):
150
        """
151
        :param obj: (str) remote object path
152

153
        :param etag: (str)
154

155
        :param content_encoding: (str)
156

157
        :param content_disposition: (str)
158

159
        :param content_type: (str)
160

161
        :param sharing: {'read':[user and/or grp names],
162
            'write':[usr and/or grp names]}
163

164
        :param public: (bool)
165
        """
166
        self._assert_container()
167
        self.object_put(
168
            obj,
169
            content_length=0,
170
            etag=etag,
171
            content_encoding=content_encoding,
172
            content_disposition=content_disposition,
173
            content_type=content_type,
174
            permissions=sharing,
175
            public=public,
176
            manifest='%s/%s' % (self.container, obj))
177

    
178
    # upload_* auxiliary methods
179
    def _put_block_async(self, data, hash, upload_gen=None):
180
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
181
        event.start()
182
        return event
183

    
184
    def _put_block(self, data, hash):
185
        r = self.container_post(
186
            update=True,
187
            content_type='application/octet-stream',
188
            content_length=len(data),
189
            data=data,
190
            format='json')
191
        assert r.json[0] == hash, 'Local hash does not match server'
192

    
193
    def _get_file_block_info(self, fileobj, size=None):
194
        meta = self.get_container_info()
195
        blocksize = int(meta['x-container-block-size'])
196
        blockhash = meta['x-container-block-hash']
197
        size = size if size is not None else fstat(fileobj.fileno()).st_size
198
        nblocks = 1 + (size - 1) // blocksize
199
        return (blocksize, blockhash, size, nblocks)
200

    
201
    def _get_missing_hashes(
202
            self, obj, json,
203
            size=None,
204
            format='json',
205
            hashmap=True,
206
            content_type=None,
207
            content_encoding=None,
208
            content_disposition=None,
209
            permissions=None,
210
            public=None,
211
            success=(201, 409)):
212
        r = self.object_put(
213
            obj,
214
            format='json',
215
            hashmap=True,
216
            content_type=content_type,
217
            json=json,
218
            content_encoding=content_encoding,
219
            content_disposition=content_disposition,
220
            permissions=permissions,
221
            public=public,
222
            success=success)
223
        return None if r.status_code == 201 else r.json
224

    
225
    def _culculate_blocks_for_upload(
226
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
227
            hash_cb=None):
228
        offset = 0
229
        if hash_cb:
230
            hash_gen = hash_cb(nblocks)
231
            hash_gen.next()
232

    
233
        for i in range(nblocks):
234
            block = fileobj.read(min(blocksize, size - offset))
235
            bytes = len(block)
236
            hash = _pithos_hash(block, blockhash)
237
            hashes.append(hash)
238
            hmap[hash] = (offset, bytes)
239
            offset += bytes
240
            if hash_cb:
241
                hash_gen.next()
242
        msg = 'Failed to calculate uploaded blocks:'
243
        ' Offset and object size do not match'
244
        assert offset == size, msg
245

    
246
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
247
        """upload missing blocks asynchronously"""
248

    
249
        self._init_thread_limit()
250

    
251
        flying = []
252
        failures = []
253
        for hash in missing:
254
            offset, bytes = hmap[hash]
255
            fileobj.seek(offset)
256
            data = fileobj.read(bytes)
257
            r = self._put_block_async(data, hash, upload_gen)
258
            flying.append(r)
259
            unfinished = self._watch_thread_limit(flying)
260
            for thread in set(flying).difference(unfinished):
261
                if thread.exception:
262
                    failures.append(thread)
263
                    if isinstance(
264
                            thread.exception,
265
                            ClientError) and thread.exception.status == 502:
266
                        self.POOLSIZE = self._thread_limit
267
                elif thread.isAlive():
268
                    flying.append(thread)
269
                elif upload_gen:
270
                    try:
271
                        upload_gen.next()
272
                    except:
273
                        pass
274
            flying = unfinished
275

    
276
        for thread in flying:
277
            thread.join()
278
            if thread.exception:
279
                failures.append(thread)
280
            elif upload_gen:
281
                try:
282
                    upload_gen.next()
283
                except:
284
                    pass
285

    
286
        return [failure.kwargs['hash'] for failure in failures]
287

    
288
    def upload_object(
289
            self, obj, f,
290
            size=None,
291
            hash_cb=None,
292
            upload_cb=None,
293
            etag=None,
294
            if_etag_match=None,
295
            if_not_exist=None,
296
            content_encoding=None,
297
            content_disposition=None,
298
            content_type=None,
299
            sharing=None,
300
            public=None):
301
        """Upload an object using multiple connections (threads)
302

303
        :param obj: (str) remote object path
304

305
        :param f: open file descriptor (rb)
306

307
        :param hash_cb: optional progress.bar object for calculating hashes
308

309
        :param upload_cb: optional progress.bar object for uploading
310

311
        :param etag: (str)
312

313
        :param if_etag_match: (str) Push that value to if-match header at file
314
            creation
315

316
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
317
            it does not exist remotely, otherwise the operation will fail.
318
            Involves the case of an object with the same path is created while
319
            the object is being uploaded.
320

321
        :param content_encoding: (str)
322

323
        :param content_disposition: (str)
324

325
        :param content_type: (str)
326

327
        :param sharing: {'read':[user and/or grp names],
328
            'write':[usr and/or grp names]}
329

330
        :param public: (bool)
331
        """
332
        self._assert_container()
333

    
334
        #init
335
        block_info = (blocksize, blockhash, size, nblocks) =\
336
            self._get_file_block_info(f, size)
337
        (hashes, hmap, offset) = ([], {}, 0)
338
        if not content_type:
339
            content_type = 'application/octet-stream'
340

    
341
        self._culculate_blocks_for_upload(
342
            *block_info,
343
            hashes=hashes,
344
            hmap=hmap,
345
            fileobj=f,
346
            hash_cb=hash_cb)
347

    
348
        hashmap = dict(bytes=size, hashes=hashes)
349
        missing = self._get_missing_hashes(
350
            obj, hashmap,
351
            content_type=content_type,
352
            size=size,
353
            content_encoding=content_encoding,
354
            content_disposition=content_disposition,
355
            permissions=sharing,
356
            public=public)
357

    
358
        if missing is None:
359
            return
360

    
361
        if upload_cb:
362
            upload_gen = upload_cb(len(missing))
363
            for i in range(len(missing), len(hashmap['hashes']) + 1):
364
                try:
365
                    upload_gen.next()
366
                except:
367
                    upload_gen = None
368
        else:
369
            upload_gen = None
370

    
371
        retries = 7
372
        try:
373
            while retries:
374
                sendlog.info('%s blocks missing' % len(missing))
375
                num_of_blocks = len(missing)
376
                missing = self._upload_missing_blocks(
377
                    missing,
378
                    hmap,
379
                    f,
380
                    upload_gen)
381
                if missing:
382
                    if num_of_blocks == len(missing):
383
                        retries -= 1
384
                    else:
385
                        num_of_blocks = len(missing)
386
                else:
387
                    break
388
            if missing:
389
                raise ClientError(
390
                    '%s blocks failed to upload' % len(missing),
391
                    status=800)
392
        except KeyboardInterrupt:
393
            sendlog.info('- - - wait for threads to finish')
394
            for thread in activethreads():
395
                thread.join()
396
            raise
397

    
398
        self.object_put(
399
            obj,
400
            format='json',
401
            hashmap=True,
402
            content_type=content_type,
403
            if_etag_match=if_etag_match,
404
            if_etag_not_match='*' if if_not_exist else None,
405
            etag=etag,
406
            json=hashmap,
407
            permissions=sharing,
408
            public=public,
409
            success=201)
410

    
411
    # download_* auxiliary methods
412
    def _get_remote_blocks_info(self, obj, **restargs):
413
        #retrieve object hashmap
414
        myrange = restargs.pop('data_range', None)
415
        hashmap = self.get_object_hashmap(obj, **restargs)
416
        restargs['data_range'] = myrange
417
        blocksize = int(hashmap['block_size'])
418
        blockhash = hashmap['block_hash']
419
        total_size = hashmap['bytes']
420
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
421
        map_dict = {}
422
        for i, h in enumerate(hashmap['hashes']):
423
            #  map_dict[h] = i   CHAGE
424
            if h in map_dict:
425
                map_dict[h].append(i)
426
            else:
427
                map_dict[h] = [i]
428
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
429

    
430
    def _dump_blocks_sync(
431
            self, obj, remote_hashes, blocksize, total_size, dst, range,
432
            **args):
433
        for blockid, blockhash in enumerate(remote_hashes):
434
            if blockhash:
435
                start = blocksize * blockid
436
                is_last = start + blocksize > total_size
437
                end = (total_size - 1) if is_last else (start + blocksize - 1)
438
                (start, end) = _range_up(start, end, range)
439
                args['data_range'] = 'bytes=%s-%s' % (start, end)
440
                r = self.object_get(obj, success=(200, 206), **args)
441
                self._cb_next()
442
                dst.write(r.content)
443
                dst.flush()
444

    
445
    def _get_block_async(self, obj, **args):
446
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
447
        event.start()
448
        return event
449

    
450
    def _hash_from_file(self, fp, start, size, blockhash):
451
        fp.seek(start)
452
        block = fp.read(size)
453
        h = newhashlib(blockhash)
454
        h.update(block.strip('\x00'))
455
        return hexlify(h.digest())
456

    
457
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
458
        """write the results of a greenleted rest call to a file
459

460
        :param offset: the offset of the file up to blocksize
461
        - e.g. if the range is 10-100, all blocks will be written to
462
        normal_position - 10
463
        """
464
        for i, (key, g) in enumerate(flying.items()):
465
            if g.isAlive():
466
                continue
467
            if g.exception:
468
                raise g.exception
469
            block = g.value.content
470
            for block_start in blockids[key]:
471
                local_file.seek(block_start + offset)
472
                local_file.write(block)
473
                self._cb_next()
474
            flying.pop(key)
475
            blockids.pop(key)
476
        local_file.flush()
477

    
478
    def _dump_blocks_async(
479
            self, obj, remote_hashes, blocksize, total_size, local_file,
480
            blockhash=None, resume=False, filerange=None, **restargs):
481
        file_size = fstat(local_file.fileno()).st_size if resume else 0
482
        flying = dict()
483
        blockid_dict = dict()
484
        offset = 0
485
        if filerange is not None:
486
            rstart = int(filerange.split('-')[0])
487
            offset = rstart if blocksize > rstart else rstart % blocksize
488

    
489
        self._init_thread_limit()
490
        for block_hash, blockids in remote_hashes.items():
491
            blockids = [blk * blocksize for blk in blockids]
492
            unsaved = [blk for blk in blockids if not (
493
                blk < file_size and block_hash == self._hash_from_file(
494
                        local_file, blk, blocksize, blockhash))]
495
            self._cb_next(len(blockids) - len(unsaved))
496
            if unsaved:
497
                key = unsaved[0]
498
                self._watch_thread_limit(flying.values())
499
                self._thread2file(
500
                    flying, blockid_dict, local_file, offset,
501
                    **restargs)
502
                end = total_size - 1 if key + blocksize > total_size\
503
                    else key + blocksize - 1
504
                start, end = _range_up(key, end, filerange)
505
                if start == end:
506
                    self._cb_next()
507
                    continue
508
                restargs['async_headers'] = {
509
                    'Range': 'bytes=%s-%s' % (start, end)}
510
                flying[key] = self._get_block_async(obj, **restargs)
511
                blockid_dict[key] = unsaved
512

    
513
        for thread in flying.values():
514
            thread.join()
515
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
516

    
517
    def download_object(
518
            self, obj, dst,
519
            download_cb=None,
520
            version=None,
521
            resume=False,
522
            range_str=None,
523
            if_match=None,
524
            if_none_match=None,
525
            if_modified_since=None,
526
            if_unmodified_since=None):
527
        """Download an object (multiple connections, random blocks)
528

529
        :param obj: (str) remote object path
530

531
        :param dst: open file descriptor (wb+)
532

533
        :param download_cb: optional progress.bar object for downloading
534

535
        :param version: (str) file version
536

537
        :param resume: (bool) if set, preserve already downloaded file parts
538

539
        :param range_str: (str) from, to are file positions (int) in bytes
540

541
        :param if_match: (str)
542

543
        :param if_none_match: (str)
544

545
        :param if_modified_since: (str) formated date
546

547
        :param if_unmodified_since: (str) formated date"""
548
        restargs = dict(
549
            version=version,
550
            data_range=None if range_str is None else 'bytes=%s' % range_str,
551
            if_match=if_match,
552
            if_none_match=if_none_match,
553
            if_modified_since=if_modified_since,
554
            if_unmodified_since=if_unmodified_since)
555

    
556
        (
557
            blocksize,
558
            blockhash,
559
            total_size,
560
            hash_list,
561
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
562
        assert total_size >= 0
563

    
564
        if download_cb:
565
            self.progress_bar_gen = download_cb(len(hash_list))
566
            self._cb_next()
567

    
568
        if dst.isatty():
569
            self._dump_blocks_sync(
570
                obj,
571
                hash_list,
572
                blocksize,
573
                total_size,
574
                dst,
575
                range_str,
576
                **restargs)
577
        else:
578
            self._dump_blocks_async(
579
                obj,
580
                remote_hashes,
581
                blocksize,
582
                total_size,
583
                dst,
584
                blockhash,
585
                resume,
586
                range_str,
587
                **restargs)
588
            if not range_str:
589
                dst.truncate(total_size)
590

    
591
        self._complete_cb()
592

    
593
    #Command Progress Bar method
594
    def _cb_next(self, step=1):
595
        if hasattr(self, 'progress_bar_gen'):
596
            try:
597
                for i in xrange(step):
598
                    self.progress_bar_gen.next()
599
            except:
600
                pass
601

    
602
    def _complete_cb(self):
603
        while True:
604
            try:
605
                self.progress_bar_gen.next()
606
            except:
607
                break
608

    
609
    def get_object_hashmap(
610
            self, obj,
611
            version=None,
612
            if_match=None,
613
            if_none_match=None,
614
            if_modified_since=None,
615
            if_unmodified_since=None,
616
            data_range=None):
617
        """
618
        :param obj: (str) remote object path
619

620
        :param if_match: (str)
621

622
        :param if_none_match: (str)
623

624
        :param if_modified_since: (str) formated date
625

626
        :param if_unmodified_since: (str) formated date
627

628
        :param data_range: (str) from-to where from and to are integers
629
            denoting file positions in bytes
630

631
        :returns: (list)
632
        """
633
        try:
634
            r = self.object_get(
635
                obj,
636
                hashmap=True,
637
                version=version,
638
                if_etag_match=if_match,
639
                if_etag_not_match=if_none_match,
640
                if_modified_since=if_modified_since,
641
                if_unmodified_since=if_unmodified_since,
642
                data_range=data_range)
643
        except ClientError as err:
644
            if err.status == 304 or err.status == 412:
645
                return {}
646
            raise
647
        return r.json
648

    
649
    def set_account_group(self, group, usernames):
650
        """
651
        :param group: (str)
652

653
        :param usernames: (list)
654
        """
655
        self.account_post(update=True, groups={group: usernames})
656

    
657
    def del_account_group(self, group):
658
        """
659
        :param group: (str)
660
        """
661
        self.account_post(update=True, groups={group: []})
662

    
663
    def get_account_info(self, until=None):
664
        """
665
        :param until: (str) formated date
666

667
        :returns: (dict)
668
        """
669
        r = self.account_head(until=until)
670
        if r.status_code == 401:
671
            raise ClientError("No authorization", status=401)
672
        return r.headers
673

    
674
    def get_account_quota(self):
675
        """
676
        :returns: (dict)
677
        """
678
        return filter_in(
679
            self.get_account_info(),
680
            'X-Account-Policy-Quota',
681
            exactMatch=True)
682

    
683
    def get_account_versioning(self):
684
        """
685
        :returns: (dict)
686
        """
687
        return filter_in(
688
            self.get_account_info(),
689
            'X-Account-Policy-Versioning',
690
            exactMatch=True)
691

    
692
    def get_account_meta(self, until=None):
693
        """
694
        :meta until: (str) formated date
695

696
        :returns: (dict)
697
        """
698
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
699

    
700
    def get_account_group(self):
701
        """
702
        :returns: (dict)
703
        """
704
        return filter_in(self.get_account_info(), 'X-Account-Group-')
705

    
706
    def set_account_meta(self, metapairs):
707
        """
708
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
709
        """
710
        assert(type(metapairs) is dict)
711
        self.account_post(update=True, metadata=metapairs)
712

    
713
    def del_account_meta(self, metakey):
714
        """
715
        :param metakey: (str) metadatum key
716
        """
717
        self.account_post(update=True, metadata={metakey: ''})
718

    
719
    """
720
    def set_account_quota(self, quota):
721
        ""
722
        :param quota: (int)
723
        ""
724
        self.account_post(update=True, quota=quota)
725
    """
726

    
727
    def set_account_versioning(self, versioning):
728
        """
729
        "param versioning: (str)
730
        """
731
        self.account_post(update=True, versioning=versioning)
732

    
733
    def list_containers(self):
734
        """
735
        :returns: (dict)
736
        """
737
        r = self.account_get()
738
        return r.json
739

    
740
    def del_container(self, until=None, delimiter=None):
741
        """
742
        :param until: (str) formated date
743

744
        :param delimiter: (str) with / empty container
745

746
        :raises ClientError: 404 Container does not exist
747

748
        :raises ClientError: 409 Container is not empty
749
        """
750
        self._assert_container()
751
        r = self.container_delete(
752
            until=until,
753
            delimiter=delimiter,
754
            success=(204, 404, 409))
755
        if r.status_code == 404:
756
            raise ClientError(
757
                'Container "%s" does not exist' % self.container,
758
                r.status_code)
759
        elif r.status_code == 409:
760
            raise ClientError(
761
                'Container "%s" is not empty' % self.container,
762
                r.status_code)
763

    
764
    def get_container_versioning(self, container=None):
765
        """
766
        :param container: (str)
767

768
        :returns: (dict)
769
        """
770
        cnt_back_up = self.container
771
        try:
772
            self.container = container or cnt_back_up
773
            return filter_in(
774
                self.get_container_info(),
775
                'X-Container-Policy-Versioning')
776
        finally:
777
            self.container = cnt_back_up
778

    
779
    def get_container_limit(self, container=None):
780
        """
781
        :param container: (str)
782

783
        :returns: (dict)
784
        """
785
        cnt_back_up = self.container
786
        try:
787
            self.container = container or cnt_back_up
788
            return filter_in(
789
                self.get_container_info(),
790
                'X-Container-Policy-Quota')
791
        finally:
792
            self.container = cnt_back_up
793

    
794
    def get_container_info(self, until=None):
795
        """
796
        :param until: (str) formated date
797

798
        :returns: (dict)
799

800
        :raises ClientError: 404 Container not found
801
        """
802
        try:
803
            r = self.container_head(until=until)
804
        except ClientError as err:
805
            err.details.append('for container %s' % self.container)
806
            raise err
807
        return r.headers
808

    
809
    def get_container_meta(self, until=None):
810
        """
811
        :param until: (str) formated date
812

813
        :returns: (dict)
814
        """
815
        return filter_in(
816
            self.get_container_info(until=until),
817
            'X-Container-Meta')
818

    
819
    def get_container_object_meta(self, until=None):
820
        """
821
        :param until: (str) formated date
822

823
        :returns: (dict)
824
        """
825
        return filter_in(
826
            self.get_container_info(until=until),
827
            'X-Container-Object-Meta')
828

    
829
    def set_container_meta(self, metapairs):
830
        """
831
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
832
        """
833
        assert(type(metapairs) is dict)
834
        self.container_post(update=True, metadata=metapairs)
835

    
836
    def del_container_meta(self, metakey):
837
        """
838
        :param metakey: (str) metadatum key
839
        """
840
        self.container_post(update=True, metadata={metakey: ''})
841

    
842
    def set_container_limit(self, limit):
843
        """
844
        :param limit: (int)
845
        """
846
        self.container_post(update=True, quota=limit)
847

    
848
    def set_container_versioning(self, versioning):
849
        """
850
        :param versioning: (str)
851
        """
852
        self.container_post(update=True, versioning=versioning)
853

    
854
    def del_object(self, obj, until=None, delimiter=None):
855
        """
856
        :param obj: (str) remote object path
857

858
        :param until: (str) formated date
859

860
        :param delimiter: (str)
861
        """
862
        self._assert_container()
863
        self.object_delete(obj, until=until, delimiter=delimiter)
864

    
865
    def set_object_meta(self, obj, metapairs):
866
        """
867
        :param obj: (str) remote object path
868

869
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
870
        """
871
        assert(type(metapairs) is dict)
872
        self.object_post(obj, update=True, metadata=metapairs)
873

    
874
    def del_object_meta(self, obj, metakey):
875
        """
876
        :param obj: (str) remote object path
877

878
        :param metakey: (str) metadatum key
879
        """
880
        self.object_post(obj, update=True, metadata={metakey: ''})
881

    
882
    def publish_object(self, obj):
883
        """
884
        :param obj: (str) remote object path
885

886
        :returns: (str) access url
887
        """
888
        self.object_post(obj, update=True, public=True)
889
        info = self.get_object_info(obj)
890
        pref, sep, rest = self.base_url.partition('//')
891
        base = rest.split('/')[0]
892
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
893

    
894
    def unpublish_object(self, obj):
895
        """
896
        :param obj: (str) remote object path
897
        """
898
        self.object_post(obj, update=True, public=False)
899

    
900
    def get_object_info(self, obj, version=None):
901
        """
902
        :param obj: (str) remote object path
903

904
        :param version: (str)
905

906
        :returns: (dict)
907
        """
908
        try:
909
            r = self.object_head(obj, version=version)
910
            return r.headers
911
        except ClientError as ce:
912
            if ce.status == 404:
913
                raise ClientError('Object %s not found' % obj, status=404)
914
            raise
915

    
916
    def get_object_meta(self, obj, version=None):
917
        """
918
        :param obj: (str) remote object path
919

920
        :param version: (str)
921

922
        :returns: (dict)
923
        """
924
        return filter_in(
925
            self.get_object_info(obj, version=version),
926
            'X-Object-Meta')
927

    
928
    def get_object_sharing(self, obj):
929
        """
930
        :param obj: (str) remote object path
931

932
        :returns: (dict)
933
        """
934
        r = filter_in(
935
            self.get_object_info(obj),
936
            'X-Object-Sharing',
937
            exactMatch=True)
938
        reply = {}
939
        if len(r) > 0:
940
            perms = r['x-object-sharing'].split(';')
941
            for perm in perms:
942
                try:
943
                    perm.index('=')
944
                except ValueError:
945
                    raise ClientError('Incorrect reply format')
946
                (key, val) = perm.strip().split('=')
947
                reply[key] = val
948
        return reply
949

    
950
    def set_object_sharing(
951
            self, obj,
952
            read_permition=False, write_permition=False):
953
        """Give read/write permisions to an object.
954

955
        :param obj: (str) remote object path
956

957
        :param read_permition: (list - bool) users and user groups that get
958
            read permition for this object - False means all previous read
959
            permissions will be removed
960

961
        :param write_perimition: (list - bool) of users and user groups to get
962
           write permition for this object - False means all previous write
963
           permissions will be removed
964
        """
965

    
966
        perms = dict(read=read_permition or '', write=write_permition or '')
967
        self.object_post(obj, update=True, permissions=perms)
968

    
969
    def del_object_sharing(self, obj):
970
        """
971
        :param obj: (str) remote object path
972
        """
973
        self.set_object_sharing(obj)
974

    
975
    def append_object(self, obj, source_file, upload_cb=None):
976
        """
977
        :param obj: (str) remote object path
978

979
        :param source_file: open file descriptor
980

981
        :param upload_db: progress.bar for uploading
982
        """
983

    
984
        self._assert_container()
985
        meta = self.get_container_info()
986
        blocksize = int(meta['x-container-block-size'])
987
        filesize = fstat(source_file.fileno()).st_size
988
        nblocks = 1 + (filesize - 1) // blocksize
989
        offset = 0
990
        if upload_cb:
991
            upload_gen = upload_cb(nblocks)
992
            upload_gen.next()
993
        for i in range(nblocks):
994
            block = source_file.read(min(blocksize, filesize - offset))
995
            offset += len(block)
996
            self.object_post(
997
                obj,
998
                update=True,
999
                content_range='bytes */*',
1000
                content_type='application/octet-stream',
1001
                content_length=len(block),
1002
                data=block)
1003

    
1004
            if upload_cb:
1005
                upload_gen.next()
1006

    
1007
    def truncate_object(self, obj, upto_bytes):
1008
        """
1009
        :param obj: (str) remote object path
1010

1011
        :param upto_bytes: max number of bytes to leave on file
1012
        """
1013
        self.object_post(
1014
            obj,
1015
            update=True,
1016
            content_range='bytes 0-%s/*' % upto_bytes,
1017
            content_type='application/octet-stream',
1018
            object_bytes=upto_bytes,
1019
            source_object=path4url(self.container, obj))
1020

    
1021
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1022
        """Overwrite a part of an object from local source file
1023

1024
        :param obj: (str) remote object path
1025

1026
        :param start: (int) position in bytes to start overwriting from
1027

1028
        :param end: (int) position in bytes to stop overwriting at
1029

1030
        :param source_file: open file descriptor
1031

1032
        :param upload_db: progress.bar for uploading
1033
        """
1034

    
1035
        r = self.get_object_info(obj)
1036
        rf_size = int(r['content-length'])
1037
        if rf_size < int(start):
1038
            raise ClientError(
1039
                'Range start exceeds file size',
1040
                status=416)
1041
        elif rf_size < int(end):
1042
            raise ClientError(
1043
                'Range end exceeds file size',
1044
                status=416)
1045
        self._assert_container()
1046
        meta = self.get_container_info()
1047
        blocksize = int(meta['x-container-block-size'])
1048
        filesize = fstat(source_file.fileno()).st_size
1049
        datasize = int(end) - int(start) + 1
1050
        nblocks = 1 + (datasize - 1) // blocksize
1051
        offset = 0
1052
        if upload_cb:
1053
            upload_gen = upload_cb(nblocks)
1054
            upload_gen.next()
1055
        for i in range(nblocks):
1056
            read_size = min(blocksize, filesize - offset, datasize - offset)
1057
            block = source_file.read(read_size)
1058
            self.object_post(
1059
                obj,
1060
                update=True,
1061
                content_type='application/octet-stream',
1062
                content_length=len(block),
1063
                content_range='bytes %s-%s/*' % (
1064
                    start + offset,
1065
                    start + offset + len(block) - 1),
1066
                data=block)
1067
            offset += len(block)
1068

    
1069
            if upload_cb:
1070
                upload_gen.next()
1071

    
1072
    def copy_object(
1073
            self, src_container, src_object, dst_container,
1074
            dst_object=None,
1075
            source_version=None,
1076
            source_account=None,
1077
            public=False,
1078
            content_type=None,
1079
            delimiter=None):
1080
        """
1081
        :param src_container: (str) source container
1082

1083
        :param src_object: (str) source object path
1084

1085
        :param dst_container: (str) destination container
1086

1087
        :param dst_object: (str) destination object path
1088

1089
        :param source_version: (str) source object version
1090

1091
        :param source_account: (str) account to copy from
1092

1093
        :param public: (bool)
1094

1095
        :param content_type: (str)
1096

1097
        :param delimiter: (str)
1098
        """
1099
        self._assert_account()
1100
        self.container = dst_container
1101
        src_path = path4url(src_container, src_object)
1102
        self.object_put(
1103
            dst_object or src_object,
1104
            success=201,
1105
            copy_from=src_path,
1106
            content_length=0,
1107
            source_version=source_version,
1108
            source_account=source_account,
1109
            public=public,
1110
            content_type=content_type,
1111
            delimiter=delimiter)
1112

    
1113
    def move_object(
1114
            self, src_container, src_object, dst_container,
1115
            dst_object=False,
1116
            source_account=None,
1117
            source_version=None,
1118
            public=False,
1119
            content_type=None,
1120
            delimiter=None):
1121
        """
1122
        :param src_container: (str) source container
1123

1124
        :param src_object: (str) source object path
1125

1126
        :param dst_container: (str) destination container
1127

1128
        :param dst_object: (str) destination object path
1129

1130
        :param source_account: (str) account to move from
1131

1132
        :param source_version: (str) source object version
1133

1134
        :param public: (bool)
1135

1136
        :param content_type: (str)
1137

1138
        :param delimiter: (str)
1139
        """
1140
        self._assert_account()
1141
        self.container = dst_container
1142
        dst_object = dst_object or src_object
1143
        src_path = path4url(src_container, src_object)
1144
        self.object_put(
1145
            dst_object,
1146
            success=201,
1147
            move_from=src_path,
1148
            content_length=0,
1149
            source_account=source_account,
1150
            source_version=source_version,
1151
            public=public,
1152
            content_type=content_type,
1153
            delimiter=delimiter)
1154

    
1155
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1156
        """Get accounts that share with self.account
1157

1158
        :param limit: (str)
1159

1160
        :param marker: (str)
1161

1162
        :returns: (dict)
1163
        """
1164
        self._assert_account()
1165

    
1166
        self.set_param('format', 'json')
1167
        self.set_param('limit', limit, iff=limit is not None)
1168
        self.set_param('marker', marker, iff=marker is not None)
1169

    
1170
        path = ''
1171
        success = kwargs.pop('success', (200, 204))
1172
        r = self.get(path, *args, success=success, **kwargs)
1173
        return r.json
1174

    
1175
    def get_object_versionlist(self, obj):
1176
        """
1177
        :param obj: (str) remote object path
1178

1179
        :returns: (list)
1180
        """
1181
        self._assert_container()
1182
        r = self.object_get(obj, format='json', version='list')
1183
        return r.json['versions']