Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 723e9d47

History | View | Annotate | Download (35.6 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self):
77
        """Delete an empty container and destroy associated blocks
78
        """
79
        r = self.container_delete(until=unicode(time()))
80
        r.release()
81

    
82
    def upload_object_unchunked(
83
            self, obj, f,
84
            withHashFile=False,
85
            size=None,
86
            etag=None,
87
            content_encoding=None,
88
            content_disposition=None,
89
            content_type=None,
90
            sharing=None,
91
            public=None):
92
        """
93
        :param obj: (str) remote object path
94

95
        :param f: open file descriptor
96

97
        :param withHashFile: (bool)
98

99
        :param size: (int) size of data to upload
100

101
        :param etag: (str)
102

103
        :param content_encoding: (str)
104

105
        :param content_disposition: (str)
106

107
        :param content_type: (str)
108

109
        :param sharing: {'read':[user and/or grp names],
110
            'write':[usr and/or grp names]}
111

112
        :param public: (bool)
113
        """
114
        self._assert_container()
115

    
116
        if withHashFile:
117
            data = f.read()
118
            try:
119
                import json
120
                data = json.dumps(json.loads(data))
121
            except ValueError:
122
                raise ClientError('"%s" is not json-formated' % f.name, 1)
123
            except SyntaxError:
124
                msg = '"%s" is not a valid hashmap file' % f.name
125
                raise ClientError(msg, 1)
126
            f = StringIO(data)
127
        data = f.read(size) if size is not None else f.read()
128
        r = self.object_put(
129
            obj,
130
            data=data,
131
            etag=etag,
132
            content_encoding=content_encoding,
133
            content_disposition=content_disposition,
134
            content_type=content_type,
135
            permissions=sharing,
136
            public=public,
137
            success=201)
138
        r.release()
139

    
140
    def create_object_by_manifestation(
141
            self, obj,
142
            etag=None,
143
            content_encoding=None,
144
            content_disposition=None,
145
            content_type=None,
146
            sharing=None,
147
            public=None):
148
        """
149
        :param obj: (str) remote object path
150

151
        :param etag: (str)
152

153
        :param content_encoding: (str)
154

155
        :param content_disposition: (str)
156

157
        :param content_type: (str)
158

159
        :param sharing: {'read':[user and/or grp names],
160
            'write':[usr and/or grp names]}
161

162
        :param public: (bool)
163
        """
164
        self._assert_container()
165
        r = self.object_put(
166
            obj,
167
            content_length=0,
168
            etag=etag,
169
            content_encoding=content_encoding,
170
            content_disposition=content_disposition,
171
            content_type=content_type,
172
            permissions=sharing,
173
            public=public,
174
            manifest='%s/%s' % (self.container, obj))
175
        r.release()
176

    
177
    # upload_* auxiliary methods
178
    def _put_block_async(self, data, hash, upload_gen=None):
179
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
180
        event.start()
181
        return event
182

    
183
    def _put_block(self, data, hash):
184
        r = self.container_post(
185
            update=True,
186
            content_type='application/octet-stream',
187
            content_length=len(data),
188
            data=data,
189
            format='json')
190
        assert r.json[0] == hash, 'Local hash does not match server'
191

    
192
    def _get_file_block_info(self, fileobj, size=None):
193
        meta = self.get_container_info()
194
        blocksize = int(meta['x-container-block-size'])
195
        blockhash = meta['x-container-block-hash']
196
        size = size if size is not None else fstat(fileobj.fileno()).st_size
197
        nblocks = 1 + (size - 1) // blocksize
198
        return (blocksize, blockhash, size, nblocks)
199

    
200
    def _get_missing_hashes(
201
            self, obj, json,
202
            size=None,
203
            format='json',
204
            hashmap=True,
205
            content_type=None,
206
            etag=None,
207
            content_encoding=None,
208
            content_disposition=None,
209
            permissions=None,
210
            public=None,
211
            success=(201, 409)):
