Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 3a066af4

History | View | Annotate | Download (35.6 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self):
77
        """Delete an empty container and destroy associated blocks
78
        """
79
        r = self.container_delete(until=unicode(time()))
80
        r.release()
81

    
82
    def upload_object_unchunked(
83
            self, obj, f,
84
            withHashFile=False,
85
            size=None,
86
            etag=None,
87
            content_encoding=None,
88
            content_disposition=None,
89
            content_type=None,
90
            sharing=None,
91
            public=None):
92
        """
93
        :param obj: (str) remote object path
94

95
        :param f: open file descriptor
96

97
        :param withHashFile: (bool)
98

99
        :param size: (int) size of data to upload
100

101
        :param etag: (str)
102

103
        :param content_encoding: (str)
104

105
        :param content_disposition: (str)
106

107
        :param content_type: (str)
108

109
        :param sharing: {'read':[user and/or grp names],
110
            'write':[usr and/or grp names]}
111

112
        :param public: (bool)
113
        """
114
        self._assert_container()
115

    
116
        if withHashFile:
117
            data = f.read()
118
            try:
119
                import json
120
                data = json.dumps(json.loads(data))
121
            except ValueError:
122
                raise ClientError('"%s" is not json-formated' % f.name, 1)
123
            except SyntaxError:
124
                msg = '"%s" is not a valid hashmap file' % f.name
125
                raise ClientError(msg, 1)
126
            f = StringIO(data)
127
        data = f.read(size) if size is not None else f.read()
128
        r = self.object_put(
129
            obj,
130
            data=data,
131
            etag=etag,
132
            content_encoding=content_encoding,
133
            content_disposition=content_disposition,
134
            content_type=content_type,
135
            permissions=sharing,
136
            public=public,
137
            success=201)
138
        r.release()
139

    
140
    def create_object_by_manifestation(
141
            self, obj,
142
            etag=None,
143
            content_encoding=None,
144
            content_disposition=None,
145
            content_type=None,
146
            sharing=None,
147
            public=None):
148
        """
149
        :param obj: (str) remote object path
150

151
        :param etag: (str)
152

153
        :param content_encoding: (str)
154

155
        :param content_disposition: (str)
156

157
        :param content_type: (str)
158

159
        :param sharing: {'read':[user and/or grp names],
160
            'write':[usr and/or grp names]}
161

162
        :param public: (bool)
163
        """
164
        self._assert_container()
165
        r = self.object_put(
166
            obj,
167
            content_length=0,
168
            etag=etag,
169
            content_encoding=content_encoding,
170
            content_disposition=content_disposition,
171
            content_type=content_type,
172
            permissions=sharing,
173
            public=public,
174
            manifest='%s/%s' % (self.container, obj))
175
        r.release()
176

    
177
    # upload_* auxiliary methods
178
    def _put_block_async(self, data, hash, upload_gen=None):
179
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
180
        event.start()
181
        return event
182

    
183
    def _put_block(self, data, hash):
184
        from random import randint
185
        if not randint(0, 7):
186
            raise ClientError('BAD GATEWAY STUFF', 503)
187
        r = self.container_post(
188
            update=True,
189
            content_type='application/octet-stream',
190
            content_length=len(data),
191
            data=data,
192
            format='json')
193
        assert r.json[0] == hash, 'Local hash does not match server'
194

    
195
    def _get_file_block_info(self, fileobj, size=None):
196
        meta = self.get_container_info()
197
        blocksize = int(meta['x-container-block-size'])
198
        blockhash = meta['x-container-block-hash']
199
        size = size if size is not None else fstat(fileobj.fileno()).st_size
200
        nblocks = 1 + (size - 1) // blocksize
201
        return (blocksize, blockhash, size, nblocks)
202

    
203
    def _get_missing_hashes(
204
            self, obj, json,
205
            size=None,
206
            format='json',
207
            hashmap=True,
208
            content_type=None,
209
            etag=None,
210
            content_encoding=None,
211
            content_disposition=None,
212
            permissions=None,
213
            public=None,
214
            success=(201, 409)):
