Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 2a7292f1

History | View | Annotate | Download (35.8 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self, container=None):
77
        """Delete an empty container and destroy associated blocks
78
        """
79
        cnt_back_up = self.container
80
        try:
81
            self.container = container or cnt_back_up
82
            r = self.container_delete(until=unicode(time()))
83
        finally:
84
            self.container = cnt_back_up
85
        r.release()
86

    
87
    def upload_object_unchunked(
88
            self, obj, f,
89
            withHashFile=False,
90
            size=None,
91
            etag=None,
92
            content_encoding=None,
93
            content_disposition=None,
94
            content_type=None,
95
            sharing=None,
96
            public=None):
97
        """
98
        :param obj: (str) remote object path
99

100
        :param f: open file descriptor
101

102
        :param withHashFile: (bool)
103

104
        :param size: (int) size of data to upload
105

106
        :param etag: (str)
107

108
        :param content_encoding: (str)
109

110
        :param content_disposition: (str)
111

112
        :param content_type: (str)
113

114
        :param sharing: {'read':[user and/or grp names],
115
            'write':[usr and/or grp names]}
116

117
        :param public: (bool)
118
        """
119
        self._assert_container()
120

    
121
        if withHashFile:
122
            data = f.read()
123
            try:
124
                import json
125
                data = json.dumps(json.loads(data))
126
            except ValueError:
127
                raise ClientError('"%s" is not json-formated' % f.name, 1)
128
            except SyntaxError:
129
                msg = '"%s" is not a valid hashmap file' % f.name
130
                raise ClientError(msg, 1)
131
            f = StringIO(data)
132
        else:
133
            data = f.read(size) if size is not None else f.read()
134
        r = self.object_put(
135
            obj,
136
            data=data,
137
            etag=etag,
138
            content_encoding=content_encoding,
139
            content_disposition=content_disposition,
140
            content_type=content_type,
141
            permissions=sharing,
142
            public=public,
143
            success=201)
144
        r.release()
145

    
146
    def create_object_by_manifestation(
147
            self, obj,
148
            etag=None,
149
            content_encoding=None,
150
            content_disposition=None,
151
            content_type=None,
152
            sharing=None,
153
            public=None):
154
        """
155
        :param obj: (str) remote object path
156

157
        :param etag: (str)
158

159
        :param content_encoding: (str)
160

161
        :param content_disposition: (str)
162

163
        :param content_type: (str)
164

165
        :param sharing: {'read':[user and/or grp names],
166
            'write':[usr and/or grp names]}
167

168
        :param public: (bool)
169
        """
170
        self._assert_container()
171
        r = self.object_put(
172
            obj,
173
            content_length=0,
174
            etag=etag,
175
            content_encoding=content_encoding,
176
            content_disposition=content_disposition,
177
            content_type=content_type,
178
            permissions=sharing,
179
            public=public,
180
            manifest='%s/%s' % (self.container, obj))
181
        r.release()
182

    
183
    # upload_* auxiliary methods
184
    def _put_block_async(self, data, hash, upload_gen=None):
185
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
186
        event.start()
187
        return event
188

    
189
    def _put_block(self, data, hash):
190
        r = self.container_post(
191
            update=True,
192
            content_type='application/octet-stream',
193
            content_length=len(data),
194
            data=data,
195
            format='json')
196
        assert r.json[0] == hash, 'Local hash does not match server'
197

    
198
    def _get_file_block_info(self, fileobj, size=None):
199
        meta = self.get_container_info()
200
        blocksize = int(meta['x-container-block-size'])
201
        blockhash = meta['x-container-block-hash']
202
        size = size if size is not None else fstat(fileobj.fileno()).st_size
203
        nblocks = 1 + (size - 1) // blocksize
204
        return (blocksize, blockhash, size, nblocks)
205

    
206
    def _get_missing_hashes(
207
            self, obj, json,
208
            size=None,
209
            format='json',
210
            hashmap=True,
211
            content_type=None,
212
            etag=None,
213
            content_encoding=None,
214
            content_disposition=None,
215
            permissions=None,
216
            public=None,
217
            success=(201, 409)):
