Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 5549c0ab

History | View | Annotate | Download (35.7 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self):
77
        """Delete an empty container and destroy associated blocks
78
        """
79
        r = self.container_delete(until=unicode(time()))
80
        r.release()
81

    
82
    def upload_object_unchunked(
83
            self, obj, f,
84
            withHashFile=False,
85
            size=None,
86
            etag=None,
87
            content_encoding=None,
88
            content_disposition=None,
89
            content_type=None,
90
            sharing=None,
91
            public=None):
92
        """
93
        :param obj: (str) remote object path
94

95
        :param f: open file descriptor
96

97
        :param withHashFile: (bool)
98

99
        :param size: (int) size of data to upload
100

101
        :param etag: (str)
102

103
        :param content_encoding: (str)
104

105
        :param content_disposition: (str)
106

107
        :param content_type: (str)
108

109
        :param sharing: {'read':[user and/or grp names],
110
            'write':[usr and/or grp names]}
111

112
        :param public: (bool)
113
        """
114
        self._assert_container()
115

    
116
        if withHashFile:
117
            data = f.read()
118
            try:
119
                import json
120
                data = json.dumps(json.loads(data))
121
            except ValueError:
122
                raise ClientError('"%s" is not json-formated' % f.name, 1)
123
            except SyntaxError:
124
                msg = '"%s" is not a valid hashmap file' % f.name
125
                raise ClientError(msg, 1)
126
            f = StringIO(data)
127
        data = f.read(size) if size is not None else f.read()
128
        r = self.object_put(
129
            obj,
130
            data=data,
131
            etag=etag,
132
            content_encoding=content_encoding,
133
            content_disposition=content_disposition,
134
            content_type=content_type,
135
            permissions=sharing,
136
            public=public,
137
            success=201)
138
        r.release()
139

    
140
    def create_object_by_manifestation(
141
            self, obj,
142
            etag=None,
143
            content_encoding=None,
144
            content_disposition=None,
145
            content_type=None,
146
            sharing=None,
147
            public=None):
148
        """
149
        :param obj: (str) remote object path
150

151
        :param etag: (str)
152

153
        :param content_encoding: (str)
154

155
        :param content_disposition: (str)
156

157
        :param content_type: (str)
158

159
        :param sharing: {'read':[user and/or grp names],
160
            'write':[usr and/or grp names]}
161

162
        :param public: (bool)
163
        """
164
        self._assert_container()
165
        r = self.object_put(
166
            obj,
167
            content_length=0,
168
            etag=etag,
169
            content_encoding=content_encoding,
170
            content_disposition=content_disposition,
171
            content_type=content_type,
172
            permissions=sharing,
173
            public=public,
174
            manifest='%s/%s' % (self.container, obj))
175
        r.release()
176

    
177
    # upload_* auxiliary methods
178
    def _put_block_async(self, data, hash, upload_gen=None):
179
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
180
        event.start()
181
        return event
182

    
183
    def _put_block(self, data, hash):
184
        from random import randint
185
        if not randint(0, 7):
186
            raise ClientError('BAD GATEWAY STUFF', 503)
187
        r = self.container_post(
188
            update=True,
189
            content_type='application/octet-stream',
190
            content_length=len(data),
191
            data=data,
192
            format='json')
193
        assert r.json[0] == hash, 'Local hash does not match server'
194

    
195
    def _get_file_block_info(self, fileobj, size=None):
196
        meta = self.get_container_info()
197
        blocksize = int(meta['x-container-block-size'])
198
        blockhash = meta['x-container-block-hash']
199
        size = size if size is not None else fstat(fileobj.fileno()).st_size
200
        nblocks = 1 + (size - 1) // blocksize
201
        return (blocksize, blockhash, size, nblocks)
202

    
203
    def _get_missing_hashes(
204
            self, obj, json,
205
            size=None,
206
            format='json',
207
            hashmap=True,
208
            content_type=None,
209
            etag=None,
210
            content_encoding=None,
211
            content_disposition=None,
212
            permissions=None,
213
            public=None,
214
            success=(201, 409)):
