Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 402b6f48

History | View | Annotate | Download (35.9 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self, container=None):
77
        """Delete an empty container and destroy associated blocks
78
        """
79
        cnt_back_up = self.container
80
        try:
81
            self.container = container or cnt_back_up
82
            r = self.container_delete(until=unicode(time()))
83
        finally:
84
            self.container = cnt_back_up
85
        r.release()
86

    
87
    def upload_object_unchunked(
88
            self, obj, f,
89
            withHashFile=False,
90
            size=None,
91
            etag=None,
92
            content_encoding=None,
93
            content_disposition=None,
94
            content_type=None,
95
            sharing=None,
96
            public=None):
97
        """
98
        :param obj: (str) remote object path
99

100
        :param f: open file descriptor
101

102
        :param withHashFile: (bool)
103

104
        :param size: (int) size of data to upload
105

106
        :param etag: (str)
107

108
        :param content_encoding: (str)
109

110
        :param content_disposition: (str)
111

112
        :param content_type: (str)
113

114
        :param sharing: {'read':[user and/or grp names],
115
            'write':[usr and/or grp names]}
116

117
        :param public: (bool)
118
        """
119
        self._assert_container()
120

    
121
        if withHashFile:
122
            data = f.read()
123
            try:
124
                import json
125
                data = json.dumps(json.loads(data))
126
            except ValueError:
127
                raise ClientError('"%s" is not json-formated' % f.name, 1)
128
            except SyntaxError:
129
                msg = '"%s" is not a valid hashmap file' % f.name
130
                raise ClientError(msg, 1)
131
            f = StringIO(data)
132
        else:
133
            data = f.read(size or ())
134
        r = self.object_put(
135
            obj,
136
            data=data,
137
            etag=etag,
138
            content_encoding=content_encoding,
139
            content_disposition=content_disposition,
140
            content_type=content_type,
141
            permissions=sharing,
142
            public=public,
143
            success=201)
144
        r.release()
145

    
146
    def create_object_by_manifestation(
147
            self, obj,
148
            etag=None,
149
            content_encoding=None,
150
            content_disposition=None,
151
            content_type=None,
152
            sharing=None,
153
            public=None):
154
        """
155
        :param obj: (str) remote object path
156

157
        :param etag: (str)
158

159
        :param content_encoding: (str)
160

161
        :param content_disposition: (str)
162

163
        :param content_type: (str)
164

165
        :param sharing: {'read':[user and/or grp names],
166
            'write':[usr and/or grp names]}
167

168
        :param public: (bool)
169
        """
170
        self._assert_container()
171
        r = self.object_put(
172
            obj,
173
            content_length=0,
174
            etag=etag,
175
            content_encoding=content_encoding,
176
            content_disposition=content_disposition,
177
            content_type=content_type,
178
            permissions=sharing,
179
            public=public,
180
            manifest='%s/%s' % (self.container, obj))
181
        r.release()
182

    
183
    # upload_* auxiliary methods
184
    def _put_block_async(self, data, hash, upload_gen=None):
185
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
186
        event.start()
187
        return event
188

    
189
    def _put_block(self, data, hash):
190
        r = self.container_post(
191
            update=True,
192
            content_type='application/octet-stream',
193
            content_length=len(data),
194
            data=data,
195
            format='json')
196
        assert r.json[0] == hash, 'Local hash does not match server'
197

    
198
    def _get_file_block_info(self, fileobj, size=None):
199
        meta = self.get_container_info()
200
        blocksize = int(meta['x-container-block-size'])
201
        blockhash = meta['x-container-block-hash']
202
        size = size if size is not None else fstat(fileobj.fileno()).st_size
203
        nblocks = 1 + (size - 1) // blocksize
204
        return (blocksize, blockhash, size, nblocks)
205

    
206
    def _get_missing_hashes(
207
            self, obj, json,
208
            size=None,
209
            format='json',
210
            hashmap=True,
211
            content_type=None,
212
            etag=None,
213
            content_encoding=None,
214
            content_disposition=None,
215
            permissions=None,
216
            public=None,
217
            success=(201, 409)):