218
        r = self.object_put(
219
            obj,
220
            format='json',
221
            hashmap=True,
222
            content_type=content_type,
223
            json=json,
224
            etag=etag,
225
            content_encoding=content_encoding,
226
            content_disposition=content_disposition,
227
            permissions=permissions,
228
            public=public,
229
            success=success)
230
        if r.status_code == 201:
231
            r.release()
232
            return None
233
        return r.json
234

    
235
    def _culculate_blocks_for_upload(
236
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
237
            hash_cb=None):
238
        offset = 0
239
        if hash_cb:
240
            hash_gen = hash_cb(nblocks)
241
            hash_gen.next()
242

    
243
        for i in range(nblocks):
244
            block = fileobj.read(min(blocksize, size - offset))
245
            bytes = len(block)
246
            hash = _pithos_hash(block, blockhash)
247
            hashes.append(hash)
248
            hmap[hash] = (offset, bytes)
249
            offset += bytes
250
            if hash_cb:
251
                hash_gen.next()
252
        msg = 'Failed to calculate uploaded blocks:'
253
        ' Offset and object size do not match'
254
        assert offset == size, msg
255

    
256
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
257
        """upload missing blocks asynchronously"""
258

    
259
        self._init_thread_limit()
260

    
261
        flying = []
262
        failures = []
263
        for hash in missing:
264
            offset, bytes = hmap[hash]
265
            fileobj.seek(offset)
266
            data = fileobj.read(bytes)
267
            r = self._put_block_async(data, hash, upload_gen)
268
            flying.append(r)
269
            unfinished = self._watch_thread_limit(flying)
270
            for thread in set(flying).difference(unfinished):
271
                if thread.exception:
272
                    failures.append(thread)
273
                    if isinstance(
274
                            thread.exception,
275
                            ClientError) and thread.exception.status == 502:
276
                        self.POOLSIZE = self._thread_limit
277
                elif thread.isAlive():
278
                    flying.append(thread)
279
                elif upload_gen:
280
                    try:
281
                        upload_gen.next()
282
                    except:
283
                        pass
284
            flying = unfinished
285

    
286
        for thread in flying:
287
            thread.join()
288
            if thread.exception:
289
                failures.append(thread)
290
            elif upload_gen:
291
                try:
292
                    upload_gen.next()
293
                except:
294
                    pass
295

    
296
        return [failure.kwargs['hash'] for failure in failures]
297

    
298
    def upload_object(
299
            self, obj, f,
300
            size=None,
301
            hash_cb=None,
302
            upload_cb=None,
303
            etag=None,
304
            content_encoding=None,
305
            content_disposition=None,
306
            content_type=None,
307
            sharing=None,
308
            public=None):
309
        """Upload an object using multiple connections (threads)
310

311
        :param obj: (str) remote object path
312

313
        :param f: open file descriptor (rb)
314

315
        :param hash_cb: optional progress.bar object for calculating hashes
316

317
        :param upload_cb: optional progress.bar object for uploading
318

319
        :param etag: (str)
320

321
        :param content_encoding: (str)
322

323
        :param content_disposition: (str)
324

325
        :param content_type: (str)
326

327
        :param sharing: {'read':[user and/or grp names],
328
            'write':[usr and/or grp names]}
329

330
        :param public: (bool)
331
        """
332
        self._assert_container()
333

    
334
        #init
335
        block_info = (blocksize, blockhash, size, nblocks) =\
336
            self._get_file_block_info(f, size)
337
        (hashes, hmap, offset) = ([], {}, 0)
338
        if not content_type:
339
            content_type = 'application/octet-stream'
340

    
341
        self._culculate_blocks_for_upload(
342
            *block_info,
343
            hashes=hashes,
344
            hmap=hmap,
345
            fileobj=f,
346
            hash_cb=hash_cb)
347

    
348
        hashmap = dict(bytes=size, hashes=hashes)
349
        missing = self._get_missing_hashes(
350
            obj, hashmap,
351
            content_type=content_type,
352
            size=size,
353
            etag=etag,
354
            content_encoding=content_encoding,
355
            content_disposition=content_disposition,
356
            permissions=sharing,
357
            public=public)
