Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 6069b53b

History | View | Annotate | Download (32.9 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39

    
40
from binascii import hexlify
41

    
42
from kamaki.clients import SilentEvent, sendlog
43
from kamaki.clients.pithos_rest_api import PithosRestAPI
44
from kamaki.clients.storage import ClientError
45
from kamaki.clients.utils import path4url, filter_in
46
from StringIO import StringIO
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestAPI):
69
    """GRNet Pithos API client"""
70

    
71
    _thread_exceptions = []
72

    
73
    def __init__(self, base_url, token, account=None, container=None):
74
        super(PithosClient, self).__init__(base_url, token, account, container)
75

    
76
    def purge_container(self):
77
        """Delete an empty container and destroy associated blocks
78
        """
79
        r = self.container_delete(until=unicode(time()))
80
        r.release()
81

    
82
    def upload_object_unchunked(self, obj, f,
83
        withHashFile=False,
84
        size=None,
85
        etag=None,
86
        content_encoding=None,
87
        content_disposition=None,
88
        content_type=None,
89
        sharing=None,
90
        public=None):
91
        """
92
        :param obj: (str) remote object path
93

94
        :param f: open file descriptor
95

96
        :param withHashFile: (bool)
97

98
        :param size: (int) size of data to upload
99

100
        :param etag: (str)
101

102
        :param content_encoding: (str)
103

104
        :param content_disposition: (str)
105

106
        :param content_type: (str)
107

108
        :param sharing: {'read':[user and/or grp names],
109
            'write':[usr and/or grp names]}
110

111
        :param public: (bool)
112
        """
113
        self._assert_container()
114

    
115
        if withHashFile:
116
            data = f.read()
117
            try:
118
                import json
119
                data = json.dumps(json.loads(data))
120
            except ValueError:
121
                raise ClientError(message='"%s" is not json-formated' % f.name,
122
                    status=1)
123
            except SyntaxError:
124
                raise ClientError(message='"%s" is not a valid hashmap file'\
125
                % f.name, status=1)
126
            f = StringIO(data)
127
        data = f.read(size) if size is not None else f.read()
128
        r = self.object_put(obj,
129
            data=data,
130
            etag=etag,
131
            content_encoding=content_encoding,
132
            content_disposition=content_disposition,
133
            content_type=content_type,
134
            permissions=sharing,
135
            public=public,
136
            success=201)
137
        r.release()
138

    
139
    def create_object_by_manifestation(self, obj,
140
        etag=None,
141
        content_encoding=None,
142
        content_disposition=None,
143
        content_type=None,
144
        sharing=None,
145
        public=None):
146
        """
147
        :param obj: (str) remote object path
148

149
        :param etag: (str)
150

151
        :param content_encoding: (str)
152

153
        :param content_disposition: (str)
154

155
        :param content_type: (str)
156

157
        :param sharing: {'read':[user and/or grp names],
158
            'write':[usr and/or grp names]}
159

160
        :param public: (bool)
161
        """
162
        self._assert_container()
163
        r = self.object_put(obj,
164
            content_length=0,
165
            etag=etag,
166
            content_encoding=content_encoding,
167
            content_disposition=content_disposition,
168
            content_type=content_type,
169
            permissions=sharing,
170
            public=public,
171
            manifest='%s/%s' % (self.container, obj))
172
        r.release()
173

    
174
    # upload_* auxiliary methods
175
    def _put_block_async(self, data, hash, upload_gen=None):
176
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
177
        event.start()
178
        if upload_gen:
179
            upload_gen.next()
180
        return event
181

    
182
    def _put_block(self, data, hash):
183
        r = self.container_post(update=True,
184
            content_type='application/octet-stream',
185
            content_length=len(data),
186
            data=data,
187
            format='json')
188
        assert r.json[0] == hash, 'Local hash does not match server'
189

    
190
    def _get_file_block_info(self, fileobj, size=None):
