Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 0fbc8a52

History | View | Annotate | Download (39.9 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def purge_container(self, container=None):
75
        """Delete an empty container and destroy associated blocks
76
        """
77
        cnt_back_up = self.container
78
        try:
79
            self.container = container or cnt_back_up
80
            self.container_delete(until=unicode(time()))
81
        finally:
82
            self.container = cnt_back_up
83

    
84
    def upload_object_unchunked(
85
            self, obj, f,
86
            withHashFile=False,
87
            size=None,
88
            etag=None,
89
            content_encoding=None,
90
            content_disposition=None,
91
            content_type=None,
92
            sharing=None,
93
            public=None):
94
        """
95
        :param obj: (str) remote object path
96

97
        :param f: open file descriptor
98

99
        :param withHashFile: (bool)
100

101
        :param size: (int) size of data to upload
102

103
        :param etag: (str)
104

105
        :param content_encoding: (str)
106

107
        :param content_disposition: (str)
108

109
        :param content_type: (str)
110

111
        :param sharing: {'read':[user and/or grp names],
112
            'write':[usr and/or grp names]}
113

114
        :param public: (bool)
115

116
        :returns: (dict) created object metadata
117
        """
118
        self._assert_container()
119

    
120
        if withHashFile:
121
            data = f.read()
122
            try:
123
                import json
124
                data = json.dumps(json.loads(data))
125
            except ValueError:
126
                raise ClientError('"%s" is not json-formated' % f.name, 1)
127
            except SyntaxError:
128
                msg = '"%s" is not a valid hashmap file' % f.name
129
                raise ClientError(msg, 1)
130
            f = StringIO(data)
131
        else:
132
            data = f.read(size) if size else f.read()
133
        r = self.object_put(
134
            obj,
135
            data=data,
136
            etag=etag,
137
            content_encoding=content_encoding,
138
            content_disposition=content_disposition,
139
            content_type=content_type,
140
            permissions=sharing,
141
            public=public,
142
            success=201)
143
        return r.headers
144

    
145
    def create_object_by_manifestation(
146
            self, obj,
147
            etag=None,
148
            content_encoding=None,
149
            content_disposition=None,
150
            content_type=None,
151
            sharing=None,
152
            public=None):
153
        """
154
        :param obj: (str) remote object path
155

156
        :param etag: (str)
157

158
        :param content_encoding: (str)
159

160
        :param content_disposition: (str)
161

162
        :param content_type: (str)
163

164
        :param sharing: {'read':[user and/or grp names],
165
            'write':[usr and/or grp names]}
166

167
        :param public: (bool)
168

169
        :returns: (dict) created object metadata
170
        """
171
        self._assert_container()
172
        r = self.object_put(
173
            obj,
174
            content_length=0,
175
            etag=etag,
176
            content_encoding=content_encoding,
177
            content_disposition=content_disposition,
178
            content_type=content_type,
179
            permissions=sharing,
180
            public=public,
181
            manifest='%s/%s' % (self.container, obj))
182
        return r.headers
183

    
184
    # upload_* auxiliary methods
185
    def _put_block_async(self, data, hash, upload_gen=None):
186
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
187
        event.start()
188
        return event
189

    
190
    def _put_block(self, data, hash):
191
        r = self.container_post(
192
            update=True,
193
            content_type='application/octet-stream',
194
            content_length=len(data),
195
            data=data,
196
            format='json')
197
        assert r.json[0] == hash, 'Local hash does not match server'
198

    
199
    def _get_file_block_info(self, fileobj, size=None, cache=None):
200
        """
201
        :param fileobj: (file descriptor) source
202

203
        :param size: (int) size of data to upload from source
204

205
        :param cache: (dict) if provided, cache container info response to
206
        avoid redundant calls
207
        """
208
        if isinstance(cache, dict):
209
            try:
210
                meta = cache[self.container]
211
            except KeyError:
212
                meta = self.get_container_info()
213
                cache[self.container] = meta
214
        else:
215
            meta = self.get_container_info()
216
        blocksize = int(meta['x-container-block-size'])
217
        blockhash = meta['x-container-block-hash']
218
        size = size if size is not None else fstat(fileobj.fileno()).st_size
219
        nblocks = 1 + (size - 1) // blocksize
220
        return (blocksize, blockhash, size, nblocks)
221

    
222
    def _create_or_get_missing_hashes(
223
            self, obj, json,
224
            size=None,
225
            format='json',
226
            hashmap=True,
227
            content_type=None,
228
            if_etag_match=None,
229
            if_etag_not_match=None,
230
            content_encoding=None,
231
            content_disposition=None,
232
            permissions=None,
233
            public=None,
234
            success=(201, 409)):
235
        r = self.object_put(
236
            obj,
237
            format='json',
238
            hashmap=True,
239
            content_type=content_type,
240
            json=json,
241
            if_etag_match=if_etag_match,
242
            if_etag_not_match=if_etag_not_match,
243
            content_encoding=content_encoding,
244
            content_disposition=content_disposition,
245
            permissions=permissions,
246
            public=public,
247
            success=success)
248
        return (None if r.status_code == 201 else r.json), r.headers
249

    
250
    def _calculate_blocks_for_upload(
251
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
252
            hash_cb=None):
253
        offset = 0
254
        if hash_cb:
255
            hash_gen = hash_cb(nblocks)
256
            hash_gen.next()
257

    
258
        for i in range(nblocks):
259
            block = fileobj.read(min(blocksize, size - offset))
260
            bytes = len(block)
261
            hash = _pithos_hash(block, blockhash)
262
            hashes.append(hash)
263
            hmap[hash] = (offset, bytes)
264
            offset += bytes
265
            if hash_cb:
266
                hash_gen.next()
267
        msg = 'Failed to calculate uploaded blocks:'
268
        ' Offset and object size do not match'
269
        assert offset == size, msg
270

    
271
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
272
        """upload missing blocks asynchronously"""
273

    
274
        self._init_thread_limit()
275

    
276
        flying = []
277
        failures = []
278
        for hash in missing:
279
            offset, bytes = hmap[hash]
280
            fileobj.seek(offset)
281
            data = fileobj.read(bytes)
282
            r = self._put_block_async(data, hash, upload_gen)
283
            flying.append(r)
284
            unfinished = self._watch_thread_limit(flying)
285
            for thread in set(flying).difference(unfinished):
286
                if thread.exception:
287
                    failures.append(thread)
288
                    if isinstance(
289
                            thread.exception,
290
                            ClientError) and thread.exception.status == 502:
291
                        self.POOLSIZE = self._thread_limit
292
                elif thread.isAlive():
293
                    flying.append(thread)
294
                elif upload_gen:
295
                    try:
296
                        upload_gen.next()
297
                    except:
298
                        pass
299
            flying = unfinished
300

    
301
        for thread in flying:
302
            thread.join()
303
            if thread.exception:
304
                failures.append(thread)
305
            elif upload_gen:
306
                try:
307
                    upload_gen.next()
308
                except:
309
                    pass
310

    
311
        return [failure.kwargs['hash'] for failure in failures]
312

    
313
    def upload_object(
314
            self, obj, f,
315
            size=None,
316
            hash_cb=None,
317
            upload_cb=None,
318
            etag=None,
319
            if_etag_match=None,
320
            if_not_exist=None,
321
            content_encoding=None,
322
            content_disposition=None,
323
            content_type=None,
324
            sharing=None,
325
            public=None,
326
            container_info_cache=None):
327
        """Upload an object using multiple connections (threads)
328

329
        :param obj: (str) remote object path
330

331
        :param f: open file descriptor (rb)
332

333
        :param hash_cb: optional progress.bar object for calculating hashes
334

335
        :param upload_cb: optional progress.bar object for uploading
336

337
        :param etag: (str)
338

339
        :param if_etag_match: (str) Push that value to if-match header at file
340
            creation
341

342
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
343
            it does not exist remotely, otherwise the operation will fail.
344
            Involves the case of an object with the same path is created while
345
            the object is being uploaded.
346

347
        :param content_encoding: (str)
348

349
        :param content_disposition: (str)
350

351
        :param content_type: (str)
352

353
        :param sharing: {'read':[user and/or grp names],
354
            'write':[usr and/or grp names]}
355

356
        :param public: (bool)
357

358
        :param container_info_cache: (dict) if given, avoid redundant calls to
359
        server for container info (block size and hash information)
360
        """
361
        self._assert_container()
362

    
363
        #init
364
        block_info = (
365
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
366
                f, size, container_info_cache)
367
        (hashes, hmap, offset) = ([], {}, 0)
368
        if not content_type:
369
            content_type = 'application/octet-stream'
370

    
371
        self._calculate_blocks_for_upload(
372
            *block_info,
373
            hashes=hashes,
374
            hmap=hmap,
375
            fileobj=f,
376
            hash_cb=hash_cb)
377

    
378
        hashmap = dict(bytes=size, hashes=hashes)
379
        missing, obj_headers = self._create_or_get_missing_hashes(
380
            obj, hashmap,
381
            content_type=content_type,
382
            size=size,
383
            if_etag_match=if_etag_match,
384
            if_etag_not_match='*' if if_not_exist else None,
385
            content_encoding=content_encoding,
386
            content_disposition=content_disposition,
387
            permissions=sharing,
388
            public=public)
389

    
390
        if missing is None:
391
            return obj_headers
392

    
393
        if upload_cb:
394
            upload_gen = upload_cb(len(missing))
395
            for i in range(len(missing), len(hashmap['hashes']) + 1):
396
                try:
397
                    upload_gen.next()
398
                except:
399
                    upload_gen = None
400
        else:
401
            upload_gen = None
402

    
403
        retries = 7
404
        try:
405
            while retries:
406
                sendlog.info('%s blocks missing' % len(missing))
407
                num_of_blocks = len(missing)
408
                missing = self._upload_missing_blocks(
409
                    missing,
410
                    hmap,
411
                    f,
412
                    upload_gen)
413
                if missing:
414
                    if num_of_blocks == len(missing):
415
                        retries -= 1
416
                    else:
417
                        num_of_blocks = len(missing)
418
                else:
419
                    break
420
            if missing:
421
                raise ClientError(
422
                    '%s blocks failed to upload' % len(missing),
423
                    status=800)
424
        except KeyboardInterrupt:
425
            sendlog.info('- - - wait for threads to finish')
426
            for thread in activethreads():
427
                thread.join()
428
            raise
429

    
430
        r = self.object_put(
431
            obj,
432
            format='json',
433
            hashmap=True,
434
            content_type=content_type,
435
            if_etag_match=if_etag_match,
436
            if_etag_not_match='*' if if_not_exist else None,
437
            etag=etag,
438
            json=hashmap,
439
            permissions=sharing,
440
            public=public,
441
            success=201)
442
        return r.headers
443

    
444
    # download_* auxiliary methods
445
    def _get_remote_blocks_info(self, obj, **restargs):
446
        #retrieve object hashmap
447
        myrange = restargs.pop('data_range', None)
448
        hashmap = self.get_object_hashmap(obj, **restargs)
449
        restargs['data_range'] = myrange
450
        blocksize = int(hashmap['block_size'])
451
        blockhash = hashmap['block_hash']
452
        total_size = hashmap['bytes']
453
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
454
        map_dict = {}
455
        for i, h in enumerate(hashmap['hashes']):
456
            #  map_dict[h] = i   CHAGE
457
            if h in map_dict:
458
                map_dict[h].append(i)
459
            else:
460
                map_dict[h] = [i]
461
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
462

    
463
    def _dump_blocks_sync(
464
            self, obj, remote_hashes, blocksize, total_size, dst, range,
465
            **args):
466
        for blockid, blockhash in enumerate(remote_hashes):
467
            if blockhash:
468
                start = blocksize * blockid
469
                is_last = start + blocksize > total_size
470
                end = (total_size - 1) if is_last else (start + blocksize - 1)
471
                (start, end) = _range_up(start, end, range)
472
                args['data_range'] = 'bytes=%s-%s' % (start, end)
473
                r = self.object_get(obj, success=(200, 206), **args)
474
                self._cb_next()
475
                dst.write(r.content)
476
                dst.flush()
477

    
478
    def _get_block_async(self, obj, **args):
479
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
480
        event.start()
481
        return event
482

    
483
    def _hash_from_file(self, fp, start, size, blockhash):
484
        fp.seek(start)
485
        block = fp.read(size)
486
        h = newhashlib(blockhash)
487
        h.update(block.strip('\x00'))
488
        return hexlify(h.digest())
489

    
490
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
491
        """write the results of a greenleted rest call to a file
492

493
        :param offset: the offset of the file up to blocksize
494
        - e.g. if the range is 10-100, all blocks will be written to
495
        normal_position - 10
496
        """
497
        for key, g in flying.items():
498
            if g.isAlive():
499
                continue
500
            if g.exception:
501
                raise g.exception
502
            block = g.value.content
503
            for block_start in blockids[key]:
504
                local_file.seek(block_start + offset)
505
                local_file.write(block)
506
                self._cb_next()
507
            flying.pop(key)
508
            blockids.pop(key)
509
        local_file.flush()
510

    
511
    def _dump_blocks_async(
512
            self, obj, remote_hashes, blocksize, total_size, local_file,
513
            blockhash=None, resume=False, filerange=None, **restargs):
514
        file_size = fstat(local_file.fileno()).st_size if resume else 0
515
        flying = dict()
516
        blockid_dict = dict()
517
        offset = 0
518
        if filerange is not None:
519
            rstart = int(filerange.split('-')[0])
520
            offset = rstart if blocksize > rstart else rstart % blocksize
521

    
522
        self._init_thread_limit()
523
        for block_hash, blockids in remote_hashes.items():
524
            blockids = [blk * blocksize for blk in blockids]
525
            unsaved = [blk for blk in blockids if not (
526
                blk < file_size and block_hash == self._hash_from_file(
527
                        local_file, blk, blocksize, blockhash))]
528
            self._cb_next(len(blockids) - len(unsaved))
529
            if unsaved:
530
                key = unsaved[0]
531
                self._watch_thread_limit(flying.values())
532
                self._thread2file(
533
                    flying, blockid_dict, local_file, offset,
534
                    **restargs)
535
                end = total_size - 1 if (
536
                    key + blocksize > total_size) else key + blocksize - 1
537
                start, end = _range_up(key, end, filerange)
538
                if start == end:
539
                    self._cb_next()
540
                    continue
541
                restargs['async_headers'] = {
542
                    'Range': 'bytes=%s-%s' % (start, end)}
543
                flying[key] = self._get_block_async(obj, **restargs)
544
                blockid_dict[key] = unsaved
545

    
546
        for thread in flying.values():
547
            thread.join()
548
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
549

    
550
    def download_object(
551
            self, obj, dst,
552
            download_cb=None,
553
            version=None,
554
            resume=False,
555
            range_str=None,
556
            if_match=None,
557
            if_none_match=None,
558
            if_modified_since=None,
559
            if_unmodified_since=None):
560
        """Download an object (multiple connections, random blocks)
561

562
        :param obj: (str) remote object path
563

564
        :param dst: open file descriptor (wb+)
565

566
        :param download_cb: optional progress.bar object for downloading
567

568
        :param version: (str) file version
569

570
        :param resume: (bool) if set, preserve already downloaded file parts
571

572
        :param range_str: (str) from, to are file positions (int) in bytes
573

574
        :param if_match: (str)
575

576
        :param if_none_match: (str)
577

578
        :param if_modified_since: (str) formated date
579

580
        :param if_unmodified_since: (str) formated date"""
581
        restargs = dict(
582
            version=version,
583
            data_range=None if range_str is None else 'bytes=%s' % range_str,
584
            if_match=if_match,
585
            if_none_match=if_none_match,
586
            if_modified_since=if_modified_since,
587
            if_unmodified_since=if_unmodified_since)
588

    
589
        (
590
            blocksize,
591
            blockhash,
592
            total_size,
593
            hash_list,
594
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
595
        assert total_size >= 0
596

    
597
        if download_cb:
598
            self.progress_bar_gen = download_cb(len(hash_list))
599
            self._cb_next()
600

    
601
        if dst.isatty():
602
            self._dump_blocks_sync(
603
                obj,
604
                hash_list,
605
                blocksize,
606
                total_size,
607
                dst,
608
                range_str,
609
                **restargs)
610
        else:
611
            self._dump_blocks_async(
612
                obj,
613
                remote_hashes,
614
                blocksize,
615
                total_size,
616
                dst,
617
                blockhash,
618
                resume,
619
                range_str,
620
                **restargs)
621
            if not range_str:
622
                dst.truncate(total_size)
623

    
624
        self._complete_cb()
625

    
626
    def download_to_string(
627
            self, obj,
628
            download_cb=None,
629
            version=None,
630
            range_str=None,
631
            if_match=None,
632
            if_none_match=None,
633
            if_modified_since=None,
634
            if_unmodified_since=None):
635
        """Download an object to a string (multiple connections). This method
636
        uses threads for http requests, but stores all content in memory.
637

638
        :param obj: (str) remote object path
639

640
        :param download_cb: optional progress.bar object for downloading
641

642
        :param version: (str) file version
643

644
        :param range_str: (str) from, to are file positions (int) in bytes
645

646
        :param if_match: (str)
647

648
        :param if_none_match: (str)
649

650
        :param if_modified_since: (str) formated date
651

652
        :param if_unmodified_since: (str) formated date
653

654
        :returns: (str) the whole object contents
655
        """
656
        restargs = dict(
657
            version=version,
658
            data_range=None if range_str is None else 'bytes=%s' % range_str,
659
            if_match=if_match,
660
            if_none_match=if_none_match,
661
            if_modified_since=if_modified_since,
662
            if_unmodified_since=if_unmodified_since)
663

    
664
        (
665
            blocksize,
666
            blockhash,
667
            total_size,
668
            hash_list,
669
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
670
        assert total_size >= 0
671

    
672
        if download_cb:
673
            self.progress_bar_gen = download_cb(len(hash_list))
674
            self._cb_next()
675

    
676
        num_of_blocks = len(remote_hashes)
677
        ret = [''] * num_of_blocks
678
        self._init_thread_limit()
679
        flying = dict()
680
        for blockid, blockhash in enumerate(remote_hashes):
681
            start = blocksize * blockid
682
            is_last = start + blocksize > total_size
683
            end = (total_size - 1) if is_last else (start + blocksize - 1)
684
            (start, end) = _range_up(start, end, range_str)
685
            if start < end:
686
                self._watch_thread_limit(flying.values())
687
                flying[blockid] = self._get_block_async(obj, **restargs)
688
            for runid, thread in flying.items():
689
                if (blockid + 1) == num_of_blocks:
690
                    thread.join()
691
                elif thread.isAlive():
692
                    continue
693
                if thread.exception:
694
                    raise thread.exception
695
                ret[runid] = thread.value.content
696
                self._cb_next()
697
                flying.pop(runid)
698
        return ''.join(ret)
699

    
700
    #Command Progress Bar method
701
    def _cb_next(self, step=1):
702
        if hasattr(self, 'progress_bar_gen'):
703
            try:
704
                for i in xrange(step):
705
                    self.progress_bar_gen.next()
706
            except:
707
                pass
708

    
709
    def _complete_cb(self):
710
        while True:
711
            try:
712
                self.progress_bar_gen.next()
713
            except:
714
                break
715

    
716
    def get_object_hashmap(
717
            self, obj,
718
            version=None,
719
            if_match=None,
720
            if_none_match=None,
721
            if_modified_since=None,
722
            if_unmodified_since=None,
723
            data_range=None):
724
        """
725
        :param obj: (str) remote object path
726

727
        :param if_match: (str)
728

729
        :param if_none_match: (str)
730

731
        :param if_modified_since: (str) formated date
732

733
        :param if_unmodified_since: (str) formated date
734

735
        :param data_range: (str) from-to where from and to are integers
736
            denoting file positions in bytes
737

738
        :returns: (list)
739
        """
740
        try:
741
            r = self.object_get(
742
                obj,
743
                hashmap=True,
744
                version=version,
745
                if_etag_match=if_match,
746
                if_etag_not_match=if_none_match,
747
                if_modified_since=if_modified_since,
748
                if_unmodified_since=if_unmodified_since,
749
                data_range=data_range)
750
        except ClientError as err:
751
            if err.status == 304 or err.status == 412:
752
                return {}
753
            raise
754
        return r.json
755

    
756
    def set_account_group(self, group, usernames):
757
        """
758
        :param group: (str)
759

760
        :param usernames: (list)
761
        """
762
        self.account_post(update=True, groups={group: usernames})
763

    
764
    def del_account_group(self, group):
765
        """
766
        :param group: (str)
767
        """
768
        self.account_post(update=True, groups={group: []})
769

    
770
    def get_account_info(self, until=None):
771
        """
772
        :param until: (str) formated date
773

774
        :returns: (dict)
775
        """
776
        r = self.account_head(until=until)
777
        if r.status_code == 401:
778
            raise ClientError("No authorization", status=401)
779
        return r.headers
780

    
781
    def get_account_quota(self):
782
        """
783
        :returns: (dict)
784
        """
785
        return filter_in(
786
            self.get_account_info(),
787
            'X-Account-Policy-Quota',
788
            exactMatch=True)
789

    
790
    def get_account_versioning(self):
791
        """
792
        :returns: (dict)
793
        """
794
        return filter_in(
795
            self.get_account_info(),
796
            'X-Account-Policy-Versioning',
797
            exactMatch=True)
798

    
799
    def get_account_meta(self, until=None):
800
        """
801
        :meta until: (str) formated date
802

803
        :returns: (dict)
804
        """
805
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
806

    
807
    def get_account_group(self):
808
        """
809
        :returns: (dict)
810
        """
811
        return filter_in(self.get_account_info(), 'X-Account-Group-')
812

    
813
    def set_account_meta(self, metapairs):
814
        """
815
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
816
        """
817
        assert(type(metapairs) is dict)
818
        self.account_post(update=True, metadata=metapairs)
819

    
820
    def del_account_meta(self, metakey):
821
        """
822
        :param metakey: (str) metadatum key
823
        """
824
        self.account_post(update=True, metadata={metakey: ''})
825

    
826
    """
827
    def set_account_quota(self, quota):
828
        ""
829
        :param quota: (int)
830
        ""
831
        self.account_post(update=True, quota=quota)
832
    """
833

    
834
    def set_account_versioning(self, versioning):
835
        """
836
        "param versioning: (str)
837
        """
838
        self.account_post(update=True, versioning=versioning)
839

    
840
    def list_containers(self):
841
        """
842
        :returns: (dict)
843
        """
844
        r = self.account_get()
845
        return r.json
846

    
847
    def del_container(self, until=None, delimiter=None):
848
        """
849
        :param until: (str) formated date
850

851
        :param delimiter: (str) with / empty container
852

853
        :raises ClientError: 404 Container does not exist
854

855
        :raises ClientError: 409 Container is not empty
856
        """
857
        self._assert_container()
858
        r = self.container_delete(
859
            until=until,
860
            delimiter=delimiter,
861
            success=(204, 404, 409))
862
        if r.status_code == 404:
863
            raise ClientError(
864
                'Container "%s" does not exist' % self.container,
865
                r.status_code)
866
        elif r.status_code == 409:
867
            raise ClientError(
868
                'Container "%s" is not empty' % self.container,
869
                r.status_code)
870

    
871
    def get_container_versioning(self, container=None):
872
        """
873
        :param container: (str)
874

875
        :returns: (dict)
876
        """
877
        cnt_back_up = self.container
878
        try:
879
            self.container = container or cnt_back_up
880
            return filter_in(
881
                self.get_container_info(),
882
                'X-Container-Policy-Versioning')
883
        finally:
884
            self.container = cnt_back_up
885

    
886
    def get_container_limit(self, container=None):
887
        """
888
        :param container: (str)
889

890
        :returns: (dict)
891
        """
892
        cnt_back_up = self.container
893
        try:
894
            self.container = container or cnt_back_up
895
            return filter_in(
896
                self.get_container_info(),
897
                'X-Container-Policy-Quota')
898
        finally:
899
            self.container = cnt_back_up
900

    
901
    def get_container_info(self, until=None):
902
        """
903
        :param until: (str) formated date
904

905
        :returns: (dict)
906

907
        :raises ClientError: 404 Container not found
908
        """
909
        try:
910
            r = self.container_head(until=until)
911
        except ClientError as err:
912
            err.details.append('for container %s' % self.container)
913
            raise err
914
        return r.headers
915

    
916
    def get_container_meta(self, until=None):
917
        """
918
        :param until: (str) formated date
919

920
        :returns: (dict)
921
        """
922
        return filter_in(
923
            self.get_container_info(until=until),
924
            'X-Container-Meta')
925

    
926
    def get_container_object_meta(self, until=None):
927
        """
928
        :param until: (str) formated date
929

930
        :returns: (dict)
931
        """
932
        return filter_in(
933
            self.get_container_info(until=until),
934
            'X-Container-Object-Meta')
935

    
936
    def set_container_meta(self, metapairs):
937
        """
938
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
939
        """
940
        assert(type(metapairs) is dict)
941
        self.container_post(update=True, metadata=metapairs)
942

    
943
    def del_container_meta(self, metakey):
944
        """
945
        :param metakey: (str) metadatum key
946
        """
947
        self.container_post(update=True, metadata={metakey: ''})
948

    
949
    def set_container_limit(self, limit):
950
        """
951
        :param limit: (int)
952
        """
953
        self.container_post(update=True, quota=limit)
954

    
955
    def set_container_versioning(self, versioning):
956
        """
957
        :param versioning: (str)
958
        """
959
        self.container_post(update=True, versioning=versioning)
960

    
961
    def del_object(self, obj, until=None, delimiter=None):
962
        """
963
        :param obj: (str) remote object path
964

965
        :param until: (str) formated date
966

967
        :param delimiter: (str)
968
        """
969
        self._assert_container()
970
        self.object_delete(obj, until=until, delimiter=delimiter)
971

    
972
    def set_object_meta(self, obj, metapairs):
973
        """
974
        :param obj: (str) remote object path
975

976
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
977
        """
978
        assert(type(metapairs) is dict)
979
        self.object_post(obj, update=True, metadata=metapairs)
980

    
981
    def del_object_meta(self, obj, metakey):
982
        """
983
        :param obj: (str) remote object path
984

985
        :param metakey: (str) metadatum key
986
        """
987
        self.object_post(obj, update=True, metadata={metakey: ''})
988

    
989
    def publish_object(self, obj):
990
        """
991
        :param obj: (str) remote object path
992

993
        :returns: (str) access url
994
        """
995
        self.object_post(obj, update=True, public=True)
996
        info = self.get_object_info(obj)
997
        pref, sep, rest = self.base_url.partition('//')
998
        base = rest.split('/')[0]
999
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1000

    
1001
    def unpublish_object(self, obj):
1002
        """
1003
        :param obj: (str) remote object path
1004
        """
1005
        self.object_post(obj, update=True, public=False)
1006

    
1007
    def get_object_info(self, obj, version=None):
1008
        """
1009
        :param obj: (str) remote object path
1010

1011
        :param version: (str)
1012

1013
        :returns: (dict)
1014
        """
1015
        try:
1016
            r = self.object_head(obj, version=version)
1017
            return r.headers
1018
        except ClientError as ce:
1019
            if ce.status == 404:
1020
                raise ClientError('Object %s not found' % obj, status=404)
1021
            raise
1022

    
1023
    def get_object_meta(self, obj, version=None):
1024
        """
1025
        :param obj: (str) remote object path
1026

1027
        :param version: (str)
1028

1029
        :returns: (dict)
1030
        """
1031
        return filter_in(
1032
            self.get_object_info(obj, version=version),
1033
            'X-Object-Meta')
1034

    
1035
    def get_object_sharing(self, obj):
1036
        """
1037
        :param obj: (str) remote object path
1038

1039
        :returns: (dict)
1040
        """
1041
        r = filter_in(
1042
            self.get_object_info(obj),
1043
            'X-Object-Sharing',
1044
            exactMatch=True)
1045
        reply = {}
1046
        if len(r) > 0:
1047
            perms = r['x-object-sharing'].split(';')
1048
            for perm in perms:
1049
                try:
1050
                    perm.index('=')
1051
                except ValueError:
1052
                    raise ClientError('Incorrect reply format')
1053
                (key, val) = perm.strip().split('=')
1054
                reply[key] = val
1055
        return reply
1056

    
1057
    def set_object_sharing(
1058
            self, obj,
1059
            read_permition=False, write_permition=False):
1060
        """Give read/write permisions to an object.
1061

1062
        :param obj: (str) remote object path
1063

1064
        :param read_permition: (list - bool) users and user groups that get
1065
            read permition for this object - False means all previous read
1066
            permissions will be removed
1067

1068
        :param write_perimition: (list - bool) of users and user groups to get
1069
           write permition for this object - False means all previous write
1070
           permissions will be removed
1071
        """
1072

    
1073
        perms = dict(read=read_permition or '', write=write_permition or '')
1074
        self.object_post(obj, update=True, permissions=perms)
1075

    
1076
    def del_object_sharing(self, obj):
1077
        """
1078
        :param obj: (str) remote object path
1079
        """
1080
        self.set_object_sharing(obj)
1081

    
1082
    def append_object(self, obj, source_file, upload_cb=None):
1083
        """
1084
        :param obj: (str) remote object path
1085

1086
        :param source_file: open file descriptor
1087

1088
        :param upload_db: progress.bar for uploading
1089
        """
1090

    
1091
        self._assert_container()
1092
        meta = self.get_container_info()
1093
        blocksize = int(meta['x-container-block-size'])
1094
        filesize = fstat(source_file.fileno()).st_size
1095
        nblocks = 1 + (filesize - 1) // blocksize
1096
        offset = 0
1097
        if upload_cb:
1098
            upload_gen = upload_cb(nblocks)
1099
            upload_gen.next()
1100
        for i in range(nblocks):
1101
            block = source_file.read(min(blocksize, filesize - offset))
1102
            offset += len(block)
1103
            self.object_post(
1104
                obj,
1105
                update=True,
1106
                content_range='bytes */*',
1107
                content_type='application/octet-stream',
1108
                content_length=len(block),
1109
                data=block)
1110

    
1111
            if upload_cb:
1112
                upload_gen.next()
1113

    
1114
    def truncate_object(self, obj, upto_bytes):
1115
        """
1116
        :param obj: (str) remote object path
1117

1118
        :param upto_bytes: max number of bytes to leave on file
1119
        """
1120
        self.object_post(
1121
            obj,
1122
            update=True,
1123
            content_range='bytes 0-%s/*' % upto_bytes,
1124
            content_type='application/octet-stream',
1125
            object_bytes=upto_bytes,
1126
            source_object=path4url(self.container, obj))
1127

    
1128
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1129
        """Overwrite a part of an object from local source file
1130

1131
        :param obj: (str) remote object path
1132

1133
        :param start: (int) position in bytes to start overwriting from
1134

1135
        :param end: (int) position in bytes to stop overwriting at
1136

1137
        :param source_file: open file descriptor
1138

1139
        :param upload_db: progress.bar for uploading
1140
        """
1141

    
1142
        r = self.get_object_info(obj)
1143
        rf_size = int(r['content-length'])
1144
        if rf_size < int(start):
1145
            raise ClientError(
1146
                'Range start exceeds file size',
1147
                status=416)
1148
        elif rf_size < int(end):
1149
            raise ClientError(
1150
                'Range end exceeds file size',
1151
                status=416)
1152
        self._assert_container()
1153
        meta = self.get_container_info()
1154
        blocksize = int(meta['x-container-block-size'])
1155
        filesize = fstat(source_file.fileno()).st_size
1156
        datasize = int(end) - int(start) + 1
1157
        nblocks = 1 + (datasize - 1) // blocksize
1158
        offset = 0
1159
        if upload_cb:
1160
            upload_gen = upload_cb(nblocks)
1161
            upload_gen.next()
1162
        for i in range(nblocks):
1163
            read_size = min(blocksize, filesize - offset, datasize - offset)
1164
            block = source_file.read(read_size)
1165
            self.object_post(
1166
                obj,
1167
                update=True,
1168
                content_type='application/octet-stream',
1169
                content_length=len(block),
1170
                content_range='bytes %s-%s/*' % (
1171
                    start + offset,
1172
                    start + offset + len(block) - 1),
1173
                data=block)
1174
            offset += len(block)
1175

    
1176
            if upload_cb:
1177
                upload_gen.next()
1178

    
1179
    def copy_object(
1180
            self, src_container, src_object, dst_container,
1181
            dst_object=None,
1182
            source_version=None,
1183
            source_account=None,
1184
            public=False,
1185
            content_type=None,
1186
            delimiter=None):
1187
        """
1188
        :param src_container: (str) source container
1189

1190
        :param src_object: (str) source object path
1191

1192
        :param dst_container: (str) destination container
1193

1194
        :param dst_object: (str) destination object path
1195

1196
        :param source_version: (str) source object version
1197

1198
        :param source_account: (str) account to copy from
1199

1200
        :param public: (bool)
1201

1202
        :param content_type: (str)
1203

1204
        :param delimiter: (str)
1205
        """
1206
        self._assert_account()
1207
        self.container = dst_container
1208
        src_path = path4url(src_container, src_object)
1209
        self.object_put(
1210
            dst_object or src_object,
1211
            success=201,
1212
            copy_from=src_path,
1213
            content_length=0,
1214
            source_version=source_version,
1215
            source_account=source_account,
1216
            public=public,
1217
            content_type=content_type,
1218
            delimiter=delimiter)
1219

    
1220
    def move_object(
1221
            self, src_container, src_object, dst_container,
1222
            dst_object=False,
1223
            source_account=None,
1224
            source_version=None,
1225
            public=False,
1226
            content_type=None,
1227
            delimiter=None):
1228
        """
1229
        :param src_container: (str) source container
1230

1231
        :param src_object: (str) source object path
1232

1233
        :param dst_container: (str) destination container
1234

1235
        :param dst_object: (str) destination object path
1236

1237
        :param source_account: (str) account to move from
1238

1239
        :param source_version: (str) source object version
1240

1241
        :param public: (bool)
1242

1243
        :param content_type: (str)
1244

1245
        :param delimiter: (str)
1246
        """
1247
        self._assert_account()
1248
        self.container = dst_container
1249
        dst_object = dst_object or src_object
1250
        src_path = path4url(src_container, src_object)
1251
        self.object_put(
1252
            dst_object,
1253
            success=201,
1254
            move_from=src_path,
1255
            content_length=0,
1256
            source_account=source_account,
1257
            source_version=source_version,
1258
            public=public,
1259
            content_type=content_type,
1260
            delimiter=delimiter)
1261

    
1262
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1263
        """Get accounts that share with self.account
1264

1265
        :param limit: (str)
1266

1267
        :param marker: (str)
1268

1269
        :returns: (dict)
1270
        """
1271
        self._assert_account()
1272

    
1273
        self.set_param('format', 'json')
1274
        self.set_param('limit', limit, iff=limit is not None)
1275
        self.set_param('marker', marker, iff=marker is not None)
1276

    
1277
        path = ''
1278
        success = kwargs.pop('success', (200, 204))
1279
        r = self.get(path, *args, success=success, **kwargs)
1280
        return r.json
1281

    
1282
    def get_object_versionlist(self, obj):
1283
        """
1284
        :param obj: (str) remote object path
1285

1286
        :returns: (list)
1287
        """
1288
        self._assert_container()
1289
        r = self.object_get(obj, format='json', version='list')
1290
        return r.json['versions']