212
        r = self.object_put(
213
            obj,
214
            format='json',
215
            hashmap=True,
216
            content_type=content_type,
217
            json=json,
218
            etag=etag,
219
            content_encoding=content_encoding,
220
            content_disposition=content_disposition,
221
            permissions=permissions,
222
            public=public,
223
            success=success)
224
        if r.status_code == 201:
225
            r.release()
226
            return None
227
        return r.json
228

    
229
    def _culculate_blocks_for_upload(
230
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
231
            hash_cb=None):
232
        offset = 0
233
        if hash_cb:
234
            hash_gen = hash_cb(nblocks)
235
            hash_gen.next()
236

    
237
        for i in range(nblocks):
238
            block = fileobj.read(min(blocksize, size - offset))
239
            bytes = len(block)
240
            hash = _pithos_hash(block, blockhash)
241
            hashes.append(hash)
242
            hmap[hash] = (offset, bytes)
243
            offset += bytes
244
            if hash_cb:
245
                hash_gen.next()
246
        msg = 'Failed to calculate uploaded blocks:'
247
        ' Offset and object size do not match'
248
        assert offset == size, msg
249

    
250
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
251
        """upload missing blocks asynchronously"""
252

    
253
        self._init_thread_limit()
254

    
255
        flying = []
256
        failures = []
257
        for hash in missing:
258
            offset, bytes = hmap[hash]
259
            fileobj.seek(offset)
260
            data = fileobj.read(bytes)
261
            r = self._put_block_async(data, hash, upload_gen)
262
            flying.append(r)
263
            unfinished = self._watch_thread_limit(flying)
264
            for thread in set(flying).difference(unfinished):
265
                if thread.exception:
266
                    failures.append(thread)
267
                    if isinstance(
268
                            thread.exception,
269
                            ClientError) and thread.exception.status == 502:
270
                        self.POOLSIZE = self._thread_limit
271
                elif thread.isAlive():
272
                    flying.append(thread)
273
                elif upload_gen:
274
                    try:
275
                        upload_gen.next()
276
                    except:
277
                        pass
278
            flying = unfinished
279

    
280
        for thread in flying:
281
            thread.join()
282
            if thread.exception:
283
                failures.append(thread)
284
            elif upload_gen:
285
                try:
286
                    upload_gen.next()
287
                except:
288
                    pass
289

    
290
        return [failure.kwargs['hash'] for failure in failures]
291

    
292
    def upload_object(
293
            self, obj, f,
294
            size=None,
295
            hash_cb=None,
296
            upload_cb=None,
297
            etag=None,
298
            content_encoding=None,
299
            content_disposition=None,
300
            content_type=None,
301
            sharing=None,
302
            public=None):
303
        """Upload an object using multiple connections (threads)
304

305
        :param obj: (str) remote object path
306

307
        :param f: open file descriptor (rb)
308

309
        :param hash_cb: optional progress.bar object for calculating hashes
310

311
        :param upload_cb: optional progress.bar object for uploading
312

313
        :param etag: (str)
314

315
        :param content_encoding: (str)
316

317
        :param content_disposition: (str)
318

319
        :param content_type: (str)
320

321
        :param sharing: {'read':[user and/or grp names],
322
            'write':[usr and/or grp names]}
323

324
        :param public: (bool)
325
        """
326
        self._assert_container()
327

    
328
        #init
329
        block_info = (blocksize, blockhash, size, nblocks) =\
330
            self._get_file_block_info(f, size)
331
        (hashes, hmap, offset) = ([], {}, 0)
332
        if not content_type:
333
            content_type = 'application/octet-stream'
334

    
335
        self._culculate_blocks_for_upload(
336
            *block_info,
337
            hashes=hashes,
338
            hmap=hmap,
339
            fileobj=f,
340
            hash_cb=hash_cb)
341

    
342
        hashmap = dict(bytes=size, hashes=hashes)
343
        missing = self._get_missing_hashes(
344
            obj, hashmap,
345
            content_type=content_type,
346
            size=size,
347
            etag=etag,
348
            content_encoding=content_encoding,
349
            content_disposition=content_disposition,
350
            permissions=sharing,
351
            public=public)
352

    
353
        if missing is None:
354
            return
355

    
356
        if upload_cb:
357
            upload_gen = upload_cb(len(missing))
358
            for i in range(len(missing), len(hashmap['hashes']) + 1):
359
                try:
360
                    upload_gen.next()
361
                except:
362
                    upload_gen = None
363
        else:
364
            upload_gen = None
365

    
366
        retries = 7
367
        try:
368
            while retries:
369
                sendlog.info('%s blocks missing' % len(missing))
370
                num_of_blocks = len(missing)
371
                missing = self._upload_missing_blocks(
372
                    missing,
373
                    hmap,
374
                    f,
375
                    upload_gen)
376
                if missing:
377
                    if num_of_blocks == len(missing):
378
                        retries -= 1
379
                    else:
380
                        num_of_blocks = len(missing)
381
                else:
382
                    break
383
            if missing:
384
                raise ClientError(
385
                    '%s blocks failed to upload' % len(missing),
386
                    status=800)
387
        except KeyboardInterrupt:
388
            sendlog.info('- - - wait for threads to finish')
389
            for thread in activethreads():
390
                thread.join()
391
            raise
392

    
393
        r = self.object_put(
394
            obj,
395
            format='json',
396
            hashmap=True,
397
            content_type=content_type,
398
            json=hashmap,
399
            success=201)
400
        r.release()
401

    
402
    # download_* auxiliary methods
403
    def _get_remote_blocks_info(self, obj, **restargs):
404
        #retrieve object hashmap
405
        myrange = restargs.pop('data_range', None)
406
        hashmap = self.get_object_hashmap(obj, **restargs)
407
        restargs['data_range'] = myrange
408
        blocksize = int(hashmap['block_size'])
409
        blockhash = hashmap['block_hash']
410
        total_size = hashmap['bytes']
411
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
412
        map_dict = {}
413
        for i, h in enumerate(hashmap['hashes']):
414
            map_dict[h] = i
415
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
416

    
417
    def _dump_blocks_sync(
418
            self, obj, remote_hashes, blocksize, total_size, dst, range,
419
            **args):
420
        for blockid, blockhash in enumerate(remote_hashes):
421
            if blockhash:
422
                start = blocksize * blockid
423
                is_last = start + blocksize > total_size
424
                end = (total_size - 1) if is_last else (start + blocksize - 1)
425
                (start, end) = _range_up(start, end, range)
426
                args['data_range'] = 'bytes=%s-%s' % (start, end)
427
                r = self.object_get(obj, success=(200, 206), **args)
428
                self._cb_next()
429
                dst.write(r.content)
430
                dst.flush()
431

    
432
    def _get_block_async(self, obj, **args):
433
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
434
        event.start()
435
        return event
436

    
437
    def _hash_from_file(self, fp, start, size, blockhash):
438
        fp.seek(start)
439
        block = fp.read(size)
440
        h = newhashlib(blockhash)
441
        h.update(block.strip('\x00'))
442
        return hexlify(h.digest())
443

    
444
    def _thread2file(self, flying, local_file, offset=0, **restargs):
445
        """write the results of a greenleted rest call to a file
446

447
        :param offset: the offset of the file up to blocksize
448
        - e.g. if the range is 10-100, all blocks will be written to
449
        normal_position - 10
450
        """
451
        finished = []
452
        for i, (start, g) in enumerate(flying.items()):
453
            if not g.isAlive():
454
                if g.exception:
455
                    raise g.exception
456
                block = g.value.content
457
                local_file.seek(start - offset)
458
                local_file.write(block)
459
                self._cb_next()
460
                finished.append(flying.pop(start))
461
        local_file.flush()
462
        return finished
463

    
464
    def _dump_blocks_async(
465
            self, obj, remote_hashes, blocksize, total_size, local_file,
466
            blockhash=None, resume=False, filerange=None, **restargs):
467
        file_size = fstat(local_file.fileno()).st_size if resume else 0
468
        flying = {}
469
        finished = []
470
        offset = 0
471
        if filerange is not None:
472
            rstart = int(filerange.split('-')[0])