191
        meta = self.get_container_info()
192
        blocksize = int(meta['x-container-block-size'])
193
        blockhash = meta['x-container-block-hash']
194
        size = size if size is not None else fstat(fileobj.fileno()).st_size
195
        nblocks = 1 + (size - 1) // blocksize
196
        return (blocksize, blockhash, size, nblocks)
197

    
198
    def _get_missing_hashes(self, obj, json,
199
        size=None,
200
        format='json',
201
        hashmap=True,
202
        content_type=None,
203
        etag=None,
204
        content_encoding=None,
205
        content_disposition=None,
206
        permissions=None,
207
        public=None,
208
        success=(201, 409)):
209
        r = self.object_put(obj,
210
            format='json',
211
            hashmap=True,
212
            content_type=content_type,
213
            json=json,
214
            etag=etag,
215
            content_encoding=content_encoding,
216
            content_disposition=content_disposition,
217
            permissions=permissions,
218
            public=public,
219
            success=success)
220
        if r.status_code == 201:
221
            r.release()
222
            return None
223
        return r.json
224

    
225
    def _caclulate_uploaded_blocks(self,
226
        blocksize,
227
        blockhash,
228
        size,
229
        nblocks,
230
        hashes,
231
        hmap,
232
        fileobj,
233
        hash_cb=None):
234
        offset = 0
235
        if hash_cb:
236
            hash_gen = hash_cb(nblocks)
237
            hash_gen.next()
238

    
239
        for i in range(nblocks):
240
            block = fileobj.read(min(blocksize, size - offset))
241
            bytes = len(block)
242
            hash = _pithos_hash(block, blockhash)
243
            hashes.append(hash)
244
            hmap[hash] = (offset, bytes)
245
            offset += bytes
246
            if hash_cb:
247
                hash_gen.next()
248
        assert offset == size
249

    
250
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
251
        """upload missing blocks asynchronously.
252
        """
253
        if upload_cb:
254
            upload_gen = upload_cb(len(missing))
255
            upload_gen.next()
256
        else:
257
            upload_gen = None
258

    
259
        self._init_thread_limit()
260

    
261
        flying = []
262
        for hash in missing:
263
            offset, bytes = hmap[hash]
264
            fileobj.seek(offset)
265
            data = fileobj.read(bytes)
266
            r = self._put_block_async(data, hash, upload_gen)
267
            flying.append(r)
268
            unfinished = []
269
            for i, thread in enumerate(flying):
270

    
271
                unfinished = self._watch_thread_limit(unfinished)
272

    
273
                if thread.isAlive() or thread.exception:
274
                    unfinished.append(thread)
275
                #else:
276
                    #if upload_cb:
277
                    #    upload_gen.next()
278
            flying = unfinished
279

    
280
        for thread in flying:
281
            thread.join()
282
            #upload_gen.next()
283

    
284
        failures = [r for r in flying if r.exception]
285
        if len(failures):
286
            details = ', '.join([' (%s).%s' % (i, r.exception)\
287
                for i, r in enumerate(failures)])
288
            raise ClientError(message="Block uploading failed",
289
                status=505,
290
                details=details)
291

    
292
    def upload_object(self, obj, f,
293
        size=None,
294
        hash_cb=None,
295
        upload_cb=None,
296
        etag=None,
297
        content_encoding=None,
298
        content_disposition=None,
299
        content_type=None,
300
        sharing=None,
301
        public=None):
302
        """Upload an object using multiple connections (threads)
303

304
        :param obj: (str) remote object path
305

306
        :param f: open file descriptor (rb)
307

308
        :param hash_cb: optional progress.bar object for calculating hashes
309

310
        :param upload_cb: optional progress.bar object for uploading
311

312
        :param etag: (str)
313

314
        :param content_encoding: (str)
315

316
        :param content_disposition: (str)
317

318
        :param content_type: (str)
319

320
        :param sharing: {'read':[user and/or grp names],
321
            'write':[usr and/or grp names]}
322

323
        :param public: (bool)
324
        """
