Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 2005b18e

History | View | Annotate | Download (35.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self):
77
        """Delete an empty container and destroy associated blocks
78
        """
79
        r = self.container_delete(until=unicode(time()))
80
        r.release()
81

    
82
    def upload_object_unchunked(
83
            self, obj, f,
84
            withHashFile=False,
85
            size=None,
86
            etag=None,
87
            content_encoding=None,
88
            content_disposition=None,
89
            content_type=None,
90
            sharing=None,
91
            public=None):
92
        """
93
        :param obj: (str) remote object path
94

95
        :param f: open file descriptor
96

97
        :param withHashFile: (bool)
98

99
        :param size: (int) size of data to upload
100

101
        :param etag: (str)
102

103
        :param content_encoding: (str)
104

105
        :param content_disposition: (str)
106

107
        :param content_type: (str)
108

109
        :param sharing: {'read':[user and/or grp names],
110
            'write':[usr and/or grp names]}
111

112
        :param public: (bool)
113
        """
114
        self._assert_container()
115

    
116
        if withHashFile:
117
            data = f.read()
118
            try:
119
                import json
120
                data = json.dumps(json.loads(data))
121
            except ValueError:
122
                raise ClientError('"%s" is not json-formated' % f.name, 1)
123
            except SyntaxError:
124
                msg = '"%s" is not a valid hashmap file' % f.name
125
                raise ClientError(msg, 1)
126
            f = StringIO(data)
127
        data = f.read(size) if size is not None else f.read()
128
        r = self.object_put(
129
            obj,
130
            data=data,
131
            etag=etag,
132
            content_encoding=content_encoding,
133
            content_disposition=content_disposition,
134
            content_type=content_type,
135
            permissions=sharing,
136
            public=public,
137
            success=201)
138
        r.release()
139

    
140
    def create_object_by_manifestation(
141
            self, obj,
142
            etag=None,
143
            content_encoding=None,
144
            content_disposition=None,
145
            content_type=None,
146
            sharing=None,
147
            public=None):
148
        """
149
        :param obj: (str) remote object path
150

151
        :param etag: (str)
152

153
        :param content_encoding: (str)
154

155
        :param content_disposition: (str)
156

157
        :param content_type: (str)
158

159
        :param sharing: {'read':[user and/or grp names],
160
            'write':[usr and/or grp names]}
161

162
        :param public: (bool)
163
        """
164
        self._assert_container()
165
        r = self.object_put(
166
            obj,
167
            content_length=0,
168
            etag=etag,
169
            content_encoding=content_encoding,
170
            content_disposition=content_disposition,
171
            content_type=content_type,
172
            permissions=sharing,
173
            public=public,
174
            manifest='%s/%s' % (self.container, obj))
175
        r.release()
176

    
177
    # upload_* auxiliary methods
178
    def _put_block_async(self, data, hash, upload_gen=None):
179
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
180
        event.start()
181
        return event
182

    
183
    def _put_block(self, data, hash):
184
        from random import randint
185
        if not randint(0, 7):
186
            raise ClientError('BAD GATEWAY STUFF', 503)
187
        r = self.container_post(
188
            update=True,
189
            content_type='application/octet-stream',
190
            content_length=len(data),
191
            data=data,
192
            format='json')
193
        assert r.json[0] == hash, 'Local hash does not match server'
194

    
195
    def _get_file_block_info(self, fileobj, size=None):
196
        meta = self.get_container_info()
197
        blocksize = int(meta['x-container-block-size'])
198
        blockhash = meta['x-container-block-hash']
199
        size = size if size is not None else fstat(fileobj.fileno()).st_size
200
        nblocks = 1 + (size - 1) // blocksize
201
        return (blocksize, blockhash, size, nblocks)
202

    
203
    def _get_missing_hashes(
204
            self, obj, json,
205
            size=None,
206
            format='json',
207
            hashmap=True,
208
            content_type=None,
209
            etag=None,
210
            content_encoding=None,
211
            content_disposition=None,
212
            permissions=None,
213
            public=None,
214
            success=(201, 409)):
