Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ c2b5da2f

History | View | Annotate | Download (35.2 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos.rest_api import PithosRestClient
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """GRNet Pithos API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def purge_container(self, container=None):
75
        """Delete an empty container and destroy associated blocks
76
        """
77
        cnt_back_up = self.container
78
        try:
79
            self.container = container or cnt_back_up
80
            self.container_delete(until=unicode(time()))
81
        finally:
82
            self.container = cnt_back_up
83

    
84
    def upload_object_unchunked(
85
            self, obj, f,
86
            withHashFile=False,
87
            size=None,
88
            etag=None,
89
            content_encoding=None,
90
            content_disposition=None,
91
            content_type=None,
92
            sharing=None,
93
            public=None):
94
        """
95
        :param obj: (str) remote object path
96

97
        :param f: open file descriptor
98

99
        :param withHashFile: (bool)
100

101
        :param size: (int) size of data to upload
102

103
        :param etag: (str)
104

105
        :param content_encoding: (str)
106

107
        :param content_disposition: (str)
108

109
        :param content_type: (str)
110

111
        :param sharing: {'read':[user and/or grp names],
112
            'write':[usr and/or grp names]}
113

114
        :param public: (bool)
115
        """
116
        self._assert_container()
117

    
118
        if withHashFile:
119
            data = f.read()
120
            try:
121
                import json
122
                data = json.dumps(json.loads(data))
123
            except ValueError:
124
                raise ClientError('"%s" is not json-formated' % f.name, 1)
125
            except SyntaxError:
126
                msg = '"%s" is not a valid hashmap file' % f.name
127
                raise ClientError(msg, 1)
128
            f = StringIO(data)
129
        else:
130
            data = f.read(size) if size else f.read()
131
        self.object_put(
132
            obj,
133
            data=data,
134
            etag=etag,
135
            content_encoding=content_encoding,
136
            content_disposition=content_disposition,
137
            content_type=content_type,
138
            permissions=sharing,
139
            public=public,
140
            success=201)
141

    
142
    def create_object_by_manifestation(
143
            self, obj,
144
            etag=None,
145
            content_encoding=None,
146
            content_disposition=None,
147
            content_type=None,
148
            sharing=None,
149
            public=None):
150
        """
151
        :param obj: (str) remote object path
152

153
        :param etag: (str)
154

155
        :param content_encoding: (str)
156

157
        :param content_disposition: (str)
158

159
        :param content_type: (str)
160

161
        :param sharing: {'read':[user and/or grp names],
162
            'write':[usr and/or grp names]}
163

164
        :param public: (bool)
165
        """
166
        self._assert_container()
167
        self.object_put(
168
            obj,
169
            content_length=0,
170
            etag=etag,
171
            content_encoding=content_encoding,
172
            content_disposition=content_disposition,
173
            content_type=content_type,
174
            permissions=sharing,
175
            public=public,
176
            manifest='%s/%s' % (self.container, obj))
177

    
178
    # upload_* auxiliary methods
179
    def _put_block_async(self, data, hash, upload_gen=None):
180
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
181
        event.start()
182
        return event
183

    
184
    def _put_block(self, data, hash):
185
        r = self.container_post(
186
            update=True,
187
            content_type='application/octet-stream',
188
            content_length=len(data),
189
            data=data,
190
            format='json')
191
        assert r.json[0] == hash, 'Local hash does not match server'
192

    
193
    def _get_file_block_info(self, fileobj, size=None):
194
        meta = self.get_container_info()
195
        blocksize = int(meta['x-container-block-size'])
196
        blockhash = meta['x-container-block-hash']
197
        size = size if size is not None else fstat(fileobj.fileno()).st_size
198
        nblocks = 1 + (size - 1) // blocksize
199
        return (blocksize, blockhash, size, nblocks)
200

    
201
    def _get_missing_hashes(
202
            self, obj, json,
203
            size=None,
204
            format='json',
205
            hashmap=True,
206
            content_type=None,
207
            etag=None,
208
            content_encoding=None,
209
            content_disposition=None,
210
            permissions=None,
211
            public=None,
212
            success=(201, 409)):
213
        r = self.object_put(
214
            obj,
215
            format='json',
216
            hashmap=True,
217
            content_type=content_type,
218
            json=json,
219
            etag=etag,
220
            content_encoding=content_encoding,
221
            content_disposition=content_disposition,
222
            permissions=permissions,
223
            public=public,
224
            success=success)
225
        return None if r.status_code == 201 else r.json
226

    
227
    def _culculate_blocks_for_upload(
228
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
229
            hash_cb=None):
230
        offset = 0
231
        if hash_cb:
232
            hash_gen = hash_cb(nblocks)
233
            hash_gen.next()
234

    
235
        for i in range(nblocks):
236
            block = fileobj.read(min(blocksize, size - offset))
237
            bytes = len(block)
238
            hash = _pithos_hash(block, blockhash)
239
            hashes.append(hash)
240
            hmap[hash] = (offset, bytes)
241
            offset += bytes
242
            if hash_cb:
243
                hash_gen.next()
244
        msg = 'Failed to calculate uploaded blocks:'
245
        ' Offset and object size do not match'
246
        assert offset == size, msg
247

    
248
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
249
        """upload missing blocks asynchronously"""
250

    
251
        self._init_thread_limit()
252

    
253
        flying = []
254
        failures = []
255
        for hash in missing:
256
            offset, bytes = hmap[hash]
257
            fileobj.seek(offset)
258
            data = fileobj.read(bytes)
259
            r = self._put_block_async(data, hash, upload_gen)
260
            flying.append(r)
261
            unfinished = self._watch_thread_limit(flying)
262
            for thread in set(flying).difference(unfinished):
263
                if thread.exception:
264
                    failures.append(thread)
265
                    if isinstance(
266
                            thread.exception,
267
                            ClientError) and thread.exception.status == 502:
268
                        self.POOLSIZE = self._thread_limit
269
                elif thread.isAlive():
270
                    flying.append(thread)
271
                elif upload_gen:
272
                    try:
273
                        upload_gen.next()
274
                    except:
275
                        pass
276
            flying = unfinished
277

    
278
        for thread in flying:
279
            thread.join()
280
            if thread.exception:
281
                failures.append(thread)
282
            elif upload_gen:
283
                try:
284
                    upload_gen.next()
285
                except:
286
                    pass
287

    
288
        return [failure.kwargs['hash'] for failure in failures]
289

    
290
    def upload_object(
291
            self, obj, f,
292
            size=None,
293
            hash_cb=None,
294
            upload_cb=None,
295
            etag=None,
296
            content_encoding=None,
297
            content_disposition=None,
298
            content_type=None,
299
            sharing=None,
300
            public=None):
301
        """Upload an object using multiple connections (threads)
302

303
        :param obj: (str) remote object path
304

305
        :param f: open file descriptor (rb)
306

307
        :param hash_cb: optional progress.bar object for calculating hashes
308

309
        :param upload_cb: optional progress.bar object for uploading
310

311
        :param etag: (str)
312

313
        :param content_encoding: (str)
314

315
        :param content_disposition: (str)
316

317
        :param content_type: (str)
318

319
        :param sharing: {'read':[user and/or grp names],
320
            'write':[usr and/or grp names]}
321

322
        :param public: (bool)
323
        """
324
        self._assert_container()
325

    
326
        #init
327
        block_info = (blocksize, blockhash, size, nblocks) =\
328
            self._get_file_block_info(f, size)
329
        (hashes, hmap, offset) = ([], {}, 0)
330
        if not content_type:
331
            content_type = 'application/octet-stream'
332

    
333
        self._culculate_blocks_for_upload(
334
            *block_info,
335
            hashes=hashes,
336
            hmap=hmap,
337
            fileobj=f,
338
            hash_cb=hash_cb)
339

    
340
        hashmap = dict(bytes=size, hashes=hashes)
341
        missing = self._get_missing_hashes(
342
            obj, hashmap,
343
            content_type=content_type,
344
            size=size,
345
            etag=etag,
346
            content_encoding=content_encoding,
347
            content_disposition=content_disposition,
348
            permissions=sharing,
349
            public=public)
350

    
351
        if missing is None:
352
            return
353

    
354
        if upload_cb:
355
            upload_gen = upload_cb(len(missing))
356
            for i in range(len(missing), len(hashmap['hashes']) + 1):
357
                try:
358
                    upload_gen.next()
359
                except:
360
                    upload_gen = None
361
        else:
362
            upload_gen = None
363

    
364
        retries = 7
365
        try:
366
            while retries:
367
                sendlog.info('%s blocks missing' % len(missing))
368
                num_of_blocks = len(missing)
369
                missing = self._upload_missing_blocks(
370
                    missing,
371
                    hmap,
372
                    f,
373
                    upload_gen)
374
                if missing:
375
                    if num_of_blocks == len(missing):
376
                        retries -= 1
377
                    else:
378
                        num_of_blocks = len(missing)
379
                else:
380
                    break
381
            if missing:
382
                raise ClientError(
383
                    '%s blocks failed to upload' % len(missing),
384
                    status=800)
385
        except KeyboardInterrupt:
386
            sendlog.info('- - - wait for threads to finish')
387
            for thread in activethreads():
388
                thread.join()
389
            raise
390

    
391
        self.object_put(
392
            obj,
393
            format='json',
394
            hashmap=True,
395
            content_type=content_type,
396
            json=hashmap,
397
            success=201)
398

    
399
    # download_* auxiliary methods
400
    def _get_remote_blocks_info(self, obj, **restargs):
401
        #retrieve object hashmap
402
        myrange = restargs.pop('data_range', None)
403
        hashmap = self.get_object_hashmap(obj, **restargs)
404
        restargs['data_range'] = myrange
405
        blocksize = int(hashmap['block_size'])
406
        blockhash = hashmap['block_hash']
407
        total_size = hashmap['bytes']
408
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
409
        map_dict = {}
410
        for i, h in enumerate(hashmap['hashes']):
411
            map_dict[h] = i
412
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
413

    
414
    def _dump_blocks_sync(
415
            self, obj, remote_hashes, blocksize, total_size, dst, range,
416
            **args):
417
        for blockid, blockhash in enumerate(remote_hashes):
418
            if blockhash:
419
                start = blocksize * blockid
420
                is_last = start + blocksize > total_size
421
                end = (total_size - 1) if is_last else (start + blocksize - 1)
422
                (start, end) = _range_up(start, end, range)
423
                args['data_range'] = 'bytes=%s-%s' % (start, end)
424
                r = self.object_get(obj, success=(200, 206), **args)
425
                self._cb_next()
426
                dst.write(r.content)
427
                dst.flush()
428

    
429
    def _get_block_async(self, obj, **args):
430
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
431
        event.start()
432
        return event
433

    
434
    def _hash_from_file(self, fp, start, size, blockhash):
435
        fp.seek(start)
436
        block = fp.read(size)
437
        h = newhashlib(blockhash)
438
        h.update(block.strip('\x00'))
439
        return hexlify(h.digest())
440

    
441
    def _thread2file(self, flying, local_file, offset=0, **restargs):
442
        """write the results of a greenleted rest call to a file
443

444
        :param offset: the offset of the file up to blocksize
445
        - e.g. if the range is 10-100, all blocks will be written to
446
        normal_position - 10
447
        """
448
        finished = []
449
        for i, (start, g) in enumerate(flying.items()):
450
            if not g.isAlive():
451
                if g.exception:
452
                    raise g.exception
453
                block = g.value.content
454
                local_file.seek(start - offset)
455
                local_file.write(block)
456
                self._cb_next()
457
                finished.append(flying.pop(start))
458
        local_file.flush()
459
        return finished
460

    
461
    def _dump_blocks_async(
462
            self, obj, remote_hashes, blocksize, total_size, local_file,
463
            blockhash=None, resume=False, filerange=None, **restargs):
464
        file_size = fstat(local_file.fileno()).st_size if resume else 0
465
        flying = {}
466
        finished = []
467
        offset = 0
468
        if filerange is not None:
469
            rstart = int(filerange.split('-')[0])
470
            offset = rstart if blocksize > rstart else rstart % blocksize
471

    
472
        self._init_thread_limit()
473
        for block_hash, blockid in remote_hashes.items():
474
            start = blocksize * blockid
475
            if start < file_size and block_hash == self._hash_from_file(
476
                    local_file, start, blocksize, blockhash):
477
                self._cb_next()
478
                continue
479
            self._watch_thread_limit(flying.values())
480
            finished += self._thread2file(
481
                flying,
482
                local_file,
483
                offset,
484
                **restargs)
485
            end = total_size - 1 if start + blocksize > total_size\
486
                else start + blocksize - 1
487
            (start, end) = _range_up(start, end, filerange)
488
            if start == end:
489
                self._cb_next()
490
                continue
491
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
492
            flying[start] = self._get_block_async(obj, **restargs)
493

    
494
        for thread in flying.values():
495
            thread.join()
496
        finished += self._thread2file(flying, local_file, offset, **restargs)
497

    
498
    def download_object(
499
            self, obj, dst,
500
            download_cb=None,
501
            version=None,
502
            resume=False,
503
            range_str=None,
504
            if_match=None,
505
            if_none_match=None,
506
            if_modified_since=None,
507
            if_unmodified_since=None):
508
        """Download an object (multiple connections, random blocks)
509

510
        :param obj: (str) remote object path
511

512
        :param dst: open file descriptor (wb+)
513

514
        :param download_cb: optional progress.bar object for downloading
515

516
        :param version: (str) file version
517

518
        :param resume: (bool) if set, preserve already downloaded file parts
519

520
        :param range_str: (str) from, to are file positions (int) in bytes
521

522
        :param if_match: (str)
523

524
        :param if_none_match: (str)
525

526
        :param if_modified_since: (str) formated date
527

528
        :param if_unmodified_since: (str) formated date"""
529
        restargs = dict(
530
            version=version,
531
            data_range=None if range_str is None else 'bytes=%s' % range_str,
532
            if_match=if_match,
533
            if_none_match=if_none_match,
534
            if_modified_since=if_modified_since,
535
            if_unmodified_since=if_unmodified_since)
536

    
537
        (
538
            blocksize,
539
            blockhash,
540
            total_size,
541
            hash_list,
542
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
543
        assert total_size >= 0
544

    
545
        if download_cb:
546
            self.progress_bar_gen = download_cb(len(remote_hashes))
547
            self._cb_next()
548

    
549
        if dst.isatty():
550
            self._dump_blocks_sync(
551
                obj,
552
                hash_list,
553
                blocksize,
554
                total_size,
555
                dst,
556
                range_str,
557
                **restargs)
558
        else:
559
            self._dump_blocks_async(
560
                obj,
561
                remote_hashes,
562
                blocksize,
563
                total_size,
564
                dst,
565
                blockhash,
566
                resume,
567
                range_str,
568
                **restargs)
569
            if not range_str:
570
                dst.truncate(total_size)
571

    
572
        self._complete_cb()
573

    
574
    #Command Progress Bar method
575
    def _cb_next(self):
576
        if hasattr(self, 'progress_bar_gen'):
577
            try:
578
                self.progress_bar_gen.next()
579
            except:
580
                pass
581

    
582
    def _complete_cb(self):
583
        while True:
584
            try:
585
                self.progress_bar_gen.next()
586
            except:
587
                break
588

    
589
    def get_object_hashmap(
590
            self, obj,
591
            version=None,
592
            if_match=None,
593
            if_none_match=None,
594
            if_modified_since=None,
595
            if_unmodified_since=None,
596
            data_range=None):
597
        """
598
        :param obj: (str) remote object path
599

600
        :param if_match: (str)
601

602
        :param if_none_match: (str)
603

604
        :param if_modified_since: (str) formated date
605

606
        :param if_unmodified_since: (str) formated date
607

608
        :param data_range: (str) from-to where from and to are integers
609
            denoting file positions in bytes
610

611
        :returns: (list)
612
        """
613
        try:
614
            r = self.object_get(
615
                obj,
616
                hashmap=True,
617
                version=version,
618
                if_etag_match=if_match,
619
                if_etag_not_match=if_none_match,
620
                if_modified_since=if_modified_since,
621
                if_unmodified_since=if_unmodified_since,
622
                data_range=data_range)
623
        except ClientError as err:
624
            if err.status == 304 or err.status == 412:
625
                return {}
626
            raise
627
        return r.json
628

    
629
    def set_account_group(self, group, usernames):
630
        """
631
        :param group: (str)
632

633
        :param usernames: (list)
634
        """
635
        self.account_post(update=True, groups={group: usernames})
636

    
637
    def del_account_group(self, group):
638
        """
639
        :param group: (str)
640
        """
641
        self.account_post(update=True, groups={group: []})
642

    
643
    def get_account_info(self, until=None):
644
        """
645
        :param until: (str) formated date
646

647
        :returns: (dict)
648
        """
649
        r = self.account_head(until=until)
650
        if r.status_code == 401:
651
            raise ClientError("No authorization", status=401)
652
        return r.headers
653

    
654
    def get_account_quota(self):
655
        """
656
        :returns: (dict)
657
        """
658
        return filter_in(
659
            self.get_account_info(),
660
            'X-Account-Policy-Quota',
661
            exactMatch=True)
662

    
663
    def get_account_versioning(self):
664
        """
665
        :returns: (dict)
666
        """
667
        return filter_in(
668
            self.get_account_info(),
669
            'X-Account-Policy-Versioning',
670
            exactMatch=True)
671

    
672
    def get_account_meta(self, until=None):
673
        """
674
        :meta until: (str) formated date
675

676
        :returns: (dict)
677
        """
678
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
679

    
680
    def get_account_group(self):
681
        """
682
        :returns: (dict)
683
        """
684
        return filter_in(self.get_account_info(), 'X-Account-Group-')
685

    
686
    def set_account_meta(self, metapairs):
687
        """
688
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
689
        """
690
        assert(type(metapairs) is dict)
691
        self.account_post(update=True, metadata=metapairs)
692

    
693
    def del_account_meta(self, metakey):
694
        """
695
        :param metakey: (str) metadatum key
696
        """
697
        self.account_post(update=True, metadata={metakey: ''})
698

    
699
    def set_account_quota(self, quota):
700
        """
701
        :param quota: (int)
702
        """
703
        self.account_post(update=True, quota=quota)
704

    
705
    def set_account_versioning(self, versioning):
706
        """
707
        "param versioning: (str)
708
        """
709
        self.account_post(update=True, versioning=versioning)
710

    
711
    def list_containers(self):
712
        """
713
        :returns: (dict)
714
        """
715
        r = self.account_get()
716
        return r.json
717

    
718
    def del_container(self, until=None, delimiter=None):
719
        """
720
        :param until: (str) formated date
721

722
        :param delimiter: (str) with / empty container
723

724
        :raises ClientError: 404 Container does not exist
725

726
        :raises ClientError: 409 Container is not empty
727
        """
728
        self._assert_container()
729
        r = self.container_delete(
730
            until=until,
731
            delimiter=delimiter,
732
            success=(204, 404, 409))
733
        if r.status_code == 404:
734
            raise ClientError(
735
                'Container "%s" does not exist' % self.container,
736
                r.status_code)
737
        elif r.status_code == 409:
738
            raise ClientError(
739
                'Container "%s" is not empty' % self.container,
740
                r.status_code)
741

    
742
    def get_container_versioning(self, container=None):
743
        """
744
        :param container: (str)
745

746
        :returns: (dict)
747
        """
748
        cnt_back_up = self.container
749
        try:
750
            self.container = container or cnt_back_up
751
            return filter_in(
752
                self.get_container_info(),
753
                'X-Container-Policy-Versioning')
754
        finally:
755
            self.container = cnt_back_up
756

    
757
    def get_container_quota(self, container=None):
758
        """
759
        :param container: (str)
760

761
        :returns: (dict)
762
        """
763
        cnt_back_up = self.container
764
        try:
765
            self.container = container or cnt_back_up
766
            return filter_in(
767
                self.get_container_info(),
768
                'X-Container-Policy-Quota')
769
        finally:
770
            self.container = cnt_back_up
771

    
772
    def get_container_info(self, until=None):
773
        """
774
        :param until: (str) formated date
775

776
        :returns: (dict)
777

778
        :raises ClientError: 404 Container not found
779
        """
780
        try:
781
            r = self.container_head(until=until)
782
        except ClientError as err:
783
            err.details.append('for container %s' % self.container)
784
            raise err
785
        return r.headers
786

    
787
    def get_container_meta(self, until=None):
788
        """
789
        :param until: (str) formated date
790

791
        :returns: (dict)
792
        """
793
        return filter_in(
794
            self.get_container_info(until=until),
795
            'X-Container-Meta')
796

    
797
    def get_container_object_meta(self, until=None):
798
        """
799
        :param until: (str) formated date
800

801
        :returns: (dict)
802
        """
803
        return filter_in(
804
            self.get_container_info(until=until),
805
            'X-Container-Object-Meta')
806

    
807
    def set_container_meta(self, metapairs):
808
        """
809
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
810
        """
811
        assert(type(metapairs) is dict)
812
        self.container_post(update=True, metadata=metapairs)
813

    
814
    def del_container_meta(self, metakey):
815
        """
816
        :param metakey: (str) metadatum key
817
        """
818
        self.container_post(update=True, metadata={metakey: ''})
819

    
820
    def set_container_quota(self, quota):
821
        """
822
        :param quota: (int)
823
        """
824
        self.container_post(update=True, quota=quota)
825

    
826
    def set_container_versioning(self, versioning):
827
        """
828
        :param versioning: (str)
829
        """
830
        self.container_post(update=True, versioning=versioning)
831

    
832
    def del_object(self, obj, until=None, delimiter=None):
833
        """
834
        :param obj: (str) remote object path
835

836
        :param until: (str) formated date
837

838
        :param delimiter: (str)
839
        """
840
        self._assert_container()
841
        self.object_delete(obj, until=until, delimiter=delimiter)
842

    
843
    def set_object_meta(self, obj, metapairs):
844
        """
845
        :param obj: (str) remote object path
846

847
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
848
        """
849
        assert(type(metapairs) is dict)
850
        self.object_post(obj, update=True, metadata=metapairs)
851

    
852
    def del_object_meta(self, obj, metakey):
853
        """
854
        :param obj: (str) remote object path
855

856
        :param metakey: (str) metadatum key
857
        """
858
        self.object_post(obj, update=True, metadata={metakey: ''})
859

    
860
    def publish_object(self, obj):
861
        """
862
        :param obj: (str) remote object path
863

864
        :returns: (str) access url
865
        """
866
        self.object_post(obj, update=True, public=True)
867
        info = self.get_object_info(obj)
868
        pref, sep, rest = self.base_url.partition('//')
869
        base = rest.split('/')[0]
870
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
871

    
872
    def unpublish_object(self, obj):
873
        """
874
        :param obj: (str) remote object path
875
        """
876
        self.object_post(obj, update=True, public=False)
877

    
878
    def get_object_info(self, obj, version=None):
879
        """
880
        :param obj: (str) remote object path
881

882
        :param version: (str)
883

884
        :returns: (dict)
885
        """
886
        try:
887
            r = self.object_head(obj, version=version)
888
            return r.headers
889
        except ClientError as ce:
890
            if ce.status == 404:
891
                raise ClientError('Object %s not found' % obj, status=404)
892
            raise
893

    
894
    def get_object_meta(self, obj, version=None):
895
        """
896
        :param obj: (str) remote object path
897

898
        :param version: (str)
899

900
        :returns: (dict)
901
        """
902
        return filter_in(
903
            self.get_object_info(obj, version=version),
904
            'X-Object-Meta')
905

    
906
    def get_object_sharing(self, obj):
907
        """
908
        :param obj: (str) remote object path
909

910
        :returns: (dict)
911
        """
912
        r = filter_in(
913
            self.get_object_info(obj),
914
            'X-Object-Sharing',
915
            exactMatch=True)
916
        reply = {}
917
        if len(r) > 0:
918
            perms = r['x-object-sharing'].split(';')
919
            for perm in perms:
920
                try:
921
                    perm.index('=')
922
                except ValueError:
923
                    raise ClientError('Incorrect reply format')
924
                (key, val) = perm.strip().split('=')
925
                reply[key] = val
926
        return reply
927

    
928
    def set_object_sharing(
929
            self, obj,
930
            read_permition=False, write_permition=False):
931
        """Give read/write permisions to an object.
932

933
        :param obj: (str) remote object path
934

935
        :param read_permition: (list - bool) users and user groups that get
936
            read permition for this object - False means all previous read
937
            permissions will be removed
938

939
        :param write_perimition: (list - bool) of users and user groups to get
940
           write permition for this object - False means all previous write
941
           permissions will be removed
942
        """
943

    
944
        perms = dict(read=read_permition or '', write=write_permition or '')
945
        self.object_post(obj, update=True, permissions=perms)
946

    
947
    def del_object_sharing(self, obj):
948
        """
949
        :param obj: (str) remote object path
950
        """
951
        self.set_object_sharing(obj)
952

    
953
    def append_object(self, obj, source_file, upload_cb=None):
954
        """
955
        :param obj: (str) remote object path
956

957
        :param source_file: open file descriptor
958

959
        :param upload_db: progress.bar for uploading
960
        """
961

    
962
        self._assert_container()
963
        meta = self.get_container_info()
964
        blocksize = int(meta['x-container-block-size'])
965
        filesize = fstat(source_file.fileno()).st_size
966
        nblocks = 1 + (filesize - 1) // blocksize
967
        offset = 0
968
        if upload_cb:
969
            upload_gen = upload_cb(nblocks)
970
            upload_gen.next()
971
        for i in range(nblocks):
972
            block = source_file.read(min(blocksize, filesize - offset))
973
            offset += len(block)
974
            self.object_post(
975
                obj,
976
                update=True,
977
                content_range='bytes */*',
978
                content_type='application/octet-stream',
979
                content_length=len(block),
980
                data=block)
981

    
982
            if upload_cb:
983
                upload_gen.next()
984

    
985
    def truncate_object(self, obj, upto_bytes):
986
        """
987
        :param obj: (str) remote object path
988

989
        :param upto_bytes: max number of bytes to leave on file
990
        """
991
        self.object_post(
992
            obj,
993
            update=True,
994
            content_range='bytes 0-%s/*' % upto_bytes,
995
            content_type='application/octet-stream',
996
            object_bytes=upto_bytes,
997
            source_object=path4url(self.container, obj))
998

    
999
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1000
        """Overwrite a part of an object from local source file
1001

1002
        :param obj: (str) remote object path
1003

1004
        :param start: (int) position in bytes to start overwriting from
1005

1006
        :param end: (int) position in bytes to stop overwriting at
1007

1008
        :param source_file: open file descriptor
1009

1010
        :param upload_db: progress.bar for uploading
1011
        """
1012

    
1013
        r = self.get_object_info(obj)
1014
        rf_size = int(r['content-length'])
1015
        if rf_size < int(start):
1016
            raise ClientError(
1017
                'Range start exceeds file size',
1018
                status=416)
1019
        elif rf_size < int(end):
1020
            raise ClientError(
1021
                'Range end exceeds file size',
1022
                status=416)
1023
        self._assert_container()
1024
        meta = self.get_container_info()
1025
        blocksize = int(meta['x-container-block-size'])
1026
        filesize = fstat(source_file.fileno()).st_size
1027
        datasize = int(end) - int(start) + 1
1028
        nblocks = 1 + (datasize - 1) // blocksize
1029
        offset = 0
1030
        if upload_cb:
1031
            upload_gen = upload_cb(nblocks)
1032
            upload_gen.next()
1033
        for i in range(nblocks):
1034
            read_size = min(blocksize, filesize - offset, datasize - offset)
1035
            block = source_file.read(read_size)
1036
            self.object_post(
1037
                obj,
1038
                update=True,
1039
                content_type='application/octet-stream',
1040
                content_length=len(block),
1041
                content_range='bytes %s-%s/*' % (
1042
                    start + offset,
1043
                    start + offset + len(block) - 1),
1044
                data=block)
1045
            offset += len(block)
1046

    
1047
            if upload_cb:
1048
                upload_gen.next()
1049

    
1050
    def copy_object(
1051
            self, src_container, src_object, dst_container,
1052
            dst_object=None,
1053
            source_version=None,
1054
            source_account=None,
1055
            public=False,
1056
            content_type=None,
1057
            delimiter=None):
1058
        """
1059
        :param src_container: (str) source container
1060

1061
        :param src_object: (str) source object path
1062

1063
        :param dst_container: (str) destination container
1064

1065
        :param dst_object: (str) destination object path
1066

1067
        :param source_version: (str) source object version
1068

1069
        :param source_account: (str) account to copy from
1070

1071
        :param public: (bool)
1072

1073
        :param content_type: (str)
1074

1075
        :param delimiter: (str)
1076
        """
1077
        self._assert_account()
1078
        self.container = dst_container
1079
        src_path = path4url(src_container, src_object)
1080
        self.object_put(
1081
            dst_object or src_object,
1082
            success=201,
1083
            copy_from=src_path,
1084
            content_length=0,
1085
            source_version=source_version,
1086
            source_account=source_account,
1087
            public=public,
1088
            content_type=content_type,
1089
            delimiter=delimiter)
1090

    
1091
    def move_object(
1092
            self, src_container, src_object, dst_container,
1093
            dst_object=False,
1094
            source_account=None,
1095
            source_version=None,
1096
            public=False,
1097
            content_type=None,
1098
            delimiter=None):
1099
        """
1100
        :param src_container: (str) source container
1101

1102
        :param src_object: (str) source object path
1103

1104
        :param dst_container: (str) destination container
1105

1106
        :param dst_object: (str) destination object path
1107

1108
        :param source_account: (str) account to move from
1109

1110
        :param source_version: (str) source object version
1111

1112
        :param public: (bool)
1113

1114
        :param content_type: (str)
1115

1116
        :param delimiter: (str)
1117
        """
1118
        self._assert_account()
1119
        self.container = dst_container
1120
        dst_object = dst_object or src_object
1121
        src_path = path4url(src_container, src_object)
1122
        self.object_put(
1123
            dst_object,
1124
            success=201,
1125
            move_from=src_path,
1126
            content_length=0,
1127
            source_account=source_account,
1128
            source_version=source_version,
1129
            public=public,
1130
            content_type=content_type,
1131
            delimiter=delimiter)
1132

    
1133
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1134
        """Get accounts that share with self.account
1135

1136
        :param limit: (str)
1137

1138
        :param marker: (str)
1139

1140
        :returns: (dict)
1141
        """
1142
        self._assert_account()
1143

    
1144
        self.set_param('format', 'json')
1145
        self.set_param('limit', limit, iff=limit is not None)
1146
        self.set_param('marker', marker, iff=marker is not None)
1147

    
1148
        path = ''
1149
        success = kwargs.pop('success', (200, 204))
1150
        r = self.get(path, *args, success=success, **kwargs)
1151
        return r.json
1152

    
1153
    def get_object_versionlist(self, obj):
1154
        """
1155
        :param obj: (str) remote object path
1156

1157
        :returns: (list)
1158
        """
1159
        self._assert_container()
1160
        r = self.object_get(obj, format='json', version='list')
1161
        return r.json['versions']