325
        self._assert_container()
326

    
327
        #init
328
        block_info = (blocksize, blockhash, size, nblocks) =\
329
            self._get_file_block_info(f, size)
330
        (hashes, hmap, offset) = ([], {}, 0)
331
        if content_type is None:
332
            content_type = 'application/octet-stream'
333

    
334
        self._caclulate_uploaded_blocks(*block_info,
335
            hashes=hashes,
336
            hmap=hmap,
337
            fileobj=f,
338
            hash_cb=hash_cb)
339

    
340
        hashmap = dict(bytes=size, hashes=hashes)
341
        missing = self._get_missing_hashes(obj, hashmap,
342
            content_type=content_type,
343
            size=size,
344
            etag=etag,
345
            content_encoding=content_encoding,
346
            content_disposition=content_disposition,
347
            permissions=sharing,
348
            public=public)
349

    
350
        if missing is None:
351
            return
352

    
353
        try:
354
            self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
355
        except KeyboardInterrupt:
356
            sendlog.info('- - - wait for threads to finish')
357
            for thread in activethreads():
358
                thread.join()
359
            raise
360

    
361
        r = self.object_put(
362
            obj,
363
            format='json',
364
            hashmap=True,
365
            content_type=content_type,
366
            json=hashmap,
367
            success=201)
368
        r.release()
369

    
370
    # download_* auxiliary methods
371
    def _get_remote_blocks_info(self, obj, **restargs):
372
        #retrieve object hashmap
373
        myrange = restargs.pop('data_range', None)
374
        hashmap = self.get_object_hashmapp(obj, **restargs)
375
        restargs['data_range'] = myrange
376
        blocksize = int(hashmap['block_size'])
377
        blockhash = hashmap['block_hash']
378
        total_size = hashmap['bytes']
379
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
380
        map_dict = {}
381
        for i, h in enumerate(hashmap['hashes']):
382
            map_dict[h] = i
383
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
384

    
385
    def _dump_blocks_sync(self,
386
        obj,
387
        remote_hashes,
388
        blocksize,
389
        total_size,
390
        dst,
391
        range,
392
        **restargs):
393
        for blockid, blockhash in enumerate(remote_hashes):
394
            if blockhash == None:
395
                continue
396
            start = blocksize * blockid
397
            end = total_size - 1 if start + blocksize > total_size\
398
                else start + blocksize - 1
399
            (start, end) = _range_up(start, end, range)
400
            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
401
            r = self.object_get(obj, success=(200, 206), **restargs)
402
            self._cb_next()
403
            dst.write(r.content)
404
            dst.flush()
405

    
406
    def _get_block_async(self, obj, **restargs):
407
        event = SilentEvent(self.object_get,
408
            obj,
409
            success=(200, 206),
410
            **restargs)
411
        event.start()
412
        return event
413

    
414
    def _hash_from_file(self, fp, start, size, blockhash):
415
        fp.seek(start)
416
        block = fp.read(size)
417
        h = newhashlib(blockhash)
418
        h.update(block.strip('\x00'))
419
        return hexlify(h.digest())
420

    
421
    def _thread2file(self,
422
        flying,
423
        local_file,
424
        offset=0,
425
        **restargs):
426
        """write the results of a greenleted rest call to a file
427
        @offset: the offset of the file up to blocksize
428
            - e.g. if the range is 10-100, all
429
        blocks will be written to normal_position - 10"""
430
        finished = []
431
        for i, (start, g) in enumerate(flying.items()):
432
            if not g.isAlive():
433
                if g.exception:
434
                    raise g.exception
435
                block = g.value.content
436
                local_file.seek(start - offset)
437
                local_file.write(block)
438
                self._cb_next()
439
                finished.append(flying.pop(start))
440
        local_file.flush()