358

    
359
        if missing is None:
360
            return
361

    
362
        if upload_cb:
363
            upload_gen = upload_cb(len(missing))
364
            for i in range(len(missing), len(hashmap['hashes']) + 1):
365
                try:
366
                    upload_gen.next()
367
                except:
368
                    upload_gen = None
369
        else:
370
            upload_gen = None
371

    
372
        retries = 7
373
        try:
374
            while retries:
375
                sendlog.info('%s blocks missing' % len(missing))
376
                num_of_blocks = len(missing)
377
                missing = self._upload_missing_blocks(
378
                    missing,
379
                    hmap,
380
                    f,
381
                    upload_gen)
382
                if missing:
383
                    if num_of_blocks == len(missing):
384
                        retries -= 1
385
                    else:
386
                        num_of_blocks = len(missing)
387
                else:
388
                    break
389
            if missing:
390
                raise ClientError(
391
                    '%s blocks failed to upload' % len(missing),
392
                    status=800)
393
        except KeyboardInterrupt:
394
            sendlog.info('- - - wait for threads to finish')
395
            for thread in activethreads():
396
                thread.join()
397
            raise
398

    
399
        r = self.object_put(
400
            obj,
401
            format='json',
402
            hashmap=True,
403
            content_type=content_type,
404
            json=hashmap,
405
            success=201)
406
        r.release()
407

    
408
    # download_* auxiliary methods
409
    def _get_remote_blocks_info(self, obj, **restargs):
410
        #retrieve object hashmap
411
        myrange = restargs.pop('data_range', None)
412
        hashmap = self.get_object_hashmap(obj, **restargs)
413
        restargs['data_range'] = myrange
414
        blocksize = int(hashmap['block_size'])
415
        blockhash = hashmap['block_hash']
416
        total_size = hashmap['bytes']
417
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
418
        map_dict = {}
419
        for i, h in enumerate(hashmap['hashes']):
420
            map_dict[h] = i
421
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
422

    
423
    def _dump_blocks_sync(
424
            self, obj, remote_hashes, blocksize, total_size, dst, range,
425
            **args):
426
        for blockid, blockhash in enumerate(remote_hashes):
427
            if blockhash:
428
                start = blocksize * blockid
429
                is_last = start + blocksize > total_size
430
                end = (total_size - 1) if is_last else (start + blocksize - 1)
431
                (start, end) = _range_up(start, end, range)
432
                args['data_range'] = 'bytes=%s-%s' % (start, end)
433
                r = self.object_get(obj, success=(200, 206), **args)
434
                self._cb_next()
435
                dst.write(r.content)
436
                dst.flush()
437

    
438
    def _get_block_async(self, obj, **args):
439
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
440
        event.start()
441
        return event
442

    
443
    def _hash_from_file(self, fp, start, size, blockhash):
444
        fp.seek(start)
445
        block = fp.read(size)
446
        h = newhashlib(blockhash)
447
        h.update(block.strip('\x00'))
448
        return hexlify(h.digest())
449

    
450
    def _thread2file(self, flying, local_file, offset=0, **restargs):
451
        """write the results of a greenleted rest call to a file
452

453
        :param offset: the offset of the file up to blocksize
454
        - e.g. if the range is 10-100, all blocks will be written to
455
        normal_position - 10
456
        """
457
        finished = []
458
        for i, (start, g) in enumerate(flying.items()):
459
            if not g.isAlive():
460
                if g.exception:
461
                    raise g.exception
462
                block = g.value.content
463
                local_file.seek(start - offset)
464
                local_file.write(block)
465
                self._cb_next()
466
                finished.append(flying.pop(start))
467
        local_file.flush()
468
        return finished
469

    
470
    def _dump_blocks_async(
471
            self, obj, remote_hashes, blocksize, total_size, local_file,
472
            blockhash=None, resume=False, filerange=None, **restargs):
473
        file_size = fstat(local_file.fileno()).st_size if resume else 0
474
        flying = {}
475
        finished = []
476
        offset = 0