215
        r = self.object_put(
216
            obj,
217
            format='json',
218
            hashmap=True,
219
            content_type=content_type,
220
            json=json,
221
            etag=etag,
222
            content_encoding=content_encoding,
223
            content_disposition=content_disposition,
224
            permissions=permissions,
225
            public=public,
226
            success=success)
227
        if r.status_code == 201:
228
            r.release()
229
            return None
230
        return r.json
231

    
232
    def _caclulate_uploaded_blocks(
233
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
234
            hash_cb=None):
235
        offset = 0
236
        if hash_cb:
237
            hash_gen = hash_cb(nblocks)
238
            hash_gen.next()
239

    
240
        for i in range(nblocks):
241
            block = fileobj.read(min(blocksize, size - offset))
242
            bytes = len(block)
243
            hash = _pithos_hash(block, blockhash)
244
            hashes.append(hash)
245
            hmap[hash] = (offset, bytes)
246
            offset += bytes
247
            if hash_cb:
248
                hash_gen.next()
249
        if offset != size:
250
            msg = 'Failed to calculate uploaded blocks:'
251
            msg += ' Offset and object size do not match'
252
            assert offset == size, msg
253

    
254
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
255
        """upload missing blocks asynchronously"""
256

    
257
        self._init_thread_limit()
258

    
259
        flying = []
260
        failures = []
261
        for hash in missing:
262
            offset, bytes = hmap[hash]
263
            fileobj.seek(offset)
264
            data = fileobj.read(bytes)
265
            r = self._put_block_async(data, hash, upload_gen)
266
            flying.append(r)
267
            unfinished = self._watch_thread_limit(flying)
268
            for thread in set(flying).difference(unfinished):
269
                if thread.exception:
270
                    failures.append(thread)
271
                    if isinstance(
272
                            thread.exception,
273
                            ClientError) and thread.exception.status == 502:
274
                        self.POOLSIZE = self._thread_limit
275
                elif thread.isAlive():
276
                    flying.append(thread)
277
                elif upload_gen:
278
                    try:
279
                        upload_gen.next()
280
                    except:
281
                        pass
282
            flying = unfinished
283

    
284
        for thread in flying:
285
            thread.join()
286
            if thread.exception:
287
                failures.append(thread)
288
            elif upload_gen:
289
                try:
290
                    upload_gen.next()
291
                except:
292
                    pass
293

    
294
        return [failure.kwargs['hash'] for failure in failures]
295

    
296
    def upload_object(
297
            self, obj, f,
298
            size=None,
299
            hash_cb=None,
300
            upload_cb=None,
301
            etag=None,
302
            content_encoding=None,
303
            content_disposition=None,
304
            content_type=None,
305
            sharing=None,
306
            public=None):
307
        """Upload an object using multiple connections (threads)
308

309
        :param obj: (str) remote object path
310

311
        :param f: open file descriptor (rb)
312

313
        :param hash_cb: optional progress.bar object for calculating hashes
314

315
        :param upload_cb: optional progress.bar object for uploading
316

317
        :param etag: (str)
318

319
        :param content_encoding: (str)
320

321
        :param content_disposition: (str)
322

323
        :param content_type: (str)
324

325
        :param sharing: {'read':[user and/or grp names],
326
            'write':[usr and/or grp names]}
327

328
        :param public: (bool)
329
        """
330
        self._assert_container()
331

    
332
        #init
333
        block_info = (blocksize, blockhash, size, nblocks) =\
334
            self._get_file_block_info(f, size)
335
        (hashes, hmap, offset) = ([], {}, 0)
336
        if content_type is None:
337
            content_type = 'application/octet-stream'
338

    
339
        self._caclulate_uploaded_blocks(
340
            *block_info,
341
            hashes=hashes,
342
            hmap=hmap,
343
            fileobj=f,
344
            hash_cb=hash_cb)
345

    
346
        hashmap = dict(bytes=size, hashes=hashes)
347
        missing = self._get_missing_hashes(
348
            obj, hashmap,
349
            content_type=content_type,
350
            size=size,
351
            etag=etag,
352
            content_encoding=content_encoding,
353
            content_disposition=content_disposition,
354
            permissions=sharing,
355
            public=public)