441
        return finished
442

    
443
    def _dump_blocks_async(self,
444
        obj,
445
        remote_hashes,
446
        blocksize,
447
        total_size,
448
        local_file,
449
        blockhash=None,
450
        resume=False,
451
        filerange=None,
452
        **restargs):
453

    
454
        file_size = fstat(local_file.fileno()).st_size if resume else 0
455
        flying = {}
456
        finished = []
457
        offset = 0
458
        if filerange is not None:
459
            rstart = int(filerange.split('-')[0])
460
            offset = rstart if blocksize > rstart else rstart % blocksize
461

    
462
        self._init_thread_limit()
463
        for block_hash, blockid in remote_hashes.items():
464
            start = blocksize * blockid
465
            if start < file_size\
466
            and block_hash == self._hash_from_file(
467
                    local_file,
468
                    start,
469
                    blocksize,
470
                    blockhash):
471
                self._cb_next()
472
                continue
473
            self._watch_thread_limit(flying.values())
474
            finished += self._thread2file(
475
                flying,
476
                local_file,
477
                offset,
478
                **restargs)
479
            end = total_size - 1 if start + blocksize > total_size\
480
                else start + blocksize - 1
481
            (start, end) = _range_up(start, end, filerange)
482
            if start == end:
483
                self._cb_next()
484
                continue
485
            restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
486
            flying[start] = self._get_block_async(obj, **restargs)
487

    
488
        for thread in flying.values():
489
            thread.join()
490
        finished += self._thread2file(flying, local_file, offset, **restargs)
491

    
492
    def download_object(self,
493
        obj,
494
        dst,
495
        download_cb=None,
496
        version=None,
497
        resume=False,
498
        range=None,
499
        if_match=None,
500
        if_none_match=None,
501
        if_modified_since=None,
502
        if_unmodified_since=None):
503
        """Download an object using multiple connections (threads) and
504
            writing to random parts of the file
505

506
        :param obj: (str) remote object path
507

508
        :param dst: open file descriptor (wb+)
509

510
        :param download_cb: optional progress.bar object for downloading
511

512
        :param version: (str) file version
513

514
        :param resume: (bool) if set, preserve already downloaded file parts
515

516
        :param range: (str) from-to where from and to are integers denoting
517
            file positions in bytes
518

519
        :param if_match: (str)
520

521
        :param if_none_match: (str)
522

523
        :param if_modified_since: (str) formated date
524

525
        :param if_unmodified_since: (str) formated date
526
        """
527

    
528
        restargs = dict(version=version,
529
            data_range=None if range is None else 'bytes=%s' % range,
530
            if_match=if_match,
531
            if_none_match=if_none_match,
532
            if_modified_since=if_modified_since,
533
            if_unmodified_since=if_unmodified_since)
