Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 326a79b9

History | View | Annotate | Download (36.2 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos.rest_api import PithosRestClient
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """GRNet Pithos API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def purge_container(self, container=None):
75
        """Delete an empty container and destroy associated blocks
76
        """
77
        cnt_back_up = self.container
78
        try:
79
            self.container = container or cnt_back_up
80
            self.container_delete(until=unicode(time()))
81
        finally:
82
            self.container = cnt_back_up
83

    
84
    def upload_object_unchunked(
85
            self, obj, f,
86
            withHashFile=False,
87
            size=None,
88
            etag=None,
89
            content_encoding=None,
90
            content_disposition=None,
91
            content_type=None,
92
            sharing=None,
93
            public=None):
94
        """
95
        :param obj: (str) remote object path
96

97
        :param f: open file descriptor
98

99
        :param withHashFile: (bool)
100

101
        :param size: (int) size of data to upload
102

103
        :param etag: (str)
104

105
        :param content_encoding: (str)
106

107
        :param content_disposition: (str)
108

109
        :param content_type: (str)
110

111
        :param sharing: {'read':[user and/or grp names],
112
            'write':[usr and/or grp names]}
113

114
        :param public: (bool)
115
        """
116
        self._assert_container()
117

    
118
        if withHashFile:
119
            data = f.read()
120
            try:
121
                import json
122
                data = json.dumps(json.loads(data))
123
            except ValueError:
124
                raise ClientError('"%s" is not json-formated' % f.name, 1)
125
            except SyntaxError:
126
                msg = '"%s" is not a valid hashmap file' % f.name
127
                raise ClientError(msg, 1)
128
            f = StringIO(data)
129
        else:
130
            data = f.read(size) if size else f.read()
131
        self.object_put(
132
            obj,
133
            data=data,
134
            etag=etag,
135
            content_encoding=content_encoding,
136
            content_disposition=content_disposition,
137
            content_type=content_type,
138
            permissions=sharing,
139
            public=public,
140
            success=201)
141

    
142
    def create_object_by_manifestation(
143
            self, obj,
144
            etag=None,
145
            content_encoding=None,
146
            content_disposition=None,
147
            content_type=None,
148
            sharing=None,
149
            public=None):
150
        """
151
        :param obj: (str) remote object path
152

153
        :param etag: (str)
154

155
        :param content_encoding: (str)
156

157
        :param content_disposition: (str)
158

159
        :param content_type: (str)
160

161
        :param sharing: {'read':[user and/or grp names],
162
            'write':[usr and/or grp names]}
163

164
        :param public: (bool)
165
        """
166
        self._assert_container()
167
        self.object_put(
168
            obj,
169
            content_length=0,
170
            etag=etag,
171
            content_encoding=content_encoding,
172
            content_disposition=content_disposition,
173
            content_type=content_type,
174
            permissions=sharing,
175
            public=public,
176
            manifest='%s/%s' % (self.container, obj))
177

    
178
    # upload_* auxiliary methods
179
    def _put_block_async(self, data, hash, upload_gen=None):
180
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
181
        event.start()
182
        return event
183

    
184
    def _put_block(self, data, hash):
185
        r = self.container_post(
186
            update=True,
187
            content_type='application/octet-stream',
188
            content_length=len(data),
189
            data=data,
190
            format='json')
191
        assert r.json[0] == hash, 'Local hash does not match server'
192

    
193
    def _get_file_block_info(self, fileobj, size=None):
194
        meta = self.get_container_info()
195
        blocksize = int(meta['x-container-block-size'])
196
        blockhash = meta['x-container-block-hash']
197
        size = size if size is not None else fstat(fileobj.fileno()).st_size
198
        nblocks = 1 + (size - 1) // blocksize
199
        return (blocksize, blockhash, size, nblocks)
200

    
201
    def _get_missing_hashes(
202
            self, obj, json,
203
            size=None,
204
            format='json',
205
            hashmap=True,
206
            content_type=None,
207
            content_encoding=None,
208
            content_disposition=None,
209
            permissions=None,
210
            public=None,
211
            success=(201, 409)):
212
        r = self.object_put(
213
            obj,
214
            format='json',
215
            hashmap=True,
216
            content_type=content_type,
217
            json=json,
218
            content_encoding=content_encoding,
219
            content_disposition=content_disposition,
220
            permissions=permissions,
221
            public=public,
222
            success=success)
223
        return None if r.status_code == 201 else r.json
224

    
225
    def _culculate_blocks_for_upload(
226
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
227
            hash_cb=None):
228
        offset = 0
229
        if hash_cb:
230
            hash_gen = hash_cb(nblocks)
231
            hash_gen.next()
232

    
233
        for i in range(nblocks):
234
            block = fileobj.read(min(blocksize, size - offset))
235
            bytes = len(block)
236
            hash = _pithos_hash(block, blockhash)
237
            hashes.append(hash)
238
            hmap[hash] = (offset, bytes)
239
            offset += bytes
240
            if hash_cb:
241
                hash_gen.next()
242
        msg = 'Failed to calculate uploaded blocks:'
243
        ' Offset and object size do not match'
244
        assert offset == size, msg
245

    
246
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
247
        """upload missing blocks asynchronously"""
248

    
249
        self._init_thread_limit()
250

    
251
        flying = []
252
        failures = []
253
        for hash in missing:
254
            offset, bytes = hmap[hash]
255
            fileobj.seek(offset)
256
            data = fileobj.read(bytes)
257
            r = self._put_block_async(data, hash, upload_gen)
258
            flying.append(r)
259
            unfinished = self._watch_thread_limit(flying)
260
            for thread in set(flying).difference(unfinished):
261
                if thread.exception:
262
                    failures.append(thread)
263
                    if isinstance(
264
                            thread.exception,
265
                            ClientError) and thread.exception.status == 502:
266
                        self.POOLSIZE = self._thread_limit
267
                elif thread.isAlive():
268
                    flying.append(thread)
269
                elif upload_gen:
270
                    try:
271
                        upload_gen.next()
272
                    except:
273
                        pass
274
            flying = unfinished
275

    
276
        for thread in flying:
277
            thread.join()
278
            if thread.exception:
279
                failures.append(thread)
280
            elif upload_gen:
281
                try:
282
                    upload_gen.next()
283
                except:
284
                    pass
285

    
286
        return [failure.kwargs['hash'] for failure in failures]
287

    
288
    def upload_object(
289
            self, obj, f,
290
            size=None,
291
            hash_cb=None,
292
            upload_cb=None,
293
            etag=None,
294
            if_etag_match=None,
295
            if_not_exist=None,
296
            content_encoding=None,
297
            content_disposition=None,
298
            content_type=None,
299
            sharing=None,
300
            public=None):
301
        """Upload an object using multiple connections (threads)
302

303
        :param obj: (str) remote object path
304

305
        :param f: open file descriptor (rb)
306

307
        :param hash_cb: optional progress.bar object for calculating hashes
308

309
        :param upload_cb: optional progress.bar object for uploading
310

311
        :param etag: (str)
312

313
        :param if_etag_match: (str) Push that value to if-match header at file
314
            creation
315

316
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
317
            it does not exist remotely, otherwise the operation will fail.
318
            Involves the case of an object with the same path is created while
319
            the object is being uploaded.
320

321
        :param content_encoding: (str)
322

323
        :param content_disposition: (str)
324

325
        :param content_type: (str)
326

327
        :param sharing: {'read':[user and/or grp names],
328
            'write':[usr and/or grp names]}
329

330
        :param public: (bool)
331
        """
332
        self._assert_container()
333

    
334
        #init
335
        block_info = (blocksize, blockhash, size, nblocks) =\
336
            self._get_file_block_info(f, size)
337
        (hashes, hmap, offset) = ([], {}, 0)
338
        if not content_type:
339
            content_type = 'application/octet-stream'
340

    
341
        self._culculate_blocks_for_upload(
342
            *block_info,
343
            hashes=hashes,
344
            hmap=hmap,
345
            fileobj=f,
346
            hash_cb=hash_cb)
347

    
348
        hashmap = dict(bytes=size, hashes=hashes)
349
        missing = self._get_missing_hashes(
350
            obj, hashmap,
351
            content_type=content_type,
352
            size=size,
353
            content_encoding=content_encoding,
354
            content_disposition=content_disposition,
355
            permissions=sharing,
356
            public=public)
357

    
358
        if missing is None:
359
            return
360

    
361
        if upload_cb:
362
            upload_gen = upload_cb(len(missing))
363
            for i in range(len(missing), len(hashmap['hashes']) + 1):
364
                try:
365
                    upload_gen.next()
366
                except:
367
                    upload_gen = None
368
        else:
369
            upload_gen = None
370

    
371
        retries = 7
372
        try:
373
            while retries:
374
                sendlog.info('%s blocks missing' % len(missing))
375
                num_of_blocks = len(missing)
376
                missing = self._upload_missing_blocks(
377
                    missing,
378
                    hmap,
379
                    f,
380
                    upload_gen)
381
                if missing:
382
                    if num_of_blocks == len(missing):
383
                        retries -= 1
384
                    else:
385
                        num_of_blocks = len(missing)
386
                else:
387
                    break
388
            if missing:
389
                raise ClientError(
390
                    '%s blocks failed to upload' % len(missing),
391
                    status=800)
392
        except KeyboardInterrupt:
393
            sendlog.info('- - - wait for threads to finish')
394
            for thread in activethreads():
395
                thread.join()
396
            raise
397

    
398
        self.object_put(
399
            obj,
400
            format='json',
401
            hashmap=True,
402
            content_type=content_type,
403
            if_etag_match=if_etag_match,
404
            if_etag_not_match='*' if if_not_exist else None,
405
            etag=etag,
406
            json=hashmap,
407
            permissions=sharing,
408
            public=public,
409
            success=201)
410

    
411
    # download_* auxiliary methods
412
    def _get_remote_blocks_info(self, obj, **restargs):
413
        #retrieve object hashmap
414
        myrange = restargs.pop('data_range', None)
415
        hashmap = self.get_object_hashmap(obj, **restargs)
416
        restargs['data_range'] = myrange
417
        blocksize = int(hashmap['block_size'])
418
        blockhash = hashmap['block_hash']
419
        total_size = hashmap['bytes']
420
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
421
        map_dict = {}
422
        for i, h in enumerate(hashmap['hashes']):
423
            #  map_dict[h] = i   CHAGE
424
            if h in map_dict:
425
                map_dict[h].append(i)
426
            else:
427
                map_dict[h] = [i]
428
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
429

    
430
    def _dump_blocks_sync(
431
            self, obj, remote_hashes, blocksize, total_size, dst, range,
432
            **args):
433
        for blockid, blockhash in enumerate(remote_hashes):
434
            if blockhash:
435
                start = blocksize * blockid
436
                is_last = start + blocksize > total_size
437
                end = (total_size - 1) if is_last else (start + blocksize - 1)
438
                (start, end) = _range_up(start, end, range)
439
                args['data_range'] = 'bytes=%s-%s' % (start, end)
440
                r = self.object_get(obj, success=(200, 206), **args)
441
                self._cb_next()
442
                dst.write(r.content)
443
                dst.flush()
444

    
445
    def _get_block_async(self, obj, **args):
446
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
447
        event.start()
448
        return event
449

    
450
    def _hash_from_file(self, fp, start, size, blockhash):
451
        fp.seek(start)
452
        block = fp.read(size)
453
        h = newhashlib(blockhash)
454
        h.update(block.strip('\x00'))
455
        return hexlify(h.digest())
456

    
457
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
458
        """write the results of a greenleted rest call to a file
459

460
        :param offset: the offset of the file up to blocksize
461
        - e.g. if the range is 10-100, all blocks will be written to
462
        normal_position - 10
463
        """
464
        for i, (key, g) in enumerate(flying.items()):
465
            if g.isAlive():
466
                continue
467
            if g.exception:
468
                raise g.exception
469
            block = g.value.content
470
            for block_start in blockids[key]:
471
                local_file.seek(block_start + offset)
472
                local_file.write(block)
473
                self._cb_next()
474
            flying.pop(key)
475
            blockids.pop(key)
476
        local_file.flush()
477

    
478
    def _dump_blocks_async(
479
            self, obj, remote_hashes, blocksize, total_size, local_file,
480
            blockhash=None, resume=False, filerange=None, **restargs):
481
        file_size = fstat(local_file.fileno()).st_size if resume else 0
482
        flying = dict()
483
        blockid_dict = dict()
484
        offset = 0
485
        if filerange is not None:
486
            rstart = int(filerange.split('-')[0])
487
            offset = rstart if blocksize > rstart else rstart % blocksize
488

    
489
        self._init_thread_limit()
490
        for block_hash, blockids in remote_hashes.items():
491
            blockids = [blk * blocksize for blk in blockids]
492
            unsaved = [blk for blk in blockids if not (
493
                blk < file_size and block_hash == self._hash_from_file(
494
                        local_file, blk, blocksize, blockhash))]
495
            self._cb_next(len(blockids) - len(unsaved))
496
            if unsaved:
497
                key = unsaved[0]
498
                self._watch_thread_limit(flying.values())
499
                self._thread2file(
500
                    flying, blockid_dict, local_file, offset,
501
                    **restargs)
502
                end = total_size - 1 if key + blocksize > total_size\
503
                    else key + blocksize - 1
504
                start, end = _range_up(key, end, filerange)
505
                if start == end:
506
                    self._cb_next()
507
                    continue
508
                restargs['async_headers'] = {
509
                    'Range': 'bytes=%s-%s' % (start, end)}
510
                flying[key] = self._get_block_async(obj, **restargs)
511
                blockid_dict[key] = unsaved
512

    
513
        for thread in flying.values():
514
            thread.join()
515
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
516

    
517
    def download_object(
518
            self, obj, dst,
519
            download_cb=None,
520
            version=None,
521
            resume=False,
522
            range_str=None,
523
            if_match=None,
524
            if_none_match=None,
525
            if_modified_since=None,
526
            if_unmodified_since=None):
527
        """Download an object (multiple connections, random blocks)
528

529
        :param obj: (str) remote object path
530

531
        :param dst: open file descriptor (wb+)
532

533
        :param download_cb: optional progress.bar object for downloading
534

535
        :param version: (str) file version
536

537
        :param resume: (bool) if set, preserve already downloaded file parts
538

539
        :param range_str: (str) from, to are file positions (int) in bytes
540

541
        :param if_match: (str)
542

543
        :param if_none_match: (str)
544

545
        :param if_modified_since: (str) formated date
546

547
        :param if_unmodified_since: (str) formated date"""
548
        restargs = dict(
549
            version=version,
550
            data_range=None if range_str is None else 'bytes=%s' % range_str,
551
            if_match=if_match,
552
            if_none_match=if_none_match,
553
            if_modified_since=if_modified_since,
554
            if_unmodified_since=if_unmodified_since)
555

    
556
        (
557
            blocksize,
558
            blockhash,
559
            total_size,
560
            hash_list,
561
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
562
        assert total_size >= 0
563

    
564
        if download_cb:
565
            self.progress_bar_gen = download_cb(len(hash_list))
566
            self._cb_next()
567

    
568
        if dst.isatty():
569
            self._dump_blocks_sync(
570
                obj,
571
                hash_list,
572
                blocksize,
573
                total_size,
574
                dst,
575
                range_str,
576
                **restargs)
577
        else:
578
            self._dump_blocks_async(
579
                obj,
580
                remote_hashes,
581
                blocksize,
582
                total_size,
583
                dst,
584
                blockhash,
585
                resume,
586
                range_str,
587
                **restargs)
588
            if not range_str:
589
                dst.truncate(total_size)
590

    
591
        self._complete_cb()
592

    
593
    #Command Progress Bar method
594
    def _cb_next(self, step=1):
595
        if hasattr(self, 'progress_bar_gen'):
596
            try:
597
                for i in xrange(step):
598
                    self.progress_bar_gen.next()
599
            except:
600
                pass
601

    
602
    def _complete_cb(self):
603
        while True:
604
            try:
605
                self.progress_bar_gen.next()
606
            except:
607
                break
608

    
609
    def get_object_hashmap(
610
            self, obj,
611
            version=None,
612
            if_match=None,
613
            if_none_match=None,
614
            if_modified_since=None,
615
            if_unmodified_since=None,
616
            data_range=None):
617
        """
618
        :param obj: (str) remote object path
619

620
        :param if_match: (str)
621

622
        :param if_none_match: (str)
623

624
        :param if_modified_since: (str) formated date
625

626
        :param if_unmodified_since: (str) formated date
627

628
        :param data_range: (str) from-to where from and to are integers
629
            denoting file positions in bytes
630

631
        :returns: (list)
632
        """
633
        try:
634
            r = self.object_get(
635
                obj,
636
                hashmap=True,
637
                version=version,
638
                if_etag_match=if_match,
639
                if_etag_not_match=if_none_match,
640
                if_modified_since=if_modified_since,
641
                if_unmodified_since=if_unmodified_since,
642
                data_range=data_range)
643
        except ClientError as err:
644
            if err.status == 304 or err.status == 412:
645
                return {}
646
            raise
647
        return r.json
648

    
649
    def set_account_group(self, group, usernames):
650
        """
651
        :param group: (str)
652

653
        :param usernames: (list)
654
        """
655
        self.account_post(update=True, groups={group: usernames})
656

    
657
    def del_account_group(self, group):
658
        """
659
        :param group: (str)
660
        """
661
        self.account_post(update=True, groups={group: []})
662

    
663
    def get_account_info(self, until=None):
664
        """
665
        :param until: (str) formated date
666

667
        :returns: (dict)
668
        """
669
        r = self.account_head(until=until)
670
        if r.status_code == 401:
671
            raise ClientError("No authorization", status=401)
672
        return r.headers
673

    
674
    def get_account_quota(self):
675
        """
676
        :returns: (dict)
677
        """
678
        return filter_in(
679
            self.get_account_info(),
680
            'X-Account-Policy-Quota',
681
            exactMatch=True)
682

    
683
    def get_account_versioning(self):
684
        """
685
        :returns: (dict)
686
        """
687
        return filter_in(
688
            self.get_account_info(),
689
            'X-Account-Policy-Versioning',
690
            exactMatch=True)
691

    
692
    def get_account_meta(self, until=None):
693
        """
694
        :meta until: (str) formated date
695

696
        :returns: (dict)
697
        """
698
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
699

    
700
    def get_account_group(self):
701
        """
702
        :returns: (dict)
703
        """
704
        return filter_in(self.get_account_info(), 'X-Account-Group-')
705

    
706
    def set_account_meta(self, metapairs):
707
        """
708
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
709
        """
710
        assert(type(metapairs) is dict)
711
        self.account_post(update=True, metadata=metapairs)
712

    
713
    def del_account_meta(self, metakey):
714
        """
715
        :param metakey: (str) metadatum key
716
        """
717
        self.account_post(update=True, metadata={metakey: ''})
718

    
719
    def set_account_quota(self, quota):
720
        """
721
        :param quota: (int)
722
        """
723
        self.account_post(update=True, quota=quota)
724

    
725
    def set_account_versioning(self, versioning):
726
        """
727
        "param versioning: (str)
728
        """
729
        self.account_post(update=True, versioning=versioning)
730

    
731
    def list_containers(self):
732
        """
733
        :returns: (dict)
734
        """
735
        r = self.account_get()
736
        return r.json
737

    
738
    def del_container(self, until=None, delimiter=None):
739
        """
740
        :param until: (str) formated date
741

742
        :param delimiter: (str) with / empty container
743

744
        :raises ClientError: 404 Container does not exist
745

746
        :raises ClientError: 409 Container is not empty
747
        """
748
        self._assert_container()
749
        r = self.container_delete(
750
            until=until,
751
            delimiter=delimiter,
752
            success=(204, 404, 409))
753
        if r.status_code == 404:
754
            raise ClientError(
755
                'Container "%s" does not exist' % self.container,
756
                r.status_code)
757
        elif r.status_code == 409:
758
            raise ClientError(
759
                'Container "%s" is not empty' % self.container,
760
                r.status_code)
761

    
762
    def get_container_versioning(self, container=None):
763
        """
764
        :param container: (str)
765

766
        :returns: (dict)
767
        """
768
        cnt_back_up = self.container
769
        try:
770
            self.container = container or cnt_back_up
771
            return filter_in(
772
                self.get_container_info(),
773
                'X-Container-Policy-Versioning')
774
        finally:
775
            self.container = cnt_back_up
776

    
777
    def get_container_quota(self, container=None):
778
        """
779
        :param container: (str)
780

781
        :returns: (dict)
782
        """
783
        cnt_back_up = self.container
784
        try:
785
            self.container = container or cnt_back_up
786
            return filter_in(
787
                self.get_container_info(),
788
                'X-Container-Policy-Quota')
789
        finally:
790
            self.container = cnt_back_up
791

    
792
    def get_container_info(self, until=None):
793
        """
794
        :param until: (str) formated date
795

796
        :returns: (dict)
797

798
        :raises ClientError: 404 Container not found
799
        """
800
        try:
801
            r = self.container_head(until=until)
802
        except ClientError as err:
803
            err.details.append('for container %s' % self.container)
804
            raise err
805
        return r.headers
806

    
807
    def get_container_meta(self, until=None):
808
        """
809
        :param until: (str) formated date
810

811
        :returns: (dict)
812
        """
813
        return filter_in(
814
            self.get_container_info(until=until),
815
            'X-Container-Meta')
816

    
817
    def get_container_object_meta(self, until=None):
818
        """
819
        :param until: (str) formated date
820

821
        :returns: (dict)
822
        """
823
        return filter_in(
824
            self.get_container_info(until=until),
825
            'X-Container-Object-Meta')
826

    
827
    def set_container_meta(self, metapairs):
828
        """
829
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
830
        """
831
        assert(type(metapairs) is dict)
832
        self.container_post(update=True, metadata=metapairs)
833

    
834
    def del_container_meta(self, metakey):
835
        """
836
        :param metakey: (str) metadatum key
837
        """
838
        self.container_post(update=True, metadata={metakey: ''})
839

    
840
    def set_container_limit(self, limit):
841
        """
842
        :param limit: (int)
843
        """
844
        self.container_post(update=True, quota=limit)
845

    
846
    def set_container_versioning(self, versioning):
847
        """
848
        :param versioning: (str)
849
        """
850
        self.container_post(update=True, versioning=versioning)
851

    
852
    def del_object(self, obj, until=None, delimiter=None):
853
        """
854
        :param obj: (str) remote object path
855

856
        :param until: (str) formated date
857

858
        :param delimiter: (str)
859
        """
860
        self._assert_container()
861
        self.object_delete(obj, until=until, delimiter=delimiter)
862

    
863
    def set_object_meta(self, obj, metapairs):
864
        """
865
        :param obj: (str) remote object path
866

867
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
868
        """
869
        assert(type(metapairs) is dict)
870
        self.object_post(obj, update=True, metadata=metapairs)
871

    
872
    def del_object_meta(self, obj, metakey):
873
        """
874
        :param obj: (str) remote object path
875

876
        :param metakey: (str) metadatum key
877
        """
878
        self.object_post(obj, update=True, metadata={metakey: ''})
879

    
880
    def publish_object(self, obj):
881
        """
882
        :param obj: (str) remote object path
883

884
        :returns: (str) access url
885
        """
886
        self.object_post(obj, update=True, public=True)
887
        info = self.get_object_info(obj)
888
        pref, sep, rest = self.base_url.partition('//')
889
        base = rest.split('/')[0]
890
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
891

    
892
    def unpublish_object(self, obj):
893
        """
894
        :param obj: (str) remote object path
895
        """
896
        self.object_post(obj, update=True, public=False)
897

    
898
    def get_object_info(self, obj, version=None):
899
        """
900
        :param obj: (str) remote object path
901

902
        :param version: (str)
903

904
        :returns: (dict)
905
        """
906
        try:
907
            r = self.object_head(obj, version=version)
908
            return r.headers
909
        except ClientError as ce:
910
            if ce.status == 404:
911
                raise ClientError('Object %s not found' % obj, status=404)
912
            raise
913

    
914
    def get_object_meta(self, obj, version=None):
915
        """
916
        :param obj: (str) remote object path
917

918
        :param version: (str)
919

920
        :returns: (dict)
921
        """
922
        return filter_in(
923
            self.get_object_info(obj, version=version),
924
            'X-Object-Meta')
925

    
926
    def get_object_sharing(self, obj):
927
        """
928
        :param obj: (str) remote object path
929

930
        :returns: (dict)
931
        """
932
        r = filter_in(
933
            self.get_object_info(obj),
934
            'X-Object-Sharing',
935
            exactMatch=True)
936
        reply = {}
937
        if len(r) > 0:
938
            perms = r['x-object-sharing'].split(';')
939
            for perm in perms:
940
                try:
941
                    perm.index('=')
942
                except ValueError:
943
                    raise ClientError('Incorrect reply format')
944
                (key, val) = perm.strip().split('=')
945
                reply[key] = val
946
        return reply
947

    
948
    def set_object_sharing(
949
            self, obj,
950
            read_permition=False, write_permition=False):
951
        """Give read/write permisions to an object.
952

953
        :param obj: (str) remote object path
954

955
        :param read_permition: (list - bool) users and user groups that get
956
            read permition for this object - False means all previous read
957
            permissions will be removed
958

959
        :param write_perimition: (list - bool) of users and user groups to get
960
           write permition for this object - False means all previous write
961
           permissions will be removed
962
        """
963

    
964
        perms = dict(read=read_permition or '', write=write_permition or '')
965
        self.object_post(obj, update=True, permissions=perms)
966

    
967
    def del_object_sharing(self, obj):
968
        """
969
        :param obj: (str) remote object path
970
        """
971
        self.set_object_sharing(obj)
972

    
973
    def append_object(self, obj, source_file, upload_cb=None):
974
        """
975
        :param obj: (str) remote object path
976

977
        :param source_file: open file descriptor
978

979
        :param upload_db: progress.bar for uploading
980
        """
981

    
982
        self._assert_container()
983
        meta = self.get_container_info()
984
        blocksize = int(meta['x-container-block-size'])
985
        filesize = fstat(source_file.fileno()).st_size
986
        nblocks = 1 + (filesize - 1) // blocksize
987
        offset = 0
988
        if upload_cb:
989
            upload_gen = upload_cb(nblocks)
990
            upload_gen.next()
991
        for i in range(nblocks):
992
            block = source_file.read(min(blocksize, filesize - offset))
993
            offset += len(block)
994
            self.object_post(
995
                obj,
996
                update=True,
997
                content_range='bytes */*',
998
                content_type='application/octet-stream',
999
                content_length=len(block),
1000
                data=block)
1001

    
1002
            if upload_cb:
1003
                upload_gen.next()
1004

    
1005
    def truncate_object(self, obj, upto_bytes):
1006
        """
1007
        :param obj: (str) remote object path
1008

1009
        :param upto_bytes: max number of bytes to leave on file
1010
        """
1011
        self.object_post(
1012
            obj,
1013
            update=True,
1014
            content_range='bytes 0-%s/*' % upto_bytes,
1015
            content_type='application/octet-stream',
1016
            object_bytes=upto_bytes,
1017
            source_object=path4url(self.container, obj))
1018

    
1019
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1020
        """Overwrite a part of an object from local source file
1021

1022
        :param obj: (str) remote object path
1023

1024
        :param start: (int) position in bytes to start overwriting from
1025

1026
        :param end: (int) position in bytes to stop overwriting at
1027

1028
        :param source_file: open file descriptor
1029

1030
        :param upload_db: progress.bar for uploading
1031
        """
1032

    
1033
        r = self.get_object_info(obj)
1034
        rf_size = int(r['content-length'])
1035
        if rf_size < int(start):
1036
            raise ClientError(
1037
                'Range start exceeds file size',
1038
                status=416)
1039
        elif rf_size < int(end):
1040
            raise ClientError(
1041
                'Range end exceeds file size',
1042
                status=416)
1043
        self._assert_container()
1044
        meta = self.get_container_info()
1045
        blocksize = int(meta['x-container-block-size'])
1046
        filesize = fstat(source_file.fileno()).st_size
1047
        datasize = int(end) - int(start) + 1
1048
        nblocks = 1 + (datasize - 1) // blocksize
1049
        offset = 0
1050
        if upload_cb:
1051
            upload_gen = upload_cb(nblocks)
1052
            upload_gen.next()
1053
        for i in range(nblocks):
1054
            read_size = min(blocksize, filesize - offset, datasize - offset)
1055
            block = source_file.read(read_size)
1056
            self.object_post(
1057
                obj,
1058
                update=True,
1059
                content_type='application/octet-stream',
1060
                content_length=len(block),
1061
                content_range='bytes %s-%s/*' % (
1062
                    start + offset,
1063
                    start + offset + len(block) - 1),
1064
                data=block)
1065
            offset += len(block)
1066

    
1067
            if upload_cb:
1068
                upload_gen.next()
1069

    
1070
    def copy_object(
1071
            self, src_container, src_object, dst_container,
1072
            dst_object=None,
1073
            source_version=None,
1074
            source_account=None,
1075
            public=False,
1076
            content_type=None,
1077
            delimiter=None):
1078
        """
1079
        :param src_container: (str) source container
1080

1081
        :param src_object: (str) source object path
1082

1083
        :param dst_container: (str) destination container
1084

1085
        :param dst_object: (str) destination object path
1086

1087
        :param source_version: (str) source object version
1088

1089
        :param source_account: (str) account to copy from
1090

1091
        :param public: (bool)
1092

1093
        :param content_type: (str)
1094

1095
        :param delimiter: (str)
1096
        """
1097
        self._assert_account()
1098
        self.container = dst_container
1099
        src_path = path4url(src_container, src_object)
1100
        self.object_put(
1101
            dst_object or src_object,
1102
            success=201,
1103
            copy_from=src_path,
1104
            content_length=0,
1105
            source_version=source_version,
1106
            source_account=source_account,
1107
            public=public,
1108
            content_type=content_type,
1109
            delimiter=delimiter)
1110

    
1111
    def move_object(
1112
            self, src_container, src_object, dst_container,
1113
            dst_object=False,
1114
            source_account=None,
1115
            source_version=None,
1116
            public=False,
1117
            content_type=None,
1118
            delimiter=None):
1119
        """
1120
        :param src_container: (str) source container
1121

1122
        :param src_object: (str) source object path
1123

1124
        :param dst_container: (str) destination container
1125

1126
        :param dst_object: (str) destination object path
1127

1128
        :param source_account: (str) account to move from
1129

1130
        :param source_version: (str) source object version
1131

1132
        :param public: (bool)
1133

1134
        :param content_type: (str)
1135

1136
        :param delimiter: (str)
1137
        """
1138
        self._assert_account()
1139
        self.container = dst_container
1140
        dst_object = dst_object or src_object
1141
        src_path = path4url(src_container, src_object)
1142
        self.object_put(
1143
            dst_object,
1144
            success=201,
1145
            move_from=src_path,
1146
            content_length=0,
1147
            source_account=source_account,
1148
            source_version=source_version,
1149
            public=public,
1150
            content_type=content_type,
1151
            delimiter=delimiter)
1152

    
1153
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1154
        """Get accounts that share with self.account
1155

1156
        :param limit: (str)
1157

1158
        :param marker: (str)
1159

1160
        :returns: (dict)
1161
        """
1162
        self._assert_account()
1163

    
1164
        self.set_param('format', 'json')
1165
        self.set_param('limit', limit, iff=limit is not None)
1166
        self.set_param('marker', marker, iff=marker is not None)
1167

    
1168
        path = ''
1169
        success = kwargs.pop('success', (200, 204))
1170
        r = self.get(path, *args, success=success, **kwargs)
1171
        return r.json
1172

    
1173
    def get_object_versionlist(self, obj):
1174
        """
1175
        :param obj: (str) remote object path
1176

1177
        :returns: (list)
1178
        """
1179
        self._assert_container()
1180
        r = self.object_get(obj, format='json', version='list')
1181
        return r.json['versions']