215
        r = self.object_put(
216
            obj,
217
            format='json',
218
            hashmap=True,
219
            content_type=content_type,
220
            json=json,
221
            etag=etag,
222
            content_encoding=content_encoding,
223
            content_disposition=content_disposition,
224
            permissions=permissions,
225
            public=public,
226
            success=success)
227
        if r.status_code == 201:
228
            r.release()
229
            return None
230
        return r.json
231

    
232
    def _caclulate_uploaded_blocks(
233
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
234
            hash_cb=None):
235
        offset = 0
236
        if hash_cb:
237
            hash_gen = hash_cb(nblocks)
238
            hash_gen.next()
239

    
240
        for i in range(nblocks):
241
            block = fileobj.read(min(blocksize, size - offset))
242
            bytes = len(block)
243
            hash = _pithos_hash(block, blockhash)
244
            hashes.append(hash)
245
            hmap[hash] = (offset, bytes)
246
            offset += bytes
247
            if hash_cb:
248
                hash_gen.next()
249
        msg = 'Failed to calculate uploaded blocks:'
250
        msg += ' Offset and object size do not match'
251
        assert offset == size, msg
252

    
253
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
254
        """upload missing blocks asynchronously"""
255

    
256
        self._init_thread_limit()
257

    
258
        flying = []
259
        failures = []
260
        for hash in missing:
261
            offset, bytes = hmap[hash]
262
            fileobj.seek(offset)
263
            data = fileobj.read(bytes)
264
            r = self._put_block_async(data, hash, upload_gen)
265
            flying.append(r)
266
            unfinished = self._watch_thread_limit(flying)
267
            for thread in set(flying).difference(unfinished):
268
                if thread.exception:
269
                    failures.append(thread)
270
                    if isinstance(
271
                            thread.exception,
272
                            ClientError) and thread.exception.status == 502:
273
                        self.POOLSIZE = self._thread_limit
274
                elif thread.isAlive():
275
                    flying.append(thread)
276
                elif upload_gen:
277
                    try:
278
                        upload_gen.next()
279
                    except:
280
                        pass
281
            flying = unfinished
282

    
283
        for thread in flying:
284
            thread.join()
285
            if thread.exception:
286
                failures.append(thread)
287
            elif upload_gen:
288
                try:
289
                    upload_gen.next()
290
                except:
291
                    pass
292

    
293
        return [failure.kwargs['hash'] for failure in failures]
294

    
295
    def upload_object(
296
            self, obj, f,
297
            size=None,
298
            hash_cb=None,
299
            upload_cb=None,
300
            etag=None,
301
            content_encoding=None,
302
            content_disposition=None,
303
            content_type=None,
304
            sharing=None,
305
            public=None):
306
        """Upload an object using multiple connections (threads)
307

308
        :param obj: (str) remote object path
309

310
        :param f: open file descriptor (rb)
311

312
        :param hash_cb: optional progress.bar object for calculating hashes
313

314
        :param upload_cb: optional progress.bar object for uploading
315

316
        :param etag: (str)
317

318
        :param content_encoding: (str)
319

320
        :param content_disposition: (str)
321

322
        :param content_type: (str)
323

324
        :param sharing: {'read':[user and/or grp names],
325
            'write':[usr and/or grp names]}
326

327
        :param public: (bool)
328
        """
329
        self._assert_container()
330

    
331
        #init
332
        block_info = (blocksize, blockhash, size, nblocks) =\
333
            self._get_file_block_info(f, size)
334
        (hashes, hmap, offset) = ([], {}, 0)
335
        if content_type is None:
336
            content_type = 'application/octet-stream'
337

    
338
        self._caclulate_uploaded_blocks(
339
            *block_info,
340
            hashes=hashes,
341
            hmap=hmap,
342
            fileobj=f,
343
            hash_cb=hash_cb)
344

    
345
        hashmap = dict(bytes=size, hashes=hashes)
346
        missing = self._get_missing_hashes(
347
            obj, hashmap,
348
            content_type=content_type,
349
            size=size,
350
            etag=etag,
351
            content_encoding=content_encoding,
352
            content_disposition=content_disposition,
353
            permissions=sharing,
354
            public=public)