215
        r = self.object_put(
216
            obj,
217
            format='json',
218
            hashmap=True,
219
            content_type=content_type,
220
            json=json,
221
            etag=etag,
222
            content_encoding=content_encoding,
223
            content_disposition=content_disposition,
224
            permissions=permissions,
225
            public=public,
226
            success=success)
227
        if r.status_code == 201:
228
            r.release()
229
            return None
230
        return r.json
231

    
232
    def _caclulate_uploaded_blocks(
233
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
234
            hash_cb=None):
235
        offset = 0
236
        if hash_cb:
237
            hash_gen = hash_cb(nblocks)
238
            hash_gen.next()
239

    
240
        for i in range(nblocks):
241
            block = fileobj.read(min(blocksize, size - offset))
242
            bytes = len(block)
243
            hash = _pithos_hash(block, blockhash)
244
            hashes.append(hash)
245
            hmap[hash] = (offset, bytes)
246
            offset += bytes
247
            if hash_cb:
248
                hash_gen.next()
249
        msg = 'Failed to calculate uploaded blocks:'
250
        msg += ' Offset and object size do not match'
251
        assert offset == size, msg
252

    
253
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
254
        """upload missing blocks asynchronously"""
255

    
256
        self._init_thread_limit()
257

    
258
        flying = []
259
        failures = []
260
        for hash in missing:
261
            offset, bytes = hmap[hash]
262
            fileobj.seek(offset)
263
            data = fileobj.read(bytes)
264
            r = self._put_block_async(data, hash, upload_gen)
265
            flying.append(r)
266
            unfinished = self._watch_thread_limit(flying)
267
            for thread in set(flying).difference(unfinished):
268
                if thread.exception:
269
                    failures.append(thread)
270
                    if isinstance(
271
                            thread.exception,
272
                            ClientError) and thread.exception.status == 502:
273
                        self.POOLSIZE = self._thread_limit
274
                elif thread.isAlive():
275
                    flying.append(thread)
276
                elif upload_gen:
277
                    try:
278
                        upload_gen.next()
279
                    except:
280
                        pass
281
            flying = unfinished
282

    
283
        for thread in flying:
284
            thread.join()
285
            if thread.exception:
286
                failures.append(thread)
287
            elif upload_gen:
288
                try:
289
                    upload_gen.next()
290
                except:
291
                    pass
292

    
293
        return [failure.kwargs['hash'] for failure in failures]
294

    
295
    def upload_object(
296
            self, obj, f,
297
            size=None,
298
            hash_cb=None,
299
            upload_cb=None,
300
            etag=None,
301
            content_encoding=None,
302
            content_disposition=None,
303
            content_type=None,
304
            sharing=None,
305
            public=None):
306
        """Upload an object using multiple connections (threads)
307

308
        :param obj: (str) remote object path
309

310
        :param f: open file descriptor (rb)
311

312
        :param hash_cb: optional progress.bar object for calculating hashes
313

314
        :param upload_cb: optional progress.bar object for uploading
315

316
        :param etag: (str)
317

318
        :param content_encoding: (str)
319

320
        :param content_disposition: (str)
321

322
        :param content_type: (str)
323

324
        :param sharing: {'read':[user and/or grp names],
325
            'write':[usr and/or grp names]}
326

327
        :param public: (bool)
328
        """
329
        self._assert_container()
330

    
331
        #init
332
        block_info = (blocksize, blockhash, size, nblocks) =\
333
            self._get_file_block_info(f, size)
334
        (hashes, hmap, offset) = ([], {}, 0)
335
        if content_type is None:
336
            content_type = 'application/octet-stream'
337

    
338
        self._caclulate_uploaded_blocks(
339
            *block_info,
340
            hashes=hashes,
341
            hmap=hmap,
342
            fileobj=f,
343
            hash_cb=hash_cb)
344

    
345
        hashmap = dict(bytes=size, hashes=hashes)
346
        missing = self._get_missing_hashes(
347
            obj, hashmap,
348
            content_type=content_type,
349
            size=size,
350
            etag=etag,
351
            content_encoding=content_encoding,
352
            content_disposition=content_disposition,
353
            permissions=sharing,
354
            public=public)