218
        r = self.object_put(
219
            obj,
220
            format='json',
221
            hashmap=True,
222
            content_type=content_type,
223
            json=json,
224
            etag=etag,
225
            content_encoding=content_encoding,
226
            content_disposition=content_disposition,
227
            permissions=permissions,
228
            public=public,
229
            success=success)
230
        if r.status_code == 201:
231
            r.release()
232
            return None
233
        return r.json
234

    
235
    def _culculate_blocks_for_upload(
236
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
237
            hash_cb=None):
238
        offset = 0
239
        if hash_cb:
240
            hash_gen = hash_cb(nblocks)
241
            hash_gen.next()
242

    
243
        for i in range(nblocks):
244
            block = fileobj.read(min(blocksize, size - offset))
245
            bytes = len(block)
246
            hash = _pithos_hash(block, blockhash)
247
            hashes.append(hash)
248
            hmap[hash] = (offset, bytes)
249
            offset += bytes
250
            if hash_cb:
251
                hash_gen.next()
252
        msg = 'Failed to calculate uploaded blocks:'
253
        ' Offset and object size do not match'
254
        assert offset == size, msg
255

    
256
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
257
        """upload missing blocks asynchronously"""
258

    
259
        self._init_thread_limit()
260

    
261
        flying = []
262
        failures = []
263
        for hash in missing:
264
            offset, bytes = hmap[hash]
265
            fileobj.seek(offset)
266
            data = fileobj.read(bytes)
267
            r = self._put_block_async(data, hash, upload_gen)
268
            flying.append(r)
269
            unfinished = self._watch_thread_limit(flying)
270
            for thread in set(flying).difference(unfinished):
271
                if thread.exception:
272
                    failures.append(thread)
273
                    if isinstance(
274
                            thread.exception,
275
                            ClientError) and thread.exception.status == 502:
276
                        self.POOLSIZE = self._thread_limit
277
                elif thread.isAlive():
278
                    flying.append(thread)
279
                elif upload_gen:
280
                    try:
281
                        upload_gen.next()
282
                    except:
283
                        pass
284
            flying = unfinished
285

    
286
        for thread in flying:
287
            thread.join()
288
            if thread.exception:
289
                failures.append(thread)
290
            elif upload_gen:
291
                try:
292
                    upload_gen.next()
293
                except:
294
                    pass
295

    
296
        return [failure.kwargs['hash'] for failure in failures]
297

    
298
    def upload_object(
299
            self, obj, f,
300
            size=None,
301
            hash_cb=None,
302
            upload_cb=None,
303
            etag=None,
304
            content_encoding=None,
305
            content_disposition=None,
306
            content_type=None,
307
            sharing=None,
308
            public=None):
309
        """Upload an object using multiple connections (threads)
310

311
        :param obj: (str) remote object path
312

313
        :param f: open file descriptor (rb)
314

315
        :param hash_cb: optional progress.bar object for calculating hashes
316

317
        :param upload_cb: optional progress.bar object for uploading
318

319
        :param etag: (str)
320

321
        :param content_encoding: (str)
322

323
        :param content_disposition: (str)
324

325
        :param content_type: (str)
326

327
        :param sharing: {'read':[user and/or grp names],
328
            'write':[usr and/or grp names]}
329

330
        :param public: (bool)
331
        """
332
        self._assert_container()
333

    
334
        #init
335
        block_info = (blocksize, blockhash, size, nblocks) =\
336
            self._get_file_block_info(f, size)
337
        (hashes, hmap, offset) = ([], {}, 0)
338
        if not content_type:
339
            content_type = 'application/octet-stream'
340

    
341
        self._culculate_blocks_for_upload(
342
            *block_info,
343
            hashes=hashes,
344
            hmap=hmap,
345
            fileobj=f,
346
            hash_cb=hash_cb)
347

    
348
        hashmap = dict(bytes=size, hashes=hashes)
349
        missing = self._get_missing_hashes(
350
            obj, hashmap,
351
            content_type=content_type,
352
            size=size,
353
            etag=etag,
354
            content_encoding=content_encoding,
355
            content_disposition=content_disposition,
356
            permissions=sharing,
357
            public=public)