473
            offset = rstart if blocksize > rstart else rstart % blocksize
474

    
475
        self._init_thread_limit()
476
        for block_hash, blockid in remote_hashes.items():
477
            start = blocksize * blockid
478
            if start < file_size and block_hash == self._hash_from_file(
479
                    local_file, start, blocksize, blockhash):
480
                self._cb_next()
481
                continue
482
            self._watch_thread_limit(flying.values())
483
            finished += self._thread2file(
484
                flying,
485
                local_file,
486
                offset,
487
                **restargs)
488
            end = total_size - 1 if start + blocksize > total_size\
489
                else start + blocksize - 1
490
            (start, end) = _range_up(start, end, filerange)
491
            if start == end:
492
                self._cb_next()
493
                continue
494
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
495
            flying[start] = self._get_block_async(obj, **restargs)
496

    
497
        for thread in flying.values():
498
            thread.join()
499
        finished += self._thread2file(flying, local_file, offset, **restargs)
500

    
501
    def download_object(
502
            self, obj, dst,
503
            download_cb=None,
504
            version=None,
505
            resume=False,
506
            range_str=None,
507
            if_match=None,
508
            if_none_match=None,
509
            if_modified_since=None,
510
            if_unmodified_since=None):
511
        """Download an object (multiple connections, random blocks)
512

513
        :param obj: (str) remote object path
514

515
        :param dst: open file descriptor (wb+)
516

517
        :param download_cb: optional progress.bar object for downloading
518

519
        :param version: (str) file version
520

521
        :param resume: (bool) if set, preserve already downloaded file parts
522

523
        :param range_str: (str) from, to are file positions (int) in bytes
524

525
        :param if_match: (str)
526

527
        :param if_none_match: (str)
528

529
        :param if_modified_since: (str) formated date
530

531
        :param if_unmodified_since: (str) formated date"""
532
        restargs = dict(
533
            version=version,
534
            data_range=None if range_str is None else 'bytes=%s' % range_str,
535
            if_match=if_match,
536
            if_none_match=if_none_match,
537
            if_modified_since=if_modified_since,
538
            if_unmodified_since=if_unmodified_since)