355

    
356
        if missing is None:
357
            return
358

    
359
        if upload_cb:
360
            upload_gen = upload_cb(len(missing))
361
            for i in range(len(missing), len(hashmap['hashes']) + 1):
362
                try:
363
                    upload_gen.next()
364
                except:
365
                    upload_gen = None
366
        else:
367
            upload_gen = None
368

    
369
        retries = 7
370
        try:
371
            while retries:
372
                sendlog.info('%s blocks missing' % len(missing))
373
                num_of_blocks = len(missing)
374
                missing = self._upload_missing_blocks(
375
                    missing,
376
                    hmap,
377
                    f,
378
                    upload_gen)
379
                if missing:
380
                    if num_of_blocks == len(missing):
381
                        retries -= 1
382
                    else:
383
                        num_of_blocks = len(missing)
384
                else:
385
                    break
386
            if missing:
387
                raise ClientError(
388
                    '%s blocks failed to upload' % len(missing),
389
                    status=800)
390
        except KeyboardInterrupt:
391
            sendlog.info('- - - wait for threads to finish')
392
            for thread in activethreads():
393
                thread.join()
394
            raise
395

    
396
        r = self.object_put(
397
            obj,
398
            format='json',
399
            hashmap=True,
400
            content_type=content_type,
401
            json=hashmap,
402
            success=201)
403
        r.release()
404

    
405
    # download_* auxiliary methods
406
    def _get_remote_blocks_info(self, obj, **restargs):
407
        #retrieve object hashmap
408
        myrange = restargs.pop('data_range', None)
409
        hashmap = self.get_object_hashmap(obj, **restargs)
410
        restargs['data_range'] = myrange
411
        blocksize = int(hashmap['block_size'])
412
        blockhash = hashmap['block_hash']
413
        total_size = hashmap['bytes']
414
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
415
        map_dict = {}
416
        for i, h in enumerate(hashmap['hashes']):
417
            map_dict[h] = i
418
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
419

    
420
    def _dump_blocks_sync(
421
            self, obj, remote_hashes, blocksize, total_size, dst, range,
422
            **args):
423
        for blockid, blockhash in enumerate(remote_hashes):
424
            if blockhash:
425
                start = blocksize * blockid
426
                is_last = start + blocksize > total_size
427
                end = (total_size - 1) if is_last else (start + blocksize - 1)
428
                (start, end) = _range_up(start, end, range)
429
                args['data_range'] = 'bytes=%s-%s' % (start, end)
430
                r = self.object_get(obj, success=(200, 206), **args)
431
                self._cb_next()
432
                dst.write(r.content)
433
                dst.flush()
434

    
435
    def _get_block_async(self, obj, **args):
436
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
437
        event.start()
438
        return event
439

    
440
    def _hash_from_file(self, fp, start, size, blockhash):
441
        fp.seek(start)
442
        block = fp.read(size)
443
        h = newhashlib(blockhash)
444
        h.update(block.strip('\x00'))
445
        return hexlify(h.digest())
446

    
447
    def _thread2file(self, flying, local_file, offset=0, **restargs):
448
        """write the results of a greenleted rest call to a file
449

450
        :param offset: the offset of the file up to blocksize
451
        - e.g. if the range is 10-100, all blocks will be written to
452
        normal_position - 10
453
        """
454
        finished = []
455
        for i, (start, g) in enumerate(flying.items()):
456
            if not g.isAlive():
457
                if g.exception:
458
                    raise g.exception
459
                block = g.value.content
460
                local_file.seek(start - offset)
461
                local_file.write(block)
462
                self._cb_next()
463
                finished.append(flying.pop(start))
464
        local_file.flush()
465
        return finished
466

    
467
    def _dump_blocks_async(
468
            self, obj, remote_hashes, blocksize, total_size, local_file,
469
            blockhash=None, resume=False, filerange=None, **restargs):
470
        file_size = fstat(local_file.fileno()).st_size if resume else 0
471
        flying = {}
472
        finished = []