358

    
359
        if missing is None:
360
            return
361

    
362
        if upload_cb:
363
            upload_gen = upload_cb(len(missing))
364
            for i in range(len(missing), len(hashmap['hashes']) + 1):
365
                try:
366
                    upload_gen.next()
367
                except:
368
                    upload_gen = None
369
        else:
370
            upload_gen = None
371

    
372
        retries = 7
373
        try:
374
            while retries:
375
                sendlog.info('%s blocks missing' % len(missing))
376
                num_of_blocks = len(missing)
377
                missing = self._upload_missing_blocks(
378
                    missing,
379
                    hmap,
380
                    f,
381
                    upload_gen)
382
                if missing:
383
                    if num_of_blocks == len(missing):
384
                        retries -= 1
385
                    else:
386
                        num_of_blocks = len(missing)
387
                else:
388
                    break
389
            if missing:
390
                raise ClientError(
391
                    '%s blocks failed to upload' % len(missing),
392
                    status=800)
393
        except KeyboardInterrupt:
394
            sendlog.info('- - - wait for threads to finish')
395
            for thread in activethreads():
396
                thread.join()
397
            raise
398

    
399
        r = self.object_put(
400
            obj,
401
            format='json',
402
            hashmap=True,
403
            content_type=content_type,
404
            json=hashmap,
405
            success=201)
406
        r.release()
407

    
408
    # download_* auxiliary methods
409
    def _get_remote_blocks_info(self, obj, **restargs):
410
        #retrieve object hashmap
411
        myrange = restargs.pop('data_range', None)
412
        hashmap = self.get_object_hashmap(obj, **restargs)
413
        restargs['data_range'] = myrange
414
        blocksize = int(hashmap['block_size'])
415
        blockhash = hashmap['block_hash']
416
        total_size = hashmap['bytes']
417
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
418
        map_dict = {}
419
        for i, h in enumerate(hashmap['hashes']):
420
            map_dict[h] = i
421
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
422

    
423
    def _dump_blocks_sync(
424
            self, obj, remote_hashes, blocksize, total_size, dst, range,
425
            **args):
426
        for blockid, blockhash in enumerate(remote_hashes):
427
            if blockhash:
428
                start = blocksize * blockid
429
                is_last = start + blocksize > total_size
430
                end = (total_size - 1) if is_last else (start + blocksize - 1)
431
                (start, end) = _range_up(start, end, range)
432
                args['data_range'] = 'bytes=%s-%s' % (start, end)
433
                r = self.object_get(obj, success=(200, 206), **args)
434
                self._cb_next()
435
                dst.write(r.content)
436
                dst.flush()
437

    
438
    def _get_block_async(self, obj, **args):
439
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
440
        event.start()
441
        return event
442

    
443
    def _hash_from_file(self, fp, start, size, blockhash):
444
        fp.seek(start)
445
        block = fp.read(size)
446
        h = newhashlib(blockhash)
447
        h.update(block.strip('\x00'))
448
        return hexlify(h.digest())
449

    
450
    def _thread2file(self, flying, local_file, offset=0, **restargs):
451
        """write the results of a greenleted rest call to a file
452

453
        :param offset: the offset of the file up to blocksize
454
        - e.g. if the range is 10-100, all blocks will be written to
455
        normal_position - 10
456
        """
457
        finished = []
458
        for i, (start, g) in enumerate(flying.items()):
459
            if not g.isAlive():
460
                if g.exception:
461
                    raise g.exception
462
                block = g.value.content
463
                local_file.seek(start - offset)
464
                local_file.write(block)
465
                self._cb_next()
466
                finished.append(flying.pop(start))
467
        local_file.flush()
468
        return finished
469

    
470
    def _dump_blocks_async(
471
            self, obj, remote_hashes, blocksize, total_size, local_file,
472
            blockhash=None, resume=False, filerange=None, **restargs):
473
        file_size = fstat(local_file.fileno()).st_size if resume else 0
474
        flying = {}
475
        finished = []
476
        offset = 0
477
        if filerange is not None:
478
            rstart = int(filerange.split('-')[0])
