Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 28cbc3c2

History | View | Annotate | Download (35.6 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos.rest_api import PithosRestClient
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """GRNet Pithos API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def purge_container(self, container=None):
75
        """Delete an empty container and destroy associated blocks
76
        """
77
        cnt_back_up = self.container
78
        try:
79
            self.container = container or cnt_back_up
80
            self.container_delete(until=unicode(time()))
81
        finally:
82
            self.container = cnt_back_up
83

    
84
    def upload_object_unchunked(
85
            self, obj, f,
86
            withHashFile=False,
87
            size=None,
88
            etag=None,
89
            content_encoding=None,
90
            content_disposition=None,
91
            content_type=None,
92
            sharing=None,
93
            public=None):
94
        """
95
        :param obj: (str) remote object path
96

97
        :param f: open file descriptor
98

99
        :param withHashFile: (bool)
100

101
        :param size: (int) size of data to upload
102

103
        :param etag: (str)
104

105
        :param content_encoding: (str)
106

107
        :param content_disposition: (str)
108

109
        :param content_type: (str)
110

111
        :param sharing: {'read':[user and/or grp names],
112
            'write':[usr and/or grp names]}
113

114
        :param public: (bool)
115
        """
116
        self._assert_container()
117

    
118
        if withHashFile:
119
            data = f.read()
120
            try:
121
                import json
122
                data = json.dumps(json.loads(data))
123
            except ValueError:
124
                raise ClientError('"%s" is not json-formated' % f.name, 1)
125
            except SyntaxError:
126
                msg = '"%s" is not a valid hashmap file' % f.name
127
                raise ClientError(msg, 1)
128
            f = StringIO(data)
129
        else:
130
            data = f.read(size) if size else f.read()
131
        self.object_put(
132
            obj,
133
            data=data,
134
            etag=etag,
135
            content_encoding=content_encoding,
136
            content_disposition=content_disposition,
137
            content_type=content_type,
138
            permissions=sharing,
139
            public=public,
140
            success=201)
141

    
142
    def create_object_by_manifestation(
143
            self, obj,
144
            etag=None,
145
            content_encoding=None,
146
            content_disposition=None,
147
            content_type=None,
148
            sharing=None,
149
            public=None):
150
        """
151
        :param obj: (str) remote object path
152

153
        :param etag: (str)
154

155
        :param content_encoding: (str)
156

157
        :param content_disposition: (str)
158

159
        :param content_type: (str)
160

161
        :param sharing: {'read':[user and/or grp names],
162
            'write':[usr and/or grp names]}
163

164
        :param public: (bool)
165
        """
166
        self._assert_container()
167
        self.object_put(
168
            obj,
169
            content_length=0,
170
            etag=etag,
171
            content_encoding=content_encoding,
172
            content_disposition=content_disposition,
173
            content_type=content_type,
174
            permissions=sharing,
175
            public=public,
176
            manifest='%s/%s' % (self.container, obj))
177

    
178
    # upload_* auxiliary methods
179
    def _put_block_async(self, data, hash, upload_gen=None):
180
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
181
        event.start()
182
        return event
183

    
184
    def _put_block(self, data, hash):
185
        r = self.container_post(
186
            update=True,
187
            content_type='application/octet-stream',
188
            content_length=len(data),
189
            data=data,
190
            format='json')
191
        assert r.json[0] == hash, 'Local hash does not match server'
192

    
193
    def _get_file_block_info(self, fileobj, size=None):
194
        meta = self.get_container_info()
195
        blocksize = int(meta['x-container-block-size'])
196
        blockhash = meta['x-container-block-hash']
197
        size = size if size is not None else fstat(fileobj.fileno()).st_size
198
        nblocks = 1 + (size - 1) // blocksize
199
        return (blocksize, blockhash, size, nblocks)
200

    
201
    def _get_missing_hashes(
202
            self, obj, json,
203
            size=None,
204
            format='json',
205
            hashmap=True,
206
            content_type=None,
207
            etag=None,
208
            content_encoding=None,
209
            content_disposition=None,
210
            permissions=None,
211
            public=None,
212
            success=(201, 409)):
213
        r = self.object_put(
214
            obj,
215
            format='json',
216
            hashmap=True,
217
            content_type=content_type,
218
            json=json,
219
            etag=etag,
220
            content_encoding=content_encoding,
221
            content_disposition=content_disposition,
222
            permissions=permissions,
223
            public=public,
224
            success=success)
225
        return None if r.status_code == 201 else r.json
226

    
227
    def _culculate_blocks_for_upload(
228
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
229
            hash_cb=None):
230
        offset = 0
231
        if hash_cb:
232
            hash_gen = hash_cb(nblocks)
233
            hash_gen.next()
234

    
235
        for i in range(nblocks):
236
            block = fileobj.read(min(blocksize, size - offset))
237
            bytes = len(block)
238
            hash = _pithos_hash(block, blockhash)
239
            hashes.append(hash)
240
            hmap[hash] = (offset, bytes)
241
            offset += bytes
242
            if hash_cb:
243
                hash_gen.next()
244
        msg = 'Failed to calculate uploaded blocks:'
245
        ' Offset and object size do not match'
246
        assert offset == size, msg
247

    
248
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
249
        """upload missing blocks asynchronously"""
250

    
251
        self._init_thread_limit()
252

    
253
        flying = []
254
        failures = []
255
        for hash in missing:
256
            offset, bytes = hmap[hash]
257
            fileobj.seek(offset)
258
            data = fileobj.read(bytes)
259
            r = self._put_block_async(data, hash, upload_gen)
260
            flying.append(r)
261
            unfinished = self._watch_thread_limit(flying)
262
            for thread in set(flying).difference(unfinished):
263
                if thread.exception:
264
                    failures.append(thread)
265
                    if isinstance(
266
                            thread.exception,
267
                            ClientError) and thread.exception.status == 502:
268
                        self.POOLSIZE = self._thread_limit
269
                elif thread.isAlive():
270
                    flying.append(thread)
271
                elif upload_gen:
272
                    try:
273
                        upload_gen.next()
274
                    except:
275
                        pass
276
            flying = unfinished
277

    
278
        for thread in flying:
279
            thread.join()
280
            if thread.exception:
281
                failures.append(thread)
282
            elif upload_gen:
283
                try:
284
                    upload_gen.next()
285
                except:
286
                    pass
287

    
288
        return [failure.kwargs['hash'] for failure in failures]
289

    
290
    def upload_object(
291
            self, obj, f,
292
            size=None,
293
            hash_cb=None,
294
            upload_cb=None,
295
            etag=None,
296
            content_encoding=None,
297
            content_disposition=None,
298
            content_type=None,
299
            sharing=None,
300
            public=None):
301
        """Upload an object using multiple connections (threads)
302

303
        :param obj: (str) remote object path
304

305
        :param f: open file descriptor (rb)
306

307
        :param hash_cb: optional progress.bar object for calculating hashes
308

309
        :param upload_cb: optional progress.bar object for uploading
310

311
        :param etag: (str)
312

313
        :param content_encoding: (str)
314

315
        :param content_disposition: (str)
316

317
        :param content_type: (str)
318

319
        :param sharing: {'read':[user and/or grp names],
320
            'write':[usr and/or grp names]}
321

322
        :param public: (bool)
323
        """
324
        self._assert_container()
325

    
326
        #init
327
        block_info = (blocksize, blockhash, size, nblocks) =\
328
            self._get_file_block_info(f, size)
329
        (hashes, hmap, offset) = ([], {}, 0)
330
        if not content_type:
331
            content_type = 'application/octet-stream'
332

    
333
        self._culculate_blocks_for_upload(
334
            *block_info,
335
            hashes=hashes,
336
            hmap=hmap,
337
            fileobj=f,
338
            hash_cb=hash_cb)
339

    
340
        hashmap = dict(bytes=size, hashes=hashes)
341
        missing = self._get_missing_hashes(
342
            obj, hashmap,
343
            content_type=content_type,
344
            size=size,
345
            etag=etag,
346
            content_encoding=content_encoding,
347
            content_disposition=content_disposition,
348
            permissions=sharing,
349
            public=public)
350

    
351
        if missing is None:
352
            return
353

    
354
        if upload_cb:
355
            upload_gen = upload_cb(len(missing))
356
            for i in range(len(missing), len(hashmap['hashes']) + 1):
357
                try:
358
                    upload_gen.next()
359
                except:
360
                    upload_gen = None
361
        else:
362
            upload_gen = None
363

    
364
        retries = 7
365
        try:
366
            while retries:
367
                sendlog.info('%s blocks missing' % len(missing))
368
                num_of_blocks = len(missing)
369
                missing = self._upload_missing_blocks(
370
                    missing,
371
                    hmap,
372
                    f,
373
                    upload_gen)
374
                if missing:
375
                    if num_of_blocks == len(missing):
376
                        retries -= 1
377
                    else:
378
                        num_of_blocks = len(missing)
379
                else:
380
                    break
381
            if missing:
382
                raise ClientError(
383
                    '%s blocks failed to upload' % len(missing),
384
                    status=800)
385
        except KeyboardInterrupt:
386
            sendlog.info('- - - wait for threads to finish')
387
            for thread in activethreads():
388
                thread.join()
389
            raise
390

    
391
        self.object_put(
392
            obj,
393
            format='json',
394
            hashmap=True,
395
            content_type=content_type,
396
            json=hashmap,
397
            success=201)
398

    
399
    # download_* auxiliary methods
400
    def _get_remote_blocks_info(self, obj, **restargs):
401
        #retrieve object hashmap
402
        myrange = restargs.pop('data_range', None)
403
        hashmap = self.get_object_hashmap(obj, **restargs)
404
        restargs['data_range'] = myrange
405
        blocksize = int(hashmap['block_size'])
406
        blockhash = hashmap['block_hash']
407
        total_size = hashmap['bytes']
408
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
409
        map_dict = {}
410
        for i, h in enumerate(hashmap['hashes']):
411
            #  map_dict[h] = i   CHAGE
412
            if h in map_dict:
413
                map_dict[h].append(i)
414
            else:
415
                map_dict[h] = [i]
416
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
417

    
418
    def _dump_blocks_sync(
419
            self, obj, remote_hashes, blocksize, total_size, dst, range,
420
            **args):
421
        for blockid, blockhash in enumerate(remote_hashes):
422
            if blockhash:
423
                start = blocksize * blockid
424
                is_last = start + blocksize > total_size
425
                end = (total_size - 1) if is_last else (start + blocksize - 1)
426
                (start, end) = _range_up(start, end, range)
427
                args['data_range'] = 'bytes=%s-%s' % (start, end)
428
                r = self.object_get(obj, success=(200, 206), **args)
429
                self._cb_next()
430
                dst.write(r.content)
431
                dst.flush()
432

    
433
    def _get_block_async(self, obj, **args):
434
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
435
        event.start()
436
        return event
437

    
438
    def _hash_from_file(self, fp, start, size, blockhash):
439
        fp.seek(start)
440
        block = fp.read(size)
441
        h = newhashlib(blockhash)
442
        h.update(block.strip('\x00'))
443
        return hexlify(h.digest())
444

    
445
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
446
        """write the results of a greenleted rest call to a file
447

448
        :param offset: the offset of the file up to blocksize
449
        - e.g. if the range is 10-100, all blocks will be written to
450
        normal_position - 10
451
        """
452
        for i, (key, g) in enumerate(flying.items()):
453
            if g.isAlive():
454
                continue
455
            if g.exception:
456
                raise g.exception
457
            block = g.value.content
458
            for block_start in blockids[key]:
459
                local_file.seek(block_start + offset)
460
                local_file.write(block)
461
                self._cb_next()
462
            flying.pop(key)
463
            blockids.pop(key)
464
        local_file.flush()
465

    
466
    def _dump_blocks_async(
467
            self, obj, remote_hashes, blocksize, total_size, local_file,
468
            blockhash=None, resume=False, filerange=None, **restargs):
469
        file_size = fstat(local_file.fileno()).st_size if resume else 0
470
        flying = dict()
471
        blockid_dict = dict()
472
        offset = 0
473
        if filerange is not None:
474
            rstart = int(filerange.split('-')[0])
475
            offset = rstart if blocksize > rstart else rstart % blocksize
476

    
477
        self._init_thread_limit()
478
        for block_hash, blockids in remote_hashes.items():
479
            blockids = [blk * blocksize for blk in blockids]
480
            unsaved = [blk for blk in blockids if not (
481
                blk < file_size and block_hash == self._hash_from_file(
482
                        local_file, blk, blocksize, blockhash))]
483
            self._cb_next(len(blockids) - len(unsaved))
484
            if unsaved:
485
                key = unsaved[0]
486
                self._watch_thread_limit(flying.values())
487
                self._thread2file(
488
                    flying, blockid_dict, local_file, offset,
489
                    **restargs)
490
                end = total_size - 1 if key + blocksize > total_size\
491
                    else key + blocksize - 1
492
                start, end = _range_up(key, end, filerange)
493
                if start == end:
494
                    self._cb_next()
495
                    continue
496
                restargs['async_headers'] = {
497
                    'Range': 'bytes=%s-%s' % (start, end)}
498
                flying[key] = self._get_block_async(obj, **restargs)
499
                blockid_dict[key] = unsaved
500

    
501
        for thread in flying.values():
502
            thread.join()
503
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
504

    
505
    def download_object(
506
            self, obj, dst,
507
            download_cb=None,
508
            version=None,
509
            resume=False,
510
            range_str=None,
511
            if_match=None,
512
            if_none_match=None,
513
            if_modified_since=None,
514
            if_unmodified_since=None):
515
        """Download an object (multiple connections, random blocks)
516

517
        :param obj: (str) remote object path
518

519
        :param dst: open file descriptor (wb+)
520

521
        :param download_cb: optional progress.bar object for downloading
522

523
        :param version: (str) file version
524

525
        :param resume: (bool) if set, preserve already downloaded file parts
526

527
        :param range_str: (str) from, to are file positions (int) in bytes
528

529
        :param if_match: (str)
530

531
        :param if_none_match: (str)
532

533
        :param if_modified_since: (str) formated date
534

535
        :param if_unmodified_since: (str) formated date"""
536
        restargs = dict(
537
            version=version,
538
            data_range=None if range_str is None else 'bytes=%s' % range_str,
539
            if_match=if_match,
540
            if_none_match=if_none_match,
541
            if_modified_since=if_modified_since,
542
            if_unmodified_since=if_unmodified_since)
543

    
544
        (
545
            blocksize,
546
            blockhash,
547
            total_size,
548
            hash_list,
549
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
550
        assert total_size >= 0
551

    
552
        if download_cb:
553
            self.progress_bar_gen = download_cb(len(hash_list))
554
            self._cb_next()
555

    
556
        if dst.isatty():
557
            self._dump_blocks_sync(
558
                obj,
559
                hash_list,
560
                blocksize,
561
                total_size,
562
                dst,
563
                range_str,
564
                **restargs)
565
        else:
566
            self._dump_blocks_async(
567
                obj,
568
                remote_hashes,
569
                blocksize,
570
                total_size,
571
                dst,
572
                blockhash,
573
                resume,
574
                range_str,
575
                **restargs)
576
            if not range_str:
577
                dst.truncate(total_size)
578

    
579
        self._complete_cb()
580

    
581
    #Command Progress Bar method
582
    def _cb_next(self, step=1):
583
        if hasattr(self, 'progress_bar_gen'):
584
            try:
585
                self.progress_bar_gen.next(step)
586
            except:
587
                pass
588

    
589
    def _complete_cb(self):
590
        while True:
591
            try:
592
                self.progress_bar_gen.next(step)
593
            except:
594
                break
595

    
596
    def get_object_hashmap(
597
            self, obj,
598
            version=None,
599
            if_match=None,
600
            if_none_match=None,
601
            if_modified_since=None,
602
            if_unmodified_since=None,
603
            data_range=None):
604
        """
605
        :param obj: (str) remote object path
606

607
        :param if_match: (str)
608

609
        :param if_none_match: (str)
610

611
        :param if_modified_since: (str) formated date
612

613
        :param if_unmodified_since: (str) formated date
614

615
        :param data_range: (str) from-to where from and to are integers
616
            denoting file positions in bytes
617

618
        :returns: (list)
619
        """
620
        try:
621
            r = self.object_get(
622
                obj,
623
                hashmap=True,
624
                version=version,
625
                if_etag_match=if_match,
626
                if_etag_not_match=if_none_match,
627
                if_modified_since=if_modified_since,
628
                if_unmodified_since=if_unmodified_since,
629
                data_range=data_range)
630
        except ClientError as err:
631
            if err.status == 304 or err.status == 412:
632
                return {}
633
            raise
634
        return r.json
635

    
636
    def set_account_group(self, group, usernames):
637
        """
638
        :param group: (str)
639

640
        :param usernames: (list)
641
        """
642
        self.account_post(update=True, groups={group: usernames})
643

    
644
    def del_account_group(self, group):
645
        """
646
        :param group: (str)
647
        """
648
        self.account_post(update=True, groups={group: []})
649

    
650
    def get_account_info(self, until=None):
651
        """
652
        :param until: (str) formated date
653

654
        :returns: (dict)
655
        """
656
        r = self.account_head(until=until)
657
        if r.status_code == 401:
658
            raise ClientError("No authorization", status=401)
659
        return r.headers
660

    
661
    def get_account_quota(self):
662
        """
663
        :returns: (dict)
664
        """
665
        return filter_in(
666
            self.get_account_info(),
667
            'X-Account-Policy-Quota',
668
            exactMatch=True)
669

    
670
    def get_account_versioning(self):
671
        """
672
        :returns: (dict)
673
        """
674
        return filter_in(
675
            self.get_account_info(),
676
            'X-Account-Policy-Versioning',
677
            exactMatch=True)
678

    
679
    def get_account_meta(self, until=None):
680
        """
681
        :meta until: (str) formated date
682

683
        :returns: (dict)
684
        """
685
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
686

    
687
    def get_account_group(self):
688
        """
689
        :returns: (dict)
690
        """
691
        return filter_in(self.get_account_info(), 'X-Account-Group-')
692

    
693
    def set_account_meta(self, metapairs):
694
        """
695
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
696
        """
697
        assert(type(metapairs) is dict)
698
        self.account_post(update=True, metadata=metapairs)
699

    
700
    def del_account_meta(self, metakey):
701
        """
702
        :param metakey: (str) metadatum key
703
        """
704
        self.account_post(update=True, metadata={metakey: ''})
705

    
706
    def set_account_quota(self, quota):
707
        """
708
        :param quota: (int)
709
        """
710
        self.account_post(update=True, quota=quota)
711

    
712
    def set_account_versioning(self, versioning):
713
        """
714
        "param versioning: (str)
715
        """
716
        self.account_post(update=True, versioning=versioning)
717

    
718
    def list_containers(self):
719
        """
720
        :returns: (dict)
721
        """
722
        r = self.account_get()
723
        return r.json
724

    
725
    def del_container(self, until=None, delimiter=None):
726
        """
727
        :param until: (str) formated date
728

729
        :param delimiter: (str) with / empty container
730

731
        :raises ClientError: 404 Container does not exist
732

733
        :raises ClientError: 409 Container is not empty
734
        """
735
        self._assert_container()
736
        r = self.container_delete(
737
            until=until,
738
            delimiter=delimiter,
739
            success=(204, 404, 409))
740
        if r.status_code == 404:
741
            raise ClientError(
742
                'Container "%s" does not exist' % self.container,
743
                r.status_code)
744
        elif r.status_code == 409:
745
            raise ClientError(
746
                'Container "%s" is not empty' % self.container,
747
                r.status_code)
748

    
749
    def get_container_versioning(self, container=None):
750
        """
751
        :param container: (str)
752

753
        :returns: (dict)
754
        """
755
        cnt_back_up = self.container
756
        try:
757
            self.container = container or cnt_back_up
758
            return filter_in(
759
                self.get_container_info(),
760
                'X-Container-Policy-Versioning')
761
        finally:
762
            self.container = cnt_back_up
763

    
764
    def get_container_quota(self, container=None):
765
        """
766
        :param container: (str)
767

768
        :returns: (dict)
769
        """
770
        cnt_back_up = self.container
771
        try:
772
            self.container = container or cnt_back_up
773
            return filter_in(
774
                self.get_container_info(),
775
                'X-Container-Policy-Quota')
776
        finally:
777
            self.container = cnt_back_up
778

    
779
    def get_container_info(self, until=None):
780
        """
781
        :param until: (str) formated date
782

783
        :returns: (dict)
784

785
        :raises ClientError: 404 Container not found
786
        """
787
        try:
788
            r = self.container_head(until=until)
789
        except ClientError as err:
790
            err.details.append('for container %s' % self.container)
791
            raise err
792
        return r.headers
793

    
794
    def get_container_meta(self, until=None):
795
        """
796
        :param until: (str) formated date
797

798
        :returns: (dict)
799
        """
800
        return filter_in(
801
            self.get_container_info(until=until),
802
            'X-Container-Meta')
803

    
804
    def get_container_object_meta(self, until=None):
805
        """
806
        :param until: (str) formated date
807

808
        :returns: (dict)
809
        """
810
        return filter_in(
811
            self.get_container_info(until=until),
812
            'X-Container-Object-Meta')
813

    
814
    def set_container_meta(self, metapairs):
815
        """
816
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
817
        """
818
        assert(type(metapairs) is dict)
819
        self.container_post(update=True, metadata=metapairs)
820

    
821
    def del_container_meta(self, metakey):
822
        """
823
        :param metakey: (str) metadatum key
824
        """
825
        self.container_post(update=True, metadata={metakey: ''})
826

    
827
    def set_container_quota(self, quota):
828
        """
829
        :param quota: (int)
830
        """
831
        self.container_post(update=True, quota=quota)
832

    
833
    def set_container_versioning(self, versioning):
834
        """
835
        :param versioning: (str)
836
        """
837
        self.container_post(update=True, versioning=versioning)
838

    
839
    def del_object(self, obj, until=None, delimiter=None):
840
        """
841
        :param obj: (str) remote object path
842

843
        :param until: (str) formated date
844

845
        :param delimiter: (str)
846
        """
847
        self._assert_container()
848
        self.object_delete(obj, until=until, delimiter=delimiter)
849

    
850
    def set_object_meta(self, obj, metapairs):
851
        """
852
        :param obj: (str) remote object path
853

854
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
855
        """
856
        assert(type(metapairs) is dict)
857
        self.object_post(obj, update=True, metadata=metapairs)
858

    
859
    def del_object_meta(self, obj, metakey):
860
        """
861
        :param obj: (str) remote object path
862

863
        :param metakey: (str) metadatum key
864
        """
865
        self.object_post(obj, update=True, metadata={metakey: ''})
866

    
867
    def publish_object(self, obj):
868
        """
869
        :param obj: (str) remote object path
870

871
        :returns: (str) access url
872
        """
873
        self.object_post(obj, update=True, public=True)
874
        info = self.get_object_info(obj)
875
        pref, sep, rest = self.base_url.partition('//')
876
        base = rest.split('/')[0]
877
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
878

    
879
    def unpublish_object(self, obj):
880
        """
881
        :param obj: (str) remote object path
882
        """
883
        self.object_post(obj, update=True, public=False)
884

    
885
    def get_object_info(self, obj, version=None):
886
        """
887
        :param obj: (str) remote object path
888

889
        :param version: (str)
890

891
        :returns: (dict)
892
        """
893
        try:
894
            r = self.object_head(obj, version=version)
895
            return r.headers
896
        except ClientError as ce:
897
            if ce.status == 404:
898
                raise ClientError('Object %s not found' % obj, status=404)
899
            raise
900

    
901
    def get_object_meta(self, obj, version=None):
902
        """
903
        :param obj: (str) remote object path
904

905
        :param version: (str)
906

907
        :returns: (dict)
908
        """
909
        return filter_in(
910
            self.get_object_info(obj, version=version),
911
            'X-Object-Meta')
912

    
913
    def get_object_sharing(self, obj):
914
        """
915
        :param obj: (str) remote object path
916

917
        :returns: (dict)
918
        """
919
        r = filter_in(
920
            self.get_object_info(obj),
921
            'X-Object-Sharing',
922
            exactMatch=True)
923
        reply = {}
924
        if len(r) > 0:
925
            perms = r['x-object-sharing'].split(';')
926
            for perm in perms:
927
                try:
928
                    perm.index('=')
929
                except ValueError:
930
                    raise ClientError('Incorrect reply format')
931
                (key, val) = perm.strip().split('=')
932
                reply[key] = val
933
        return reply
934

    
935
    def set_object_sharing(
936
            self, obj,
937
            read_permition=False, write_permition=False):
938
        """Give read/write permisions to an object.
939

940
        :param obj: (str) remote object path
941

942
        :param read_permition: (list - bool) users and user groups that get
943
            read permition for this object - False means all previous read
944
            permissions will be removed
945

946
        :param write_perimition: (list - bool) of users and user groups to get
947
           write permition for this object - False means all previous write
948
           permissions will be removed
949
        """
950

    
951
        perms = dict(read=read_permition or '', write=write_permition or '')
952
        self.object_post(obj, update=True, permissions=perms)
953

    
954
    def del_object_sharing(self, obj):
955
        """
956
        :param obj: (str) remote object path
957
        """
958
        self.set_object_sharing(obj)
959

    
960
    def append_object(self, obj, source_file, upload_cb=None):
961
        """
962
        :param obj: (str) remote object path
963

964
        :param source_file: open file descriptor
965

966
        :param upload_db: progress.bar for uploading
967
        """
968

    
969
        self._assert_container()
970
        meta = self.get_container_info()
971
        blocksize = int(meta['x-container-block-size'])
972
        filesize = fstat(source_file.fileno()).st_size
973
        nblocks = 1 + (filesize - 1) // blocksize
974
        offset = 0
975
        if upload_cb:
976
            upload_gen = upload_cb(nblocks)
977
            upload_gen.next()
978
        for i in range(nblocks):
979
            block = source_file.read(min(blocksize, filesize - offset))
980
            offset += len(block)
981
            self.object_post(
982
                obj,
983
                update=True,
984
                content_range='bytes */*',
985
                content_type='application/octet-stream',
986
                content_length=len(block),
987
                data=block)
988

    
989
            if upload_cb:
990
                upload_gen.next()
991

    
992
    def truncate_object(self, obj, upto_bytes):
993
        """
994
        :param obj: (str) remote object path
995

996
        :param upto_bytes: max number of bytes to leave on file
997
        """
998
        self.object_post(
999
            obj,
1000
            update=True,
1001
            content_range='bytes 0-%s/*' % upto_bytes,
1002
            content_type='application/octet-stream',
1003
            object_bytes=upto_bytes,
1004
            source_object=path4url(self.container, obj))
1005

    
1006
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1007
        """Overwrite a part of an object from local source file
1008

1009
        :param obj: (str) remote object path
1010

1011
        :param start: (int) position in bytes to start overwriting from
1012

1013
        :param end: (int) position in bytes to stop overwriting at
1014

1015
        :param source_file: open file descriptor
1016

1017
        :param upload_db: progress.bar for uploading
1018
        """
1019

    
1020
        r = self.get_object_info(obj)
1021
        rf_size = int(r['content-length'])
1022
        if rf_size < int(start):
1023
            raise ClientError(
1024
                'Range start exceeds file size',
1025
                status=416)
1026
        elif rf_size < int(end):
1027
            raise ClientError(
1028
                'Range end exceeds file size',
1029
                status=416)
1030
        self._assert_container()
1031
        meta = self.get_container_info()
1032
        blocksize = int(meta['x-container-block-size'])
1033
        filesize = fstat(source_file.fileno()).st_size
1034
        datasize = int(end) - int(start) + 1
1035
        nblocks = 1 + (datasize - 1) // blocksize
1036
        offset = 0
1037
        if upload_cb:
1038
            upload_gen = upload_cb(nblocks)
1039
            upload_gen.next()
1040
        for i in range(nblocks):
1041
            read_size = min(blocksize, filesize - offset, datasize - offset)
1042
            block = source_file.read(read_size)
1043
            self.object_post(
1044
                obj,
1045
                update=True,
1046
                content_type='application/octet-stream',
1047
                content_length=len(block),
1048
                content_range='bytes %s-%s/*' % (
1049
                    start + offset,
1050
                    start + offset + len(block) - 1),
1051
                data=block)
1052
            offset += len(block)
1053

    
1054
            if upload_cb:
1055
                upload_gen.next()
1056

    
1057
    def copy_object(
1058
            self, src_container, src_object, dst_container,
1059
            dst_object=None,
1060
            source_version=None,
1061
            source_account=None,
1062
            public=False,
1063
            content_type=None,
1064
            delimiter=None):
1065
        """
1066
        :param src_container: (str) source container
1067

1068
        :param src_object: (str) source object path
1069

1070
        :param dst_container: (str) destination container
1071

1072
        :param dst_object: (str) destination object path
1073

1074
        :param source_version: (str) source object version
1075

1076
        :param source_account: (str) account to copy from
1077

1078
        :param public: (bool)
1079

1080
        :param content_type: (str)
1081

1082
        :param delimiter: (str)
1083
        """
1084
        self._assert_account()
1085
        self.container = dst_container
1086
        src_path = path4url(src_container, src_object)
1087
        self.object_put(
1088
            dst_object or src_object,
1089
            success=201,
1090
            copy_from=src_path,
1091
            content_length=0,
1092
            source_version=source_version,
1093
            source_account=source_account,
1094
            public=public,
1095
            content_type=content_type,
1096
            delimiter=delimiter)
1097

    
1098
    def move_object(
1099
            self, src_container, src_object, dst_container,
1100
            dst_object=False,
1101
            source_account=None,
1102
            source_version=None,
1103
            public=False,
1104
            content_type=None,
1105
            delimiter=None):
1106
        """
1107
        :param src_container: (str) source container
1108

1109
        :param src_object: (str) source object path
1110

1111
        :param dst_container: (str) destination container
1112

1113
        :param dst_object: (str) destination object path
1114

1115
        :param source_account: (str) account to move from
1116

1117
        :param source_version: (str) source object version
1118

1119
        :param public: (bool)
1120

1121
        :param content_type: (str)
1122

1123
        :param delimiter: (str)
1124
        """
1125
        self._assert_account()
1126
        self.container = dst_container
1127
        dst_object = dst_object or src_object
1128
        src_path = path4url(src_container, src_object)
1129
        self.object_put(
1130
            dst_object,
1131
            success=201,
1132
            move_from=src_path,
1133
            content_length=0,
1134
            source_account=source_account,
1135
            source_version=source_version,
1136
            public=public,
1137
            content_type=content_type,
1138
            delimiter=delimiter)
1139

    
1140
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1141
        """Get accounts that share with self.account
1142

1143
        :param limit: (str)
1144

1145
        :param marker: (str)
1146

1147
        :returns: (dict)
1148
        """
1149
        self._assert_account()
1150

    
1151
        self.set_param('format', 'json')
1152
        self.set_param('limit', limit, iff=limit is not None)
1153
        self.set_param('marker', marker, iff=marker is not None)
1154

    
1155
        path = ''
1156
        success = kwargs.pop('success', (200, 204))
1157
        r = self.get(path, *args, success=success, **kwargs)
1158
        return r.json
1159

    
1160
    def get_object_versionlist(self, obj):
1161
        """
1162
        :param obj: (str) remote object path
1163

1164
        :returns: (list)
1165
        """
1166
        self._assert_container()
1167
        r = self.object_get(obj, format='json', version='list')
1168
        return r.json['versions']