477
        if filerange is not None:
478
            rstart = int(filerange.split('-')[0])
479
            offset = rstart if blocksize > rstart else rstart % blocksize
480

    
481
        self._init_thread_limit()
482
        for block_hash, blockid in remote_hashes.items():
483
            start = blocksize * blockid
484
            if start < file_size and block_hash == self._hash_from_file(
485
                    local_file, start, blocksize, blockhash):
486
                self._cb_next()
487
                continue
488
            self._watch_thread_limit(flying.values())
489
            finished += self._thread2file(
490
                flying,
491
                local_file,
492
                offset,
493
                **restargs)
494
            end = total_size - 1 if start + blocksize > total_size\
495
                else start + blocksize - 1
496
            (start, end) = _range_up(start, end, filerange)
497
            if start == end:
498
                self._cb_next()
499
                continue
500
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
501
            flying[start] = self._get_block_async(obj, **restargs)
502

    
503
        for thread in flying.values():
504
            thread.join()
505
        finished += self._thread2file(flying, local_file, offset, **restargs)
506

    
507
    def download_object(
508
            self, obj, dst,
509
            download_cb=None,
510
            version=None,
511
            resume=False,
512
            range_str=None,
513
            if_match=None,
514
            if_none_match=None,
515
            if_modified_since=None,
516
            if_unmodified_since=None):
517
        """Download an object (multiple connections, random blocks)
518

519
        :param obj: (str) remote object path
520

521
        :param dst: open file descriptor (wb+)
522

523
        :param download_cb: optional progress.bar object for downloading
524

525
        :param version: (str) file version
526

527
        :param resume: (bool) if set, preserve already downloaded file parts
528

529
        :param range_str: (str) from, to are file positions (int) in bytes
530

531
        :param if_match: (str)
532

533
        :param if_none_match: (str)
534

535
        :param if_modified_since: (str) formated date
536

537
        :param if_unmodified_since: (str) formated date"""
538
        restargs = dict(
539
            version=version,
540
            data_range=None if range_str is None else 'bytes=%s' % range_str,
541
            if_match=if_match,
542
            if_none_match=if_none_match,
543
            if_modified_since=if_modified_since,
544
            if_unmodified_since=if_unmodified_since)