473
        offset = 0
474
        if filerange is not None:
475
            rstart = int(filerange.split('-')[0])
476
            offset = rstart if blocksize > rstart else rstart % blocksize
477

    
478
        self._init_thread_limit()
479
        for block_hash, blockid in remote_hashes.items():
480
            start = blocksize * blockid
481
            if start < file_size and block_hash == self._hash_from_file(
482
                    local_file, start, blocksize, blockhash):
483
                self._cb_next()
484
                continue
485
            self._watch_thread_limit(flying.values())
486
            finished += self._thread2file(
487
                flying,
488
                local_file,
489
                offset,
490
                **restargs)
491
            end = total_size - 1 if start + blocksize > total_size\
492
                else start + blocksize - 1
493
            (start, end) = _range_up(start, end, filerange)
494
            if start == end:
495
                self._cb_next()
496
                continue
497
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
498
            flying[start] = self._get_block_async(obj, **restargs)
499

    
500
        for thread in flying.values():
501
            thread.join()
502
        finished += self._thread2file(flying, local_file, offset, **restargs)
503

    
504
    def download_object(
505
            self, obj, dst,
506
            download_cb=None,
507
            version=None,
508
            resume=False,
509
            range_str=None,
510
            if_match=None,
511
            if_none_match=None,
512
            if_modified_since=None,
513
            if_unmodified_since=None):
514
        """Download an object (multiple connections, random blocks)
515

516
        :param obj: (str) remote object path
517

518
        :param dst: open file descriptor (wb+)
519

520
        :param download_cb: optional progress.bar object for downloading
521

522
        :param version: (str) file version
523

524
        :param resume: (bool) if set, preserve already downloaded file parts
525

526
        :param range_str: (str) from, to are file positions (int) in bytes
527

528
        :param if_match: (str)
529

530
        :param if_none_match: (str)
531

532
        :param if_modified_since: (str) formated date
533

534
        :param if_unmodified_since: (str) formated date"""
535
        restargs = dict(
536
            version=version,
537
            data_range=None if range_str is None else 'bytes=%s' % range_str,
538
            if_match=if_match,
539
            if_none_match=if_none_match,
540
            if_modified_since=if_modified_since,
541
            if_unmodified_since=if_unmodified_since)