355

    
356
        if missing is None:
357
            return
358

    
359
        if upload_cb:
360
            upload_gen = upload_cb(len(missing))
361
            for i in range(len(missing), len(hashmap['hashes']) + 1):
362
                try:
363
                    upload_gen.next()
364
                except:
365
                    upload_gen = None
366
        else:
367
            upload_gen = None
368

    
369
        retries = 7
370
        try:
371
            while retries:
372
                sendlog.info('%s blocks missing' % len(missing))
373
                num_of_blocks = len(missing)
374
                missing = self._upload_missing_blocks(
375
                    missing,
376
                    hmap,
377
                    f,
378
                    upload_gen)
379
                if missing:
380
                    if num_of_blocks == len(missing):
381
                        retries -= 1
382
                    else:
383
                        num_of_blocks = len(missing)
384
                else:
385
                    break
386
            if missing:
387
                raise ClientError(
388
                    '%s blocks failed to upload' % len(missing),
389
                    status=800)
390
        except KeyboardInterrupt:
391
            sendlog.info('- - - wait for threads to finish')
392
            for thread in activethreads():
393
                thread.join()
394
            raise
395

    
396
        r = self.object_put(
397
            obj,
398
            format='json',
399
            hashmap=True,
400
            content_type=content_type,
401
            json=hashmap,
402
            success=201)
403
        r.release()
404

    
405
    # download_* auxiliary methods
406
    def _get_remote_blocks_info(self, obj, **restargs):
407
        #retrieve object hashmap
408
        myrange = restargs.pop('data_range', None)
409
        hashmap = self.get_object_hashmap(obj, **restargs)
410
        restargs['data_range'] = myrange
411
        blocksize = int(hashmap['block_size'])
412
        blockhash = hashmap['block_hash']
413
        total_size = hashmap['bytes']
414
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
415
        map_dict = {}
416
        for i, h in enumerate(hashmap['hashes']):
417
            map_dict[h] = i
418
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
419

    
420
    def _dump_blocks_sync(
421
            self, obj, remote_hashes, blocksize, total_size, dst, range,
422
            **args):
423
        for blockid, blockhash in enumerate(remote_hashes):
424
            if blockhash:
425
                start = blocksize * blockid
426
                is_last = start + blocksize > total_size
427
                end = (total_size - 1) if is_last else (start + blocksize - 1)
428
                (start, end) = _range_up(start, end, range)
429
                args['data_range'] = 'bytes=%s-%s' % (start, end)
430
                r = self.object_get(obj, success=(200, 206), **args)
431
                self._cb_next()
432
                dst.write(r.content)
433
                dst.flush()
434

    
435
    def _get_block_async(self, obj, **args):
436
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
437
        event.start()
438
        return event
439

    
440
    def _hash_from_file(self, fp, start, size, blockhash):
441
        fp.seek(start)
442
        block = fp.read(size)
443
        h = newhashlib(blockhash)
444
        h.update(block.strip('\x00'))
445
        return hexlify(h.digest())
446

    
447
    def _thread2file(self, flying, local_file, offset=0, **restargs):
448
        """write the results of a greenleted rest call to a file
449

450
        :param offset: the offset of the file up to blocksize
451
        - e.g. if the range is 10-100, all blocks will be written to
452
        normal_position - 10
453
        """
454
        finished = []
455
        for i, (start, g) in enumerate(flying.items()):
456
            if not g.isAlive():
457
                if g.exception:
458
                    raise g.exception
459
                block = g.value.content
460
                local_file.seek(start - offset)
461
                local_file.write(block)
462
                self._cb_next()
463
                finished.append(flying.pop(start))
464
        local_file.flush()
465
        return finished
466

    
467
    def _dump_blocks_async(
468
            self, obj, remote_hashes, blocksize, total_size, local_file,
469
            blockhash=None, resume=False, filerange=None, **restargs):
470
        file_size = fstat(local_file.fileno()).st_size if resume else 0
471
        flying = {}
472
        finished = []
473
        offset = 0
474
        if filerange is not None:
475
            rstart = int(filerange.split('-')[0])
476
            offset = rstart if blocksize > rstart else rstart % blocksize
477

    
478
        self._init_thread_limit()
479
        for block_hash, blockid in remote_hashes.items():
480
            start = blocksize * blockid
481
            if start < file_size and block_hash == self._hash_from_file(
482
                    local_file, start, blocksize, blockhash):
483
                self._cb_next()
484
                continue
485
            self._watch_thread_limit(flying.values())
486
            finished += self._thread2file(
487
                flying,
488
                local_file,
489
                offset,
490
                **restargs)
491
            end = total_size - 1 if start + blocksize > total_size\
492
                else start + blocksize - 1
493
            (start, end) = _range_up(start, end, filerange)
494
            if start == end:
495
                self._cb_next()
496
                continue
497
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
498
            flying[start] = self._get_block_async(obj, **restargs)
499

    
500
        for thread in flying.values():
501
            thread.join()
502
        finished += self._thread2file(flying, local_file, offset, **restargs)
503

    
504
    def download_object(
505
            self, obj, dst,
506
            download_cb=None,
507
            version=None,
508
            resume=False,
509
            range_str=None,
510
            if_match=None,
511
            if_none_match=None,
512
            if_modified_since=None,
513
            if_unmodified_since=None):
514
        """Download an object (multiple connections, random blocks)
515

516
        :param obj: (str) remote object path
517

518
        :param dst: open file descriptor (wb+)
519

520
        :param download_cb: optional progress.bar object for downloading
521

522
        :param version: (str) file version
523

524
        :param resume: (bool) if set, preserve already downloaded file parts
525

526
        :param range_str: (str) from, to are file positions (int) in bytes
527

528
        :param if_match: (str)
529

530
        :param if_none_match: (str)
531

532
        :param if_modified_since: (str) formated date
533

534
        :param if_unmodified_since: (str) formated date"""
535
        restargs = dict(
536
            version=version,
537
            data_range=None if range_str is None else 'bytes=%s' % range_str,
538
            if_match=if_match,
539
            if_none_match=if_none_match,
540
            if_modified_since=if_modified_since,
541
            if_unmodified_since=if_unmodified_since)