356

    
357
        if missing is None:
358
            return
359

    
360
        if upload_cb:
361
            upload_gen = upload_cb(len(missing))
362
            for i in range(len(missing), len(hashmap['hashes']) + 1):
363
                try:
364
                    upload_gen.next()
365
                except:
366
                    upload_gen = None
367
        else:
368
            upload_gen = None
369

    
370
        retries = 7
371
        try:
372
            while retries:
373
                sendlog.info('%s blocks missing' % len(missing))
374
                num_of_blocks = len(missing)
375
                missing = self._upload_missing_blocks(
376
                    missing,
377
                    hmap,
378
                    f,
379
                    upload_gen)
380
                if missing:
381
                    if num_of_blocks == len(missing):
382
                        retries -= 1
383
                    else:
384
                        num_of_blocks = len(missing)
385
                else:
386
                    break
387
            if missing:
388
                raise ClientError(
389
                    '%s blocks failed to upload' % len(missing),
390
                    status=800)
391
        except KeyboardInterrupt:
392
            sendlog.info('- - - wait for threads to finish')
393
            for thread in activethreads():
394
                thread.join()
395
            raise
396

    
397
        r = self.object_put(
398
            obj,
399
            format='json',
400
            hashmap=True,
401
            content_type=content_type,
402
            json=hashmap,
403
            success=201)
404
        r.release()
405

    
406
    # download_* auxiliary methods
407
    def _get_remote_blocks_info(self, obj, **restargs):
408
        #retrieve object hashmap
409
        myrange = restargs.pop('data_range', None)
410
        hashmap = self.get_object_hashmap(obj, **restargs)
411
        restargs['data_range'] = myrange
412
        blocksize = int(hashmap['block_size'])
413
        blockhash = hashmap['block_hash']
414
        total_size = hashmap['bytes']
415
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
416
        map_dict = {}
417
        for i, h in enumerate(hashmap['hashes']):
418
            map_dict[h] = i
419
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
420

    
421
    def _dump_blocks_sync(
422
            self, obj, remote_hashes, blocksize, total_size, dst, range,
423
            **args):
424
        for blockid, blockhash in enumerate(remote_hashes):
425
            if blockhash:
426
                start = blocksize * blockid
427
                is_last = start + blocksize > total_size
428
                end = (total_size - 1) if is_last else (start + blocksize - 1)
429
                (start, end) = _range_up(start, end, range)
430
                args['data_range'] = 'bytes=%s-%s' % (start, end)
431
                r = self.object_get(obj, success=(200, 206), **args)
432
                self._cb_next()
433
                dst.write(r.content)
434
                dst.flush()
435

    
436
    def _get_block_async(self, obj, **args):
437
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
438
        event.start()
439
        return event
440

    
441
    def _hash_from_file(self, fp, start, size, blockhash):
442
        fp.seek(start)
443
        block = fp.read(size)
444
        h = newhashlib(blockhash)
445
        h.update(block.strip('\x00'))
446
        return hexlify(h.digest())
447

    
448
    def _thread2file(self, flying, local_file, offset=0, **restargs):
449
        """write the results of a greenleted rest call to a file
450
        @offset: the offset of the file up to blocksize
451
            - e.g. if the range is 10-100, all
452
        blocks will be written to normal_position - 10"""
453
        finished = []
454
        for i, (start, g) in enumerate(flying.items()):
455
            if not g.isAlive():
456
                if g.exception:
457
                    raise g.exception
458
                block = g.value.content
459
                local_file.seek(start - offset)
460
                local_file.write(block)
461
                self._cb_next()
462
                finished.append(flying.pop(start))
463
        local_file.flush()
464
        return finished
465

    
466
    def _dump_blocks_async(
467
            self, obj, remote_hashes, blocksize, total_size, local_file,
468
            blockhash=None, resume=False, filerange=None, **restargs):
469
        file_size = fstat(local_file.fileno()).st_size if resume else 0
470
        flying = {}