545

    
546
        (
547
            blocksize,
548
            blockhash,
549
            total_size,
550
            hash_list,
551
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
552
        assert total_size >= 0
553

    
554
        if download_cb:
555
            self.progress_bar_gen = download_cb(len(remote_hashes))
556
            self._cb_next()
557

    
558
        if dst.isatty():
559
            self._dump_blocks_sync(
560
                obj,
561
                hash_list,
562
                blocksize,
563
                total_size,
564
                dst,
565
                range_str,
566
                **restargs)
567
        else:
568
            self._dump_blocks_async(
569
                obj,
570
                remote_hashes,
571
                blocksize,
572
                total_size,
573
                dst,
574
                blockhash,
575
                resume,
576
                range_str,
577
                **restargs)
578
            if not range_str:
579
                dst.truncate(total_size)
580

    
581
        self._complete_cb()
582

    
583
    #Command Progress Bar method
584
    def _cb_next(self):
585
        if hasattr(self, 'progress_bar_gen'):
586
            try:
587
                self.progress_bar_gen.next()
588
            except:
589
                pass
590

    
591
    def _complete_cb(self):
592
        while True:
593
            try:
594
                self.progress_bar_gen.next()
595
            except:
596
                break
597

    
598
    def get_object_hashmap(
599
            self, obj,
600
            version=None,
601
            if_match=None,
602
            if_none_match=None,
603
            if_modified_since=None,
604
            if_unmodified_since=None,
605
            data_range=None):
606
        """
607
        :param obj: (str) remote object path
608

609
        :param if_match: (str)
610

611
        :param if_none_match: (str)
612

613
        :param if_modified_since: (str) formated date
614

615
        :param if_unmodified_since: (str) formated date
616

617
        :param data_range: (str) from-to where from and to are integers
618
            denoting file positions in bytes
619

620
        :returns: (list)
621
        """
622
        try:
623
            r = self.object_get(
624
                obj,
625
                hashmap=True,
626
                version=version,
627
                if_etag_match=if_match,
628
                if_etag_not_match=if_none_match,
629
                if_modified_since=if_modified_since,
630
                if_unmodified_since=if_unmodified_since,
631
                data_range=data_range)
632
        except ClientError as err:
633
            if err.status == 304 or err.status == 412:
634
                return {}
635
            raise
636
        return r.json
637

    
638
    def set_account_group(self, group, usernames):
639
        """
640
        :param group: (str)
641

642
        :param usernames: (list)
643
        """
644
        r = self.account_post(update=True, groups={group: usernames})
645
        r.release()
646

    
647
    def del_account_group(self, group):
648
        """
649
        :param group: (str)
650
        """
651
        r = self.account_post(update=True, groups={group: []})
652
        r.release()
653

    
654
    def get_account_info(self, until=None):
655
        """
656
        :param until: (str) formated date
657

658
        :returns: (dict)
659
        """
660
        r = self.account_head(until=until)
661
        if r.status_code == 401:
662
            raise ClientError("No authorization", status=401)
663
        return r.headers
664

    
665
    def get_account_quota(self):
666
        """
667
        :returns: (dict)
668
        """
669
        return filter_in(
670
            self.get_account_info(),
671
            'X-Account-Policy-Quota',
672
            exactMatch=True)
673

    
674
    def get_account_versioning(self):
675
        """
676
        :returns: (dict)
677
        """
678
        return filter_in(
679
            self.get_account_info(),
680
            'X-Account-Policy-Versioning',
681
            exactMatch=True)
682

    
683
    def get_account_meta(self, until=None):
684
        """
685
        :meta until: (str) formated date
686

687
        :returns: (dict)
688
        """
689
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
690

    
691
    def get_account_group(self):
692
        """
693
        :returns: (dict)
694
        """
695
        return filter_in(self.get_account_info(), 'X-Account-Group-')
696

    
697
    def set_account_meta(self, metapairs):
698
        """
699
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
700
        """
701
        assert(type(metapairs) is dict)
702
        r = self.account_post(update=True, metadata=metapairs)
703
        r.release()
704

    
705
    def del_account_meta(self, metakey):
706
        """
707
        :param metakey: (str) metadatum key
708
        """
709
        r = self.account_post(update=True, metadata={metakey: ''})
710
        r.release()
711

    
712
    def set_account_quota(self, quota):
713
        """
714
        :param quota: (int)
715
        """
716
        r = self.account_post(update=True, quota=quota)
717
        r.release()
718

    
719
    def set_account_versioning(self, versioning):
720
        """
721
        "param versioning: (str)
722
        """
723
        r = self.account_post(update=True, versioning=versioning)
724
        r.release()
725

    
726
    def list_containers(self):
727
        """
728
        :returns: (dict)
729
        """
730
        r = self.account_get()
731
        return r.json
732

    
733
    def del_container(self, until=None, delimiter=None):
734
        """
735
        :param until: (str) formated date
736

737
        :param delimiter: (str) with / empty container
738

739
        :raises ClientError: 404 Container does not exist
740

741
        :raises ClientError: 409 Container is not empty
742
        """
743
        self._assert_container()
744
        r = self.container_delete(
745
            until=until,
746
            delimiter=delimiter,
747
            success=(204, 404, 409))
748
        r.release()
749
        if r.status_code == 404:
750
            raise ClientError(
751
                'Container "%s" does not exist' % self.container,
752
                r.status_code)
753
        elif r.status_code == 409:
754
            raise ClientError(
755
                'Container "%s" is not empty' % self.container,
756
                r.status_code)
757

    
758
    def get_container_versioning(self, container):
759
        """
760
        :param container: (str)
761

762
        :returns: (dict)
763
        """
764
        self.container = container
765
        return filter_in(
766
            self.get_container_info(),
767
            'X-Container-Policy-Versioning')
768

    
769
    def get_container_quota(self, container):
770
        """
771
        :param container: (str)
772

773
        :returns: (dict)
774
        """
775
        self.container = container
776
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
777

    
778
    def get_container_info(self, until=None):
779
        """
780
        :param until: (str) formated date
781

782
        :returns: (dict)
783

784
        :raises ClientError: 404 Container not found
785
        """
786
        try:
787
            r = self.container_head(until=until)
788
        except ClientError as err:
789
            err.details.append('for container %s' % self.container)
790
            raise err
791
        return r.headers
792

    
793
    def get_container_meta(self, until=None):
794
        """
795
        :param until: (str) formated date
796

797
        :returns: (dict)
798
        """
799
        return filter_in(
800
            self.get_container_info(until=until),
801
            'X-Container-Meta')
802

    
803
    def get_container_object_meta(self, until=None):
804
        """
805
        :param until: (str) formated date
806

807
        :returns: (dict)
808
        """
809
        return filter_in(
810
            self.get_container_info(until=until),
811
            'X-Container-Object-Meta')
812

    
813
    def set_container_meta(self, metapairs):
814
        """
815
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
816
        """
817
        assert(type(metapairs) is dict)
818
        r = self.container_post(update=True, metadata=metapairs)
819
        r.release()
820

    
821
    def del_container_meta(self, metakey):
822
        """
823
        :param metakey: (str) metadatum key
824
        """
825
        r = self.container_post(update=True, metadata={metakey: ''})
826
        r.release()
827

    
828
    def set_container_quota(self, quota):
829
        """
830
        :param quota: (int)
831
        """
832
        r = self.container_post(update=True, quota=quota)
833
        r.release()
834

    
835
    def set_container_versioning(self, versioning):
836
        """
837
        :param versioning: (str)
838
        """
839
        r = self.container_post(update=True, versioning=versioning)
840
        r.release()
841

    
842
    def del_object(self, obj, until=None, delimiter=None):
843
        """
844
        :param obj: (str) remote object path
845

846
        :param until: (str) formated date
847

848
        :param delimiter: (str)
849
        """
850
        self._assert_container()
851
        r = self.object_delete(obj, until=until, delimiter=delimiter)
852
        r.release()
853

    
854
    def set_object_meta(self, obj, metapairs):
855
        """
856
        :param obj: (str) remote object path
857

858
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
859
        """
860
        assert(type(metapairs) is dict)
861
        r = self.object_post(obj, update=True, metadata=metapairs)
862
        r.release()
863

    
864
    def del_object_meta(self, obj, metakey):
865
        """
866
        :param obj: (str) remote object path
867

868
        :param metakey: (str) metadatum key
869
        """
870
        r = self.object_post(obj, update=True, metadata={metakey: ''})
871
        r.release()
872

    
873
    def publish_object(self, obj):
874
        """
875
        :param obj: (str) remote object path
876

877
        :returns: (str) access url
878
        """
879
        r = self.object_post(obj, update=True, public=True)
880
        r.release()
881
        info = self.get_object_info(obj)
882
        pref, sep, rest = self.base_url.partition('//')
883
        base = rest.split('/')[0]
884
        newurl = path4url(
885
            '%s%s%s' % (pref, sep, base),
886
            info['x-object-public'])
887
        return newurl[1:]
888

    
889
    def unpublish_object(self, obj):
890
        """
891
        :param obj: (str) remote object path
892
        """
893
        r = self.object_post(obj, update=True, public=False)
894
        r.release()
895

    
896
    def get_object_info(self, obj, version=None):
897
        """
898
        :param obj: (str) remote object path
899

900
        :param version: (str)
901

902
        :returns: (dict)
903
        """
904
        try:
905
            r = self.object_head(obj, version=version)
906
            return r.headers
907
        except ClientError as ce:
908
            if ce.status == 404:
909
                raise ClientError('Object %s not found' % obj, status=404)
910
            raise
911

    
912
    def get_object_meta(self, obj, version=None):
913
        """
914
        :param obj: (str) remote object path
915

916
        :param version: (str)
917

918
        :returns: (dict)
919
        """
920
        return filter_in(
921
            self.get_object_info(obj, version=version),
922
            'X-Object-Meta')
923

    
924
    def get_object_sharing(self, obj):
925
        """
926
        :param obj: (str) remote object path
927

928
        :returns: (dict)
929
        """
930
        r = filter_in(
931
            self.get_object_info(obj),
932
            'X-Object-Sharing',
933
            exactMatch=True)
934
        reply = {}
935
        if len(r) > 0:
936
            perms = r['x-object-sharing'].split(';')
937
            for perm in perms:
938
                try:
939
                    perm.index('=')
940
                except ValueError:
941
                    raise ClientError('Incorrect reply format')
942
                (key, val) = perm.strip().split('=')
943
                reply[key] = val
944
        return reply
945

    
946
    def set_object_sharing(
947
            self, obj,
948
            read_permition=False, write_permition=False):
949
        """Give read/write permisions to an object.
950

951
        :param obj: (str) remote object path
952

953
        :param read_permition: (list - bool) users and user groups that get
954
            read permition for this object - False means all previous read
955
            permissions will be removed
956

957
        :param write_perimition: (list - bool) of users and user groups to get
958
           write permition for this object - False means all previous write
959
           permissions will be removed
960
        """
961

    
962
        perms = dict(
963
            read='' if not read_permition else read_permition,
964
            write='' if not write_permition else write_permition)
965
        r = self.object_post(obj, update=True, permissions=perms)
966
        r.release()
967

    
968
    def del_object_sharing(self, obj):
969
        """
970
        :param obj: (str) remote object path
971
        """
972
        self.set_object_sharing(obj)
973

    
974
    def append_object(self, obj, source_file, upload_cb=None):
975
        """
976
        :param obj: (str) remote object path
977

978
        :param source_file: open file descriptor
979

980
        :param upload_db: progress.bar for uploading
981
        """
982

    
983
        self._assert_container()
984
        meta = self.get_container_info()
985
        blocksize = int(meta['x-container-block-size'])
986
        filesize = fstat(source_file.fileno()).st_size
987
        nblocks = 1 + (filesize - 1) // blocksize
988
        offset = 0
989
        if upload_cb:
990
            upload_gen = upload_cb(nblocks)
991
            upload_gen.next()
992
        for i in range(nblocks):
993
            block = source_file.read(min(blocksize, filesize - offset))
994
            offset += len(block)
995
            r = self.object_post(
996
                obj,
997
                update=True,
998
                content_range='bytes */*',
999
                content_type='application/octet-stream',
1000
                content_length=len(block),
1001
                data=block)
1002
            r.release()
1003

    
1004
            if upload_cb:
1005
                upload_gen.next()
1006

    
1007
    def truncate_object(self, obj, upto_bytes):
1008
        """
1009
        :param obj: (str) remote object path
1010

1011
        :param upto_bytes: max number of bytes to leave on file
1012
        """
1013
        r = self.object_post(
1014
            obj,
1015
            update=True,
1016
            content_range='bytes 0-%s/*' % upto_bytes,
1017
            content_type='application/octet-stream',
1018
            object_bytes=upto_bytes,
1019
            source_object=path4url(self.container, obj))
1020
        r.release()
1021

    
1022
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1023
        """Overwrite a part of an object from local source file
1024

1025
        :param obj: (str) remote object path
1026

1027
        :param start: (int) position in bytes to start overwriting from
1028

1029
        :param end: (int) position in bytes to stop overwriting at
1030

1031
        :param source_file: open file descriptor
1032

1033
        :param upload_db: progress.bar for uploading
1034
        """
1035

    
1036
        r = self.get_object_info(obj)
1037
        rf_size = int(r['content-length'])
1038
        if rf_size < int(start):
1039
            raise ClientError(
1040
                'Range start exceeds file size',
1041
                status=416)
1042
        elif rf_size < int(end):
1043
            raise ClientError(
1044
                'Range end exceeds file size',
1045
                status=416)
1046
        self._assert_container()
1047
        meta = self.get_container_info()
1048
        blocksize = int(meta['x-container-block-size'])
1049
        filesize = fstat(source_file.fileno()).st_size
1050
        datasize = int(end) - int(start) + 1
1051
        nblocks = 1 + (datasize - 1) // blocksize
1052
        offset = 0
1053
        if upload_cb:
1054
            upload_gen = upload_cb(nblocks)
1055
            upload_gen.next()
1056
        for i in range(nblocks):
1057
            read_size = min(blocksize, filesize - offset, datasize - offset)
1058
            block = source_file.read(read_size)
1059
            r = self.object_post(
1060
                obj,
1061
                update=True,
1062
                content_type='application/octet-stream',
1063
                content_length=len(block),
1064
                content_range='bytes %s-%s/*' % (
1065
                    start + offset,
1066
                    start + offset + len(block) - 1),
1067
                data=block)
1068
            offset += len(block)
1069
            r.release()
1070

    
1071
            if upload_cb:
1072
                upload_gen.next()
1073

    
1074
    def copy_object(
1075
            self, src_container, src_object, dst_container,
1076
            dst_object=None,
1077
            source_version=None,
1078
            source_account=None,
1079
            public=False,
1080
            content_type=None,
1081
            delimiter=None):
1082
        """
1083
        :param src_container: (str) source container
1084

1085
        :param src_object: (str) source object path
1086

1087
        :param dst_container: (str) destination container
1088

1089
        :param dst_object: (str) destination object path
1090

1091
        :param source_version: (str) source object version
1092

1093
        :param source_account: (str) account to copy from
1094

1095
        :param public: (bool)
1096

1097
        :param content_type: (str)
1098

1099
        :param delimiter: (str)
1100
        """
1101
        self._assert_account()
1102
        self.container = dst_container
1103
        src_path = path4url(src_container, src_object)
1104
        r = self.object_put(
1105
            dst_object or src_object,
1106
            success=201,
1107
            copy_from=src_path,
1108
            content_length=0,
1109
            source_version=source_version,
1110
            source_account=source_account,
1111
            public=public,
1112
            content_type=content_type,
1113
            delimiter=delimiter)
1114
        r.release()
1115

    
1116
    def move_object(
1117
            self, src_container, src_object, dst_container,
1118
            dst_object=False,
1119
            source_account=None,
1120
            source_version=None,
1121
            public=False,
1122
            content_type=None,
1123
            delimiter=None):
1124
        """
1125
        :param src_container: (str) source container
1126

1127
        :param src_object: (str) source object path
1128

1129
        :param dst_container: (str) destination container
1130

1131
        :param dst_object: (str) destination object path
1132

1133
        :param source_account: (str) account to move from
1134

1135
        :param source_version: (str) source object version
1136

1137
        :param public: (bool)
1138

1139
        :param content_type: (str)
1140

1141
        :param delimiter: (str)
1142
        """
1143
        self._assert_account()
1144
        self.container = dst_container
1145
        dst_object = dst_object or src_object
1146
        src_path = path4url(src_container, src_object)
1147
        r = self.object_put(
1148
            dst_object,
1149
            success=201,
1150
            move_from=src_path,
1151
            content_length=0,
1152
            source_account=source_account,
1153
            source_version=source_version,
1154
            public=public,
1155
            content_type=content_type,
1156
            delimiter=delimiter)
1157
        r.release()
1158

    
1159
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1160
        """Get accounts that share with self.account
1161

1162
        :param limit: (str)
1163

1164
        :param marker: (str)
1165

1166
        :returns: (dict)
1167
        """
1168
        self._assert_account()
1169

    
1170
        self.set_param('format', 'json')
1171
        self.set_param('limit', limit, iff=limit is not None)
1172
        self.set_param('marker', marker, iff=marker is not None)
1173

    
1174
        path = ''
1175
        success = kwargs.pop('success', (200, 204))
1176
        r = self.get(path, *args, success=success, **kwargs)
1177
        return r.json
1178

    
1179
    def get_object_versionlist(self, obj):
1180
        """
1181
        :param obj: (str) remote object path
1182

1183
        :returns: (list)
1184
        """
1185
        self._assert_container()
1186
        r = self.object_get(obj, format='json', version='list')
1187
        return r.json['versions']