542

    
543
        (
544
            blocksize,
545
            blockhash,
546
            total_size,
547
            hash_list,
548
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
549
        assert total_size >= 0
550

    
551
        if download_cb:
552
            self.progress_bar_gen = download_cb(len(remote_hashes))
553
            self._cb_next()
554

    
555
        if dst.isatty():
556
            self._dump_blocks_sync(
557
                obj,
558
                hash_list,
559
                blocksize,
560
                total_size,
561
                dst,
562
                range_str,
563
                **restargs)
564
        else:
565
            self._dump_blocks_async(
566
                obj,
567
                remote_hashes,
568
                blocksize,
569
                total_size,
570
                dst,
571
                blockhash,
572
                resume,
573
                range_str,
574
                **restargs)
575
            if not range_str:
576
                dst.truncate(total_size)
577

    
578
        self._complete_cb()
579

    
580
    #Command Progress Bar method
581
    def _cb_next(self):
582
        if hasattr(self, 'progress_bar_gen'):
583
            try:
584
                self.progress_bar_gen.next()
585
            except:
586
                pass
587

    
588
    def _complete_cb(self):
589
        while True:
590
            try:
591
                self.progress_bar_gen.next()
592
            except:
593
                break
594

    
595
    def get_object_hashmap(
596
            self, obj,
597
            version=None,
598
            if_match=None,
599
            if_none_match=None,
600
            if_modified_since=None,
601
            if_unmodified_since=None,
602
            data_range=None):
603
        """
604
        :param obj: (str) remote object path
605

606
        :param if_match: (str)
607

608
        :param if_none_match: (str)
609

610
        :param if_modified_since: (str) formated date
611

612
        :param if_unmodified_since: (str) formated date
613

614
        :param data_range: (str) from-to where from and to are integers
615
            denoting file positions in bytes
616

617
        :returns: (list)
618
        """
619
        try:
620
            r = self.object_get(
621
                obj,
622
                hashmap=True,
623
                version=version,
624
                if_etag_match=if_match,
625
                if_etag_not_match=if_none_match,
626
                if_modified_since=if_modified_since,
627
                if_unmodified_since=if_unmodified_since,
628
                data_range=data_range)
629
        except ClientError as err:
630
            if err.status == 304 or err.status == 412:
631
                return {}
632
            raise
633
        return r.json
634

    
635
    def set_account_group(self, group, usernames):
636
        """
637
        :param group: (str)
638

639
        :param usernames: (list)
640
        """
641
        r = self.account_post(update=True, groups={group: usernames})
642
        r.release()
643

    
644
    def del_account_group(self, group):
645
        """
646
        :param group: (str)
647
        """
648
        r = self.account_post(update=True, groups={group: []})
649
        r.release()
650

    
651
    def get_account_info(self, until=None):
652
        """
653
        :param until: (str) formated date
654

655
        :returns: (dict)
656
        """
657
        r = self.account_head(until=until)
658
        if r.status_code == 401:
659
            raise ClientError("No authorization")
660
        return r.headers
661

    
662
    def get_account_quota(self):
663
        """
664
        :returns: (dict)
665
        """
666
        return filter_in(
667
            self.get_account_info(),
668
            'X-Account-Policy-Quota',
669
            exactMatch=True)
670

    
671
    def get_account_versioning(self):
672
        """
673
        :returns: (dict)
674
        """
675
        return filter_in(
676
            self.get_account_info(),
677
            'X-Account-Policy-Versioning',
678
            exactMatch=True)
679

    
680
    def get_account_meta(self, until=None):
681
        """
682
        :meta until: (str) formated date
683

684
        :returns: (dict)
685
        """
686
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
687

    
688
    def get_account_group(self):
689
        """
690
        :returns: (dict)
691
        """
692
        return filter_in(self.get_account_info(), 'X-Account-Group-')
693

    
694
    def set_account_meta(self, metapairs):
695
        """
696
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
697
        """
698
        assert(type(metapairs) is dict)
699
        r = self.account_post(update=True, metadata=metapairs)
700
        r.release()
701

    
702
    def del_account_meta(self, metakey):
703
        """
704
        :param metakey: (str) metadatum key
705
        """
706
        r = self.account_post(update=True, metadata={metakey: ''})
707
        r.release()
708

    
709
    def set_account_quota(self, quota):
710
        """
711
        :param quota: (int)
712
        """
713
        r = self.account_post(update=True, quota=quota)
714
        r.release()
715

    
716
    def set_account_versioning(self, versioning):
717
        """
718
        "param versioning: (str)
719
        """
720
        r = self.account_post(update=True, versioning=versioning)
721
        r.release()
722

    
723
    def list_containers(self):
724
        """
725
        :returns: (dict)
726
        """
727
        r = self.account_get()
728
        return r.json
729

    
730
    def del_container(self, until=None, delimiter=None):
731
        """
732
        :param until: (str) formated date
733

734
        :param delimiter: (str) with / empty container
735

736
        :raises ClientError: 404 Container does not exist
737

738
        :raises ClientError: 409 Container is not empty
739
        """
740
        self._assert_container()
741
        r = self.container_delete(
742
            until=until,
743
            delimiter=delimiter,
744
            success=(204, 404, 409))
745
        r.release()
746
        if r.status_code == 404:
747
            raise ClientError(
748
                'Container "%s" does not exist' % self.container,
749
                r.status_code)
750
        elif r.status_code == 409:
751
            raise ClientError(
752
                'Container "%s" is not empty' % self.container,
753
                r.status_code)
754

    
755
    def get_container_versioning(self, container):
756
        """
757
        :param container: (str)
758

759
        :returns: (dict)
760
        """
761
        self.container = container
762
        return filter_in(
763
            self.get_container_info(),
764
            'X-Container-Policy-Versioning')
765

    
766
    def get_container_quota(self, container):
767
        """
768
        :param container: (str)
769

770
        :returns: (dict)
771
        """
772
        self.container = container
773
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
774

    
775
    def get_container_info(self, until=None):
776
        """
777
        :param until: (str) formated date
778

779
        :returns: (dict)
780

781
        :raises ClientError: 404 Container not found
782
        """
783
        try:
784
            r = self.container_head(until=until)
785
        except ClientError as err:
786
            err.details.append('for container %s' % self.container)
787
            raise err
788
        return r.headers
789

    
790
    def get_container_meta(self, until=None):
791
        """
792
        :param until: (str) formated date
793

794
        :returns: (dict)
795
        """
796
        return filter_in(
797
            self.get_container_info(until=until),
798
            'X-Container-Meta')
799

    
800
    def get_container_object_meta(self, until=None):
801
        """
802
        :param until: (str) formated date
803

804
        :returns: (dict)
805
        """
806
        return filter_in(
807
            self.get_container_info(until=until),
808
            'X-Container-Object-Meta')
809

    
810
    def set_container_meta(self, metapairs):
811
        """
812
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
813
        """
814
        assert(type(metapairs) is dict)
815
        r = self.container_post(update=True, metadata=metapairs)
816
        r.release()
817

    
818
    def del_container_meta(self, metakey):
819
        """
820
        :param metakey: (str) metadatum key
821
        """
822
        r = self.container_post(update=True, metadata={metakey: ''})
823
        r.release()
824

    
825
    def set_container_quota(self, quota):
826
        """
827
        :param quota: (int)
828
        """
829
        r = self.container_post(update=True, quota=quota)
830
        r.release()
831

    
832
    def set_container_versioning(self, versioning):
833
        """
834
        :param versioning: (str)
835
        """
836
        r = self.container_post(update=True, versioning=versioning)
837
        r.release()
838

    
839
    def del_object(self, obj, until=None, delimiter=None):
840
        """
841
        :param obj: (str) remote object path
842

843
        :param until: (str) formated date
844

845
        :param delimiter: (str)
846
        """
847
        self._assert_container()
848
        r = self.object_delete(obj, until=until, delimiter=delimiter)
849
        r.release()
850

    
851
    def set_object_meta(self, obj, metapairs):
852
        """
853
        :param obj: (str) remote object path
854

855
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
856
        """
857
        assert(type(metapairs) is dict)
858
        r = self.object_post(obj, update=True, metadata=metapairs)
859
        r.release()
860

    
861
    def del_object_meta(self, obj, metakey):
862
        """
863
        :param obj: (str) remote object path
864

865
        :param metakey: (str) metadatum key
866
        """
867
        r = self.object_post(obj, update=True, metadata={metakey: ''})
868
        r.release()
869

    
870
    def publish_object(self, obj):
871
        """
872
        :param obj: (str) remote object path
873

874
        :returns: (str) access url
875
        """
876
        r = self.object_post(obj, update=True, public=True)
877
        r.release()
878
        info = self.get_object_info(obj)
879
        pref, sep, rest = self.base_url.partition('//')
880
        base = rest.split('/')[0]
881
        newurl = path4url(
882
            '%s%s%s' % (pref, sep, base),
883
            info['x-object-public'])
884
        return newurl[1:]
885

    
886
    def unpublish_object(self, obj):
887
        """
888
        :param obj: (str) remote object path
889
        """
890
        r = self.object_post(obj, update=True, public=False)
891
        r.release()
892

    
893
    def get_object_info(self, obj, version=None):
894
        """
895
        :param obj: (str) remote object path
896

897
        :param version: (str)
898

899
        :returns: (dict)
900
        """
901
        try:
902
            r = self.object_head(obj, version=version)
903
            return r.headers
904
        except ClientError as ce:
905
            if ce.status == 404:
906
                raise ClientError('Object not found', status=404)
907
            raise
908

    
909
    def get_object_meta(self, obj, version=None):
910
        """
911
        :param obj: (str) remote object path
912

913
        :param version: (str)
914

915
        :returns: (dict)
916
        """
917
        return filter_in(
918
            self.get_object_info(obj, version=version),
919
            'X-Object-Meta')
920

    
921
    def get_object_sharing(self, obj):
922
        """
923
        :param obj: (str) remote object path
924

925
        :returns: (dict)
926
        """
927
        r = filter_in(
928
            self.get_object_info(obj),
929
            'X-Object-Sharing',
930
            exactMatch=True)
931
        reply = {}
932
        if len(r) > 0:
933
            perms = r['x-object-sharing'].split(';')
934
            for perm in perms:
935
                try:
936
                    perm.index('=')
937
                except ValueError:
938
                    raise ClientError('Incorrect reply format')
939
                (key, val) = perm.strip().split('=')
940
                reply[key] = val
941
        return reply
942

    
943
    def set_object_sharing(
944
            self, obj,
945
            read_permition=False, write_permition=False):
946
        """Give read/write permisions to an object.
947

948
        :param obj: (str) remote object path
949

950
        :param read_permition: (list - bool) users and user groups that get
951
            read permition for this object - False means all previous read
952
            permissions will be removed
953

954
        :param write_perimition: (list - bool) of users and user groups to get
955
           write permition for this object - False means all previous write
956
           permissions will be removed
957
        """
958

    
959
        perms = dict(
960
            read='' if not read_permition else read_permition,
961
            write='' if not write_permition else write_permition)
962
        r = self.object_post(obj, update=True, permissions=perms)
963
        r.release()
964

    
965
    def del_object_sharing(self, obj):
966
        """
967
        :param obj: (str) remote object path
968
        """
969
        self.set_object_sharing(obj)
970

    
971
    def append_object(self, obj, source_file, upload_cb=None):
972
        """
973
        :param obj: (str) remote object path
974

975
        :param source_file: open file descriptor
976

977
        :param upload_db: progress.bar for uploading
978
        """
979

    
980
        self._assert_container()
981
        meta = self.get_container_info()
982
        blocksize = int(meta['x-container-block-size'])
983
        filesize = fstat(source_file.fileno()).st_size
984
        nblocks = 1 + (filesize - 1) // blocksize
985
        offset = 0
986
        if upload_cb:
987
            upload_gen = upload_cb(nblocks)
988
            upload_gen.next()
989
        for i in range(nblocks):
990
            block = source_file.read(min(blocksize, filesize - offset))
991
            offset += len(block)
992
            r = self.object_post(
993
                obj,
994
                update=True,
995
                content_range='bytes */*',
996
                content_type='application/octet-stream',
997
                content_length=len(block),
998
                data=block)
999
            r.release()
1000

    
1001
            if upload_cb:
1002
                upload_gen.next()
1003

    
1004
    def truncate_object(self, obj, upto_bytes):
1005
        """
1006
        :param obj: (str) remote object path
1007

1008
        :param upto_bytes: max number of bytes to leave on file
1009
        """
1010
        r = self.object_post(
1011
            obj,
1012
            update=True,
1013
            content_range='bytes 0-%s/*' % upto_bytes,
1014
            content_type='application/octet-stream',
1015
            object_bytes=upto_bytes,
1016
            source_object=path4url(self.container, obj))
1017
        r.release()
1018

    
1019
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1020
        """Overwrite a part of an object from local source file
1021

1022
        :param obj: (str) remote object path
1023

1024
        :param start: (int) position in bytes to start overwriting from
1025

1026
        :param end: (int) position in bytes to stop overwriting at
1027

1028
        :param source_file: open file descriptor
1029

1030
        :param upload_db: progress.bar for uploading
1031
        """
1032

    
1033
        r = self.get_object_info(obj)
1034
        rf_size = int(r['content-length'])
1035
        if rf_size < int(start):
1036
            raise ClientError(
1037
                'Range start exceeds file size',
1038
                status=416)
1039
        elif rf_size < int(end):
1040
            raise ClientError(
1041
                'Range end exceeds file size',
1042
                status=416)
1043
        self._assert_container()
1044
        meta = self.get_container_info()
1045
        blocksize = int(meta['x-container-block-size'])
1046
        filesize = fstat(source_file.fileno()).st_size
1047
        datasize = int(end) - int(start) + 1
1048
        nblocks = 1 + (datasize - 1) // blocksize
1049
        offset = 0
1050
        if upload_cb:
1051
            upload_gen = upload_cb(nblocks)
1052
            upload_gen.next()
1053
        for i in range(nblocks):
1054
            read_size = min(blocksize, filesize - offset, datasize - offset)
1055
            block = source_file.read(read_size)
1056
            r = self.object_post(
1057
                obj,
1058
                update=True,
1059
                content_type='application/octet-stream',
1060
                content_length=len(block),
1061
                content_range='bytes %s-%s/*' % (
1062
                    start + offset,
1063
                    start + offset + len(block) - 1),
1064
                data=block)
1065
            offset += len(block)
1066
            r.release()
1067

    
1068
            if upload_cb:
1069
                upload_gen.next()
1070

    
1071
    def copy_object(
1072
            self, src_container, src_object, dst_container,
1073
            dst_object=False,
1074
            source_version=None,
1075
            source_account=None,
1076
            public=False,
1077
            content_type=None,
1078
            delimiter=None):
1079
        """
1080
        :param src_container: (str) source container
1081

1082
        :param src_object: (str) source object path
1083

1084
        :param dst_container: (str) destination container
1085

1086
        :param dst_object: (str) destination object path
1087

1088
        :param source_version: (str) source object version
1089

1090
        :param source_account: (str) account to copy from
1091

1092
        :param public: (bool)
1093

1094
        :param content_type: (str)
1095

1096
        :param delimiter: (str)
1097
        """
1098
        self._assert_account()
1099
        self.container = dst_container
1100
        dst_object = dst_object or src_object
1101
        src_path = path4url(src_container, src_object)
1102
        r = self.object_put(
1103
            dst_object,
1104
            success=201,
1105
            copy_from=src_path,
1106
            content_length=0,
1107
            source_version=source_version,
1108
            source_account=source_account,
1109
            public=public,
1110
            content_type=content_type,
1111
            delimiter=delimiter)
1112
        r.release()
1113

    
1114
    def move_object(
1115
            self, src_container, src_object, dst_container,
1116
            dst_object=False,
1117
            source_version=None,
1118
            public=False,
1119
            content_type=None,
1120
            delimiter=None):
1121
        """
1122
        :param src_container: (str) source container
1123

1124
        :param src_object: (str) source object path
1125

1126
        :param dst_container: (str) destination container
1127

1128
        :param dst_object: (str) destination object path
1129

1130
        :param source_version: (str) source object version
1131

1132
        :param public: (bool)
1133

1134
        :param content_type: (str)
1135

1136
        :param delimiter: (str)
1137
        """
1138
        self._assert_account()
1139
        self.container = dst_container
1140
        dst_object = dst_object or src_object
1141
        src_path = path4url(src_container, src_object)
1142
        r = self.object_put(
1143
            dst_object,
1144
            success=201,
1145
            move_from=src_path,
1146
            content_length=0,
1147
            source_version=source_version,
1148
            public=public,
1149
            content_type=content_type,
1150
            delimiter=delimiter)
1151
        r.release()
1152

    
1153
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1154
        """Get accounts that share with self.account
1155

1156
        :param limit: (str)
1157

1158
        :param marker: (str)
1159

1160
        :returns: (dict)
1161
        """
1162
        self._assert_account()
1163

    
1164
        self.set_param('format', 'json')
1165
        self.set_param('limit', limit, iff=limit is not None)
1166
        self.set_param('marker', marker, iff=marker is not None)
1167

    
1168
        path = ''
1169
        success = kwargs.pop('success', (200, 204))
1170
        r = self.get(path, *args, success=success, **kwargs)
1171
        return r.json
1172

    
1173
    def get_object_versionlist(self, obj):
1174
        """
1175
        :param obj: (str) remote object path
1176

1177
        :returns: (list)
1178
        """
1179
        self._assert_container()
1180
        r = self.object_get(obj, format='json', version='list')
1181
        return r.json['versions']