539

    
540
        (
541
            blocksize,
542
            blockhash,
543
            total_size,
544
            hash_list,
545
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
546
        assert total_size >= 0
547

    
548
        if download_cb:
549
            self.progress_bar_gen = download_cb(len(remote_hashes))
550
            self._cb_next()
551

    
552
        if dst.isatty():
553
            self._dump_blocks_sync(
554
                obj,
555
                hash_list,
556
                blocksize,
557
                total_size,
558
                dst,
559
                range_str,
560
                **restargs)
561
        else:
562
            self._dump_blocks_async(
563
                obj,
564
                remote_hashes,
565
                blocksize,
566
                total_size,
567
                dst,
568
                blockhash,
569
                resume,
570
                range_str,
571
                **restargs)
572
            if not range_str:
573
                dst.truncate(total_size)
574

    
575
        self._complete_cb()
576

    
577
    #Command Progress Bar method
578
    def _cb_next(self):
579
        if hasattr(self, 'progress_bar_gen'):
580
            try:
581
                self.progress_bar_gen.next()
582
            except:
583
                pass
584

    
585
    def _complete_cb(self):
586
        while True:
587
            try:
588
                self.progress_bar_gen.next()
589
            except:
590
                break
591

    
592
    def get_object_hashmap(
593
            self, obj,
594
            version=None,
595
            if_match=None,
596
            if_none_match=None,
597
            if_modified_since=None,
598
            if_unmodified_since=None,
599
            data_range=None):
600
        """
601
        :param obj: (str) remote object path
602

603
        :param if_match: (str)
604

605
        :param if_none_match: (str)
606

607
        :param if_modified_since: (str) formated date
608

609
        :param if_unmodified_since: (str) formated date
610

611
        :param data_range: (str) from-to where from and to are integers
612
            denoting file positions in bytes
613

614
        :returns: (list)
615
        """
616
        try:
617
            r = self.object_get(
618
                obj,
619
                hashmap=True,
620
                version=version,
621
                if_etag_match=if_match,
622
                if_etag_not_match=if_none_match,
623
                if_modified_since=if_modified_since,
624
                if_unmodified_since=if_unmodified_since,
625
                data_range=data_range)
626
        except ClientError as err:
627
            if err.status == 304 or err.status == 412:
628
                return {}
629
            raise
630
        return r.json
631

    
632
    def set_account_group(self, group, usernames):
633
        """
634
        :param group: (str)
635

636
        :param usernames: (list)
637
        """
638
        r = self.account_post(update=True, groups={group: usernames})
639
        r.release()
640

    
641
    def del_account_group(self, group):
642
        """
643
        :param group: (str)
644
        """
645
        r = self.account_post(update=True, groups={group: []})
646
        r.release()
647

    
648
    def get_account_info(self, until=None):
649
        """
650
        :param until: (str) formated date
651

652
        :returns: (dict)
653
        """
654
        r = self.account_head(until=until)
655
        if r.status_code == 401:
656
            raise ClientError("No authorization", status=401)
657
        return r.headers
658

    
659
    def get_account_quota(self):
660
        """
661
        :returns: (dict)
662
        """
663
        return filter_in(
664
            self.get_account_info(),
665
            'X-Account-Policy-Quota',
666
            exactMatch=True)
667

    
668
    def get_account_versioning(self):
669
        """
670
        :returns: (dict)
671
        """
672
        return filter_in(
673
            self.get_account_info(),
674
            'X-Account-Policy-Versioning',
675
            exactMatch=True)
676

    
677
    def get_account_meta(self, until=None):
678
        """
679
        :meta until: (str) formated date
680

681
        :returns: (dict)
682
        """
683
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
684

    
685
    def get_account_group(self):
686
        """
687
        :returns: (dict)
688
        """
689
        return filter_in(self.get_account_info(), 'X-Account-Group-')
690

    
691
    def set_account_meta(self, metapairs):
692
        """
693
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
694
        """
695
        assert(type(metapairs) is dict)
696
        r = self.account_post(update=True, metadata=metapairs)
697
        r.release()
698

    
699
    def del_account_meta(self, metakey):
700
        """
701
        :param metakey: (str) metadatum key
702
        """
703
        r = self.account_post(update=True, metadata={metakey: ''})
704
        r.release()
705

    
706
    def set_account_quota(self, quota):
707
        """
708
        :param quota: (int)
709
        """
710
        r = self.account_post(update=True, quota=quota)
711
        r.release()
712

    
713
    def set_account_versioning(self, versioning):
714
        """
715
        "param versioning: (str)
716
        """
717
        r = self.account_post(update=True, versioning=versioning)
718
        r.release()
719

    
720
    def list_containers(self):
721
        """
722
        :returns: (dict)
723
        """
724
        r = self.account_get()
725
        return r.json
726

    
727
    def del_container(self, until=None, delimiter=None):
728
        """
729
        :param until: (str) formated date
730

731
        :param delimiter: (str) with / empty container
732

733
        :raises ClientError: 404 Container does not exist
734

735
        :raises ClientError: 409 Container is not empty
736
        """
737
        self._assert_container()
738
        r = self.container_delete(
739
            until=until,
740
            delimiter=delimiter,
741
            success=(204, 404, 409))
742
        r.release()
743
        if r.status_code == 404:
744
            raise ClientError(
745
                'Container "%s" does not exist' % self.container,
746
                r.status_code)
747
        elif r.status_code == 409:
748
            raise ClientError(
749
                'Container "%s" is not empty' % self.container,
750
                r.status_code)
751

    
752
    def get_container_versioning(self, container):
753
        """
754
        :param container: (str)
755

756
        :returns: (dict)
757
        """
758
        self.container = container
759
        return filter_in(
760
            self.get_container_info(),
761
            'X-Container-Policy-Versioning')
762

    
763
    def get_container_quota(self, container):
764
        """
765
        :param container: (str)
766

767
        :returns: (dict)
768
        """
769
        self.container = container
770
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
771

    
772
    def get_container_info(self, until=None):
773
        """
774
        :param until: (str) formated date
775

776
        :returns: (dict)
777

778
        :raises ClientError: 404 Container not found
779
        """
780
        try:
781
            r = self.container_head(until=until)
782
        except ClientError as err:
783
            err.details.append('for container %s' % self.container)
784
            raise err
785
        return r.headers
786

    
787
    def get_container_meta(self, until=None):
788
        """
789
        :param until: (str) formated date
790

791
        :returns: (dict)
792
        """
793
        return filter_in(
794
            self.get_container_info(until=until),
795
            'X-Container-Meta')
796

    
797
    def get_container_object_meta(self, until=None):
798
        """
799
        :param until: (str) formated date
800

801
        :returns: (dict)
802
        """
803
        return filter_in(
804
            self.get_container_info(until=until),
805
            'X-Container-Object-Meta')
806

    
807
    def set_container_meta(self, metapairs):
808
        """
809
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
810
        """
811
        assert(type(metapairs) is dict)
812
        r = self.container_post(update=True, metadata=metapairs)
813
        r.release()
814

    
815
    def del_container_meta(self, metakey):
816
        """
817
        :param metakey: (str) metadatum key
818
        """
819
        r = self.container_post(update=True, metadata={metakey: ''})
820
        r.release()
821

    
822
    def set_container_quota(self, quota):
823
        """
824
        :param quota: (int)
825
        """
826
        r = self.container_post(update=True, quota=quota)
827
        r.release()
828

    
829
    def set_container_versioning(self, versioning):
830
        """
831
        :param versioning: (str)
832
        """
833
        r = self.container_post(update=True, versioning=versioning)
834
        r.release()
835

    
836
    def del_object(self, obj, until=None, delimiter=None):
837
        """
838
        :param obj: (str) remote object path
839

840
        :param until: (str) formated date
841

842
        :param delimiter: (str)
843
        """
844
        self._assert_container()
845
        r = self.object_delete(obj, until=until, delimiter=delimiter)
846
        r.release()
847

    
848
    def set_object_meta(self, obj, metapairs):
849
        """
850
        :param obj: (str) remote object path
851

852
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
853
        """
854
        assert(type(metapairs) is dict)
855
        r = self.object_post(obj, update=True, metadata=metapairs)
856
        r.release()
857

    
858
    def del_object_meta(self, obj, metakey):
859
        """
860
        :param obj: (str) remote object path
861

862
        :param metakey: (str) metadatum key
863
        """
864
        r = self.object_post(obj, update=True, metadata={metakey: ''})
865
        r.release()
866

    
867
    def publish_object(self, obj):
868
        """
869
        :param obj: (str) remote object path
870

871
        :returns: (str) access url
872
        """
873
        r = self.object_post(obj, update=True, public=True)
874
        r.release()
875
        info = self.get_object_info(obj)
876
        pref, sep, rest = self.base_url.partition('//')
877
        base = rest.split('/')[0]
878
        newurl = path4url(
879
            '%s%s%s' % (pref, sep, base),
880
            info['x-object-public'])
881
        return newurl[1:]
882

    
883
    def unpublish_object(self, obj):
884
        """
885
        :param obj: (str) remote object path
886
        """
887
        r = self.object_post(obj, update=True, public=False)
888
        r.release()
889

    
890
    def get_object_info(self, obj, version=None):
891
        """
892
        :param obj: (str) remote object path
893

894
        :param version: (str)
895

896
        :returns: (dict)
897
        """
898
        try:
899
            r = self.object_head(obj, version=version)
900
            return r.headers
901
        except ClientError as ce:
902
            if ce.status == 404:
903
                raise ClientError('Object %s not found' % obj, status=404)
904
            raise
905

    
906
    def get_object_meta(self, obj, version=None):
907
        """
908
        :param obj: (str) remote object path
909

910
        :param version: (str)
911

912
        :returns: (dict)
913
        """
914
        return filter_in(
915
            self.get_object_info(obj, version=version),
916
            'X-Object-Meta')
917

    
918
    def get_object_sharing(self, obj):
919
        """
920
        :param obj: (str) remote object path
921

922
        :returns: (dict)
923
        """
924
        r = filter_in(
925
            self.get_object_info(obj),
926
            'X-Object-Sharing',
927
            exactMatch=True)
928
        reply = {}
929
        if len(r) > 0:
930
            perms = r['x-object-sharing'].split(';')
931
            for perm in perms:
932
                try:
933
                    perm.index('=')
934
                except ValueError:
935
                    raise ClientError('Incorrect reply format')
936
                (key, val) = perm.strip().split('=')
937
                reply[key] = val
938
        return reply
939

    
940
    def set_object_sharing(
941
            self, obj,
942
            read_permition=False, write_permition=False):
943
        """Give read/write permisions to an object.
944

945
        :param obj: (str) remote object path
946

947
        :param read_permition: (list - bool) users and user groups that get
948
            read permition for this object - False means all previous read
949
            permissions will be removed
950

951
        :param write_perimition: (list - bool) of users and user groups to get
952
           write permition for this object - False means all previous write
953
           permissions will be removed
954
        """
955

    
956
        perms = dict(
957
            read='' if not read_permition else read_permition,
958
            write='' if not write_permition else write_permition)
959
        r = self.object_post(obj, update=True, permissions=perms)
960
        r.release()
961

    
962
    def del_object_sharing(self, obj):
963
        """
964
        :param obj: (str) remote object path
965
        """
966
        self.set_object_sharing(obj)
967

    
968
    def append_object(self, obj, source_file, upload_cb=None):
969
        """
970
        :param obj: (str) remote object path
971

972
        :param source_file: open file descriptor
973

974
        :param upload_db: progress.bar for uploading
975
        """
976

    
977
        self._assert_container()
978
        meta = self.get_container_info()
979
        blocksize = int(meta['x-container-block-size'])
980
        filesize = fstat(source_file.fileno()).st_size
981
        nblocks = 1 + (filesize - 1) // blocksize
982
        offset = 0
983
        if upload_cb:
984
            upload_gen = upload_cb(nblocks)
985
            upload_gen.next()
986
        for i in range(nblocks):
987
            block = source_file.read(min(blocksize, filesize - offset))
988
            offset += len(block)
989
            r = self.object_post(
990
                obj,
991
                update=True,
992
                content_range='bytes */*',
993
                content_type='application/octet-stream',
994
                content_length=len(block),
995
                data=block)
996
            r.release()
997

    
998
            if upload_cb:
999
                upload_gen.next()
1000

    
1001
    def truncate_object(self, obj, upto_bytes):
1002
        """
1003
        :param obj: (str) remote object path
1004

1005
        :param upto_bytes: max number of bytes to leave on file
1006
        """
1007
        r = self.object_post(
1008
            obj,
1009
            update=True,
1010
            content_range='bytes 0-%s/*' % upto_bytes,
1011
            content_type='application/octet-stream',
1012
            object_bytes=upto_bytes,
1013
            source_object=path4url(self.container, obj))
1014
        r.release()
1015

    
1016
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1017
        """Overwrite a part of an object from local source file
1018

1019
        :param obj: (str) remote object path
1020

1021
        :param start: (int) position in bytes to start overwriting from
1022

1023
        :param end: (int) position in bytes to stop overwriting at
1024

1025
        :param source_file: open file descriptor
1026

1027
        :param upload_db: progress.bar for uploading
1028
        """
1029

    
1030
        r = self.get_object_info(obj)
1031
        rf_size = int(r['content-length'])
1032
        if rf_size < int(start):
1033
            raise ClientError(
1034
                'Range start exceeds file size',
1035
                status=416)
1036
        elif rf_size < int(end):
1037
            raise ClientError(
1038
                'Range end exceeds file size',
1039
                status=416)
1040
        self._assert_container()
1041
        meta = self.get_container_info()
1042
        blocksize = int(meta['x-container-block-size'])
1043
        filesize = fstat(source_file.fileno()).st_size
1044
        datasize = int(end) - int(start) + 1
1045
        nblocks = 1 + (datasize - 1) // blocksize
1046
        offset = 0
1047
        if upload_cb:
1048
            upload_gen = upload_cb(nblocks)
1049
            upload_gen.next()
1050
        for i in range(nblocks):
1051
            read_size = min(blocksize, filesize - offset, datasize - offset)
1052
            block = source_file.read(read_size)
1053
            r = self.object_post(
1054
                obj,
1055
                update=True,
1056
                content_type='application/octet-stream',
1057
                content_length=len(block),
1058
                content_range='bytes %s-%s/*' % (
1059
                    start + offset,
1060
                    start + offset + len(block) - 1),
1061
                data=block)
1062
            offset += len(block)
1063
            r.release()
1064

    
1065
            if upload_cb:
1066
                upload_gen.next()
1067

    
1068
    def copy_object(
1069
            self, src_container, src_object, dst_container,
1070
            dst_object=False,
1071
            source_version=None,
1072
            source_account=None,
1073
            public=False,
1074
            content_type=None,
1075
            delimiter=None):
1076
        """
1077
        :param src_container: (str) source container
1078

1079
        :param src_object: (str) source object path
1080

1081
        :param dst_container: (str) destination container
1082

1083
        :param dst_object: (str) destination object path
1084

1085
        :param source_version: (str) source object version
1086

1087
        :param source_account: (str) account to copy from
1088

1089
        :param public: (bool)
1090

1091
        :param content_type: (str)
1092

1093
        :param delimiter: (str)
1094
        """
1095
        self._assert_account()
1096
        self.container = dst_container
1097
        dst_object = dst_object or src_object
1098
        src_path = path4url(src_container, src_object)
1099
        r = self.object_put(
1100
            dst_object,
1101
            success=201,
1102
            copy_from=src_path,
1103
            content_length=0,
1104
            source_version=source_version,
1105
            source_account=source_account,
1106
            public=public,
1107
            content_type=content_type,
1108
            delimiter=delimiter)
1109
        r.release()
1110

    
1111
    def move_object(
1112
            self, src_container, src_object, dst_container,
1113
            dst_object=False,
1114
            source_account=None,
1115
            source_version=None,
1116
            public=False,
1117
            content_type=None,
1118
            delimiter=None):
1119
        """
1120
        :param src_container: (str) source container
1121

1122
        :param src_object: (str) source object path
1123

1124
        :param dst_container: (str) destination container
1125

1126
        :param dst_object: (str) destination object path
1127

1128
        :param source_account: (str) account to move from
1129

1130
        :param source_version: (str) source object version
1131

1132
        :param public: (bool)
1133

1134
        :param content_type: (str)
1135

1136
        :param delimiter: (str)
1137
        """
1138
        self._assert_account()
1139
        self.container = dst_container
1140
        dst_object = dst_object or src_object
1141
        src_path = path4url(src_container, src_object)
1142
        r = self.object_put(
1143
            dst_object,
1144
            success=201,
1145
            move_from=src_path,
1146
            content_length=0,
1147
            source_account=source_account,
1148
            source_version=source_version,
1149
            public=public,
1150
            content_type=content_type,
1151
            delimiter=delimiter)
1152
        r.release()
1153

    
1154
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1155
        """Get accounts that share with self.account
1156

1157
        :param limit: (str)
1158

1159
        :param marker: (str)
1160

1161
        :returns: (dict)
1162
        """
1163
        self._assert_account()
1164

    
1165
        self.set_param('format', 'json')
1166
        self.set_param('limit', limit, iff=limit is not None)
1167
        self.set_param('marker', marker, iff=marker is not None)
1168

    
1169
        path = ''
1170
        success = kwargs.pop('success', (200, 204))
1171
        r = self.get(path, *args, success=success, **kwargs)
1172
        return r.json
1173

    
1174
    def get_object_versionlist(self, obj):
1175
        """
1176
        :param obj: (str) remote object path
1177

1178
        :returns: (list)
1179
        """
1180
        self._assert_container()
1181
        r = self.object_get(obj, format='json', version='list')
1182
        return r.json['versions']