Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 14c72dbd

History | View | Annotate | Download (37.4 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos.rest_api import PithosRestClient
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def purge_container(self, container=None):
75
        """Delete an empty container and destroy associated blocks
76
        """
77
        cnt_back_up = self.container
78
        try:
79
            self.container = container or cnt_back_up
80
            self.container_delete(until=unicode(time()))
81
        finally:
82
            self.container = cnt_back_up
83

    
84
    def upload_object_unchunked(
85
            self, obj, f,
86
            withHashFile=False,
87
            size=None,
88
            etag=None,
89
            content_encoding=None,
90
            content_disposition=None,
91
            content_type=None,
92
            sharing=None,
93
            public=None):
94
        """
95
        :param obj: (str) remote object path
96

97
        :param f: open file descriptor
98

99
        :param withHashFile: (bool)
100

101
        :param size: (int) size of data to upload
102

103
        :param etag: (str)
104

105
        :param content_encoding: (str)
106

107
        :param content_disposition: (str)
108

109
        :param content_type: (str)
110

111
        :param sharing: {'read':[user and/or grp names],
112
            'write':[usr and/or grp names]}
113

114
        :param public: (bool)
115

116
        :returns: (dict) created object metadata
117
        """
118
        self._assert_container()
119

    
120
        if withHashFile:
121
            data = f.read()
122
            try:
123
                import json
124
                data = json.dumps(json.loads(data))
125
            except ValueError:
126
                raise ClientError('"%s" is not json-formated' % f.name, 1)
127
            except SyntaxError:
128
                msg = '"%s" is not a valid hashmap file' % f.name
129
                raise ClientError(msg, 1)
130
            f = StringIO(data)
131
        else:
132
            data = f.read(size) if size else f.read()
133
        r = self.object_put(
134
            obj,
135
            data=data,
136
            etag=etag,
137
            content_encoding=content_encoding,
138
            content_disposition=content_disposition,
139
            content_type=content_type,
140
            permissions=sharing,
141
            public=public,
142
            success=201)
143
        return r.headers
144

    
145
    def create_object_by_manifestation(
146
            self, obj,
147
            etag=None,
148
            content_encoding=None,
149
            content_disposition=None,
150
            content_type=None,
151
            sharing=None,
152
            public=None):
153
        """
154
        :param obj: (str) remote object path
155

156
        :param etag: (str)
157

158
        :param content_encoding: (str)
159

160
        :param content_disposition: (str)
161

162
        :param content_type: (str)
163

164
        :param sharing: {'read':[user and/or grp names],
165
            'write':[usr and/or grp names]}
166

167
        :param public: (bool)
168

169
        :returns: (dict) created object metadata
170
        """
171
        self._assert_container()
172
        r = self.object_put(
173
            obj,
174
            content_length=0,
175
            etag=etag,
176
            content_encoding=content_encoding,
177
            content_disposition=content_disposition,
178
            content_type=content_type,
179
            permissions=sharing,
180
            public=public,
181
            manifest='%s/%s' % (self.container, obj))
182
        return r.headers
183

    
184
    # upload_* auxiliary methods
185
    def _put_block_async(self, data, hash, upload_gen=None):
186
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
187
        event.start()
188
        return event
189

    
190
    def _put_block(self, data, hash):
191
        r = self.container_post(
192
            update=True,
193
            content_type='application/octet-stream',
194
            content_length=len(data),
195
            data=data,
196
            format='json')
197
        assert r.json[0] == hash, 'Local hash does not match server'
198

    
199
    def _get_file_block_info(self, fileobj, size=None, cache=None):
200
        """
201
        :param fileobj: (file descriptor) source
202

203
        :param size: (int) size of data to upload from source
204

205
        :param cache: (dict) if provided, cache container info response to
206
        avoid redundant calls
207
        """
208
        if isinstance(cache, dict):
209
            try:
210
                meta = cache[self.container]
211
            except KeyError:
212
                meta = self.get_container_info()
213
                cache[self.container] = meta
214
        else:
215
            meta = self.get_container_info()
216
        blocksize = int(meta['x-container-block-size'])
217
        blockhash = meta['x-container-block-hash']
218
        size = size if size is not None else fstat(fileobj.fileno()).st_size
219
        nblocks = 1 + (size - 1) // blocksize
220
        return (blocksize, blockhash, size, nblocks)
221

    
222
    def _create_or_get_missing_hashes(
223
            self, obj, json,
224
            size=None,
225
            format='json',
226
            hashmap=True,
227
            content_type=None,
228
            if_etag_match=None,
229
            if_etag_not_match=None,
230
            content_encoding=None,
231
            content_disposition=None,
232
            permissions=None,
233
            public=None,
234
            success=(201, 409)):
235
        r = self.object_put(
236
            obj,
237
            format='json',
238
            hashmap=True,
239
            content_type=content_type,
240
            json=json,
241
            if_etag_match=if_etag_match,
242
            if_etag_not_match=if_etag_not_match,
243
            content_encoding=content_encoding,
244
            content_disposition=content_disposition,
245
            permissions=permissions,
246
            public=public,
247
            success=success)
248
        return (None if r.status_code == 201 else r.json), r.headers
249

    
250
    def _calculate_blocks_for_upload(
251
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
252
            hash_cb=None):
253
        offset = 0
254
        if hash_cb:
255
            hash_gen = hash_cb(nblocks)
256
            hash_gen.next()
257

    
258
        for i in range(nblocks):
259
            block = fileobj.read(min(blocksize, size - offset))
260
            bytes = len(block)
261
            hash = _pithos_hash(block, blockhash)
262
            hashes.append(hash)
263
            hmap[hash] = (offset, bytes)
264
            offset += bytes
265
            if hash_cb:
266
                hash_gen.next()
267
        msg = 'Failed to calculate uploaded blocks:'
268
        ' Offset and object size do not match'
269
        assert offset == size, msg
270

    
271
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
272
        """upload missing blocks asynchronously"""
273

    
274
        self._init_thread_limit()
275

    
276
        flying = []
277
        failures = []
278
        for hash in missing:
279
            offset, bytes = hmap[hash]
280
            fileobj.seek(offset)
281
            data = fileobj.read(bytes)
282
            r = self._put_block_async(data, hash, upload_gen)
283
            flying.append(r)
284
            unfinished = self._watch_thread_limit(flying)
285
            for thread in set(flying).difference(unfinished):
286
                if thread.exception:
287
                    failures.append(thread)
288
                    if isinstance(
289
                            thread.exception,
290
                            ClientError) and thread.exception.status == 502:
291
                        self.POOLSIZE = self._thread_limit
292
                elif thread.isAlive():
293
                    flying.append(thread)
294
                elif upload_gen:
295
                    try:
296
                        upload_gen.next()
297
                    except:
298
                        pass
299
            flying = unfinished
300

    
301
        for thread in flying:
302
            thread.join()
303
            if thread.exception:
304
                failures.append(thread)
305
            elif upload_gen:
306
                try:
307
                    upload_gen.next()
308
                except:
309
                    pass
310

    
311
        return [failure.kwargs['hash'] for failure in failures]
312

    
313
    def upload_object(
314
            self, obj, f,
315
            size=None,
316
            hash_cb=None,
317
            upload_cb=None,
318
            etag=None,
319
            if_etag_match=None,
320
            if_not_exist=None,
321
            content_encoding=None,
322
            content_disposition=None,
323
            content_type=None,
324
            sharing=None,
325
            public=None,
326
            container_info_cache=None):
327
        """Upload an object using multiple connections (threads)
328

329
        :param obj: (str) remote object path
330

331
        :param f: open file descriptor (rb)
332

333
        :param hash_cb: optional progress.bar object for calculating hashes
334

335
        :param upload_cb: optional progress.bar object for uploading
336

337
        :param etag: (str)
338

339
        :param if_etag_match: (str) Push that value to if-match header at file
340
            creation
341

342
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
343
            it does not exist remotely, otherwise the operation will fail.
344
            Involves the case of an object with the same path is created while
345
            the object is being uploaded.
346

347
        :param content_encoding: (str)
348

349
        :param content_disposition: (str)
350

351
        :param content_type: (str)
352

353
        :param sharing: {'read':[user and/or grp names],
354
            'write':[usr and/or grp names]}
355

356
        :param public: (bool)
357

358
        :param container_info_cache: (dict) if given, avoid redundant calls to
359
        server for container info (block size and hash information)
360
        """
361
        self._assert_container()
362

    
363
        #init
364
        block_info = (
365
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
366
                f, size, container_info_cache)
367
        (hashes, hmap, offset) = ([], {}, 0)
368
        if not content_type:
369
            content_type = 'application/octet-stream'
370

    
371
        self._calculate_blocks_for_upload(
372
            *block_info,
373
            hashes=hashes,
374
            hmap=hmap,
375
            fileobj=f,
376
            hash_cb=hash_cb)
377

    
378
        hashmap = dict(bytes=size, hashes=hashes)
379
        missing, obj_headers = self._create_or_get_missing_hashes(
380
            obj, hashmap,
381
            content_type=content_type,
382
            size=size,
383
            if_etag_match=if_etag_match,
384
            if_etag_not_match='*' if if_not_exist else None,
385
            content_encoding=content_encoding,
386
            content_disposition=content_disposition,
387
            permissions=sharing,
388
            public=public)
389

    
390
        if missing is None:
391
            return obj_headers
392

    
393
        if upload_cb:
394
            upload_gen = upload_cb(len(missing))
395
            for i in range(len(missing), len(hashmap['hashes']) + 1):
396
                try:
397
                    upload_gen.next()
398
                except:
399
                    upload_gen = None
400
        else:
401
            upload_gen = None
402

    
403
        retries = 7
404
        try:
405
            while retries:
406
                sendlog.info('%s blocks missing' % len(missing))
407
                num_of_blocks = len(missing)
408
                missing = self._upload_missing_blocks(
409
                    missing,
410
                    hmap,
411
                    f,
412
                    upload_gen)
413
                if missing:
414
                    if num_of_blocks == len(missing):
415
                        retries -= 1
416
                    else:
417
                        num_of_blocks = len(missing)
418
                else:
419
                    break
420
            if missing:
421
                raise ClientError(
422
                    '%s blocks failed to upload' % len(missing),
423
                    status=800)
424
        except KeyboardInterrupt:
425
            sendlog.info('- - - wait for threads to finish')
426
            for thread in activethreads():
427
                thread.join()
428
            raise
429

    
430
        r = self.object_put(
431
            obj,
432
            format='json',
433
            hashmap=True,
434
            content_type=content_type,
435
            if_etag_match=if_etag_match,
436
            if_etag_not_match='*' if if_not_exist else None,
437
            etag=etag,
438
            json=hashmap,
439
            permissions=sharing,
440
            public=public,
441
            success=201)
442
        return r.headers
443

    
444
    # download_* auxiliary methods
445
    def _get_remote_blocks_info(self, obj, **restargs):
446
        #retrieve object hashmap
447
        myrange = restargs.pop('data_range', None)
448
        hashmap = self.get_object_hashmap(obj, **restargs)
449
        restargs['data_range'] = myrange
450
        blocksize = int(hashmap['block_size'])
451
        blockhash = hashmap['block_hash']
452
        total_size = hashmap['bytes']
453
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
454
        map_dict = {}
455
        for i, h in enumerate(hashmap['hashes']):
456
            #  map_dict[h] = i   CHAGE
457
            if h in map_dict:
458
                map_dict[h].append(i)
459
            else:
460
                map_dict[h] = [i]
461
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
462

    
463
    def _dump_blocks_sync(
464
            self, obj, remote_hashes, blocksize, total_size, dst, range,
465
            **args):
466
        for blockid, blockhash in enumerate(remote_hashes):
467
            if blockhash:
468
                start = blocksize * blockid
469
                is_last = start + blocksize > total_size
470
                end = (total_size - 1) if is_last else (start + blocksize - 1)
471
                (start, end) = _range_up(start, end, range)
472
                args['data_range'] = 'bytes=%s-%s' % (start, end)
473
                r = self.object_get(obj, success=(200, 206), **args)
474
                self._cb_next()
475
                dst.write(r.content)
476
                dst.flush()
477

    
478
    def _get_block_async(self, obj, **args):
479
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
480
        event.start()
481
        return event
482

    
483
    def _hash_from_file(self, fp, start, size, blockhash):
484
        fp.seek(start)
485
        block = fp.read(size)
486
        h = newhashlib(blockhash)
487
        h.update(block.strip('\x00'))
488
        return hexlify(h.digest())
489

    
490
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
491
        """write the results of a greenleted rest call to a file
492

493
        :param offset: the offset of the file up to blocksize
494
        - e.g. if the range is 10-100, all blocks will be written to
495
        normal_position - 10
496
        """
497
        for i, (key, g) in enumerate(flying.items()):
498
            if g.isAlive():
499
                continue
500
            if g.exception:
501
                raise g.exception
502
            block = g.value.content
503
            for block_start in blockids[key]:
504
                local_file.seek(block_start + offset)
505
                local_file.write(block)
506
                self._cb_next()
507
            flying.pop(key)
508
            blockids.pop(key)
509
        local_file.flush()
510

    
511
    def _dump_blocks_async(
512
            self, obj, remote_hashes, blocksize, total_size, local_file,
513
            blockhash=None, resume=False, filerange=None, **restargs):
514
        file_size = fstat(local_file.fileno()).st_size if resume else 0
515
        flying = dict()
516
        blockid_dict = dict()
517
        offset = 0
518
        if filerange is not None:
519
            rstart = int(filerange.split('-')[0])
520
            offset = rstart if blocksize > rstart else rstart % blocksize
521

    
522
        self._init_thread_limit()
523
        for block_hash, blockids in remote_hashes.items():
524
            blockids = [blk * blocksize for blk in blockids]
525
            unsaved = [blk for blk in blockids if not (
526
                blk < file_size and block_hash == self._hash_from_file(
527
                        local_file, blk, blocksize, blockhash))]
528
            self._cb_next(len(blockids) - len(unsaved))
529
            if unsaved:
530
                key = unsaved[0]
531
                self._watch_thread_limit(flying.values())
532
                self._thread2file(
533
                    flying, blockid_dict, local_file, offset,
534
                    **restargs)
535
                end = total_size - 1 if key + blocksize > total_size\
536
                    else key + blocksize - 1
537
                start, end = _range_up(key, end, filerange)
538
                if start == end:
539
                    self._cb_next()
540
                    continue
541
                restargs['async_headers'] = {
542
                    'Range': 'bytes=%s-%s' % (start, end)}
543
                flying[key] = self._get_block_async(obj, **restargs)
544
                blockid_dict[key] = unsaved
545

    
546
        for thread in flying.values():
547
            thread.join()
548
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
549

    
550
    def download_object(
551
            self, obj, dst,
552
            download_cb=None,
553
            version=None,
554
            resume=False,
555
            range_str=None,
556
            if_match=None,
557
            if_none_match=None,
558
            if_modified_since=None,
559
            if_unmodified_since=None):
560
        """Download an object (multiple connections, random blocks)
561

562
        :param obj: (str) remote object path
563

564
        :param dst: open file descriptor (wb+)
565

566
        :param download_cb: optional progress.bar object for downloading
567

568
        :param version: (str) file version
569

570
        :param resume: (bool) if set, preserve already downloaded file parts
571

572
        :param range_str: (str) from, to are file positions (int) in bytes
573

574
        :param if_match: (str)
575

576
        :param if_none_match: (str)
577

578
        :param if_modified_since: (str) formated date
579

580
        :param if_unmodified_since: (str) formated date"""
581
        restargs = dict(
582
            version=version,
583
            data_range=None if range_str is None else 'bytes=%s' % range_str,
584
            if_match=if_match,
585
            if_none_match=if_none_match,
586
            if_modified_since=if_modified_since,
587
            if_unmodified_since=if_unmodified_since)
588

    
589
        (
590
            blocksize,
591
            blockhash,
592
            total_size,
593
            hash_list,
594
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
595
        assert total_size >= 0
596

    
597
        if download_cb:
598
            self.progress_bar_gen = download_cb(len(hash_list))
599
            self._cb_next()
600

    
601
        if dst.isatty():
602
            self._dump_blocks_sync(
603
                obj,
604
                hash_list,
605
                blocksize,
606
                total_size,
607
                dst,
608
                range_str,
609
                **restargs)
610
        else:
611
            self._dump_blocks_async(
612
                obj,
613
                remote_hashes,
614
                blocksize,
615
                total_size,
616
                dst,
617
                blockhash,
618
                resume,
619
                range_str,
620
                **restargs)
621
            if not range_str:
622
                dst.truncate(total_size)
623

    
624
        self._complete_cb()
625

    
626
    #Command Progress Bar method
627
    def _cb_next(self, step=1):
628
        if hasattr(self, 'progress_bar_gen'):
629
            try:
630
                for i in xrange(step):
631
                    self.progress_bar_gen.next()
632
            except:
633
                pass
634

    
635
    def _complete_cb(self):
636
        while True:
637
            try:
638
                self.progress_bar_gen.next()
639
            except:
640
                break
641

    
642
    def get_object_hashmap(
643
            self, obj,
644
            version=None,
645
            if_match=None,
646
            if_none_match=None,
647
            if_modified_since=None,
648
            if_unmodified_since=None,
649
            data_range=None):
650
        """
651
        :param obj: (str) remote object path
652

653
        :param if_match: (str)
654

655
        :param if_none_match: (str)
656

657
        :param if_modified_since: (str) formated date
658

659
        :param if_unmodified_since: (str) formated date
660

661
        :param data_range: (str) from-to where from and to are integers
662
            denoting file positions in bytes
663

664
        :returns: (list)
665
        """
666
        try:
667
            r = self.object_get(
668
                obj,
669
                hashmap=True,
670
                version=version,
671
                if_etag_match=if_match,
672
                if_etag_not_match=if_none_match,
673
                if_modified_since=if_modified_since,
674
                if_unmodified_since=if_unmodified_since,
675
                data_range=data_range)
676
        except ClientError as err:
677
            if err.status == 304 or err.status == 412:
678
                return {}
679
            raise
680
        return r.json
681

    
682
    def set_account_group(self, group, usernames):
683
        """
684
        :param group: (str)
685

686
        :param usernames: (list)
687
        """
688
        self.account_post(update=True, groups={group: usernames})
689

    
690
    def del_account_group(self, group):
691
        """
692
        :param group: (str)
693
        """
694
        self.account_post(update=True, groups={group: []})
695

    
696
    def get_account_info(self, until=None):
697
        """
698
        :param until: (str) formated date
699

700
        :returns: (dict)
701
        """
702
        r = self.account_head(until=until)
703
        if r.status_code == 401:
704
            raise ClientError("No authorization", status=401)
705
        return r.headers
706

    
707
    def get_account_quota(self):
708
        """
709
        :returns: (dict)
710
        """
711
        return filter_in(
712
            self.get_account_info(),
713
            'X-Account-Policy-Quota',
714
            exactMatch=True)
715

    
716
    def get_account_versioning(self):
717
        """
718
        :returns: (dict)
719
        """
720
        return filter_in(
721
            self.get_account_info(),
722
            'X-Account-Policy-Versioning',
723
            exactMatch=True)
724

    
725
    def get_account_meta(self, until=None):
726
        """
727
        :meta until: (str) formated date
728

729
        :returns: (dict)
730
        """
731
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
732

    
733
    def get_account_group(self):
734
        """
735
        :returns: (dict)
736
        """
737
        return filter_in(self.get_account_info(), 'X-Account-Group-')
738

    
739
    def set_account_meta(self, metapairs):
740
        """
741
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
742
        """
743
        assert(type(metapairs) is dict)
744
        self.account_post(update=True, metadata=metapairs)
745

    
746
    def del_account_meta(self, metakey):
747
        """
748
        :param metakey: (str) metadatum key
749
        """
750
        self.account_post(update=True, metadata={metakey: ''})
751

    
752
    """
753
    def set_account_quota(self, quota):
754
        ""
755
        :param quota: (int)
756
        ""
757
        self.account_post(update=True, quota=quota)
758
    """
759

    
760
    def set_account_versioning(self, versioning):
761
        """
762
        "param versioning: (str)
763
        """
764
        self.account_post(update=True, versioning=versioning)
765

    
766
    def list_containers(self):
767
        """
768
        :returns: (dict)
769
        """
770
        r = self.account_get()
771
        return r.json
772

    
773
    def del_container(self, until=None, delimiter=None):
774
        """
775
        :param until: (str) formated date
776

777
        :param delimiter: (str) with / empty container
778

779
        :raises ClientError: 404 Container does not exist
780

781
        :raises ClientError: 409 Container is not empty
782
        """
783
        self._assert_container()
784
        r = self.container_delete(
785
            until=until,
786
            delimiter=delimiter,
787
            success=(204, 404, 409))
788
        if r.status_code == 404:
789
            raise ClientError(
790
                'Container "%s" does not exist' % self.container,
791
                r.status_code)
792
        elif r.status_code == 409:
793
            raise ClientError(
794
                'Container "%s" is not empty' % self.container,
795
                r.status_code)
796

    
797
    def get_container_versioning(self, container=None):
798
        """
799
        :param container: (str)
800

801
        :returns: (dict)
802
        """
803
        cnt_back_up = self.container
804
        try:
805
            self.container = container or cnt_back_up
806
            return filter_in(
807
                self.get_container_info(),
808
                'X-Container-Policy-Versioning')
809
        finally:
810
            self.container = cnt_back_up
811

    
812
    def get_container_limit(self, container=None):
813
        """
814
        :param container: (str)
815

816
        :returns: (dict)
817
        """
818
        cnt_back_up = self.container
819
        try:
820
            self.container = container or cnt_back_up
821
            return filter_in(
822
                self.get_container_info(),
823
                'X-Container-Policy-Quota')
824
        finally:
825
            self.container = cnt_back_up
826

    
827
    def get_container_info(self, until=None):
828
        """
829
        :param until: (str) formated date
830

831
        :returns: (dict)
832

833
        :raises ClientError: 404 Container not found
834
        """
835
        try:
836
            r = self.container_head(until=until)
837
        except ClientError as err:
838
            err.details.append('for container %s' % self.container)
839
            raise err
840
        return r.headers
841

    
842
    def get_container_meta(self, until=None):
843
        """
844
        :param until: (str) formated date
845

846
        :returns: (dict)
847
        """
848
        return filter_in(
849
            self.get_container_info(until=until),
850
            'X-Container-Meta')
851

    
852
    def get_container_object_meta(self, until=None):
853
        """
854
        :param until: (str) formated date
855

856
        :returns: (dict)
857
        """
858
        return filter_in(
859
            self.get_container_info(until=until),
860
            'X-Container-Object-Meta')
861

    
862
    def set_container_meta(self, metapairs):
863
        """
864
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
865
        """
866
        assert(type(metapairs) is dict)
867
        self.container_post(update=True, metadata=metapairs)
868

    
869
    def del_container_meta(self, metakey):
870
        """
871
        :param metakey: (str) metadatum key
872
        """
873
        self.container_post(update=True, metadata={metakey: ''})
874

    
875
    def set_container_limit(self, limit):
876
        """
877
        :param limit: (int)
878
        """
879
        self.container_post(update=True, quota=limit)
880

    
881
    def set_container_versioning(self, versioning):
882
        """
883
        :param versioning: (str)
884
        """
885
        self.container_post(update=True, versioning=versioning)
886

    
887
    def del_object(self, obj, until=None, delimiter=None):
888
        """
889
        :param obj: (str) remote object path
890

891
        :param until: (str) formated date
892

893
        :param delimiter: (str)
894
        """
895
        self._assert_container()
896
        self.object_delete(obj, until=until, delimiter=delimiter)
897

    
898
    def set_object_meta(self, obj, metapairs):
899
        """
900
        :param obj: (str) remote object path
901

902
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
903
        """
904
        assert(type(metapairs) is dict)
905
        self.object_post(obj, update=True, metadata=metapairs)
906

    
907
    def del_object_meta(self, obj, metakey):
908
        """
909
        :param obj: (str) remote object path
910

911
        :param metakey: (str) metadatum key
912
        """
913
        self.object_post(obj, update=True, metadata={metakey: ''})
914

    
915
    def publish_object(self, obj):
916
        """
917
        :param obj: (str) remote object path
918

919
        :returns: (str) access url
920
        """
921
        self.object_post(obj, update=True, public=True)
922
        info = self.get_object_info(obj)
923
        pref, sep, rest = self.base_url.partition('//')
924
        base = rest.split('/')[0]
925
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
926

    
927
    def unpublish_object(self, obj):
928
        """
929
        :param obj: (str) remote object path
930
        """
931
        self.object_post(obj, update=True, public=False)
932

    
933
    def get_object_info(self, obj, version=None):
934
        """
935
        :param obj: (str) remote object path
936

937
        :param version: (str)
938

939
        :returns: (dict)
940
        """
941
        try:
942
            r = self.object_head(obj, version=version)
943
            return r.headers
944
        except ClientError as ce:
945
            if ce.status == 404:
946
                raise ClientError('Object %s not found' % obj, status=404)
947
            raise
948

    
949
    def get_object_meta(self, obj, version=None):
950
        """
951
        :param obj: (str) remote object path
952

953
        :param version: (str)
954

955
        :returns: (dict)
956
        """
957
        return filter_in(
958
            self.get_object_info(obj, version=version),
959
            'X-Object-Meta')
960

    
961
    def get_object_sharing(self, obj):
962
        """
963
        :param obj: (str) remote object path
964

965
        :returns: (dict)
966
        """
967
        r = filter_in(
968
            self.get_object_info(obj),
969
            'X-Object-Sharing',
970
            exactMatch=True)
971
        reply = {}
972
        if len(r) > 0:
973
            perms = r['x-object-sharing'].split(';')
974
            for perm in perms:
975
                try:
976
                    perm.index('=')
977
                except ValueError:
978
                    raise ClientError('Incorrect reply format')
979
                (key, val) = perm.strip().split('=')
980
                reply[key] = val
981
        return reply
982

    
983
    def set_object_sharing(
984
            self, obj,
985
            read_permition=False, write_permition=False):
986
        """Give read/write permisions to an object.
987

988
        :param obj: (str) remote object path
989

990
        :param read_permition: (list - bool) users and user groups that get
991
            read permition for this object - False means all previous read
992
            permissions will be removed
993

994
        :param write_perimition: (list - bool) of users and user groups to get
995
           write permition for this object - False means all previous write
996
           permissions will be removed
997
        """
998

    
999
        perms = dict(read=read_permition or '', write=write_permition or '')
1000
        self.object_post(obj, update=True, permissions=perms)
1001

    
1002
    def del_object_sharing(self, obj):
1003
        """
1004
        :param obj: (str) remote object path
1005
        """
1006
        self.set_object_sharing(obj)
1007

    
1008
    def append_object(self, obj, source_file, upload_cb=None):
1009
        """
1010
        :param obj: (str) remote object path
1011

1012
        :param source_file: open file descriptor
1013

1014
        :param upload_db: progress.bar for uploading
1015
        """
1016

    
1017
        self._assert_container()
1018
        meta = self.get_container_info()
1019
        blocksize = int(meta['x-container-block-size'])
1020
        filesize = fstat(source_file.fileno()).st_size
1021
        nblocks = 1 + (filesize - 1) // blocksize
1022
        offset = 0
1023
        if upload_cb:
1024
            upload_gen = upload_cb(nblocks)
1025
            upload_gen.next()
1026
        for i in range(nblocks):
1027
            block = source_file.read(min(blocksize, filesize - offset))
1028
            offset += len(block)
1029
            self.object_post(
1030
                obj,
1031
                update=True,
1032
                content_range='bytes */*',
1033
                content_type='application/octet-stream',
1034
                content_length=len(block),
1035
                data=block)
1036

    
1037
            if upload_cb:
1038
                upload_gen.next()
1039

    
1040
    def truncate_object(self, obj, upto_bytes):
1041
        """
1042
        :param obj: (str) remote object path
1043

1044
        :param upto_bytes: max number of bytes to leave on file
1045
        """
1046
        self.object_post(
1047
            obj,
1048
            update=True,
1049
            content_range='bytes 0-%s/*' % upto_bytes,
1050
            content_type='application/octet-stream',
1051
            object_bytes=upto_bytes,
1052
            source_object=path4url(self.container, obj))
1053

    
1054
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1055
        """Overwrite a part of an object from local source file
1056

1057
        :param obj: (str) remote object path
1058

1059
        :param start: (int) position in bytes to start overwriting from
1060

1061
        :param end: (int) position in bytes to stop overwriting at
1062

1063
        :param source_file: open file descriptor
1064

1065
        :param upload_db: progress.bar for uploading
1066
        """
1067

    
1068
        r = self.get_object_info(obj)
1069
        rf_size = int(r['content-length'])
1070
        if rf_size < int(start):
1071
            raise ClientError(
1072
                'Range start exceeds file size',
1073
                status=416)
1074
        elif rf_size < int(end):
1075
            raise ClientError(
1076
                'Range end exceeds file size',
1077
                status=416)
1078
        self._assert_container()
1079
        meta = self.get_container_info()
1080
        blocksize = int(meta['x-container-block-size'])
1081
        filesize = fstat(source_file.fileno()).st_size
1082
        datasize = int(end) - int(start) + 1
1083
        nblocks = 1 + (datasize - 1) // blocksize
1084
        offset = 0
1085
        if upload_cb:
1086
            upload_gen = upload_cb(nblocks)
1087
            upload_gen.next()
1088
        for i in range(nblocks):
1089
            read_size = min(blocksize, filesize - offset, datasize - offset)
1090
            block = source_file.read(read_size)
1091
            self.object_post(
1092
                obj,
1093
                update=True,
1094
                content_type='application/octet-stream',
1095
                content_length=len(block),
1096
                content_range='bytes %s-%s/*' % (
1097
                    start + offset,
1098
                    start + offset + len(block) - 1),
1099
                data=block)
1100
            offset += len(block)
1101

    
1102
            if upload_cb:
1103
                upload_gen.next()
1104

    
1105
    def copy_object(
1106
            self, src_container, src_object, dst_container,
1107
            dst_object=None,
1108
            source_version=None,
1109
            source_account=None,
1110
            public=False,
1111
            content_type=None,
1112
            delimiter=None):
1113
        """
1114
        :param src_container: (str) source container
1115

1116
        :param src_object: (str) source object path
1117

1118
        :param dst_container: (str) destination container
1119

1120
        :param dst_object: (str) destination object path
1121

1122
        :param source_version: (str) source object version
1123

1124
        :param source_account: (str) account to copy from
1125

1126
        :param public: (bool)
1127

1128
        :param content_type: (str)
1129

1130
        :param delimiter: (str)
1131
        """
1132
        self._assert_account()
1133
        self.container = dst_container
1134
        src_path = path4url(src_container, src_object)
1135
        self.object_put(
1136
            dst_object or src_object,
1137
            success=201,
1138
            copy_from=src_path,
1139
            content_length=0,
1140
            source_version=source_version,
1141
            source_account=source_account,
1142
            public=public,
1143
            content_type=content_type,
1144
            delimiter=delimiter)
1145

    
1146
    def move_object(
1147
            self, src_container, src_object, dst_container,
1148
            dst_object=False,
1149
            source_account=None,
1150
            source_version=None,
1151
            public=False,
1152
            content_type=None,
1153
            delimiter=None):
1154
        """
1155
        :param src_container: (str) source container
1156

1157
        :param src_object: (str) source object path
1158

1159
        :param dst_container: (str) destination container
1160

1161
        :param dst_object: (str) destination object path
1162

1163
        :param source_account: (str) account to move from
1164

1165
        :param source_version: (str) source object version
1166

1167
        :param public: (bool)
1168

1169
        :param content_type: (str)
1170

1171
        :param delimiter: (str)
1172
        """
1173
        self._assert_account()
1174
        self.container = dst_container
1175
        dst_object = dst_object or src_object
1176
        src_path = path4url(src_container, src_object)
1177
        self.object_put(
1178
            dst_object,
1179
            success=201,
1180
            move_from=src_path,
1181
            content_length=0,
1182
            source_account=source_account,
1183
            source_version=source_version,
1184
            public=public,
1185
            content_type=content_type,
1186
            delimiter=delimiter)
1187

    
1188
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1189
        """Get accounts that share with self.account
1190

1191
        :param limit: (str)
1192

1193
        :param marker: (str)
1194

1195
        :returns: (dict)
1196
        """
1197
        self._assert_account()
1198

    
1199
        self.set_param('format', 'json')
1200
        self.set_param('limit', limit, iff=limit is not None)
1201
        self.set_param('marker', marker, iff=marker is not None)
1202

    
1203
        path = ''
1204
        success = kwargs.pop('success', (200, 204))
1205
        r = self.get(path, *args, success=success, **kwargs)
1206
        return r.json
1207

    
1208
    def get_object_versionlist(self, obj):
1209
        """
1210
        :param obj: (str) remote object path
1211

1212
        :returns: (list)
1213
        """
1214
        self._assert_container()
1215
        r = self.object_get(obj, format='json', version='list')
1216
        return r.json['versions']