471
        finished = []
472
        offset = 0
473
        if filerange is not None:
474
            rstart = int(filerange.split('-')[0])
475
            offset = rstart if blocksize > rstart else rstart % blocksize
476

    
477
        self._init_thread_limit()
478
        for block_hash, blockid in remote_hashes.items():
479
            start = blocksize * blockid
480
            if start < file_size and block_hash == self._hash_from_file(
481
                    local_file, start, blocksize, blockhash):
482
                self._cb_next()
483
                continue
484
            self._watch_thread_limit(flying.values())
485
            finished += self._thread2file(
486
                flying,
487
                local_file,
488
                offset,
489
                **restargs)
490
            end = total_size - 1 if start + blocksize > total_size\
491
                else start + blocksize - 1
492
            (start, end) = _range_up(start, end, filerange)
493
            if start == end:
494
                self._cb_next()
495
                continue
496
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
497
            flying[start] = self._get_block_async(obj, **restargs)
498

    
499
        for thread in flying.values():
500
            thread.join()
501
        finished += self._thread2file(flying, local_file, offset, **restargs)
502

    
503
    def download_object(
504
            self, obj, dst,
505
            download_cb=None,
506
            version=None,
507
            resume=False,
508
            range_str=None,
509
            if_match=None,
510
            if_none_match=None,
511
            if_modified_since=None,
512
            if_unmodified_since=None):
513
        """Download an object using multiple connections (threads) and
514
            writing to random parts of the file
515

516
        :param obj: (str) remote object path
517

518
        :param dst: open file descriptor (wb+)
519

520
        :param download_cb: optional progress.bar object for downloading
521

522
        :param version: (str) file version
523

524
        :param resume: (bool) if set, preserve already downloaded file parts
525

526
        :param range_str: (str) from-to where from and to are integers
527
        denoting file positions in bytes
528

529
        :param if_match: (str)
530

531
        :param if_none_match: (str)
532

533
        :param if_modified_since: (str) formated date
534

535
        :param if_unmodified_since: (str) formated date
536
        """
537

    
538
        restargs = dict(
539
            version=version,
540
            data_range=None if range_str is None else 'bytes=%s' % range_str,
541
            if_match=if_match,
542
            if_none_match=if_none_match,
543
            if_modified_since=if_modified_since,
544
            if_unmodified_since=if_unmodified_since)