534

    
535
        (blocksize,
536
            blockhash,
537
            total_size,
538
            hash_list,
539
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
540
        assert total_size >= 0
541

    
542
        if download_cb:
543
            self.progress_bar_gen = download_cb(len(remote_hashes))
544
            self._cb_next()
545

    
546
        if dst.isatty():
547
            self._dump_blocks_sync(obj,
548
                hash_list,
549
                blocksize,
550
                total_size,
551
                dst,
552
                range,
553
                **restargs)
554
        else:
555
            self._dump_blocks_async(obj,
556
                remote_hashes,
557
                blocksize,
558
                total_size,
559
                dst,
560
                blockhash,
561
                resume,
562
                range,
563
                **restargs)
564
            if range is None:
565
                dst.truncate(total_size)
566

    
567
        self._complete_cb()
568

    
569
    #Command Progress Bar method
570
    def _cb_next(self):
571
        if hasattr(self, 'progress_bar_gen'):
572
            try:
573
                self.progress_bar_gen.next()
574
            except:
575
                pass
576

    
577
    def _complete_cb(self):
578
        while True:
579
            try:
580
                self.progress_bar_gen.next()
581
            except:
582
                break
583

    
584
    def get_object_hashmapp(self, obj,
585
        version=None,
586
        if_match=None,
587
        if_none_match=None,
588
        if_modified_since=None,
589
        if_unmodified_since=None,
590
        data_range=None):
591
        """
592
        :param obj: (str) remote object path
593

594
        :param if_match: (str)
595

596
        :param if_none_match: (str)
597

598
        :param if_modified_since: (str) formated date
599

600
        :param if_unmodified_since: (str) formated date
601

602
        :param data_range: (str) from-to where from and to are integers
603
            denoting file positions in bytes
604

605
        :returns: (list)
606
        """
607
        try:
608
            r = self.object_get(obj,
609
                hashmap=True,
610
                version=version,
611
                if_etag_match=if_match,
612
                if_etag_not_match=if_none_match,
613
                if_modified_since=if_modified_since,
614
                if_unmodified_since=if_unmodified_since,
615
                data_range=data_range)
616
        except ClientError as err:
617
            if err.status == 304 or err.status == 412:
618
                return {}
619
            raise
620
        return r.json
621

    
622
    def set_account_group(self, group, usernames):
623
        """
624
        :param group: (str)
625

626
        :param usernames: (list)
627
        """
628
        r = self.account_post(update=True, groups={group: usernames})
629
        r.release()
630

    
631
    def del_account_group(self, group):
632
        """
633
        :param group: (str)
634
        """
635
        r = self.account_post(update=True, groups={group: []})
636
        r.release()
637

    
638
    def get_account_info(self, until=None):
639
        """
640
        :param until: (str) formated date
641

642
        :returns: (dict)
643
        """
644
        r = self.account_head(until=until)
645
        if r.status_code == 401:
646
            raise ClientError("No authorization")
647
        return r.headers
648

    
649
    def get_account_quota(self):
650
        """
651
        :returns: (dict)
652
        """
653
        return filter_in(self.get_account_info(),
654
            'X-Account-Policy-Quota',
655
            exactMatch=True)
656

    
657
    def get_account_versioning(self):
658
        """
659
        :returns: (dict)
660
        """
661
        return filter_in(self.get_account_info(),
662
            'X-Account-Policy-Versioning',
663
            exactMatch=True)
664

    
665
    def get_account_meta(self, until=None):
666
        """
667
        :meta until: (str) formated date
668

669
        :returns: (dict)
670
        """
671
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
672

    
673
    def get_account_group(self):
674
        """
675
        :returns: (dict)
676
        """
677
        return filter_in(self.get_account_info(), 'X-Account-Group-')
678

    
679
    def set_account_meta(self, metapairs):
680
        """
681
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
682
        """
683
        assert(type(metapairs) is dict)
684
        r = self.account_post(update=True, metadata=metapairs)
685
        r.release()
686

    
687
    def del_account_meta(self, metakey):
688
        """
689
        :param metakey: (str) metadatum key
690
        """
691
        r = self.account_post(update=True, metadata={metakey: ''})
692
        r.release()
693

    
694
    def set_account_quota(self, quota):
695
        """
696
        :param quota: (int)
697
        """
698
        r = self.account_post(update=True, quota=quota)
699
        r.release()
700

    
701
    def set_account_versioning(self, versioning):
702
        """
703
        "param versioning: (str)
704
        """
705
        r = self.account_post(update=True, versioning=versioning)
706
        r.release()
707

    
708
    def list_containers(self):
709
        """
710
        :returns: (dict)
711
        """
712
        r = self.account_get()
713
        return r.json
714

    
715
    def del_container(self, until=None, delimiter=None):
716
        """
717
        :param until: (str) formated date
718

719
        :param delimiter: (str) with / empty container
720

721
        :raises ClientError: 404 Container does not exist
722

723
        :raises ClientError: 409 Container is not empty
724
        """
725
        self._assert_container()
726
        r = self.container_delete(until=until,
727
            delimiter=delimiter,
728
            success=(204, 404, 409))
729
        r.release()
730
        if r.status_code == 404:
731
            raise ClientError('Container "%s" does not exist' % self.container,
732
                r.status_code)
733
        elif r.status_code == 409:
734
            raise ClientError('Container "%s" is not empty' % self.container,
735
                r.status_code)
736

    
737
    def get_container_versioning(self, container):
738
        """
739
        :param container: (str)
740

741
        :returns: (dict)
742
        """
743
        self.container = container
744
        return filter_in(self.get_container_info(),
745
            'X-Container-Policy-Versioning')
746

    
747
    def get_container_quota(self, container):
748
        """
749
        :param container: (str)
750

751
        :returns: (dict)
752
        """
753
        self.container = container
754
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
755

    
756
    def get_container_info(self, until=None):
757
        """
758
        :param until: (str) formated date
759

760
        :returns: (dict)
761
        """
762
        r = self.container_head(until=until)
763
        return r.headers
764

    
765
    def get_container_meta(self, until=None):
766
        """
767
        :param until: (str) formated date
768

769
        :returns: (dict)
770
        """
771
        return filter_in(self.get_container_info(until=until),
772
            'X-Container-Meta')
773

    
774
    def get_container_object_meta(self, until=None):
775
        """
776
        :param until: (str) formated date
777

778
        :returns: (dict)
779
        """
780
        return filter_in(self.get_container_info(until=until),
781
            'X-Container-Object-Meta')
782

    
783
    def set_container_meta(self, metapairs):
784
        """
785
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
786
        """
787
        assert(type(metapairs) is dict)
788
        r = self.container_post(update=True, metadata=metapairs)
789
        r.release()
790

    
791
    def del_container_meta(self, metakey):
792
        """
793
        :param metakey: (str) metadatum key
794
        """
795
        r = self.container_post(update=True, metadata={metakey: ''})
796
        r.release()
797

    
798
    def set_container_quota(self, quota):
799
        """
800
        :param quota: (int)
801
        """
802
        r = self.container_post(update=True, quota=quota)
803
        r.release()
804

    
805
    def set_container_versioning(self, versioning):
806
        """
807
        :param versioning: (str)
808
        """
809
        r = self.container_post(update=True, versioning=versioning)
810
        r.release()
811

    
812
    def del_object(self, obj, until=None, delimiter=None):
813
        """
814
        :param obj: (str) remote object path
815

816
        :param until: (str) formated date
817

818
        :param delimiter: (str)
819
        """
820
        self._assert_container()
821
        r = self.object_delete(obj, until=until, delimiter=delimiter)
822
        r.release()
823

    
824
    def set_object_meta(self, obj, metapairs):
825
        """
826
        :param obj: (str) remote object path
827

828
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
829
        """
830
        assert(type(metapairs) is dict)
831
        r = self.object_post(obj, update=True, metadata=metapairs)
832
        r.release()
833

    
834
    def del_object_meta(self, obj, metakey):
835
        """
836
        :param obj: (str) remote object path
837

838
        :param metakey: (str) metadatum key
839
        """
840
        r = self.object_post(obj, update=True, metadata={metakey: ''})
841
        r.release()
842

    
843
    def publish_object(self, obj):
844
        """
845
        :param obj: (str) remote object path
846
        """
847
        r = self.object_post(obj, update=True, public=True)
848
        r.release()
849

    
850
    def unpublish_object(self, obj):
851
        """
852
        :param obj: (str) remote object path
853
        """
854
        r = self.object_post(obj, update=True, public=False)
855
        r.release()
856

    
857
    def get_object_info(self, obj, version=None):
858
        """
859
        :param obj: (str) remote object path
860

861
        :param version: (str)
862

863
        :returns: (dict)
864
        """
865
        r = self.object_head(obj, version=version)
866
        return r.headers
867

    
868
    def get_object_meta(self, obj, version=None):
869
        """
870
        :param obj: (str) remote object path
871

872
        :param version: (str)
873

874
        :returns: (dict)
875
        """
876
        return filter_in(self.get_object_info(obj, version=version),
877
            'X-Object-Meta')
878

    
879
    def get_object_sharing(self, obj):
880
        """
881
        :param obj: (str) remote object path
882

883
        :returns: (dict)
884
        """
885
        r = filter_in(self.get_object_info(obj),
886
            'X-Object-Sharing',
887
            exactMatch=True)
888
        reply = {}
889
        if len(r) > 0:
890
            perms = r['x-object-sharing'].split(';')
891
            for perm in perms:
892
                try:
893
                    perm.index('=')
894
                except ValueError:
895
                    raise ClientError('Incorrect reply format')
896
                (key, val) = perm.strip().split('=')
897
                reply[key] = val
898
        return reply
899

    
900
    def set_object_sharing(self, obj,
901
        read_permition=False,
902
        write_permition=False):
903
        """Give read/write permisions to an object.
904

905
        :param obj: (str) remote object path
906

907
        :param read_permition: (list - bool) users and user groups that get
908
            read permition for this object - False means all previous read
909
            permissions will be removed
910

911
        :param write_perimition: (list - bool) of users and user groups to get
912
           write permition for this object - False means all previous write
913
           permissions will be removed
914
        """
915

    
916
        perms = dict(read='' if not read_permition else read_permition,
917
            write='' if not write_permition else write_permition)
918
        r = self.object_post(obj, update=True, permissions=perms)
919
        r.release()
920

    
921
    def del_object_sharing(self, obj):
922
        """
923
        :param obj: (str) remote object path
924
        """
925
        self.set_object_sharing(obj)
926

    
927
    def append_object(self, obj, source_file, upload_cb=None):
928
        """
929
        :param obj: (str) remote object path
930

931
        :param source_file: open file descriptor
932

933
        :param upload_db: progress.bar for uploading
934
        """
935

    
936
        self._assert_container()
937
        meta = self.get_container_info()
938
        blocksize = int(meta['x-container-block-size'])
939
        filesize = fstat(source_file.fileno()).st_size
940
        nblocks = 1 + (filesize - 1) // blocksize
941
        offset = 0
942
        if upload_cb is not None:
943
            upload_gen = upload_cb(nblocks)
944
        for i in range(nblocks):
945
            block = source_file.read(min(blocksize, filesize - offset))
946
            offset += len(block)
947
            r = self.object_post(obj,
948
                update=True,
949
                content_range='bytes */*',
950
                content_type='application/octet-stream',
951
                content_length=len(block),
952
                data=block)
953
            r.release()
954

    
955
            if upload_cb is not None:
956
                upload_gen.next()
957

    
958
    def truncate_object(self, obj, upto_bytes):
959
        """
960
        :param obj: (str) remote object path
961

962
        :param upto_bytes: max number of bytes to leave on file
963
        """
964
        r = self.object_post(obj,
965
            update=True,
966
            content_range='bytes 0-%s/*' % upto_bytes,
967
            content_type='application/octet-stream',
968
            object_bytes=upto_bytes,
969
            source_object=path4url(self.container, obj))
970
        r.release()
971

    
972
    def overwrite_object(self,
973
        obj,
974
        start,
975
        end,
976
        source_file,
977
        upload_cb=None):
978
        """Overwrite a part of an object from local source file
979

980
        :param obj: (str) remote object path
981

982
        :param start: (int) position in bytes to start overwriting from
983

984
        :param end: (int) position in bytes to stop overwriting at
985

986
        :param source_file: open file descriptor
987

988
        :param upload_db: progress.bar for uploading
989
        """
990

    
991
        self._assert_container()
992
        meta = self.get_container_info()
993
        blocksize = int(meta['x-container-block-size'])
994
        filesize = fstat(source_file.fileno()).st_size
995
        datasize = int(end) - int(start) + 1
996
        nblocks = 1 + (datasize - 1) // blocksize
997
        offset = 0
998
        if upload_cb is not None:
999
            upload_gen = upload_cb(nblocks)
1000
        for i in range(nblocks):
1001
            block = source_file.read(min(blocksize,
1002
                filesize - offset,
1003
                datasize - offset))
1004
            offset += len(block)
1005
            r = self.object_post(obj,
1006
                update=True,
1007
                content_type='application/octet-stream',
1008
                content_length=len(block),
1009
                content_range='bytes %s-%s/*' % (start, end),
1010
                data=block)
1011
            r.release()
1012

    
1013
            if upload_cb is not None:
1014
                upload_gen.next()
1015

    
1016
    def copy_object(self, src_container, src_object, dst_container,
1017
        dst_object=False,
1018
        source_version=None,
1019
        public=False,
1020
        content_type=None,
1021
        delimiter=None):
1022
        """
1023
        :param src_container: (str) source container
1024

1025
        :param src_object: (str) source object path
1026

1027
        :param dst_container: (str) destination container
1028

1029
        :param dst_object: (str) destination object path
1030

1031
        :param source_version: (str) source object version
1032

1033
        :param public: (bool)
1034

1035
        :param content_type: (str)
1036

1037
        :param delimiter: (str)
1038
        """
1039
        self._assert_account()
1040
        self.container = dst_container
1041
        dst_object = dst_object or src_object
1042
        src_path = path4url(src_container, src_object)
1043
        r = self.object_put(dst_object,
1044
            success=201,
1045
            copy_from=src_path,
1046
            content_length=0,
1047
            source_version=source_version,
1048
            public=public,
1049
            content_type=content_type,
1050
            delimiter=delimiter)
1051
        r.release()
1052

    
1053
    def move_object(self, src_container, src_object, dst_container,
1054
        dst_object=False,
1055
        source_version=None,
1056
        public=False,
1057
        content_type=None,
1058
        delimiter=None):
1059
        """
1060
        :param src_container: (str) source container
1061

1062
        :param src_object: (str) source object path
1063

1064
        :param dst_container: (str) destination container
1065

1066
        :param dst_object: (str) destination object path
1067

1068
        :param source_version: (str) source object version
1069

1070
        :param public: (bool)
1071

1072
        :param content_type: (str)
1073

1074
        :param delimiter: (str)
1075
        """
1076
        self._assert_account()
1077
        self.container = dst_container
1078
        dst_object = dst_object or src_object
1079
        src_path = path4url(src_container, src_object)
1080
        r = self.object_put(dst_object,
1081
            success=201,
1082
            move_from=src_path,
1083
            content_length=0,
1084
            source_version=source_version,
1085
            public=public,
1086
            content_type=content_type,
1087
            delimiter=delimiter)
1088
        r.release()
1089

    
1090
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1091
        """Get accounts that share with self.account
1092

1093
        :param limit: (str)
1094

1095
        :param marker: (str)
1096

1097
        :returns: (dict)
1098
        """
1099
        self._assert_account()
1100

    
1101
        self.set_param('format', 'json')
1102
        self.set_param('limit', limit, iff=limit is not None)
1103
        self.set_param('marker', marker, iff=marker is not None)
1104

    
1105
        path = ''
1106
        success = kwargs.pop('success', (200, 204))
1107
        r = self.get(path, *args, success=success, **kwargs)
1108
        return r.json
1109

    
1110
    def get_object_versionlist(self, obj):
1111
        """
1112
        :param obj: (str) remote object path
1113

1114
        :returns: (list)
1115
        """
1116
        self._assert_container()
1117
        r = self.object_get(obj, format='json', version='list')
1118
        return r.json['versions']