Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 49cc29b2

History | View | Annotate | Download (39.4 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def purge_container(self, container=None):
75
        """Delete an empty container and destroy associated blocks
76
        """
77
        cnt_back_up = self.container
78
        try:
79
            self.container = container or cnt_back_up
80
            self.container_delete(until=unicode(time()))
81
        finally:
82
            self.container = cnt_back_up
83

    
84
    def upload_object_unchunked(
85
            self, obj, f,
86
            withHashFile=False,
87
            size=None,
88
            etag=None,
89
            content_encoding=None,
90
            content_disposition=None,
91
            content_type=None,
92
            sharing=None,
93
            public=None):
94
        """
95
        :param obj: (str) remote object path
96

97
        :param f: open file descriptor
98

99
        :param withHashFile: (bool)
100

101
        :param size: (int) size of data to upload
102

103
        :param etag: (str)
104

105
        :param content_encoding: (str)
106

107
        :param content_disposition: (str)
108

109
        :param content_type: (str)
110

111
        :param sharing: {'read':[user and/or grp names],
112
            'write':[usr and/or grp names]}
113

114
        :param public: (bool)
115

116
        :returns: (dict) created object metadata
117
        """
118
        self._assert_container()
119

    
120
        if withHashFile:
121
            data = f.read()
122
            try:
123
                import json
124
                data = json.dumps(json.loads(data))
125
            except ValueError:
126
                raise ClientError('"%s" is not json-formated' % f.name, 1)
127
            except SyntaxError:
128
                msg = '"%s" is not a valid hashmap file' % f.name
129
                raise ClientError(msg, 1)
130
            f = StringIO(data)
131
        else:
132
            data = f.read(size) if size else f.read()
133
        r = self.object_put(
134
            obj,
135
            data=data,
136
            etag=etag,
137
            content_encoding=content_encoding,
138
            content_disposition=content_disposition,
139
            content_type=content_type,
140
            permissions=sharing,
141
            public=public,
142
            success=201)
143
        return r.headers
144

    
145
    def create_object_by_manifestation(
146
            self, obj,
147
            etag=None,
148
            content_encoding=None,
149
            content_disposition=None,
150
            content_type=None,
151
            sharing=None,
152
            public=None):
153
        """
154
        :param obj: (str) remote object path
155

156
        :param etag: (str)
157

158
        :param content_encoding: (str)
159

160
        :param content_disposition: (str)
161

162
        :param content_type: (str)
163

164
        :param sharing: {'read':[user and/or grp names],
165
            'write':[usr and/or grp names]}
166

167
        :param public: (bool)
168

169
        :returns: (dict) created object metadata
170
        """
171
        self._assert_container()
172
        r = self.object_put(
173
            obj,
174
            content_length=0,
175
            etag=etag,
176
            content_encoding=content_encoding,
177
            content_disposition=content_disposition,
178
            content_type=content_type,
179
            permissions=sharing,
180
            public=public,
181
            manifest='%s/%s' % (self.container, obj))
182
        return r.headers
183

    
184
    # upload_* auxiliary methods
185
    def _put_block_async(self, data, hash, upload_gen=None):
186
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
187
        event.start()
188
        return event
189

    
190
    def _put_block(self, data, hash):
191
        r = self.container_post(
192
            update=True,
193
            content_type='application/octet-stream',
194
            content_length=len(data),
195
            data=data,
196
            format='json')
197
        assert r.json[0] == hash, 'Local hash does not match server'
198

    
199
    def _get_file_block_info(self, fileobj, size=None, cache=None):
200
        """
201
        :param fileobj: (file descriptor) source
202

203
        :param size: (int) size of data to upload from source
204

205
        :param cache: (dict) if provided, cache container info response to
206
        avoid redundant calls
207
        """
208
        if isinstance(cache, dict):
209
            try:
210
                meta = cache[self.container]
211
            except KeyError:
212
                meta = self.get_container_info()
213
                cache[self.container] = meta
214
        else:
215
            meta = self.get_container_info()
216
        blocksize = int(meta['x-container-block-size'])
217
        blockhash = meta['x-container-block-hash']
218
        size = size if size is not None else fstat(fileobj.fileno()).st_size
219
        nblocks = 1 + (size - 1) // blocksize
220
        return (blocksize, blockhash, size, nblocks)
221

    
222
    def _create_or_get_missing_hashes(
223
            self, obj, json,
224
            size=None,
225
            format='json',
226
            hashmap=True,
227
            content_type=None,
228
            if_etag_match=None,
229
            if_etag_not_match=None,
230
            content_encoding=None,
231
            content_disposition=None,
232
            permissions=None,
233
            public=None,
234
            success=(201, 409)):
235
        r = self.object_put(
236
            obj,
237
            format='json',
238
            hashmap=True,
239
            content_type=content_type,
240
            json=json,
241
            if_etag_match=if_etag_match,
242
            if_etag_not_match=if_etag_not_match,
243
            content_encoding=content_encoding,
244
            content_disposition=content_disposition,
245
            permissions=permissions,
246
            public=public,
247
            success=success)
248
        return (None if r.status_code == 201 else r.json), r.headers
249

    
250
    def _calculate_blocks_for_upload(
251
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
252
            hash_cb=None):
253
        offset = 0
254
        if hash_cb:
255
            hash_gen = hash_cb(nblocks)
256
            hash_gen.next()
257

    
258
        for i in range(nblocks):
259
            block = fileobj.read(min(blocksize, size - offset))
260
            bytes = len(block)
261
            hash = _pithos_hash(block, blockhash)
262
            hashes.append(hash)
263
            hmap[hash] = (offset, bytes)
264
            offset += bytes
265
            if hash_cb:
266
                hash_gen.next()
267
        msg = 'Failed to calculate uploaded blocks:'
268
        ' Offset and object size do not match'
269
        assert offset == size, msg
270

    
271
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
272
        """upload missing blocks asynchronously"""
273

    
274
        self._init_thread_limit()
275

    
276
        flying = []
277
        failures = []
278
        for hash in missing:
279
            offset, bytes = hmap[hash]
280
            fileobj.seek(offset)
281
            data = fileobj.read(bytes)
282
            r = self._put_block_async(data, hash, upload_gen)
283
            flying.append(r)
284
            unfinished = self._watch_thread_limit(flying)
285
            for thread in set(flying).difference(unfinished):
286
                if thread.exception:
287
                    failures.append(thread)
288
                    if isinstance(
289
                            thread.exception,
290
                            ClientError) and thread.exception.status == 502:
291
                        self.POOLSIZE = self._thread_limit
292
                elif thread.isAlive():
293
                    flying.append(thread)
294
                elif upload_gen:
295
                    try:
296
                        upload_gen.next()
297
                    except:
298
                        pass
299
            flying = unfinished
300

    
301
        for thread in flying:
302
            thread.join()
303
            if thread.exception:
304
                failures.append(thread)
305
            elif upload_gen:
306
                try:
307
                    upload_gen.next()
308
                except:
309
                    pass
310

    
311
        return [failure.kwargs['hash'] for failure in failures]
312

    
313
    def upload_object(
314
            self, obj, f,
315
            size=None,
316
            hash_cb=None,
317
            upload_cb=None,
318
            etag=None,
319
            if_etag_match=None,
320
            if_not_exist=None,
321
            content_encoding=None,
322
            content_disposition=None,
323
            content_type=None,
324
            sharing=None,
325
            public=None,
326
            container_info_cache=None):
327
        """Upload an object using multiple connections (threads)
328

329
        :param obj: (str) remote object path
330

331
        :param f: open file descriptor (rb)
332

333
        :param hash_cb: optional progress.bar object for calculating hashes
334

335
        :param upload_cb: optional progress.bar object for uploading
336

337
        :param etag: (str)
338

339
        :param if_etag_match: (str) Push that value to if-match header at file
340
            creation
341

342
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
343
            it does not exist remotely, otherwise the operation will fail.
344
            Involves the case of an object with the same path is created while
345
            the object is being uploaded.
346

347
        :param content_encoding: (str)
348

349
        :param content_disposition: (str)
350

351
        :param content_type: (str)
352

353
        :param sharing: {'read':[user and/or grp names],
354
            'write':[usr and/or grp names]}
355

356
        :param public: (bool)
357

358
        :param container_info_cache: (dict) if given, avoid redundant calls to
359
        server for container info (block size and hash information)
360
        """
361
        self._assert_container()
362

    
363
        #init
364
        block_info = (
365
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
366
                f, size, container_info_cache)
367
        (hashes, hmap, offset) = ([], {}, 0)
368
        if not content_type:
369
            content_type = 'application/octet-stream'
370

    
371
        self._calculate_blocks_for_upload(
372
            *block_info,
373
            hashes=hashes,
374
            hmap=hmap,
375
            fileobj=f,
376
            hash_cb=hash_cb)
377

    
378
        hashmap = dict(bytes=size, hashes=hashes)
379
        missing, obj_headers = self._create_or_get_missing_hashes(
380
            obj, hashmap,
381
            content_type=content_type,
382
            size=size,
383
            if_etag_match=if_etag_match,
384
            if_etag_not_match='*' if if_not_exist else None,
385
            content_encoding=content_encoding,
386
            content_disposition=content_disposition,
387
            permissions=sharing,
388
            public=public)
389

    
390
        if missing is None:
391
            return obj_headers
392

    
393
        if upload_cb:
394
            upload_gen = upload_cb(len(missing))
395
            for i in range(len(missing), len(hashmap['hashes']) + 1):
396
                try:
397
                    upload_gen.next()
398
                except:
399
                    upload_gen = None
400
        else:
401
            upload_gen = None
402

    
403
        retries = 7
404
        try:
405
            while retries:
406
                sendlog.info('%s blocks missing' % len(missing))
407
                num_of_blocks = len(missing)
408
                missing = self._upload_missing_blocks(
409
                    missing,
410
                    hmap,
411
                    f,
412
                    upload_gen)
413
                if missing:
414
                    if num_of_blocks == len(missing):
415
                        retries -= 1
416
                    else:
417
                        num_of_blocks = len(missing)
418
                else:
419
                    break
420
            if missing:
421
                raise ClientError(
422
                    '%s blocks failed to upload' % len(missing),
423
                    status=800)
424
        except KeyboardInterrupt:
425
            sendlog.info('- - - wait for threads to finish')
426
            for thread in activethreads():
427
                thread.join()
428
            raise
429

    
430
        r = self.object_put(
431
            obj,
432
            format='json',
433
            hashmap=True,
434
            content_type=content_type,
435
            if_etag_match=if_etag_match,
436
            if_etag_not_match='*' if if_not_exist else None,
437
            etag=etag,
438
            json=hashmap,
439
            permissions=sharing,
440
            public=public,
441
            success=201)
442
        return r.headers
443

    
444
    # download_* auxiliary methods
445
    def _get_remote_blocks_info(self, obj, **restargs):
446
        #retrieve object hashmap
447
        myrange = restargs.pop('data_range', None)
448
        hashmap = self.get_object_hashmap(obj, **restargs)
449
        restargs['data_range'] = myrange
450
        blocksize = int(hashmap['block_size'])
451
        blockhash = hashmap['block_hash']
452
        total_size = hashmap['bytes']
453
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
454
        map_dict = {}
455
        for i, h in enumerate(hashmap['hashes']):
456
            #  map_dict[h] = i   CHAGE
457
            if h in map_dict:
458
                map_dict[h].append(i)
459
            else:
460
                map_dict[h] = [i]
461
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
462

    
463
    def _dump_blocks_sync(
464
            self, obj, remote_hashes, blocksize, total_size, dst, range,
465
            **args):
466
        for blockid, blockhash in enumerate(remote_hashes):
467
            if blockhash:
468
                start = blocksize * blockid
469
                is_last = start + blocksize > total_size
470
                end = (total_size - 1) if is_last else (start + blocksize - 1)
471
                (start, end) = _range_up(start, end, range)
472
                args['data_range'] = 'bytes=%s-%s' % (start, end)
473
                r = self.object_get(obj, success=(200, 206), **args)
474
                self._cb_next()
475
                dst.write(r.content)
476
                dst.flush()
477

    
478
    def _get_block_async(self, obj, **args):
479
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
480
        event.start()
481
        return event
482

    
483
    def _hash_from_file(self, fp, start, size, blockhash):
484
        fp.seek(start)
485
        block = fp.read(size)
486
        h = newhashlib(blockhash)
487
        h.update(block.strip('\x00'))
488
        return hexlify(h.digest())
489

    
490
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
491
        """write the results of a greenleted rest call to a file
492

493
        :param offset: the offset of the file up to blocksize
494
        - e.g. if the range is 10-100, all blocks will be written to
495
        normal_position - 10
496
        """
497
        for i, (key, g) in enumerate(flying.items()):
498
            if g.isAlive():
499
                continue
500
            if g.exception:
501
                raise g.exception
502
            block = g.value.content
503
            for block_start in blockids[key]:
504
                local_file.seek(block_start + offset)
505
                local_file.write(block)
506
                self._cb_next()
507
            flying.pop(key)
508
            blockids.pop(key)
509
        local_file.flush()
510

    
511
    def _dump_blocks_async(
512
            self, obj, remote_hashes, blocksize, total_size, local_file,
513
            blockhash=None, resume=False, filerange=None, **restargs):
514
        file_size = fstat(local_file.fileno()).st_size if resume else 0
515
        flying = dict()
516
        blockid_dict = dict()
517
        offset = 0
518
        if filerange is not None:
519
            rstart = int(filerange.split('-')[0])
520
            offset = rstart if blocksize > rstart else rstart % blocksize
521

    
522
        self._init_thread_limit()
523
        for block_hash, blockids in remote_hashes.items():
524
            blockids = [blk * blocksize for blk in blockids]
525
            unsaved = [blk for blk in blockids if not (
526
                blk < file_size and block_hash == self._hash_from_file(
527
                        local_file, blk, blocksize, blockhash))]
528
            self._cb_next(len(blockids) - len(unsaved))
529
            if unsaved:
530
                key = unsaved[0]
531
                self._watch_thread_limit(flying.values())
532
                self._thread2file(
533
                    flying, blockid_dict, local_file, offset,
534
                    **restargs)
535
                end = total_size - 1 if key + blocksize > total_size\
536
                    else key + blocksize - 1
537
                start, end = _range_up(key, end, filerange)
538
                if start == end:
539
                    self._cb_next()
540
                    continue
541
                restargs['async_headers'] = {
542
                    'Range': 'bytes=%s-%s' % (start, end)}
543
                flying[key] = self._get_block_async(obj, **restargs)
544
                blockid_dict[key] = unsaved
545

    
546
        for thread in flying.values():
547
            thread.join()
548
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
549

    
550
    def download_object(
551
            self, obj, dst,
552
            download_cb=None,
553
            version=None,
554
            resume=False,
555
            range_str=None,
556
            if_match=None,
557
            if_none_match=None,
558
            if_modified_since=None,
559
            if_unmodified_since=None):
560
        """Download an object (multiple connections, random blocks)
561

562
        :param obj: (str) remote object path
563

564
        :param dst: open file descriptor (wb+)
565

566
        :param download_cb: optional progress.bar object for downloading
567

568
        :param version: (str) file version
569

570
        :param resume: (bool) if set, preserve already downloaded file parts
571

572
        :param range_str: (str) from, to are file positions (int) in bytes
573

574
        :param if_match: (str)
575

576
        :param if_none_match: (str)
577

578
        :param if_modified_since: (str) formated date
579

580
        :param if_unmodified_since: (str) formated date"""
581
        restargs = dict(
582
            version=version,
583
            data_range=None if range_str is None else 'bytes=%s' % range_str,
584
            if_match=if_match,
585
            if_none_match=if_none_match,
586
            if_modified_since=if_modified_since,
587
            if_unmodified_since=if_unmodified_since)
588

    
589
        (
590
            blocksize,
591
            blockhash,
592
            total_size,
593
            hash_list,
594
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
595
        assert total_size >= 0
596

    
597
        if download_cb:
598
            self.progress_bar_gen = download_cb(len(hash_list))
599
            self._cb_next()
600

    
601
        if dst.isatty():
602
            self._dump_blocks_sync(
603
                obj,
604
                hash_list,
605
                blocksize,
606
                total_size,
607
                dst,
608
                range_str,
609
                **restargs)
610
        else:
611
            self._dump_blocks_async(
612
                obj,
613
                remote_hashes,
614
                blocksize,
615
                total_size,
616
                dst,
617
                blockhash,
618
                resume,
619
                range_str,
620
                **restargs)
621
            if not range_str:
622
                dst.truncate(total_size)
623

    
624
        self._complete_cb()
625

    
626
    def download_to_string(
627
            self, obj,
628
            download_cb=None,
629
            version=None,
630
            range_str=None,
631
            if_match=None,
632
            if_none_match=None,
633
            if_modified_since=None,
634
            if_unmodified_since=None):
635
        """Download an object to a string (multiple connections)
636

637
        :param obj: (str) remote object path
638

639
        :param download_cb: optional progress.bar object for downloading
640

641
        :param version: (str) file version
642

643
        :param range_str: (str) from, to are file positions (int) in bytes
644

645
        :param if_match: (str)
646

647
        :param if_none_match: (str)
648

649
        :param if_modified_since: (str) formated date
650

651
        :param if_unmodified_since: (str) formated date
652

653
        :returns: (str) the whole object contents
654
        """
655
        restargs = dict(
656
            version=version,
657
            data_range=None if range_str is None else 'bytes=%s' % range_str,
658
            if_match=if_match,
659
            if_none_match=if_none_match,
660
            if_modified_since=if_modified_since,
661
            if_unmodified_since=if_unmodified_since)
662

    
663
        (
664
            blocksize,
665
            blockhash,
666
            total_size,
667
            hash_list,
668
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
669
        assert total_size >= 0
670

    
671
        if download_cb:
672
            self.progress_bar_gen = download_cb(len(hash_list))
673
            self._cb_next()
674

    
675
        ret = ''
676
        for blockid, blockhash in enumerate(remote_hashes):
677
            start = blocksize * blockid
678
            is_last = start + blocksize > total_size
679
            end = (total_size - 1) if is_last else (start + blocksize - 1)
680
            (start, end) = _range_up(start, end, range_str)
681
            if start == end:
682
                continue
683
            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
684
            r = self.object_get(obj, success=(200, 206), **restargs)
685
            ret += r.content
686
            self._cb_next()
687

    
688
        self._complete_cb()
689
        return ret
690

    
691
    #Command Progress Bar method
692
    def _cb_next(self, step=1):
693
        if hasattr(self, 'progress_bar_gen'):
694
            try:
695
                for i in xrange(step):
696
                    self.progress_bar_gen.next()
697
            except:
698
                pass
699

    
700
    def _complete_cb(self):
701
        while True:
702
            try:
703
                self.progress_bar_gen.next()
704
            except:
705
                break
706

    
707
    def get_object_hashmap(
708
            self, obj,
709
            version=None,
710
            if_match=None,
711
            if_none_match=None,
712
            if_modified_since=None,
713
            if_unmodified_since=None,
714
            data_range=None):
715
        """
716
        :param obj: (str) remote object path
717

718
        :param if_match: (str)
719

720
        :param if_none_match: (str)
721

722
        :param if_modified_since: (str) formated date
723

724
        :param if_unmodified_since: (str) formated date
725

726
        :param data_range: (str) from-to where from and to are integers
727
            denoting file positions in bytes
728

729
        :returns: (list)
730
        """
731
        try:
732
            r = self.object_get(
733
                obj,
734
                hashmap=True,
735
                version=version,
736
                if_etag_match=if_match,
737
                if_etag_not_match=if_none_match,
738
                if_modified_since=if_modified_since,
739
                if_unmodified_since=if_unmodified_since,
740
                data_range=data_range)
741
        except ClientError as err:
742
            if err.status == 304 or err.status == 412:
743
                return {}
744
            raise
745
        return r.json
746

    
747
    def set_account_group(self, group, usernames):
748
        """
749
        :param group: (str)
750

751
        :param usernames: (list)
752
        """
753
        self.account_post(update=True, groups={group: usernames})
754

    
755
    def del_account_group(self, group):
756
        """
757
        :param group: (str)
758
        """
759
        self.account_post(update=True, groups={group: []})
760

    
761
    def get_account_info(self, until=None):
762
        """
763
        :param until: (str) formated date
764

765
        :returns: (dict)
766
        """
767
        r = self.account_head(until=until)
768
        if r.status_code == 401:
769
            raise ClientError("No authorization", status=401)
770
        return r.headers
771

    
772
    def get_account_quota(self):
773
        """
774
        :returns: (dict)
775
        """
776
        return filter_in(
777
            self.get_account_info(),
778
            'X-Account-Policy-Quota',
779
            exactMatch=True)
780

    
781
    def get_account_versioning(self):
782
        """
783
        :returns: (dict)
784
        """
785
        return filter_in(
786
            self.get_account_info(),
787
            'X-Account-Policy-Versioning',
788
            exactMatch=True)
789

    
790
    def get_account_meta(self, until=None):
791
        """
792
        :meta until: (str) formated date
793

794
        :returns: (dict)
795
        """
796
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
797

    
798
    def get_account_group(self):
799
        """
800
        :returns: (dict)
801
        """
802
        return filter_in(self.get_account_info(), 'X-Account-Group-')
803

    
804
    def set_account_meta(self, metapairs):
805
        """
806
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
807
        """
808
        assert(type(metapairs) is dict)
809
        self.account_post(update=True, metadata=metapairs)
810

    
811
    def del_account_meta(self, metakey):
812
        """
813
        :param metakey: (str) metadatum key
814
        """
815
        self.account_post(update=True, metadata={metakey: ''})
816

    
817
    """
818
    def set_account_quota(self, quota):
819
        ""
820
        :param quota: (int)
821
        ""
822
        self.account_post(update=True, quota=quota)
823
    """
824

    
825
    def set_account_versioning(self, versioning):
826
        """
827
        "param versioning: (str)
828
        """
829
        self.account_post(update=True, versioning=versioning)
830

    
831
    def list_containers(self):
832
        """
833
        :returns: (dict)
834
        """
835
        r = self.account_get()
836
        return r.json
837

    
838
    def del_container(self, until=None, delimiter=None):
839
        """
840
        :param until: (str) formated date
841

842
        :param delimiter: (str) with / empty container
843

844
        :raises ClientError: 404 Container does not exist
845

846
        :raises ClientError: 409 Container is not empty
847
        """
848
        self._assert_container()
849
        r = self.container_delete(
850
            until=until,
851
            delimiter=delimiter,
852
            success=(204, 404, 409))
853
        if r.status_code == 404:
854
            raise ClientError(
855
                'Container "%s" does not exist' % self.container,
856
                r.status_code)
857
        elif r.status_code == 409:
858
            raise ClientError(
859
                'Container "%s" is not empty' % self.container,
860
                r.status_code)
861

    
862
    def get_container_versioning(self, container=None):
863
        """
864
        :param container: (str)
865

866
        :returns: (dict)
867
        """
868
        cnt_back_up = self.container
869
        try:
870
            self.container = container or cnt_back_up
871
            return filter_in(
872
                self.get_container_info(),
873
                'X-Container-Policy-Versioning')
874
        finally:
875
            self.container = cnt_back_up
876

    
877
    def get_container_limit(self, container=None):
878
        """
879
        :param container: (str)
880

881
        :returns: (dict)
882
        """
883
        cnt_back_up = self.container
884
        try:
885
            self.container = container or cnt_back_up
886
            return filter_in(
887
                self.get_container_info(),
888
                'X-Container-Policy-Quota')
889
        finally:
890
            self.container = cnt_back_up
891

    
892
    def get_container_info(self, until=None):
893
        """
894
        :param until: (str) formated date
895

896
        :returns: (dict)
897

898
        :raises ClientError: 404 Container not found
899
        """
900
        try:
901
            r = self.container_head(until=until)
902
        except ClientError as err:
903
            err.details.append('for container %s' % self.container)
904
            raise err
905
        return r.headers
906

    
907
    def get_container_meta(self, until=None):
908
        """
909
        :param until: (str) formated date
910

911
        :returns: (dict)
912
        """
913
        return filter_in(
914
            self.get_container_info(until=until),
915
            'X-Container-Meta')
916

    
917
    def get_container_object_meta(self, until=None):
918
        """
919
        :param until: (str) formated date
920

921
        :returns: (dict)
922
        """
923
        return filter_in(
924
            self.get_container_info(until=until),
925
            'X-Container-Object-Meta')
926

    
927
    def set_container_meta(self, metapairs):
928
        """
929
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
930
        """
931
        assert(type(metapairs) is dict)
932
        self.container_post(update=True, metadata=metapairs)
933

    
934
    def del_container_meta(self, metakey):
935
        """
936
        :param metakey: (str) metadatum key
937
        """
938
        self.container_post(update=True, metadata={metakey: ''})
939

    
940
    def set_container_limit(self, limit):
941
        """
942
        :param limit: (int)
943
        """
944
        self.container_post(update=True, quota=limit)
945

    
946
    def set_container_versioning(self, versioning):
947
        """
948
        :param versioning: (str)
949
        """
950
        self.container_post(update=True, versioning=versioning)
951

    
952
    def del_object(self, obj, until=None, delimiter=None):
953
        """
954
        :param obj: (str) remote object path
955

956
        :param until: (str) formated date
957

958
        :param delimiter: (str)
959
        """
960
        self._assert_container()
961
        self.object_delete(obj, until=until, delimiter=delimiter)
962

    
963
    def set_object_meta(self, obj, metapairs):
964
        """
965
        :param obj: (str) remote object path
966

967
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
968
        """
969
        assert(type(metapairs) is dict)
970
        self.object_post(obj, update=True, metadata=metapairs)
971

    
972
    def del_object_meta(self, obj, metakey):
973
        """
974
        :param obj: (str) remote object path
975

976
        :param metakey: (str) metadatum key
977
        """
978
        self.object_post(obj, update=True, metadata={metakey: ''})
979

    
980
    def publish_object(self, obj):
981
        """
982
        :param obj: (str) remote object path
983

984
        :returns: (str) access url
985
        """
986
        self.object_post(obj, update=True, public=True)
987
        info = self.get_object_info(obj)
988
        pref, sep, rest = self.base_url.partition('//')
989
        base = rest.split('/')[0]
990
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
991

    
992
    def unpublish_object(self, obj):
993
        """
994
        :param obj: (str) remote object path
995
        """
996
        self.object_post(obj, update=True, public=False)
997

    
998
    def get_object_info(self, obj, version=None):
999
        """
1000
        :param obj: (str) remote object path
1001

1002
        :param version: (str)
1003

1004
        :returns: (dict)
1005
        """
1006
        try:
1007
            r = self.object_head(obj, version=version)
1008
            return r.headers
1009
        except ClientError as ce:
1010
            if ce.status == 404:
1011
                raise ClientError('Object %s not found' % obj, status=404)
1012
            raise
1013

    
1014
    def get_object_meta(self, obj, version=None):
1015
        """
1016
        :param obj: (str) remote object path
1017

1018
        :param version: (str)
1019

1020
        :returns: (dict)
1021
        """
1022
        return filter_in(
1023
            self.get_object_info(obj, version=version),
1024
            'X-Object-Meta')
1025

    
1026
    def get_object_sharing(self, obj):
1027
        """
1028
        :param obj: (str) remote object path
1029

1030
        :returns: (dict)
1031
        """
1032
        r = filter_in(
1033
            self.get_object_info(obj),
1034
            'X-Object-Sharing',
1035
            exactMatch=True)
1036
        reply = {}
1037
        if len(r) > 0:
1038
            perms = r['x-object-sharing'].split(';')
1039
            for perm in perms:
1040
                try:
1041
                    perm.index('=')
1042
                except ValueError:
1043
                    raise ClientError('Incorrect reply format')
1044
                (key, val) = perm.strip().split('=')
1045
                reply[key] = val
1046
        return reply
1047

    
1048
    def set_object_sharing(
1049
            self, obj,
1050
            read_permition=False, write_permition=False):
1051
        """Give read/write permisions to an object.
1052

1053
        :param obj: (str) remote object path
1054

1055
        :param read_permition: (list - bool) users and user groups that get
1056
            read permition for this object - False means all previous read
1057
            permissions will be removed
1058

1059
        :param write_perimition: (list - bool) of users and user groups to get
1060
           write permition for this object - False means all previous write
1061
           permissions will be removed
1062
        """
1063

    
1064
        perms = dict(read=read_permition or '', write=write_permition or '')
1065
        self.object_post(obj, update=True, permissions=perms)
1066

    
1067
    def del_object_sharing(self, obj):
1068
        """
1069
        :param obj: (str) remote object path
1070
        """
1071
        self.set_object_sharing(obj)
1072

    
1073
    def append_object(self, obj, source_file, upload_cb=None):
1074
        """
1075
        :param obj: (str) remote object path
1076

1077
        :param source_file: open file descriptor
1078

1079
        :param upload_db: progress.bar for uploading
1080
        """
1081

    
1082
        self._assert_container()
1083
        meta = self.get_container_info()
1084
        blocksize = int(meta['x-container-block-size'])
1085
        filesize = fstat(source_file.fileno()).st_size
1086
        nblocks = 1 + (filesize - 1) // blocksize
1087
        offset = 0
1088
        if upload_cb:
1089
            upload_gen = upload_cb(nblocks)
1090
            upload_gen.next()
1091
        for i in range(nblocks):
1092
            block = source_file.read(min(blocksize, filesize - offset))
1093
            offset += len(block)
1094
            self.object_post(
1095
                obj,
1096
                update=True,
1097
                content_range='bytes */*',
1098
                content_type='application/octet-stream',
1099
                content_length=len(block),
1100
                data=block)
1101

    
1102
            if upload_cb:
1103
                upload_gen.next()
1104

    
1105
    def truncate_object(self, obj, upto_bytes):
1106
        """
1107
        :param obj: (str) remote object path
1108

1109
        :param upto_bytes: max number of bytes to leave on file
1110
        """
1111
        self.object_post(
1112
            obj,
1113
            update=True,
1114
            content_range='bytes 0-%s/*' % upto_bytes,
1115
            content_type='application/octet-stream',
1116
            object_bytes=upto_bytes,
1117
            source_object=path4url(self.container, obj))
1118

    
1119
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1120
        """Overwrite a part of an object from local source file
1121

1122
        :param obj: (str) remote object path
1123

1124
        :param start: (int) position in bytes to start overwriting from
1125

1126
        :param end: (int) position in bytes to stop overwriting at
1127

1128
        :param source_file: open file descriptor
1129

1130
        :param upload_db: progress.bar for uploading
1131
        """
1132

    
1133
        r = self.get_object_info(obj)
1134
        rf_size = int(r['content-length'])
1135
        if rf_size < int(start):
1136
            raise ClientError(
1137
                'Range start exceeds file size',
1138
                status=416)
1139
        elif rf_size < int(end):
1140
            raise ClientError(
1141
                'Range end exceeds file size',
1142
                status=416)
1143
        self._assert_container()
1144
        meta = self.get_container_info()
1145
        blocksize = int(meta['x-container-block-size'])
1146
        filesize = fstat(source_file.fileno()).st_size
1147
        datasize = int(end) - int(start) + 1
1148
        nblocks = 1 + (datasize - 1) // blocksize
1149
        offset = 0
1150
        if upload_cb:
1151
            upload_gen = upload_cb(nblocks)
1152
            upload_gen.next()
1153
        for i in range(nblocks):
1154
            read_size = min(blocksize, filesize - offset, datasize - offset)
1155
            block = source_file.read(read_size)
1156
            self.object_post(
1157
                obj,
1158
                update=True,
1159
                content_type='application/octet-stream',
1160
                content_length=len(block),
1161
                content_range='bytes %s-%s/*' % (
1162
                    start + offset,
1163
                    start + offset + len(block) - 1),
1164
                data=block)
1165
            offset += len(block)
1166

    
1167
            if upload_cb:
1168
                upload_gen.next()
1169

    
1170
    def copy_object(
1171
            self, src_container, src_object, dst_container,
1172
            dst_object=None,
1173
            source_version=None,
1174
            source_account=None,
1175
            public=False,
1176
            content_type=None,
1177
            delimiter=None):
1178
        """
1179
        :param src_container: (str) source container
1180

1181
        :param src_object: (str) source object path
1182

1183
        :param dst_container: (str) destination container
1184

1185
        :param dst_object: (str) destination object path
1186

1187
        :param source_version: (str) source object version
1188

1189
        :param source_account: (str) account to copy from
1190

1191
        :param public: (bool)
1192

1193
        :param content_type: (str)
1194

1195
        :param delimiter: (str)
1196
        """
1197
        self._assert_account()
1198
        self.container = dst_container
1199
        src_path = path4url(src_container, src_object)
1200
        self.object_put(
1201
            dst_object or src_object,
1202
            success=201,
1203
            copy_from=src_path,
1204
            content_length=0,
1205
            source_version=source_version,
1206
            source_account=source_account,
1207
            public=public,
1208
            content_type=content_type,
1209
            delimiter=delimiter)
1210

    
1211
    def move_object(
1212
            self, src_container, src_object, dst_container,
1213
            dst_object=False,
1214
            source_account=None,
1215
            source_version=None,
1216
            public=False,
1217
            content_type=None,
1218
            delimiter=None):
1219
        """
1220
        :param src_container: (str) source container
1221

1222
        :param src_object: (str) source object path
1223

1224
        :param dst_container: (str) destination container
1225

1226
        :param dst_object: (str) destination object path
1227

1228
        :param source_account: (str) account to move from
1229

1230
        :param source_version: (str) source object version
1231

1232
        :param public: (bool)
1233

1234
        :param content_type: (str)
1235

1236
        :param delimiter: (str)
1237
        """
1238
        self._assert_account()
1239
        self.container = dst_container
1240
        dst_object = dst_object or src_object
1241
        src_path = path4url(src_container, src_object)
1242
        self.object_put(
1243
            dst_object,
1244
            success=201,
1245
            move_from=src_path,
1246
            content_length=0,
1247
            source_account=source_account,
1248
            source_version=source_version,
1249
            public=public,
1250
            content_type=content_type,
1251
            delimiter=delimiter)
1252

    
1253
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1254
        """Get accounts that share with self.account
1255

1256
        :param limit: (str)
1257

1258
        :param marker: (str)
1259

1260
        :returns: (dict)
1261
        """
1262
        self._assert_account()
1263

    
1264
        self.set_param('format', 'json')
1265
        self.set_param('limit', limit, iff=limit is not None)
1266
        self.set_param('marker', marker, iff=marker is not None)
1267

    
1268
        path = ''
1269
        success = kwargs.pop('success', (200, 204))
1270
        r = self.get(path, *args, success=success, **kwargs)
1271
        return r.json
1272

    
1273
    def get_object_versionlist(self, obj):
1274
        """
1275
        :param obj: (str) remote object path
1276

1277
        :returns: (list)
1278
        """
1279
        self._assert_container()
1280
        r = self.object_get(obj, format='json', version='list')
1281
        return r.json['versions']