545

    
546
        (
547
            blocksize,
548
            blockhash,
549
            total_size,
550
            hash_list,
551
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
552
        assert total_size >= 0
553

    
554
        if download_cb:
555
            self.progress_bar_gen = download_cb(len(remote_hashes))
556
            self._cb_next()
557

    
558
        if dst.isatty():
559
            self._dump_blocks_sync(
560
                obj,
561
                hash_list,
562
                blocksize,
563
                total_size,
564
                dst,
565
                range_str,
566
                **restargs)
567
        else:
568
            self._dump_blocks_async(
569
                obj,
570
                remote_hashes,
571
                blocksize,
572
                total_size,
573
                dst,
574
                blockhash,
575
                resume,
576
                range_str,
577
                **restargs)
578
            if not range_str:
579
                dst.truncate(total_size)
580

    
581
        self._complete_cb()
582

    
583
    #Command Progress Bar method
584
    def _cb_next(self):
585
        if hasattr(self, 'progress_bar_gen'):
586
            try:
587
                self.progress_bar_gen.next()
588
            except:
589
                pass
590

    
591
    def _complete_cb(self):
592
        while True:
593
            try:
594
                self.progress_bar_gen.next()
595
            except:
596
                break
597

    
598
    def get_object_hashmap(
599
            self, obj,
600
            version=None,
601
            if_match=None,
602
            if_none_match=None,
603
            if_modified_since=None,
604
            if_unmodified_since=None,
605
            data_range=None):
606
        """
607
        :param obj: (str) remote object path
608

609
        :param if_match: (str)
610

611
        :param if_none_match: (str)
612

613
        :param if_modified_since: (str) formated date
614

615
        :param if_unmodified_since: (str) formated date
616

617
        :param data_range: (str) from-to where from and to are integers
618
            denoting file positions in bytes
619

620
        :returns: (list)
621
        """
622
        try:
623
            r = self.object_get(
624
                obj,
625
                hashmap=True,
626
                version=version,
627
                if_etag_match=if_match,
628
                if_etag_not_match=if_none_match,
629
                if_modified_since=if_modified_since,
630
                if_unmodified_since=if_unmodified_since,
631
                data_range=data_range)
632
        except ClientError as err:
633
            if err.status == 304 or err.status == 412:
634
                return {}
635
            raise
636
        return r.json
637

    
638
    def set_account_group(self, group, usernames):
639
        """
640
        :param group: (str)
641

642
        :param usernames: (list)
643
        """
644
        r = self.account_post(update=True, groups={group: usernames})
645
        r.release()
646

    
647
    def del_account_group(self, group):
648
        """
649
        :param group: (str)
650
        """
651
        r = self.account_post(update=True, groups={group: []})
652
        r.release()
653

    
654
    def get_account_info(self, until=None):
655
        """
656
        :param until: (str) formated date
657

658
        :returns: (dict)
659
        """
660
        r = self.account_head(until=until)
661
        if r.status_code == 401:
662
            raise ClientError("No authorization")
663
        return r.headers
664

    
665
    def get_account_quota(self):
666
        """
667
        :returns: (dict)
668
        """
669
        return filter_in(
670
            self.get_account_info(),
671
            'X-Account-Policy-Quota',
672
            exactMatch=True)
673

    
674
    def get_account_versioning(self):
675
        """
676
        :returns: (dict)
677
        """
678
        return filter_in(
679
            self.get_account_info(),
680
            'X-Account-Policy-Versioning',
681
            exactMatch=True)
682

    
683
    def get_account_meta(self, until=None):
684
        """
685
        :meta until: (str) formated date
686

687
        :returns: (dict)
688
        """
689
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
690

    
691
    def get_account_group(self):
692
        """
693
        :returns: (dict)
694
        """
695
        return filter_in(self.get_account_info(), 'X-Account-Group-')
696

    
697
    def set_account_meta(self, metapairs):
698
        """
699
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
700
        """
701
        assert(type(metapairs) is dict)
702
        r = self.account_post(update=True, metadata=metapairs)
703
        r.release()
704

    
705
    def del_account_meta(self, metakey):
706
        """
707
        :param metakey: (str) metadatum key
708
        """
709
        r = self.account_post(update=True, metadata={metakey: ''})
710
        r.release()
711

    
712
    def set_account_quota(self, quota):
713
        """
714
        :param quota: (int)
715
        """
716
        r = self.account_post(update=True, quota=quota)
717
        r.release()
718

    
719
    def set_account_versioning(self, versioning):
720
        """
721
        "param versioning: (str)
722
        """
723
        r = self.account_post(update=True, versioning=versioning)
724
        r.release()
725

    
726
    def list_containers(self):
727
        """
728
        :returns: (dict)
729
        """
730
        r = self.account_get()
731
        return r.json
732

    
733
    def del_container(self, until=None, delimiter=None):
734
        """
735
        :param until: (str) formated date
736

737
        :param delimiter: (str) with / empty container
738

739
        :raises ClientError: 404 Container does not exist
740

741
        :raises ClientError: 409 Container is not empty
742
        """
743
        self._assert_container()
744
        r = self.container_delete(
745
            until=until,
746
            delimiter=delimiter,
747
            success=(204, 404, 409))
748
        r.release()
749
        if r.status_code == 404:
750
            raise ClientError(
751
                'Container "%s" does not exist' % self.container,
752
                r.status_code)
753
        elif r.status_code == 409:
754
            raise ClientError(
755
                'Container "%s" is not empty' % self.container,
756
                r.status_code)
757

    
758
    def get_container_versioning(self, container):
759
        """
760
        :param container: (str)
761

762
        :returns: (dict)
763
        """
764
        self.container = container
765
        return filter_in(
766
            self.get_container_info(),
767
            'X-Container-Policy-Versioning')
768

    
769
    def get_container_quota(self, container):
770
        """
771
        :param container: (str)
772

773
        :returns: (dict)
774
        """
775
        self.container = container
776
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
777

    
778
    def get_container_info(self, until=None):
779
        """
780
        :param until: (str) formated date
781

782
        :returns: (dict)
783

784
        :raises ClientError: 404 Container not found
785
        """
786
        try:
787
            r = self.container_head(until=until)
788
        except ClientError as err:
789
            err.details.append('for container %s' % self.container)
790
            raise err
791
        return r.headers
792

    
793
    def get_container_meta(self, until=None):
794
        """
795
        :param until: (str) formated date
796

797
        :returns: (dict)
798
        """
799
        return filter_in(
800
            self.get_container_info(until=until),
801
            'X-Container-Meta')
802

    
803
    def get_container_object_meta(self, until=None):
804
        """
805
        :param until: (str) formated date
806

807
        :returns: (dict)
808
        """
809
        return filter_in(
810
            self.get_container_info(until=until),
811
            'X-Container-Object-Meta')
812

    
813
    def set_container_meta(self, metapairs):
814
        """
815
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
816
        """
817
        assert(type(metapairs) is dict)
818
        r = self.container_post(update=True, metadata=metapairs)
819
        r.release()
820

    
821
    def del_container_meta(self, metakey):
822
        """
823
        :param metakey: (str) metadatum key
824
        """
825
        r = self.container_post(update=True, metadata={metakey: ''})
826
        r.release()
827

    
828
    def set_container_quota(self, quota):
829
        """
830
        :param quota: (int)
831
        """
832
        r = self.container_post(update=True, quota=quota)
833
        r.release()
834

    
835
    def set_container_versioning(self, versioning):
836
        """
837
        :param versioning: (str)
838
        """
839
        r = self.container_post(update=True, versioning=versioning)
840
        r.release()
841

    
842
    def del_object(self, obj, until=None, delimiter=None):
843
        """
844
        :param obj: (str) remote object path
845

846
        :param until: (str) formated date
847

848
        :param delimiter: (str)
849
        """
850
        self._assert_container()
851
        r = self.object_delete(obj, until=until, delimiter=delimiter)
852
        r.release()
853

    
854
    def set_object_meta(self, obj, metapairs):
855
        """
856
        :param obj: (str) remote object path
857

858
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
859
        """
860
        assert(type(metapairs) is dict)
861
        r = self.object_post(obj, update=True, metadata=metapairs)
862
        r.release()
863

    
864
    def del_object_meta(self, obj, metakey):
865
        """
866
        :param obj: (str) remote object path
867

868
        :param metakey: (str) metadatum key
869
        """
870
        r = self.object_post(obj, update=True, metadata={metakey: ''})
871
        r.release()
872

    
873
    def publish_object(self, obj):
874
        """
875
        :param obj: (str) remote object path
876

877
        :returns: (str) access url
878
        """
879
        r = self.object_post(obj, update=True, public=True)
880
        r.release()
881
        info = self.get_object_info(obj)
882
        pref, sep, rest = self.base_url.partition('//')
883
        base = rest.split('/')[0]
884
        newurl = path4url(
885
            '%s%s%s' % (pref, sep, base),
886
            info['x-object-public'])
887
        return newurl[1:]
888

    
889
    def unpublish_object(self, obj):
890
        """
891
        :param obj: (str) remote object path
892
        """
893
        r = self.object_post(obj, update=True, public=False)
894
        r.release()
895

    
896
    def get_object_info(self, obj, version=None):
897
        """
898
        :param obj: (str) remote object path
899

900
        :param version: (str)
901

902
        :returns: (dict)
903
        """
904
        try:
905
            r = self.object_head(obj, version=version)
906
            return r.headers
907
        except ClientError as ce:
908
            if ce.status == 404:
909
                raise ClientError('Object not found', status=404)
910
            raise
911

    
912
    def get_object_meta(self, obj, version=None):
913
        """
914
        :param obj: (str) remote object path
915

916
        :param version: (str)
917

918
        :returns: (dict)
919
        """
920
        return filter_in(
921
            self.get_object_info(obj, version=version),
922
            'X-Object-Meta')
923

    
924
    def get_object_sharing(self, obj):
925
        """
926
        :param obj: (str) remote object path
927

928
        :returns: (dict)
929
        """
930
        r = filter_in(
931
            self.get_object_info(obj),
932
            'X-Object-Sharing',
933
            exactMatch=True)
934
        reply = {}
935
        if len(r) > 0:
936
            perms = r['x-object-sharing'].split(';')
937
            for perm in perms:
938
                try:
939
                    perm.index('=')
940
                except ValueError:
941
                    raise ClientError('Incorrect reply format')
942
                (key, val) = perm.strip().split('=')
943
                reply[key] = val
944
        return reply
945

    
946
    def set_object_sharing(
947
            self, obj,
948
            read_permition=False, write_permition=False):
949
        """Give read/write permisions to an object.
950

951
        :param obj: (str) remote object path
952

953
        :param read_permition: (list - bool) users and user groups that get
954
            read permition for this object - False means all previous read
955
            permissions will be removed
956

957
        :param write_perimition: (list - bool) of users and user groups to get
958
           write permition for this object - False means all previous write
959
           permissions will be removed
960
        """
961

    
962
        perms = dict(
963
            read='' if not read_permition else read_permition,
964
            write='' if not write_permition else write_permition)
965
        r = self.object_post(obj, update=True, permissions=perms)
966
        r.release()
967

    
968
    def del_object_sharing(self, obj):
969
        """
970
        :param obj: (str) remote object path
971
        """
972
        self.set_object_sharing(obj)
973

    
974
    def append_object(self, obj, source_file, upload_cb=None):
975
        """
976
        :param obj: (str) remote object path
977

978
        :param source_file: open file descriptor
979

980
        :param upload_db: progress.bar for uploading
981
        """
982

    
983
        self._assert_container()
984
        meta = self.get_container_info()
985
        blocksize = int(meta['x-container-block-size'])
986
        filesize = fstat(source_file.fileno()).st_size
987
        nblocks = 1 + (filesize - 1) // blocksize
988
        offset = 0
989
        if upload_cb:
990
            upload_gen = upload_cb(nblocks)
991
            upload_gen.next()
992
        for i in range(nblocks):
993
            block = source_file.read(min(blocksize, filesize - offset))
994
            offset += len(block)
995
            r = self.object_post(
996
                obj,
997
                update=True,
998
                content_range='bytes */*',
999
                content_type='application/octet-stream',
1000
                content_length=len(block),
1001
                data=block)
1002
            r.release()
1003

    
1004
            if upload_cb:
1005
                upload_gen.next()
1006

    
1007
    def truncate_object(self, obj, upto_bytes):
1008
        """
1009
        :param obj: (str) remote object path
1010

1011
        :param upto_bytes: max number of bytes to leave on file
1012
        """
1013
        r = self.object_post(
1014
            obj,
1015
            update=True,
1016
            content_range='bytes 0-%s/*' % upto_bytes,
1017
            content_type='application/octet-stream',
1018
            object_bytes=upto_bytes,
1019
            source_object=path4url(self.container, obj))
1020
        r.release()
1021

    
1022
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1023
        """Overwrite a part of an object from local source file
1024

1025
        :param obj: (str) remote object path
1026

1027
        :param start: (int) position in bytes to start overwriting from
1028

1029
        :param end: (int) position in bytes to stop overwriting at
1030

1031
        :param source_file: open file descriptor
1032

1033
        :param upload_db: progress.bar for uploading
1034
        """
1035

    
1036
        r = self.get_object_info(obj)
1037
        rf_size = int(r['content-length'])
1038
        if rf_size < int(start):
1039
            raise ClientError(
1040
                'Range start exceeds file size',
1041
                status=416)
1042
        elif rf_size < int(end):
1043
            raise ClientError(
1044
                'Range end exceeds file size',
1045
                status=416)
1046
        self._assert_container()
1047
        meta = self.get_container_info()
1048
        blocksize = int(meta['x-container-block-size'])
1049
        filesize = fstat(source_file.fileno()).st_size
1050
        datasize = int(end) - int(start) + 1
1051
        nblocks = 1 + (datasize - 1) // blocksize
1052
        offset = 0
1053
        if upload_cb:
1054
            upload_gen = upload_cb(nblocks)
1055
            upload_gen.next()
1056
        for i in range(nblocks):
1057
            read_size = min(blocksize, filesize - offset, datasize - offset)
1058
            block = source_file.read(read_size)
1059
            r = self.object_post(
1060
                obj,
1061
                update=True,
1062
                content_type='application/octet-stream',
1063
                content_length=len(block),
1064
                content_range='bytes %s-%s/*' % (
1065
                    start + offset,
1066
                    start + offset + len(block) - 1),
1067
                data=block)
1068
            offset += len(block)
1069
            r.release()
1070

    
1071
            if upload_cb:
1072
                upload_gen.next()
1073

    
1074
    def copy_object(
1075
            self, src_container, src_object, dst_container,
1076
            dst_object=False,
1077
            source_version=None,
1078
            public=False,
1079
            content_type=None,
1080
            delimiter=None):
1081
        """
1082
        :param src_container: (str) source container
1083

1084
        :param src_object: (str) source object path
1085

1086
        :param dst_container: (str) destination container
1087

1088
        :param dst_object: (str) destination object path
1089

1090
        :param source_version: (str) source object version
1091

1092
        :param public: (bool)
1093

1094
        :param content_type: (str)
1095

1096
        :param delimiter: (str)
1097
        """
1098
        self._assert_account()
1099
        self.container = dst_container
1100
        dst_object = dst_object or src_object
1101
        src_path = path4url(src_container, src_object)
1102
        r = self.object_put(
1103
            dst_object,
1104
            success=201,
1105
            copy_from=src_path,
1106
            content_length=0,
1107
            source_version=source_version,
1108
            public=public,
1109
            content_type=content_type,
1110
            delimiter=delimiter)
1111
        r.release()
1112

    
1113
    def move_object(
1114
            self, src_container, src_object, dst_container,
1115
            dst_object=False,
1116
            source_version=None,
1117
            public=False,
1118
            content_type=None,
1119
            delimiter=None):
1120
        """
1121
        :param src_container: (str) source container
1122

1123
        :param src_object: (str) source object path
1124

1125
        :param dst_container: (str) destination container
1126

1127
        :param dst_object: (str) destination object path
1128

1129
        :param source_version: (str) source object version
1130

1131
        :param public: (bool)
1132

1133
        :param content_type: (str)
1134

1135
        :param delimiter: (str)
1136
        """
1137
        self._assert_account()
1138
        self.container = dst_container
1139
        dst_object = dst_object or src_object
1140
        src_path = path4url(src_container, src_object)
1141
        r = self.object_put(
1142
            dst_object,
1143
            success=201,
1144
            move_from=src_path,
1145
            content_length=0,
1146
            source_version=source_version,
1147
            public=public,
1148
            content_type=content_type,
1149
            delimiter=delimiter)
1150
        r.release()
1151

    
1152
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1153
        """Get accounts that share with self.account
1154

1155
        :param limit: (str)
1156

1157
        :param marker: (str)
1158

1159
        :returns: (dict)
1160
        """
1161
        self._assert_account()
1162

    
1163
        self.set_param('format', 'json')
1164
        self.set_param('limit', limit, iff=limit is not None)
1165
        self.set_param('marker', marker, iff=marker is not None)
1166

    
1167
        path = ''
1168
        success = kwargs.pop('success', (200, 204))
1169
        r = self.get(path, *args, success=success, **kwargs)
1170
        return r.json
1171

    
1172
    def get_object_versionlist(self, obj):
1173
        """
1174
        :param obj: (str) remote object path
1175

1176
        :returns: (list)
1177
        """
1178
        self._assert_container()
1179
        r = self.object_get(obj, format='json', version='list')
1180
        return r.json['versions']