479
            offset = rstart if blocksize > rstart else rstart % blocksize
480

    
481
        self._init_thread_limit()
482
        for block_hash, blockid in remote_hashes.items():
483
            start = blocksize * blockid
484
            if start < file_size and block_hash == self._hash_from_file(
485
                    local_file, start, blocksize, blockhash):
486
                self._cb_next()
487
                continue
488
            self._watch_thread_limit(flying.values())
489
            finished += self._thread2file(
490
                flying,
491
                local_file,
492
                offset,
493
                **restargs)
494
            end = total_size - 1 if start + blocksize > total_size\
495
                else start + blocksize - 1
496
            (start, end) = _range_up(start, end, filerange)
497
            if start == end:
498
                self._cb_next()
499
                continue
500
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
501
            flying[start] = self._get_block_async(obj, **restargs)
502

    
503
        for thread in flying.values():
504
            thread.join()
505
        finished += self._thread2file(flying, local_file, offset, **restargs)
506

    
507
    def download_object(
508
            self, obj, dst,
509
            download_cb=None,
510
            version=None,
511
            resume=False,
512
            range_str=None,
513
            if_match=None,
514
            if_none_match=None,
515
            if_modified_since=None,
516
            if_unmodified_since=None):
517
        """Download an object (multiple connections, random blocks)
518

519
        :param obj: (str) remote object path
520

521
        :param dst: open file descriptor (wb+)
522

523
        :param download_cb: optional progress.bar object for downloading
524

525
        :param version: (str) file version
526

527
        :param resume: (bool) if set, preserve already downloaded file parts
528

529
        :param range_str: (str) from, to are file positions (int) in bytes
530

531
        :param if_match: (str)
532

533
        :param if_none_match: (str)
534

535
        :param if_modified_since: (str) formated date
536

537
        :param if_unmodified_since: (str) formated date"""
538
        restargs = dict(
539
            version=version,
540
            data_range=None if range_str is None else 'bytes=%s' % range_str,
541
            if_match=if_match,
542
            if_none_match=if_none_match,
543
            if_modified_since=if_modified_since,
544
            if_unmodified_since=if_unmodified_since)
