Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ ae4585f5

History | View | Annotate | Download (35.9 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def purge_container(self, container=None):
75
        """Delete an empty container and destroy associated blocks
76
        """
77
        cnt_back_up = self.container
78
        try:
79
            self.container = container or cnt_back_up
80
            r = self.container_delete(until=unicode(time()))
81
        finally:
82
            self.container = cnt_back_up
83
        r.release()
84

    
85
    def upload_object_unchunked(
86
            self, obj, f,
87
            withHashFile=False,
88
            size=None,
89
            etag=None,
90
            content_encoding=None,
91
            content_disposition=None,
92
            content_type=None,
93
            sharing=None,
94
            public=None):
95
        """
96
        :param obj: (str) remote object path
97

98
        :param f: open file descriptor
99

100
        :param withHashFile: (bool)
101

102
        :param size: (int) size of data to upload
103

104
        :param etag: (str)
105

106
        :param content_encoding: (str)
107

108
        :param content_disposition: (str)
109

110
        :param content_type: (str)
111

112
        :param sharing: {'read':[user and/or grp names],
113
            'write':[usr and/or grp names]}
114

115
        :param public: (bool)
116
        """
117
        self._assert_container()
118

    
119
        if withHashFile:
120
            data = f.read()
121
            try:
122
                import json
123
                data = json.dumps(json.loads(data))
124
            except ValueError:
125
                raise ClientError('"%s" is not json-formated' % f.name, 1)
126
            except SyntaxError:
127
                msg = '"%s" is not a valid hashmap file' % f.name
128
                raise ClientError(msg, 1)
129
            f = StringIO(data)
130
        else:
131
            data = f.read(size) if size else f.read()
132
        r = self.object_put(
133
            obj,
134
            data=data,
135
            etag=etag,
136
            content_encoding=content_encoding,
137
            content_disposition=content_disposition,
138
            content_type=content_type,
139
            permissions=sharing,
140
            public=public,
141
            success=201)
142
        r.release()
143

    
144
    def create_object_by_manifestation(
145
            self, obj,
146
            etag=None,
147
            content_encoding=None,
148
            content_disposition=None,
149
            content_type=None,
150
            sharing=None,
151
            public=None):
152
        """
153
        :param obj: (str) remote object path
154

155
        :param etag: (str)
156

157
        :param content_encoding: (str)
158

159
        :param content_disposition: (str)
160

161
        :param content_type: (str)
162

163
        :param sharing: {'read':[user and/or grp names],
164
            'write':[usr and/or grp names]}
165

166
        :param public: (bool)
167
        """
168
        self._assert_container()
169
        r = self.object_put(
170
            obj,
171
            content_length=0,
172
            etag=etag,
173
            content_encoding=content_encoding,
174
            content_disposition=content_disposition,
175
            content_type=content_type,
176
            permissions=sharing,
177
            public=public,
178
            manifest='%s/%s' % (self.container, obj))
179
        r.release()
180

    
181
    # upload_* auxiliary methods
182
    def _put_block_async(self, data, hash, upload_gen=None):
183
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
184
        event.start()
185
        return event
186

    
187
    def _put_block(self, data, hash):
188
        r = self.container_post(
189
            update=True,
190
            content_type='application/octet-stream',
191
            content_length=len(data),
192
            data=data,
193
            format='json')
194
        assert r.json[0] == hash, 'Local hash does not match server'
195

    
196
    def _get_file_block_info(self, fileobj, size=None):
197
        meta = self.get_container_info()
198
        blocksize = int(meta['x-container-block-size'])
199
        blockhash = meta['x-container-block-hash']
200
        size = size if size is not None else fstat(fileobj.fileno()).st_size
201
        nblocks = 1 + (size - 1) // blocksize
202
        return (blocksize, blockhash, size, nblocks)
203

    
204
    def _get_missing_hashes(
205
            self, obj, json,
206
            size=None,
207
            format='json',
208
            hashmap=True,
209
            content_type=None,
210
            etag=None,
211
            content_encoding=None,
212
            content_disposition=None,
213
            permissions=None,
214
            public=None,
215
            success=(201, 409)):
216
        r = self.object_put(
217
            obj,
218
            format='json',
219
            hashmap=True,
220
            content_type=content_type,
221
            json=json,
222
            etag=etag,
223
            content_encoding=content_encoding,
224
            content_disposition=content_disposition,
225
            permissions=permissions,
226
            public=public,
227
            success=success)
228
        if r.status_code == 201:
229
            r.release()
230
            return None
231
        return r.json
232

    
233
    def _culculate_blocks_for_upload(
234
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
235
            hash_cb=None):
236
        offset = 0
237
        if hash_cb:
238
            hash_gen = hash_cb(nblocks)
239
            hash_gen.next()
240

    
241
        for i in range(nblocks):
242
            block = fileobj.read(min(blocksize, size - offset))
243
            bytes = len(block)
244
            hash = _pithos_hash(block, blockhash)
245
            hashes.append(hash)
246
            hmap[hash] = (offset, bytes)
247
            offset += bytes
248
            if hash_cb:
249
                hash_gen.next()
250
        msg = 'Failed to calculate uploaded blocks:'
251
        ' Offset and object size do not match'
252
        assert offset == size, msg
253

    
254
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
255
        """upload missing blocks asynchronously"""
256

    
257
        self._init_thread_limit()
258

    
259
        flying = []
260
        failures = []
261
        for hash in missing:
262
            offset, bytes = hmap[hash]
263
            fileobj.seek(offset)
264
            data = fileobj.read(bytes)
265
            r = self._put_block_async(data, hash, upload_gen)
266
            flying.append(r)
267
            unfinished = self._watch_thread_limit(flying)
268
            for thread in set(flying).difference(unfinished):
269
                if thread.exception:
270
                    failures.append(thread)
271
                    if isinstance(
272
                            thread.exception,
273
                            ClientError) and thread.exception.status == 502:
274
                        self.POOLSIZE = self._thread_limit
275
                elif thread.isAlive():
276
                    flying.append(thread)
277
                elif upload_gen:
278
                    try:
279
                        upload_gen.next()
280
                    except:
281
                        pass
282
            flying = unfinished
283

    
284
        for thread in flying:
285
            thread.join()
286
            if thread.exception:
287
                failures.append(thread)
288
            elif upload_gen:
289
                try:
290
                    upload_gen.next()
291
                except:
292
                    pass
293

    
294
        return [failure.kwargs['hash'] for failure in failures]
295

    
296
    def upload_object(
297
            self, obj, f,
298
            size=None,
299
            hash_cb=None,
300
            upload_cb=None,
301
            etag=None,
302
            content_encoding=None,
303
            content_disposition=None,
304
            content_type=None,
305
            sharing=None,
306
            public=None):
307
        """Upload an object using multiple connections (threads)
308

309
        :param obj: (str) remote object path
310

311
        :param f: open file descriptor (rb)
312

313
        :param hash_cb: optional progress.bar object for calculating hashes
314

315
        :param upload_cb: optional progress.bar object for uploading
316

317
        :param etag: (str)
318

319
        :param content_encoding: (str)
320

321
        :param content_disposition: (str)
322

323
        :param content_type: (str)
324

325
        :param sharing: {'read':[user and/or grp names],
326
            'write':[usr and/or grp names]}
327

328
        :param public: (bool)
329
        """
330
        self._assert_container()
331

    
332
        #init
333
        block_info = (blocksize, blockhash, size, nblocks) =\
334
            self._get_file_block_info(f, size)
335
        (hashes, hmap, offset) = ([], {}, 0)
336
        if not content_type:
337
            content_type = 'application/octet-stream'
338

    
339
        self._culculate_blocks_for_upload(
340
            *block_info,
341
            hashes=hashes,
342
            hmap=hmap,
343
            fileobj=f,
344
            hash_cb=hash_cb)
345

    
346
        hashmap = dict(bytes=size, hashes=hashes)
347
        missing = self._get_missing_hashes(
348
            obj, hashmap,
349
            content_type=content_type,
350
            size=size,
351
            etag=etag,
352
            content_encoding=content_encoding,
353
            content_disposition=content_disposition,
354
            permissions=sharing,
355
            public=public)
356

    
357
        if missing is None:
358
            return
359

    
360
        if upload_cb:
361
            upload_gen = upload_cb(len(missing))
362
            for i in range(len(missing), len(hashmap['hashes']) + 1):
363
                try:
364
                    upload_gen.next()
365
                except:
366
                    upload_gen = None
367
        else:
368
            upload_gen = None
369

    
370
        retries = 7
371
        try:
372
            while retries:
373
                sendlog.info('%s blocks missing' % len(missing))
374
                num_of_blocks = len(missing)
375
                missing = self._upload_missing_blocks(
376
                    missing,
377
                    hmap,
378
                    f,
379
                    upload_gen)
380
                if missing:
381
                    if num_of_blocks == len(missing):
382
                        retries -= 1
383
                    else:
384
                        num_of_blocks = len(missing)
385
                else:
386
                    break
387
            if missing:
388
                raise ClientError(
389
                    '%s blocks failed to upload' % len(missing),
390
                    status=800)
391
        except KeyboardInterrupt:
392
            sendlog.info('- - - wait for threads to finish')
393
            for thread in activethreads():
394
                thread.join()
395
            raise
396

    
397
        r = self.object_put(
398
            obj,
399
            format='json',
400
            hashmap=True,
401
            content_type=content_type,
402
            json=hashmap,
403
            success=201)
404
        r.release()
405

    
406
    # download_* auxiliary methods
407
    def _get_remote_blocks_info(self, obj, **restargs):
408
        #retrieve object hashmap
409
        myrange = restargs.pop('data_range', None)
410
        hashmap = self.get_object_hashmap(obj, **restargs)
411
        restargs['data_range'] = myrange
412
        blocksize = int(hashmap['block_size'])
413
        blockhash = hashmap['block_hash']
414
        total_size = hashmap['bytes']
415
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
416
        map_dict = {}
417
        for i, h in enumerate(hashmap['hashes']):
418
            map_dict[h] = i
419
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
420

    
421
    def _dump_blocks_sync(
422
            self, obj, remote_hashes, blocksize, total_size, dst, range,
423
            **args):
424
        for blockid, blockhash in enumerate(remote_hashes):
425
            if blockhash:
426
                start = blocksize * blockid
427
                is_last = start + blocksize > total_size
428
                end = (total_size - 1) if is_last else (start + blocksize - 1)
429
                (start, end) = _range_up(start, end, range)
430
                args['data_range'] = 'bytes=%s-%s' % (start, end)
431
                r = self.object_get(obj, success=(200, 206), **args)
432
                self._cb_next()
433
                dst.write(r.content)
434
                dst.flush()
435

    
436
    def _get_block_async(self, obj, **args):
437
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
438
        event.start()
439
        return event
440

    
441
    def _hash_from_file(self, fp, start, size, blockhash):
442
        fp.seek(start)
443
        block = fp.read(size)
444
        h = newhashlib(blockhash)
445
        h.update(block.strip('\x00'))
446
        return hexlify(h.digest())
447

    
448
    def _thread2file(self, flying, local_file, offset=0, **restargs):
449
        """write the results of a greenleted rest call to a file
450

451
        :param offset: the offset of the file up to blocksize
452
        - e.g. if the range is 10-100, all blocks will be written to
453
        normal_position - 10
454
        """
455
        finished = []
456
        for i, (start, g) in enumerate(flying.items()):
457
            if not g.isAlive():
458
                if g.exception:
459
                    raise g.exception
460
                block = g.value.content
461
                local_file.seek(start - offset)
462
                local_file.write(block)
463
                self._cb_next()
464
                finished.append(flying.pop(start))
465
        local_file.flush()
466
        return finished
467

    
468
    def _dump_blocks_async(
469
            self, obj, remote_hashes, blocksize, total_size, local_file,
470
            blockhash=None, resume=False, filerange=None, **restargs):
471
        file_size = fstat(local_file.fileno()).st_size if resume else 0
472
        flying = {}
473
        finished = []
474
        offset = 0
475
        if filerange is not None:
476
            rstart = int(filerange.split('-')[0])
477
            offset = rstart if blocksize > rstart else rstart % blocksize
478

    
479
        self._init_thread_limit()
480
        for block_hash, blockid in remote_hashes.items():
481
            start = blocksize * blockid
482
            if start < file_size and block_hash == self._hash_from_file(
483
                    local_file, start, blocksize, blockhash):
484
                self._cb_next()
485
                continue
486
            self._watch_thread_limit(flying.values())
487
            finished += self._thread2file(
488
                flying,
489
                local_file,
490
                offset,
491
                **restargs)
492
            end = total_size - 1 if start + blocksize > total_size\
493
                else start + blocksize - 1
494
            (start, end) = _range_up(start, end, filerange)
495
            if start == end:
496
                self._cb_next()
497
                continue
498
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
499
            flying[start] = self._get_block_async(obj, **restargs)
500

    
501
        for thread in flying.values():
502
            thread.join()
503
        finished += self._thread2file(flying, local_file, offset, **restargs)
504

    
505
    def download_object(
506
            self, obj, dst,
507
            download_cb=None,
508
            version=None,
509
            resume=False,
510
            range_str=None,
511
            if_match=None,
512
            if_none_match=None,
513
            if_modified_since=None,
514
            if_unmodified_since=None):
515
        """Download an object (multiple connections, random blocks)
516

517
        :param obj: (str) remote object path
518

519
        :param dst: open file descriptor (wb+)
520

521
        :param download_cb: optional progress.bar object for downloading
522

523
        :param version: (str) file version
524

525
        :param resume: (bool) if set, preserve already downloaded file parts
526

527
        :param range_str: (str) from, to are file positions (int) in bytes
528

529
        :param if_match: (str)
530

531
        :param if_none_match: (str)
532

533
        :param if_modified_since: (str) formated date
534

535
        :param if_unmodified_since: (str) formated date"""
536
        restargs = dict(
537
            version=version,
538
            data_range=None if range_str is None else 'bytes=%s' % range_str,
539
            if_match=if_match,
540
            if_none_match=if_none_match,
541
            if_modified_since=if_modified_since,
542
            if_unmodified_since=if_unmodified_since)
543

    
544
        (
545
            blocksize,
546
            blockhash,
547
            total_size,
548
            hash_list,
549
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
550
        assert total_size >= 0
551

    
552
        if download_cb:
553
            self.progress_bar_gen = download_cb(len(remote_hashes))
554
            self._cb_next()
555

    
556
        if dst.isatty():
557
            self._dump_blocks_sync(
558
                obj,
559
                hash_list,
560
                blocksize,
561
                total_size,
562
                dst,
563
                range_str,
564
                **restargs)
565
        else:
566
            self._dump_blocks_async(
567
                obj,
568
                remote_hashes,
569
                blocksize,
570
                total_size,
571
                dst,
572
                blockhash,
573
                resume,
574
                range_str,
575
                **restargs)
576
            if not range_str:
577
                dst.truncate(total_size)
578

    
579
        self._complete_cb()
580

    
581
    #Command Progress Bar method
582
    def _cb_next(self):
583
        if hasattr(self, 'progress_bar_gen'):
584
            try:
585
                self.progress_bar_gen.next()
586
            except:
587
                pass
588

    
589
    def _complete_cb(self):
590
        while True:
591
            try:
592
                self.progress_bar_gen.next()
593
            except:
594
                break
595

    
596
    def get_object_hashmap(
597
            self, obj,
598
            version=None,
599
            if_match=None,
600
            if_none_match=None,
601
            if_modified_since=None,
602
            if_unmodified_since=None,
603
            data_range=None):
604
        """
605
        :param obj: (str) remote object path
606

607
        :param if_match: (str)
608

609
        :param if_none_match: (str)
610

611
        :param if_modified_since: (str) formated date
612

613
        :param if_unmodified_since: (str) formated date
614

615
        :param data_range: (str) from-to where from and to are integers
616
            denoting file positions in bytes
617

618
        :returns: (list)
619
        """
620
        try:
621
            r = self.object_get(
622
                obj,
623
                hashmap=True,
624
                version=version,
625
                if_etag_match=if_match,
626
                if_etag_not_match=if_none_match,
627
                if_modified_since=if_modified_since,
628
                if_unmodified_since=if_unmodified_since,
629
                data_range=data_range)
630
        except ClientError as err:
631
            if err.status == 304 or err.status == 412:
632
                return {}
633
            raise
634
        return r.json
635

    
636
    def set_account_group(self, group, usernames):
637
        """
638
        :param group: (str)
639

640
        :param usernames: (list)
641
        """
642
        r = self.account_post(update=True, groups={group: usernames})
643
        r.release()
644

    
645
    def del_account_group(self, group):
646
        """
647
        :param group: (str)
648
        """
649
        r = self.account_post(update=True, groups={group: []})
650
        r.release()
651

    
652
    def get_account_info(self, until=None):
653
        """
654
        :param until: (str) formated date
655

656
        :returns: (dict)
657
        """
658
        r = self.account_head(until=until)
659
        if r.status_code == 401:
660
            raise ClientError("No authorization", status=401)
661
        return r.headers
662

    
663
    def get_account_quota(self):
664
        """
665
        :returns: (dict)
666
        """
667
        return filter_in(
668
            self.get_account_info(),
669
            'X-Account-Policy-Quota',
670
            exactMatch=True)
671

    
672
    def get_account_versioning(self):
673
        """
674
        :returns: (dict)
675
        """
676
        return filter_in(
677
            self.get_account_info(),
678
            'X-Account-Policy-Versioning',
679
            exactMatch=True)
680

    
681
    def get_account_meta(self, until=None):
682
        """
683
        :meta until: (str) formated date
684

685
        :returns: (dict)
686
        """
687
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
688

    
689
    def get_account_group(self):
690
        """
691
        :returns: (dict)
692
        """
693
        return filter_in(self.get_account_info(), 'X-Account-Group-')
694

    
695
    def set_account_meta(self, metapairs):
696
        """
697
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
698
        """
699
        assert(type(metapairs) is dict)
700
        r = self.account_post(update=True, metadata=metapairs)
701
        r.release()
702

    
703
    def del_account_meta(self, metakey):
704
        """
705
        :param metakey: (str) metadatum key
706
        """
707
        r = self.account_post(update=True, metadata={metakey: ''})
708
        r.release()
709

    
710
    def set_account_quota(self, quota):
711
        """
712
        :param quota: (int)
713
        """
714
        r = self.account_post(update=True, quota=quota)
715
        r.release()
716

    
717
    def set_account_versioning(self, versioning):
718
        """
719
        "param versioning: (str)
720
        """
721
        r = self.account_post(update=True, versioning=versioning)
722
        r.release()
723

    
724
    def list_containers(self):
725
        """
726
        :returns: (dict)
727
        """
728
        r = self.account_get()
729
        return r.json
730

    
731
    def del_container(self, until=None, delimiter=None):
732
        """
733
        :param until: (str) formated date
734

735
        :param delimiter: (str) with / empty container
736

737
        :raises ClientError: 404 Container does not exist
738

739
        :raises ClientError: 409 Container is not empty
740
        """
741
        self._assert_container()
742
        r = self.container_delete(
743
            until=until,
744
            delimiter=delimiter,
745
            success=(204, 404, 409))
746
        r.release()
747
        if r.status_code == 404:
748
            raise ClientError(
749
                'Container "%s" does not exist' % self.container,
750
                r.status_code)
751
        elif r.status_code == 409:
752
            raise ClientError(
753
                'Container "%s" is not empty' % self.container,
754
                r.status_code)
755

    
756
    def get_container_versioning(self, container=None):
757
        """
758
        :param container: (str)
759

760
        :returns: (dict)
761
        """
762
        cnt_back_up = self.container
763
        try:
764
            self.container = container or cnt_back_up
765
            return filter_in(
766
                self.get_container_info(),
767
                'X-Container-Policy-Versioning')
768
        finally:
769
            self.container = cnt_back_up
770

    
771
    def get_container_quota(self, container=None):
772
        """
773
        :param container: (str)
774

775
        :returns: (dict)
776
        """
777
        cnt_back_up = self.container
778
        try:
779
            self.container = container or cnt_back_up
780
            return filter_in(
781
                self.get_container_info(),
782
                'X-Container-Policy-Quota')
783
        finally:
784
            self.container = cnt_back_up
785

    
786
    def get_container_info(self, until=None):
787
        """
788
        :param until: (str) formated date
789

790
        :returns: (dict)
791

792
        :raises ClientError: 404 Container not found
793
        """
794
        try:
795
            r = self.container_head(until=until)
796
        except ClientError as err:
797
            err.details.append('for container %s' % self.container)
798
            raise err
799
        return r.headers
800

    
801
    def get_container_meta(self, until=None):
802
        """
803
        :param until: (str) formated date
804

805
        :returns: (dict)
806
        """
807
        return filter_in(
808
            self.get_container_info(until=until),
809
            'X-Container-Meta')
810

    
811
    def get_container_object_meta(self, until=None):
812
        """
813
        :param until: (str) formated date
814

815
        :returns: (dict)
816
        """
817
        return filter_in(
818
            self.get_container_info(until=until),
819
            'X-Container-Object-Meta')
820

    
821
    def set_container_meta(self, metapairs):
822
        """
823
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
824
        """
825
        assert(type(metapairs) is dict)
826
        r = self.container_post(update=True, metadata=metapairs)
827
        r.release()
828

    
829
    def del_container_meta(self, metakey):
830
        """
831
        :param metakey: (str) metadatum key
832
        """
833
        r = self.container_post(update=True, metadata={metakey: ''})
834
        r.release()
835

    
836
    def set_container_quota(self, quota):
837
        """
838
        :param quota: (int)
839
        """
840
        r = self.container_post(update=True, quota=quota)
841
        r.release()
842

    
843
    def set_container_versioning(self, versioning):
844
        """
845
        :param versioning: (str)
846
        """
847
        r = self.container_post(update=True, versioning=versioning)
848
        r.release()
849

    
850
    def del_object(self, obj, until=None, delimiter=None):
851
        """
852
        :param obj: (str) remote object path
853

854
        :param until: (str) formated date
855

856
        :param delimiter: (str)
857
        """
858
        self._assert_container()
859
        r = self.object_delete(obj, until=until, delimiter=delimiter)
860
        r.release()
861

    
862
    def set_object_meta(self, obj, metapairs):
863
        """
864
        :param obj: (str) remote object path
865

866
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
867
        """
868
        assert(type(metapairs) is dict)
869
        r = self.object_post(obj, update=True, metadata=metapairs)
870
        r.release()
871

    
872
    def del_object_meta(self, obj, metakey):
873
        """
874
        :param obj: (str) remote object path
875

876
        :param metakey: (str) metadatum key
877
        """
878
        r = self.object_post(obj, update=True, metadata={metakey: ''})
879
        r.release()
880

    
881
    def publish_object(self, obj):
882
        """
883
        :param obj: (str) remote object path
884

885
        :returns: (str) access url
886
        """
887
        r = self.object_post(obj, update=True, public=True)
888
        r.release()
889
        info = self.get_object_info(obj)
890
        pref, sep, rest = self.base_url.partition('//')
891
        base = rest.split('/')[0]
892
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
893

    
894
    def unpublish_object(self, obj):
895
        """
896
        :param obj: (str) remote object path
897
        """
898
        r = self.object_post(obj, update=True, public=False)
899
        r.release()
900

    
901
    def get_object_info(self, obj, version=None):
902
        """
903
        :param obj: (str) remote object path
904

905
        :param version: (str)
906

907
        :returns: (dict)
908
        """
909
        try:
910
            r = self.object_head(obj, version=version)
911
            return r.headers
912
        except ClientError as ce:
913
            if ce.status == 404:
914
                raise ClientError('Object %s not found' % obj, status=404)
915
            raise
916

    
917
    def get_object_meta(self, obj, version=None):
918
        """
919
        :param obj: (str) remote object path
920

921
        :param version: (str)
922

923
        :returns: (dict)
924
        """
925
        return filter_in(
926
            self.get_object_info(obj, version=version),
927
            'X-Object-Meta')
928

    
929
    def get_object_sharing(self, obj):
930
        """
931
        :param obj: (str) remote object path
932

933
        :returns: (dict)
934
        """
935
        r = filter_in(
936
            self.get_object_info(obj),
937
            'X-Object-Sharing',
938
            exactMatch=True)
939
        reply = {}
940
        if len(r) > 0:
941
            perms = r['x-object-sharing'].split(';')
942
            for perm in perms:
943
                try:
944
                    perm.index('=')
945
                except ValueError:
946
                    raise ClientError('Incorrect reply format')
947
                (key, val) = perm.strip().split('=')
948
                reply[key] = val
949
        return reply
950

    
951
    def set_object_sharing(
952
            self, obj,
953
            read_permition=False, write_permition=False):
954
        """Give read/write permisions to an object.
955

956
        :param obj: (str) remote object path
957

958
        :param read_permition: (list - bool) users and user groups that get
959
            read permition for this object - False means all previous read
960
            permissions will be removed
961

962
        :param write_perimition: (list - bool) of users and user groups to get
963
           write permition for this object - False means all previous write
964
           permissions will be removed
965
        """
966

    
967
        perms = dict(read=read_permition or '', write=write_permition or '')
968
        r = self.object_post(obj, update=True, permissions=perms)
969
        r.release()
970

    
971
    def del_object_sharing(self, obj):
972
        """
973
        :param obj: (str) remote object path
974
        """
975
        self.set_object_sharing(obj)
976

    
977
    def append_object(self, obj, source_file, upload_cb=None):
978
        """
979
        :param obj: (str) remote object path
980

981
        :param source_file: open file descriptor
982

983
        :param upload_db: progress.bar for uploading
984
        """
985

    
986
        self._assert_container()
987
        meta = self.get_container_info()
988
        blocksize = int(meta['x-container-block-size'])
989
        filesize = fstat(source_file.fileno()).st_size
990
        nblocks = 1 + (filesize - 1) // blocksize
991
        offset = 0
992
        if upload_cb:
993
            upload_gen = upload_cb(nblocks)
994
            upload_gen.next()
995
        for i in range(nblocks):
996
            block = source_file.read(min(blocksize, filesize - offset))
997
            offset += len(block)
998
            r = self.object_post(
999
                obj,
1000
                update=True,
1001
                content_range='bytes */*',
1002
                content_type='application/octet-stream',
1003
                content_length=len(block),
1004
                data=block)
1005
            r.release()
1006

    
1007
            if upload_cb:
1008
                upload_gen.next()
1009

    
1010
    def truncate_object(self, obj, upto_bytes):
1011
        """
1012
        :param obj: (str) remote object path
1013

1014
        :param upto_bytes: max number of bytes to leave on file
1015
        """
1016
        r = self.object_post(
1017
            obj,
1018
            update=True,
1019
            content_range='bytes 0-%s/*' % upto_bytes,
1020
            content_type='application/octet-stream',
1021
            object_bytes=upto_bytes,
1022
            source_object=path4url(self.container, obj))
1023
        r.release()
1024

    
1025
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1026
        """Overwrite a part of an object from local source file
1027

1028
        :param obj: (str) remote object path
1029

1030
        :param start: (int) position in bytes to start overwriting from
1031

1032
        :param end: (int) position in bytes to stop overwriting at
1033

1034
        :param source_file: open file descriptor
1035

1036
        :param upload_db: progress.bar for uploading
1037
        """
1038

    
1039
        r = self.get_object_info(obj)
1040
        rf_size = int(r['content-length'])
1041
        if rf_size < int(start):
1042
            raise ClientError(
1043
                'Range start exceeds file size',
1044
                status=416)
1045
        elif rf_size < int(end):
1046
            raise ClientError(
1047
                'Range end exceeds file size',
1048
                status=416)
1049
        self._assert_container()
1050
        meta = self.get_container_info()
1051
        blocksize = int(meta['x-container-block-size'])
1052
        filesize = fstat(source_file.fileno()).st_size
1053
        datasize = int(end) - int(start) + 1
1054
        nblocks = 1 + (datasize - 1) // blocksize
1055
        offset = 0
1056
        if upload_cb:
1057
            upload_gen = upload_cb(nblocks)
1058
            upload_gen.next()
1059
        for i in range(nblocks):
1060
            read_size = min(blocksize, filesize - offset, datasize - offset)
1061
            block = source_file.read(read_size)
1062
            r = self.object_post(
1063
                obj,
1064
                update=True,
1065
                content_type='application/octet-stream',
1066
                content_length=len(block),
1067
                content_range='bytes %s-%s/*' % (
1068
                    start + offset,
1069
                    start + offset + len(block) - 1),
1070
                data=block)
1071
            offset += len(block)
1072
            r.release()
1073

    
1074
            if upload_cb:
1075
                upload_gen.next()
1076

    
1077
    def copy_object(
1078
            self, src_container, src_object, dst_container,
1079
            dst_object=None,
1080
            source_version=None,
1081
            source_account=None,
1082
            public=False,
1083
            content_type=None,
1084
            delimiter=None):
1085
        """
1086
        :param src_container: (str) source container
1087

1088
        :param src_object: (str) source object path
1089

1090
        :param dst_container: (str) destination container
1091

1092
        :param dst_object: (str) destination object path
1093

1094
        :param source_version: (str) source object version
1095

1096
        :param source_account: (str) account to copy from
1097

1098
        :param public: (bool)
1099

1100
        :param content_type: (str)
1101

1102
        :param delimiter: (str)
1103
        """
1104
        self._assert_account()
1105
        self.container = dst_container
1106
        src_path = path4url(src_container, src_object)
1107
        r = self.object_put(
1108
            dst_object or src_object,
1109
            success=201,
1110
            copy_from=src_path,
1111
            content_length=0,
1112
            source_version=source_version,
1113
            source_account=source_account,
1114
            public=public,
1115
            content_type=content_type,
1116
            delimiter=delimiter)
1117
        r.release()
1118

    
1119
    def move_object(
1120
            self, src_container, src_object, dst_container,
1121
            dst_object=False,
1122
            source_account=None,
1123
            source_version=None,
1124
            public=False,
1125
            content_type=None,
1126
            delimiter=None):
1127
        """
1128
        :param src_container: (str) source container
1129

1130
        :param src_object: (str) source object path
1131

1132
        :param dst_container: (str) destination container
1133

1134
        :param dst_object: (str) destination object path
1135

1136
        :param source_account: (str) account to move from
1137

1138
        :param source_version: (str) source object version
1139

1140
        :param public: (bool)
1141

1142
        :param content_type: (str)
1143

1144
        :param delimiter: (str)
1145
        """
1146
        self._assert_account()
1147
        self.container = dst_container
1148
        dst_object = dst_object or src_object
1149
        src_path = path4url(src_container, src_object)
1150
        r = self.object_put(
1151
            dst_object,
1152
            success=201,
1153
            move_from=src_path,
1154
            content_length=0,
1155
            source_account=source_account,
1156
            source_version=source_version,
1157
            public=public,
1158
            content_type=content_type,
1159
            delimiter=delimiter)
1160
        r.release()
1161

    
1162
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1163
        """Get accounts that share with self.account
1164

1165
        :param limit: (str)
1166

1167
        :param marker: (str)
1168

1169
        :returns: (dict)
1170
        """
1171
        self._assert_account()
1172

    
1173
        self.set_param('format', 'json')
1174
        self.set_param('limit', limit, iff=limit is not None)
1175
        self.set_param('marker', marker, iff=marker is not None)
1176

    
1177
        path = ''
1178
        success = kwargs.pop('success', (200, 204))
1179
        r = self.get(path, *args, success=success, **kwargs)
1180
        return r.json
1181

    
1182
    def get_object_versionlist(self, obj):
1183
        """
1184
        :param obj: (str) remote object path
1185

1186
        :returns: (list)
1187
        """
1188
        self._assert_container()
1189
        r = self.object_get(obj, format='json', version='list')
1190
        return r.json['versions']