542

    
543
        (
544
            blocksize,
545
            blockhash,
546
            total_size,
547
            hash_list,
548
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
549
        assert total_size >= 0
550

    
551
        if download_cb:
552
            self.progress_bar_gen = download_cb(len(remote_hashes))
553
            self._cb_next()
554

    
555
        if dst.isatty():
556
            self._dump_blocks_sync(
557
                obj,
558
                hash_list,
559
                blocksize,
560
                total_size,
561
                dst,
562
                range_str,
563
                **restargs)
564
        else:
565
            self._dump_blocks_async(
566
                obj,
567
                remote_hashes,
568
                blocksize,
569
                total_size,
570
                dst,
571
                blockhash,
572
                resume,
573
                range_str,
574
                **restargs)
575
            if not range_str:
576
                dst.truncate(total_size)
577

    
578
        self._complete_cb()
579

    
580
    #Command Progress Bar method
581
    def _cb_next(self):
582
        if hasattr(self, 'progress_bar_gen'):
583
            try:
584
                self.progress_bar_gen.next()
585
            except:
586
                pass
587

    
588
    def _complete_cb(self):
589
        while True:
590
            try:
591
                self.progress_bar_gen.next()
592
            except:
593
                break
594

    
595
    def get_object_hashmap(
596
            self, obj,
597
            version=None,
598
            if_match=None,
599
            if_none_match=None,
600
            if_modified_since=None,
601
            if_unmodified_since=None,
602
            data_range=None):
603
        """
604
        :param obj: (str) remote object path
605

606
        :param if_match: (str)
607

608
        :param if_none_match: (str)
609

610
        :param if_modified_since: (str) formated date
611

612
        :param if_unmodified_since: (str) formated date
613

614
        :param data_range: (str) from-to where from and to are integers
615
            denoting file positions in bytes
616

617
        :returns: (list)
618
        """
619
        try:
620
            r = self.object_get(
621
                obj,
622
                hashmap=True,
623
                version=version,
624
                if_etag_match=if_match,
625
                if_etag_not_match=if_none_match,
626
                if_modified_since=if_modified_since,
627
                if_unmodified_since=if_unmodified_since,
628
                data_range=data_range)
629
        except ClientError as err:
630
            if err.status == 304 or err.status == 412:
631
                return {}
632
            raise
633
        return r.json
634

    
635
    def set_account_group(self, group, usernames):
636
        """
637
        :param group: (str)
638

639
        :param usernames: (list)
640
        """
641
        r = self.account_post(update=True, groups={group: usernames})
642
        r.release()
643

    
644
    def del_account_group(self, group):
645
        """
646
        :param group: (str)
647
        """
648
        r = self.account_post(update=True, groups={group: []})
649
        r.release()
650

    
651
    def get_account_info(self, until=None):
652
        """
653
        :param until: (str) formated date
654

655
        :returns: (dict)
656
        """
657
        r = self.account_head(until=until)
658
        if r.status_code == 401:
659
            raise ClientError("No authorization", status=401)
660
        return r.headers
661

    
662
    def get_account_quota(self):
663
        """
664
        :returns: (dict)
665
        """
666
        return filter_in(
667
            self.get_account_info(),
668
            'X-Account-Policy-Quota',
669
            exactMatch=True)
670

    
671
    def get_account_versioning(self):
672
        """
673
        :returns: (dict)
674
        """
675
        return filter_in(
676
            self.get_account_info(),
677
            'X-Account-Policy-Versioning',
678
            exactMatch=True)
679

    
680
    def get_account_meta(self, until=None):
681
        """
682
        :meta until: (str) formated date
683

684
        :returns: (dict)
685
        """
686
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
687

    
688
    def get_account_group(self):
689
        """
690
        :returns: (dict)
691
        """
692
        return filter_in(self.get_account_info(), 'X-Account-Group-')
693

    
694
    def set_account_meta(self, metapairs):
695
        """
696
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
697
        """
698
        assert(type(metapairs) is dict)
699
        r = self.account_post(update=True, metadata=metapairs)
700
        r.release()
701

    
702
    def del_account_meta(self, metakey):
703
        """
704
        :param metakey: (str) metadatum key
705
        """
706
        r = self.account_post(update=True, metadata={metakey: ''})
707
        r.release()
708

    
709
    def set_account_quota(self, quota):
710
        """
711
        :param quota: (int)
712
        """
713
        r = self.account_post(update=True, quota=quota)
714
        r.release()
715

    
716
    def set_account_versioning(self, versioning):
717
        """
718
        "param versioning: (str)
719
        """
720
        r = self.account_post(update=True, versioning=versioning)
721
        r.release()
722

    
723
    def list_containers(self):
724
        """
725
        :returns: (dict)
726
        """
727
        r = self.account_get()
728
        return r.json
729

    
730
    def del_container(self, until=None, delimiter=None):
731
        """
732
        :param until: (str) formated date
733

734
        :param delimiter: (str) with / empty container
735

736
        :raises ClientError: 404 Container does not exist
737

738
        :raises ClientError: 409 Container is not empty
739
        """
740
        self._assert_container()
741
        r = self.container_delete(
742
            until=until,
743
            delimiter=delimiter,
744
            success=(204, 404, 409))
745
        r.release()
746
        if r.status_code == 404:
747
            raise ClientError(
748
                'Container "%s" does not exist' % self.container,
749
                r.status_code)
750
        elif r.status_code == 409:
751
            raise ClientError(
752
                'Container "%s" is not empty' % self.container,
753
                r.status_code)
754

    
755
    def get_container_versioning(self, container):
756
        """
757
        :param container: (str)
758

759
        :returns: (dict)
760
        """
761
        self.container = container
762
        return filter_in(
763
            self.get_container_info(),
764
            'X-Container-Policy-Versioning')
765

    
766
    def get_container_quota(self, container):
767
        """
768
        :param container: (str)
769

770
        :returns: (dict)
771
        """
772
        self.container = container
773
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
774

    
775
    def get_container_info(self, until=None):
776
        """
777
        :param until: (str) formated date
778

779
        :returns: (dict)
780

781
        :raises ClientError: 404 Container not found
782
        """
783
        try:
784
            r = self.container_head(until=until)
785
        except ClientError as err:
786
            err.details.append('for container %s' % self.container)
787
            print('Buh?')
788
            raise err
789
        return r.headers
790

    
791
    def get_container_meta(self, until=None):
792
        """
793
        :param until: (str) formated date
794

795
        :returns: (dict)
796
        """
797
        return filter_in(
798
            self.get_container_info(until=until),
799
            'X-Container-Meta')
800

    
801
    def get_container_object_meta(self, until=None):
802
        """
803
        :param until: (str) formated date
804

805
        :returns: (dict)
806
        """
807
        return filter_in(
808
            self.get_container_info(until=until),
809
            'X-Container-Object-Meta')
810

    
811
    def set_container_meta(self, metapairs):
812
        """
813
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
814
        """
815
        assert(type(metapairs) is dict)
816
        r = self.container_post(update=True, metadata=metapairs)
817
        r.release()
818

    
819
    def del_container_meta(self, metakey):
820
        """
821
        :param metakey: (str) metadatum key
822
        """
823
        r = self.container_post(update=True, metadata={metakey: ''})
824
        r.release()
825

    
826
    def set_container_quota(self, quota):
827
        """
828
        :param quota: (int)
829
        """
830
        r = self.container_post(update=True, quota=quota)
831
        r.release()
832

    
833
    def set_container_versioning(self, versioning):
834
        """
835
        :param versioning: (str)
836
        """
837
        r = self.container_post(update=True, versioning=versioning)
838
        r.release()
839

    
840
    def del_object(self, obj, until=None, delimiter=None):
841
        """
842
        :param obj: (str) remote object path
843

844
        :param until: (str) formated date
845

846
        :param delimiter: (str)
847
        """
848
        self._assert_container()
849
        r = self.object_delete(obj, until=until, delimiter=delimiter)
850
        r.release()
851

    
852
    def set_object_meta(self, obj, metapairs):
853
        """
854
        :param obj: (str) remote object path
855

856
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
857
        """
858
        assert(type(metapairs) is dict)
859
        r = self.object_post(obj, update=True, metadata=metapairs)
860
        r.release()
861

    
862
    def del_object_meta(self, obj, metakey):
863
        """
864
        :param obj: (str) remote object path
865

866
        :param metakey: (str) metadatum key
867
        """
868
        r = self.object_post(obj, update=True, metadata={metakey: ''})
869
        r.release()
870

    
871
    def publish_object(self, obj):
872
        """
873
        :param obj: (str) remote object path
874

875
        :returns: (str) access url
876
        """
877
        r = self.object_post(obj, update=True, public=True)
878
        r.release()
879
        info = self.get_object_info(obj)
880
        pref, sep, rest = self.base_url.partition('//')
881
        base = rest.split('/')[0]
882
        newurl = path4url(
883
            '%s%s%s' % (pref, sep, base),
884
            info['x-object-public'])
885
        return newurl[1:]
886

    
887
    def unpublish_object(self, obj):
888
        """
889
        :param obj: (str) remote object path
890
        """
891
        r = self.object_post(obj, update=True, public=False)
892
        r.release()
893

    
894
    def get_object_info(self, obj, version=None):
895
        """
896
        :param obj: (str) remote object path
897

898
        :param version: (str)
899

900
        :returns: (dict)
901
        """
902
        try:
903
            r = self.object_head(obj, version=version)
904
            return r.headers
905
        except ClientError as ce:
906
            if ce.status == 404:
907
                raise ClientError('Object not found', status=404)
908
            raise
909

    
910
    def get_object_meta(self, obj, version=None):
911
        """
912
        :param obj: (str) remote object path
913

914
        :param version: (str)
915

916
        :returns: (dict)
917
        """
918
        return filter_in(
919
            self.get_object_info(obj, version=version),
920
            'X-Object-Meta')
921

    
922
    def get_object_sharing(self, obj):
923
        """
924
        :param obj: (str) remote object path
925

926
        :returns: (dict)
927
        """
928
        r = filter_in(
929
            self.get_object_info(obj),
930
            'X-Object-Sharing',
931
            exactMatch=True)
932
        reply = {}
933
        if len(r) > 0:
934
            perms = r['x-object-sharing'].split(';')
935
            for perm in perms:
936
                try:
937
                    perm.index('=')
938
                except ValueError:
939
                    raise ClientError('Incorrect reply format')
940
                (key, val) = perm.strip().split('=')
941
                reply[key] = val
942
        return reply
943

    
944
    def set_object_sharing(
945
            self, obj,
946
            read_permition=False, write_permition=False):
947
        """Give read/write permisions to an object.
948

949
        :param obj: (str) remote object path
950

951
        :param read_permition: (list - bool) users and user groups that get
952
            read permition for this object - False means all previous read
953
            permissions will be removed
954

955
        :param write_perimition: (list - bool) of users and user groups to get
956
           write permition for this object - False means all previous write
957
           permissions will be removed
958
        """
959

    
960
        perms = dict(
961
            read='' if not read_permition else read_permition,
962
            write='' if not write_permition else write_permition)
963
        r = self.object_post(obj, update=True, permissions=perms)
964
        r.release()
965

    
966
    def del_object_sharing(self, obj):
967
        """
968
        :param obj: (str) remote object path
969
        """
970
        self.set_object_sharing(obj)
971

    
972
    def append_object(self, obj, source_file, upload_cb=None):
973
        """
974
        :param obj: (str) remote object path
975

976
        :param source_file: open file descriptor
977

978
        :param upload_db: progress.bar for uploading
979
        """
980

    
981
        self._assert_container()
982
        meta = self.get_container_info()
983
        blocksize = int(meta['x-container-block-size'])
984
        filesize = fstat(source_file.fileno()).st_size
985
        nblocks = 1 + (filesize - 1) // blocksize
986
        offset = 0
987
        if upload_cb:
988
            upload_gen = upload_cb(nblocks)
989
            upload_gen.next()
990
        for i in range(nblocks):
991
            block = source_file.read(min(blocksize, filesize - offset))
992
            offset += len(block)
993
            r = self.object_post(
994
                obj,
995
                update=True,
996
                content_range='bytes */*',
997
                content_type='application/octet-stream',
998
                content_length=len(block),
999
                data=block)
1000
            r.release()
1001

    
1002
            if upload_cb:
1003
                upload_gen.next()
1004

    
1005
    def truncate_object(self, obj, upto_bytes):
1006
        """
1007
        :param obj: (str) remote object path
1008

1009
        :param upto_bytes: max number of bytes to leave on file
1010
        """
1011
        r = self.object_post(
1012
            obj,
1013
            update=True,
1014
            content_range='bytes 0-%s/*' % upto_bytes,
1015
            content_type='application/octet-stream',
1016
            object_bytes=upto_bytes,
1017
            source_object=path4url(self.container, obj))
1018
        r.release()
1019

    
1020
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1021
        """Overwrite a part of an object from local source file
1022

1023
        :param obj: (str) remote object path
1024

1025
        :param start: (int) position in bytes to start overwriting from
1026

1027
        :param end: (int) position in bytes to stop overwriting at
1028

1029
        :param source_file: open file descriptor
1030

1031
        :param upload_db: progress.bar for uploading
1032
        """
1033

    
1034
        r = self.get_object_info(obj)
1035
        rf_size = int(r['content-length'])
1036
        if rf_size < int(start):
1037
            raise ClientError(
1038
                'Range start exceeds file size',
1039
                status=416)
1040
        elif rf_size < int(end):
1041
            raise ClientError(
1042
                'Range end exceeds file size',
1043
                status=416)
1044
        self._assert_container()
1045
        meta = self.get_container_info()
1046
        blocksize = int(meta['x-container-block-size'])
1047
        filesize = fstat(source_file.fileno()).st_size
1048
        datasize = int(end) - int(start) + 1
1049
        nblocks = 1 + (datasize - 1) // blocksize
1050
        offset = 0
1051
        if upload_cb:
1052
            upload_gen = upload_cb(nblocks)
1053
            upload_gen.next()
1054
        for i in range(nblocks):
1055
            read_size = min(blocksize, filesize - offset, datasize - offset)
1056
            block = source_file.read(read_size)
1057
            r = self.object_post(
1058
                obj,
1059
                update=True,
1060
                content_type='application/octet-stream',
1061
                content_length=len(block),
1062
                content_range='bytes %s-%s/*' % (
1063
                    start + offset,
1064
                    start + offset + len(block) - 1),
1065
                data=block)
1066
            offset += len(block)
1067
            r.release()
1068

    
1069
            if upload_cb:
1070
                upload_gen.next()
1071

    
1072
    def copy_object(
1073
            self, src_container, src_object, dst_container,
1074
            dst_object=False,
1075
            source_version=None,
1076
            source_account=None,
1077
            public=False,
1078
            content_type=None,
1079
            delimiter=None):
1080
        """
1081
        :param src_container: (str) source container
1082

1083
        :param src_object: (str) source object path
1084

1085
        :param dst_container: (str) destination container
1086

1087
        :param dst_object: (str) destination object path
1088

1089
        :param source_version: (str) source object version
1090

1091
        :param source_account: (str) account to copy from
1092

1093
        :param public: (bool)
1094

1095
        :param content_type: (str)
1096

1097
        :param delimiter: (str)
1098
        """
1099
        self._assert_account()
1100
        self.container = dst_container
1101
        dst_object = dst_object or src_object
1102
        src_path = path4url(src_container, src_object)
1103
        r = self.object_put(
1104
            dst_object,
1105
            success=201,
1106
            copy_from=src_path,
1107
            content_length=0,
1108
            source_version=source_version,
1109
            source_account=source_account,
1110
            public=public,
1111
            content_type=content_type,
1112
            delimiter=delimiter)
1113
        r.release()
1114

    
1115
    def move_object(
1116
            self, src_container, src_object, dst_container,
1117
            dst_object=False,
1118
            source_account=None,
1119
            source_version=None,
1120
            public=False,
1121
            content_type=None,
1122
            delimiter=None):
1123
        """
1124
        :param src_container: (str) source container
1125

1126
        :param src_object: (str) source object path
1127

1128
        :param dst_container: (str) destination container
1129

1130
        :param dst_object: (str) destination object path
1131

1132
        :param source_account: (str) account to move from
1133

1134
        :param source_version: (str) source object version
1135

1136
        :param public: (bool)
1137

1138
        :param content_type: (str)
1139

1140
        :param delimiter: (str)
1141
        """
1142
        self._assert_account()
1143
        self.container = dst_container
1144
        dst_object = dst_object or src_object
1145
        src_path = path4url(src_container, src_object)
1146
        r = self.object_put(
1147
            dst_object,
1148
            success=201,
1149
            move_from=src_path,
1150
            content_length=0,
1151
            source_account=source_account,
1152
            source_version=source_version,
1153
            public=public,
1154
            content_type=content_type,
1155
            delimiter=delimiter)
1156
        r.release()
1157

    
1158
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1159
        """Get accounts that share with self.account
1160

1161
        :param limit: (str)
1162

1163
        :param marker: (str)
1164

1165
        :returns: (dict)
1166
        """
1167
        self._assert_account()
1168

    
1169
        self.set_param('format', 'json')
1170
        self.set_param('limit', limit, iff=limit is not None)
1171
        self.set_param('marker', marker, iff=marker is not None)
1172

    
1173
        path = ''
1174
        success = kwargs.pop('success', (200, 204))
1175
        r = self.get(path, *args, success=success, **kwargs)
1176
        return r.json
1177

    
1178
    def get_object_versionlist(self, obj):
1179
        """
1180
        :param obj: (str) remote object path
1181

1182
        :returns: (list)
1183
        """
1184
        self._assert_container()
1185
        r = self.object_get(obj, format='json', version='list')
1186
        return r.json['versions']