545

    
546
        (
547
            blocksize,
548
            blockhash,
549
            total_size,
550
            hash_list,
551
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
552
        assert total_size >= 0
553

    
554
        if download_cb:
555
            self.progress_bar_gen = download_cb(len(remote_hashes))
556
            self._cb_next()
557

    
558
        if dst.isatty():
559
            self._dump_blocks_sync(
560
                obj,
561
                hash_list,
562
                blocksize,
563
                total_size,
564
                dst,
565
                range_str,
566
                **restargs)
567
        else:
568
            self._dump_blocks_async(
569
                obj,
570
                remote_hashes,
571
                blocksize,
572
                total_size,
573
                dst,
574
                blockhash,
575
                resume,
576
                range_str,
577
                **restargs)
578
            if not range_str:
579
                dst.truncate(total_size)
580

    
581
        self._complete_cb()
582

    
583
    #Command Progress Bar method
584
    def _cb_next(self):
585
        if hasattr(self, 'progress_bar_gen'):
586
            try:
587
                self.progress_bar_gen.next()
588
            except:
589
                pass
590

    
591
    def _complete_cb(self):
592
        while True:
593
            try:
594
                self.progress_bar_gen.next()
595
            except:
596
                break
597

    
598
    def get_object_hashmap(
599
            self, obj,
600
            version=None,
601
            if_match=None,
602
            if_none_match=None,
603
            if_modified_since=None,
604
            if_unmodified_since=None,
605
            data_range=None):
606
        """
607
        :param obj: (str) remote object path
608

609
        :param if_match: (str)
610

611
        :param if_none_match: (str)
612

613
        :param if_modified_since: (str) formated date
614

615
        :param if_unmodified_since: (str) formated date
616

617
        :param data_range: (str) from-to where from and to are integers
618
            denoting file positions in bytes
619

620
        :returns: (list)
621
        """
622
        try:
623
            r = self.object_get(
624
                obj,
625
                hashmap=True,
626
                version=version,
627
                if_etag_match=if_match,
628
                if_etag_not_match=if_none_match,
629
                if_modified_since=if_modified_since,
630
                if_unmodified_since=if_unmodified_since,
631
                data_range=data_range)
632
        except ClientError as err:
633
            if err.status == 304 or err.status == 412:
634
                return {}
635
            raise
636
        return r.json
637

    
638
    def set_account_group(self, group, usernames):
639
        """
640
        :param group: (str)
641

642
        :param usernames: (list)
643
        """
644
        r = self.account_post(update=True, groups={group: usernames})
645
        r.release()
646

    
647
    def del_account_group(self, group):
648
        """
649
        :param group: (str)
650
        """
651
        r = self.account_post(update=True, groups={group: []})
652
        r.release()
653

    
654
    def get_account_info(self, until=None):
655
        """
656
        :param until: (str) formated date
657

658
        :returns: (dict)
659
        """
660
        r = self.account_head(until=until)
661
        if r.status_code == 401:
662
            raise ClientError("No authorization", status=401)
663
        return r.headers
664

    
665
    def get_account_quota(self):
666
        """
667
        :returns: (dict)
668
        """
669
        return filter_in(
670
            self.get_account_info(),
671
            'X-Account-Policy-Quota',
672
            exactMatch=True)
673

    
674
    def get_account_versioning(self):
675
        """
676
        :returns: (dict)
677
        """
678
        return filter_in(
679
            self.get_account_info(),
680
            'X-Account-Policy-Versioning',
681
            exactMatch=True)
682

    
683
    def get_account_meta(self, until=None):
684
        """
685
        :meta until: (str) formated date
686

687
        :returns: (dict)
688
        """
689
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
690

    
691
    def get_account_group(self):
692
        """
693
        :returns: (dict)
694
        """
695
        return filter_in(self.get_account_info(), 'X-Account-Group-')
696

    
697
    def set_account_meta(self, metapairs):
698
        """
699
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
700
        """
701
        assert(type(metapairs) is dict)
702
        r = self.account_post(update=True, metadata=metapairs)
703
        r.release()
704

    
705
    def del_account_meta(self, metakey):
706
        """
707
        :param metakey: (str) metadatum key
708
        """
709
        r = self.account_post(update=True, metadata={metakey: ''})
710
        r.release()
711

    
712
    def set_account_quota(self, quota):
713
        """
714
        :param quota: (int)
715
        """
716
        r = self.account_post(update=True, quota=quota)
717
        r.release()
718

    
719
    def set_account_versioning(self, versioning):
720
        """
721
        "param versioning: (str)
722
        """
723
        r = self.account_post(update=True, versioning=versioning)
724
        r.release()
725

    
726
    def list_containers(self):
727
        """
728
        :returns: (dict)
729
        """
730
        r = self.account_get()
731
        return r.json
732

    
733
    def del_container(self, until=None, delimiter=None):
734
        """
735
        :param until: (str) formated date
736

737
        :param delimiter: (str) with / empty container
738

739
        :raises ClientError: 404 Container does not exist
740

741
        :raises ClientError: 409 Container is not empty
742
        """
743
        self._assert_container()
744
        r = self.container_delete(
745
            until=until,
746
            delimiter=delimiter,
747
            success=(204, 404, 409))
748
        r.release()
749
        if r.status_code == 404:
750
            raise ClientError(
751
                'Container "%s" does not exist' % self.container,
752
                r.status_code)
753
        elif r.status_code == 409:
754
            raise ClientError(
755
                'Container "%s" is not empty' % self.container,
756
                r.status_code)
757

    
758
    def get_container_versioning(self, container=None):
759
        """
760
        :param container: (str)
761

762
        :returns: (dict)
763
        """
764
        cnt_back_up = self.container
765
        try:
766
            self.container = container or cnt_back_up
767
            return filter_in(
768
                self.get_container_info(),
769
                'X-Container-Policy-Versioning')
770
        finally:
771
            self.container = cnt_back_up
772

    
773
    def get_container_quota(self, container=None):
774
        """
775
        :param container: (str)
776

777
        :returns: (dict)
778
        """
779
        cnt_back_up = self.container
780
        try:
781
            self.container = container or cnt_back_up
782
            return filter_in(
783
                self.get_container_info(),
784
                'X-Container-Policy-Quota')
785
        finally:
786
            self.container = cnt_back_up
787

    
788
    def get_container_info(self, until=None):
789
        """
790
        :param until: (str) formated date
791

792
        :returns: (dict)
793

794
        :raises ClientError: 404 Container not found
795
        """
796
        try:
797
            r = self.container_head(until=until)
798
        except ClientError as err:
799
            err.details.append('for container %s' % self.container)
800
            raise err
801
        return r.headers
802

    
803
    def get_container_meta(self, until=None):
804
        """
805
        :param until: (str) formated date
806

807
        :returns: (dict)
808
        """
809
        return filter_in(
810
            self.get_container_info(until=until),
811
            'X-Container-Meta')
812

    
813
    def get_container_object_meta(self, until=None):
814
        """
815
        :param until: (str) formated date
816

817
        :returns: (dict)
818
        """
819
        return filter_in(
820
            self.get_container_info(until=until),
821
            'X-Container-Object-Meta')
822

    
823
    def set_container_meta(self, metapairs):
824
        """
825
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
826
        """
827
        assert(type(metapairs) is dict)
828
        r = self.container_post(update=True, metadata=metapairs)
829
        r.release()
830

    
831
    def del_container_meta(self, metakey):
832
        """
833
        :param metakey: (str) metadatum key
834
        """
835
        r = self.container_post(update=True, metadata={metakey: ''})
836
        r.release()
837

    
838
    def set_container_quota(self, quota):
839
        """
840
        :param quota: (int)
841
        """
842
        r = self.container_post(update=True, quota=quota)
843
        r.release()
844

    
845
    def set_container_versioning(self, versioning):
846
        """
847
        :param versioning: (str)
848
        """
849
        r = self.container_post(update=True, versioning=versioning)
850
        r.release()
851

    
852
    def del_object(self, obj, until=None, delimiter=None):
853
        """
854
        :param obj: (str) remote object path
855

856
        :param until: (str) formated date
857

858
        :param delimiter: (str)
859
        """
860
        self._assert_container()
861
        r = self.object_delete(obj, until=until, delimiter=delimiter)
862
        r.release()
863

    
864
    def set_object_meta(self, obj, metapairs):
865
        """
866
        :param obj: (str) remote object path
867

868
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
869
        """
870
        assert(type(metapairs) is dict)
871
        r = self.object_post(obj, update=True, metadata=metapairs)
872
        r.release()
873

    
874
    def del_object_meta(self, obj, metakey):
875
        """
876
        :param obj: (str) remote object path
877

878
        :param metakey: (str) metadatum key
879
        """
880
        r = self.object_post(obj, update=True, metadata={metakey: ''})
881
        r.release()
882

    
883
    def publish_object(self, obj):
884
        """
885
        :param obj: (str) remote object path
886

887
        :returns: (str) access url
888
        """
889
        r = self.object_post(obj, update=True, public=True)
890
        r.release()
891
        info = self.get_object_info(obj)
892
        pref, sep, rest = self.base_url.partition('//')
893
        base = rest.split('/')[0]
894
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
895

    
896
    def unpublish_object(self, obj):
897
        """
898
        :param obj: (str) remote object path
899
        """
900
        r = self.object_post(obj, update=True, public=False)
901
        r.release()
902

    
903
    def get_object_info(self, obj, version=None):
904
        """
905
        :param obj: (str) remote object path
906

907
        :param version: (str)
908

909
        :returns: (dict)
910
        """
911
        try:
912
            r = self.object_head(obj, version=version)
913
            return r.headers
914
        except ClientError as ce:
915
            if ce.status == 404:
916
                raise ClientError('Object %s not found' % obj, status=404)
917
            raise
918

    
919
    def get_object_meta(self, obj, version=None):
920
        """
921
        :param obj: (str) remote object path
922

923
        :param version: (str)
924

925
        :returns: (dict)
926
        """
927
        return filter_in(
928
            self.get_object_info(obj, version=version),
929
            'X-Object-Meta')
930

    
931
    def get_object_sharing(self, obj):
932
        """
933
        :param obj: (str) remote object path
934

935
        :returns: (dict)
936
        """
937
        r = filter_in(
938
            self.get_object_info(obj),
939
            'X-Object-Sharing',
940
            exactMatch=True)
941
        reply = {}
942
        if len(r) > 0:
943
            perms = r['x-object-sharing'].split(';')
944
            for perm in perms:
945
                try:
946
                    perm.index('=')
947
                except ValueError:
948
                    raise ClientError('Incorrect reply format')
949
                (key, val) = perm.strip().split('=')
950
                reply[key] = val
951
        return reply
952

    
953
    def set_object_sharing(
954
            self, obj,
955
            read_permition=False, write_permition=False):
956
        """Give read/write permisions to an object.
957

958
        :param obj: (str) remote object path
959

960
        :param read_permition: (list - bool) users and user groups that get
961
            read permition for this object - False means all previous read
962
            permissions will be removed
963

964
        :param write_perimition: (list - bool) of users and user groups to get
965
           write permition for this object - False means all previous write
966
           permissions will be removed
967
        """
968

    
969
        perms = dict(read=read_permition or '', write=write_permition or '')
970
        r = self.object_post(obj, update=True, permissions=perms)
971
        r.release()
972

    
973
    def del_object_sharing(self, obj):
974
        """
975
        :param obj: (str) remote object path
976
        """
977
        self.set_object_sharing(obj)
978

    
979
    def append_object(self, obj, source_file, upload_cb=None):
980
        """
981
        :param obj: (str) remote object path
982

983
        :param source_file: open file descriptor
984

985
        :param upload_db: progress.bar for uploading
986
        """
987

    
988
        self._assert_container()
989
        meta = self.get_container_info()
990
        blocksize = int(meta['x-container-block-size'])
991
        filesize = fstat(source_file.fileno()).st_size
992
        nblocks = 1 + (filesize - 1) // blocksize
993
        offset = 0
994
        if upload_cb:
995
            upload_gen = upload_cb(nblocks)
996
            upload_gen.next()
997
        for i in range(nblocks):
998
            block = source_file.read(min(blocksize, filesize - offset))
999
            offset += len(block)
1000
            r = self.object_post(
1001
                obj,
1002
                update=True,
1003
                content_range='bytes */*',
1004
                content_type='application/octet-stream',
1005
                content_length=len(block),
1006
                data=block)
1007
            r.release()
1008

    
1009
            if upload_cb:
1010
                upload_gen.next()
1011

    
1012
    def truncate_object(self, obj, upto_bytes):
1013
        """
1014
        :param obj: (str) remote object path
1015

1016
        :param upto_bytes: max number of bytes to leave on file
1017
        """
1018
        r = self.object_post(
1019
            obj,
1020
            update=True,
1021
            content_range='bytes 0-%s/*' % upto_bytes,
1022
            content_type='application/octet-stream',
1023
            object_bytes=upto_bytes,
1024
            source_object=path4url(self.container, obj))
1025
        r.release()
1026

    
1027
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1028
        """Overwrite a part of an object from local source file
1029

1030
        :param obj: (str) remote object path
1031

1032
        :param start: (int) position in bytes to start overwriting from
1033

1034
        :param end: (int) position in bytes to stop overwriting at
1035

1036
        :param source_file: open file descriptor
1037

1038
        :param upload_db: progress.bar for uploading
1039
        """
1040

    
1041
        r = self.get_object_info(obj)
1042
        rf_size = int(r['content-length'])
1043
        if rf_size < int(start):
1044
            raise ClientError(
1045
                'Range start exceeds file size',
1046
                status=416)
1047
        elif rf_size < int(end):
1048
            raise ClientError(
1049
                'Range end exceeds file size',
1050
                status=416)
1051
        self._assert_container()
1052
        meta = self.get_container_info()
1053
        blocksize = int(meta['x-container-block-size'])
1054
        filesize = fstat(source_file.fileno()).st_size
1055
        datasize = int(end) - int(start) + 1
1056
        nblocks = 1 + (datasize - 1) // blocksize
1057
        offset = 0
1058
        if upload_cb:
1059
            upload_gen = upload_cb(nblocks)
1060
            upload_gen.next()
1061
        for i in range(nblocks):
1062
            read_size = min(blocksize, filesize - offset, datasize - offset)
1063
            block = source_file.read(read_size)
1064
            r = self.object_post(
1065
                obj,
1066
                update=True,
1067
                content_type='application/octet-stream',
1068
                content_length=len(block),
1069
                content_range='bytes %s-%s/*' % (
1070
                    start + offset,
1071
                    start + offset + len(block) - 1),
1072
                data=block)
1073
            offset += len(block)
1074
            r.release()
1075

    
1076
            if upload_cb:
1077
                upload_gen.next()
1078

    
1079
    def copy_object(
1080
            self, src_container, src_object, dst_container,
1081
            dst_object=None,
1082
            source_version=None,
1083
            source_account=None,
1084
            public=False,
1085
            content_type=None,
1086
            delimiter=None):
1087
        """
1088
        :param src_container: (str) source container
1089

1090
        :param src_object: (str) source object path
1091

1092
        :param dst_container: (str) destination container
1093

1094
        :param dst_object: (str) destination object path
1095

1096
        :param source_version: (str) source object version
1097

1098
        :param source_account: (str) account to copy from
1099

1100
        :param public: (bool)
1101

1102
        :param content_type: (str)
1103

1104
        :param delimiter: (str)
1105
        """
1106
        self._assert_account()
1107
        self.container = dst_container
1108
        src_path = path4url(src_container, src_object)
1109
        r = self.object_put(
1110
            dst_object or src_object,
1111
            success=201,
1112
            copy_from=src_path,
1113
            content_length=0,
1114
            source_version=source_version,
1115
            source_account=source_account,
1116
            public=public,
1117
            content_type=content_type,
1118
            delimiter=delimiter)
1119
        r.release()
1120

    
1121
    def move_object(
1122
            self, src_container, src_object, dst_container,
1123
            dst_object=False,
1124
            source_account=None,
1125
            source_version=None,
1126
            public=False,
1127
            content_type=None,
1128
            delimiter=None):
1129
        """
1130
        :param src_container: (str) source container
1131

1132
        :param src_object: (str) source object path
1133

1134
        :param dst_container: (str) destination container
1135

1136
        :param dst_object: (str) destination object path
1137

1138
        :param source_account: (str) account to move from
1139

1140
        :param source_version: (str) source object version
1141

1142
        :param public: (bool)
1143

1144
        :param content_type: (str)
1145

1146
        :param delimiter: (str)
1147
        """
1148
        self._assert_account()
1149
        self.container = dst_container
1150
        dst_object = dst_object or src_object
1151
        src_path = path4url(src_container, src_object)
1152
        r = self.object_put(
1153
            dst_object,
1154
            success=201,
1155
            move_from=src_path,
1156
            content_length=0,
1157
            source_account=source_account,
1158
            source_version=source_version,
1159
            public=public,
1160
            content_type=content_type,
1161
            delimiter=delimiter)
1162
        r.release()
1163

    
1164
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1165
        """Get accounts that share with self.account
1166

1167
        :param limit: (str)
1168

1169
        :param marker: (str)
1170

1171
        :returns: (dict)
1172
        """
1173
        self._assert_account()
1174

    
1175
        self.set_param('format', 'json')
1176
        self.set_param('limit', limit, iff=limit is not None)
1177
        self.set_param('marker', marker, iff=marker is not None)
1178

    
1179
        path = ''
1180
        success = kwargs.pop('success', (200, 204))
1181
        r = self.get(path, *args, success=success, **kwargs)
1182
        return r.json
1183

    
1184
    def get_object_versionlist(self, obj):
1185
        """
1186
        :param obj: (str) remote object path
1187

1188
        :returns: (list)
1189
        """
1190
        self._assert_container()
1191
        r = self.object_get(obj, format='json', version='list')
1192
        return r.json['versions']