ef93e5b461148f8089e9695a5c8a0980b2f186f0
[kamaki] / kamaki / clients / pithos.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent
43 from kamaki.clients.pithos_rest_api import PithosRestAPI
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestAPI):
69     """GRNet Pithos API client"""
70
71     _thread_exceptions = []
72
73     def __init__(self, base_url, token, account=None, container=None):
74         super(PithosClient, self).__init__(base_url, token, account, container)
75
76     def purge_container(self):
77         """Delete an empty container and destroy associated blocks
78         """
79         r = self.container_delete(until=unicode(time()))
80         r.release()
81
82     def upload_object_unchunked(self, obj, f,
83         withHashFile=False,
84         size=None,
85         etag=None,
86         content_encoding=None,
87         content_disposition=None,
88         content_type=None,
89         sharing=None,
90         public=None):
91         """
92         :param obj: (str) remote object path
93
94         :param f: open file descriptor
95
96         :param withHashFile: (bool)
97
98         :param size: (int) size of data to upload
99
100         :param etag: (str)
101
102         :param content_encoding: (str)
103
104         :param content_disposition: (str)
105
106         :param content_type: (str)
107
108         :param sharing: {'read':[user and/or grp names],
109             'write':[usr and/or grp names]}
110
111         :param public: (bool)
112         """
113         self._assert_container()
114
115         if withHashFile:
116             data = f.read()
117             try:
118                 import json
119                 data = json.dumps(json.loads(data))
120             except ValueError:
121                 raise ClientError(message='"%s" is not json-formated' % f.name,
122                     status=1)
123             except SyntaxError:
124                 raise ClientError(message='"%s" is not a valid hashmap file'\
125                 % f.name, status=1)
126             f = StringIO(data)
127         data = f.read(size) if size is not None else f.read()
128         r = self.object_put(obj,
129             data=data,
130             etag=etag,
131             content_encoding=content_encoding,
132             content_disposition=content_disposition,
133             content_type=content_type,
134             permissions=sharing,
135             public=public,
136             success=201)
137         r.release()
138
139     def create_object_by_manifestation(self, obj,
140         etag=None,
141         content_encoding=None,
142         content_disposition=None,
143         content_type=None,
144         sharing=None,
145         public=None):
146         """
147         :param obj: (str) remote object path
148
149         :param etag: (str)
150
151         :param content_encoding: (str)
152
153         :param content_disposition: (str)
154
155         :param content_type: (str)
156
157         :param sharing: {'read':[user and/or grp names],
158             'write':[usr and/or grp names]}
159
160         :param public: (bool)
161         """
162         self._assert_container()
163         r = self.object_put(obj,
164             content_length=0,
165             etag=etag,
166             content_encoding=content_encoding,
167             content_disposition=content_disposition,
168             content_type=content_type,
169             permissions=sharing,
170             public=public,
171             manifest='%s/%s' % (self.container, obj))
172         r.release()
173
174     # upload_* auxiliary methods
175     def _put_block_async(self, data, hash):
176         event = SilentEvent(method=self._put_block, data=data, hash=hash)
177         event.start()
178         return event
179
180     def _put_block(self, data, hash):
181         r = self.container_post(update=True,
182             content_type='application/octet-stream',
183             content_length=len(data),
184             data=data,
185             format='json')
186         assert r.json[0] == hash, 'Local hash does not match server'
187
188     def _get_file_block_info(self, fileobj, size=None):
189         meta = self.get_container_info()
190         blocksize = int(meta['x-container-block-size'])
191         blockhash = meta['x-container-block-hash']
192         size = size if size is not None else fstat(fileobj.fileno()).st_size
193         nblocks = 1 + (size - 1) // blocksize
194         return (blocksize, blockhash, size, nblocks)
195
196     def _get_missing_hashes(self, obj, json,
197         size=None,
198         format='json',
199         hashmap=True,
200         content_type=None,
201         etag=None,
202         content_encoding=None,
203         content_disposition=None,
204         permissions=None,
205         public=None,
206         success=(201, 409)):
207         r = self.object_put(obj,
208             format='json',
209             hashmap=True,
210             content_type=content_type,
211             json=json,
212             etag=etag,
213             content_encoding=content_encoding,
214             content_disposition=content_disposition,
215             permissions=permissions,
216             public=public,
217             success=success)
218         if r.status_code == 201:
219             r.release()
220             return None
221         return r.json
222
223     def _caclulate_uploaded_blocks(self,
224         blocksize,
225         blockhash,
226         size,
227         nblocks,
228         hashes,
229         hmap,
230         fileobj,
231         hash_cb=None):
232         offset = 0
233         if hash_cb:
234             hash_gen = hash_cb(nblocks)
235             hash_gen.next()
236
237         for i in range(nblocks):
238             block = fileobj.read(min(blocksize, size - offset))
239             bytes = len(block)
240             hash = _pithos_hash(block, blockhash)
241             hashes.append(hash)
242             hmap[hash] = (offset, bytes)
243             offset += bytes
244             if hash_cb:
245                 hash_gen.next()
246         assert offset == size
247
248     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
249         """upload missing blocks asynchronously.
250         """
251         if upload_cb:
252             upload_gen = upload_cb(len(missing))
253             upload_gen.next()
254
255         self._init_thread_limit()
256
257         flying = []
258         for hash in missing:
259             offset, bytes = hmap[hash]
260             fileobj.seek(offset)
261             data = fileobj.read(bytes)
262             r = self._put_block_async(data, hash)
263             flying.append(r)
264             unfinished = []
265             for i, thread in enumerate(flying):
266
267                 unfinished = self._watch_thread_limit(unfinished)
268
269                 if thread.isAlive() or thread.exception:
270                     unfinished.append(thread)
271                 else:
272                     if upload_cb:
273                         upload_gen.next()
274             flying = unfinished
275
276         for thread in flying:
277             thread.join()
278
279         failures = [r for r in flying if r.exception]
280         if len(failures):
281             details = ', '.join([' (%s).%s' % (i, r.exception)\
282                 for i, r in enumerate(failures)])
283             raise ClientError(message="Block uploading failed",
284                 status=505,
285                 details=details)
286
287         while upload_cb:
288             try:
289                 upload_gen.next()
290             except StopIteration:
291                 break
292
293     def upload_object(self, obj, f,
294         size=None,
295         hash_cb=None,
296         upload_cb=None,
297         etag=None,
298         content_encoding=None,
299         content_disposition=None,
300         content_type=None,
301         sharing=None,
302         public=None):
303         """Upload an object using multiple connections (threads)
304
305         :param obj: (str) remote object path
306
307         :param f: open file descriptor (rb)
308
309         :param hash_cb: optional progress.bar object for calculating hashes
310
311         :param upload_cb: optional progress.bar object for uploading
312
313         :param etag: (str)
314
315         :param content_encoding: (str)
316
317         :param content_disposition: (str)
318
319         :param content_type: (str)
320
321         :param sharing: {'read':[user and/or grp names],
322             'write':[usr and/or grp names]}
323
324         :param public: (bool)
325         """
326         self._assert_container()
327
328         #init
329         block_info = (blocksize, blockhash, size, nblocks) =\
330             self._get_file_block_info(f, size)
331         (hashes, hmap, offset) = ([], {}, 0)
332         if content_type is None:
333             content_type = 'application/octet-stream'
334
335         self._caclulate_uploaded_blocks(*block_info,
336             hashes=hashes,
337             hmap=hmap,
338             fileobj=f,
339             hash_cb=hash_cb)
340
341         hashmap = dict(bytes=size, hashes=hashes)
342         missing = self._get_missing_hashes(obj, hashmap,
343             content_type=content_type,
344             size=size,
345             etag=etag,
346             content_encoding=content_encoding,
347             content_disposition=content_disposition,
348             permissions=sharing,
349             public=public)
350
351         if missing is None:
352             return
353         try:
354             self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
355         except KeyboardInterrupt:
356             print('- - - wait for threads to finish')
357             for thread in activethreads():
358                 thread.join()
359             raise
360
361         r = self.object_put(
362             obj,
363             format='json',
364             hashmap=True,
365             content_type=content_type,
366             json=hashmap,
367             success=201)
368         r.release()
369
370     # download_* auxiliary methods
371     def _get_remote_blocks_info(self, obj, **restargs):
372         #retrieve object hashmap
373         myrange = restargs.pop('data_range', None)
374         hashmap = self.get_object_hashmapp(obj, **restargs)
375         restargs['data_range'] = myrange
376         blocksize = int(hashmap['block_size'])
377         blockhash = hashmap['block_hash']
378         total_size = hashmap['bytes']
379         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
380         map_dict = {}
381         for i, h in enumerate(hashmap['hashes']):
382             map_dict[h] = i
383         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
384
385     def _dump_blocks_sync(self,
386         obj,
387         remote_hashes,
388         blocksize,
389         total_size,
390         dst,
391         range,
392         **restargs):
393         for blockid, blockhash in enumerate(remote_hashes):
394             if blockhash == None:
395                 continue
396             start = blocksize * blockid
397             end = total_size - 1 if start + blocksize > total_size\
398                 else start + blocksize - 1
399             (start, end) = _range_up(start, end, range)
400             restargs['data_range'] = 'bytes=%s-%s' % (start, end)
401             r = self.object_get(obj, success=(200, 206), **restargs)
402             self._cb_next()
403             dst.write(r.content)
404             dst.flush()
405
406     def _get_block_async(self, obj, **restargs):
407         event = SilentEvent(self.object_get,
408             obj,
409             success=(200, 206),
410             **restargs)
411         event.start()
412         return event
413
414     def _hash_from_file(self, fp, start, size, blockhash):
415         fp.seek(start)
416         block = fp.read(size)
417         h = newhashlib(blockhash)
418         h.update(block.strip('\x00'))
419         return hexlify(h.digest())
420
421     def _thread2file(self,
422         flying,
423         local_file,
424         offset=0,
425         **restargs):
426         """write the results of a greenleted rest call to a file
427         @offset: the offset of the file up to blocksize
428             - e.g. if the range is 10-100, all
429         blocks will be written to normal_position - 10"""
430         finished = []
431         for i, (start, g) in enumerate(flying.items()):
432             if not g.isAlive():
433                 if g.exception:
434                     raise g.exception
435                 block = g.value.content
436                 local_file.seek(start - offset)
437                 local_file.write(block)
438                 self._cb_next()
439                 finished.append(flying.pop(start))
440         local_file.flush()
441         return finished
442
443     def _dump_blocks_async(self,
444         obj,
445         remote_hashes,
446         blocksize,
447         total_size,
448         local_file,
449         blockhash=None,
450         resume=False,
451         filerange=None,
452         **restargs):
453
454         file_size = fstat(local_file.fileno()).st_size if resume else 0
455         flying = {}
456         finished = []
457         offset = 0
458         if filerange is not None:
459             rstart = int(filerange.split('-')[0])
460             offset = rstart if blocksize > rstart else rstart % blocksize
461
462         self._init_thread_limit()
463         for block_hash, blockid in remote_hashes.items():
464             start = blocksize * blockid
465             if start < file_size\
466             and block_hash == self._hash_from_file(
467                     local_file,
468                     start,
469                     blocksize,
470                     blockhash):
471                 self._cb_next()
472                 continue
473             self._watch_thread_limit(flying.values())
474             finished += self._thread2file(
475                 flying,
476                 local_file,
477                 offset,
478                 **restargs)
479             end = total_size - 1 if start + blocksize > total_size\
480                 else start + blocksize - 1
481             (start, end) = _range_up(start, end, filerange)
482             if start == end:
483                 self._cb_next()
484                 continue
485             restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
486             flying[start] = self._get_block_async(obj, **restargs)
487
488         for thread in flying.values():
489             thread.join()
490         finished += self._thread2file(flying, local_file, offset, **restargs)
491
492     def download_object(self,
493         obj,
494         dst,
495         download_cb=None,
496         version=None,
497         resume=False,
498         range=None,
499         if_match=None,
500         if_none_match=None,
501         if_modified_since=None,
502         if_unmodified_since=None):
503         """Download an object using multiple connections (threads) and
504             writing to random parts of the file
505
506         :param obj: (str) remote object path
507
508         :param dst: open file descriptor (wb+)
509
510         :param download_cb: optional progress.bar object for downloading
511
512         :param version: (str) file version
513
514         :param resume: (bool) if set, preserve already downloaded file parts
515
516         :param range: (str) from-to where from and to are integers denoting
517             file positions in bytes
518
519         :param if_match: (str)
520
521         :param if_none_match: (str)
522
523         :param if_modified_since: (str) formated date
524
525         :param if_unmodified_since: (str) formated date
526         """
527
528         restargs = dict(version=version,
529             data_range=None if range is None else 'bytes=%s' % range,
530             if_match=if_match,
531             if_none_match=if_none_match,
532             if_modified_since=if_modified_since,
533             if_unmodified_since=if_unmodified_since)
534
535         (blocksize,
536             blockhash,
537             total_size,
538             hash_list,
539             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
540         assert total_size >= 0
541
542         if download_cb:
543             self.progress_bar_gen = download_cb(len(remote_hashes))
544             self._cb_next()
545
546         if dst.isatty():
547             self._dump_blocks_sync(obj,
548                 hash_list,
549                 blocksize,
550                 total_size,
551                 dst,
552                 range,
553                 **restargs)
554         else:
555             self._dump_blocks_async(obj,
556                 remote_hashes,
557                 blocksize,
558                 total_size,
559                 dst,
560                 blockhash,
561                 resume,
562                 range,
563                 **restargs)
564             if range is None:
565                 dst.truncate(total_size)
566
567         self._complete_cb()
568
569     #Command Progress Bar method
570     def _cb_next(self):
571         if hasattr(self, 'progress_bar_gen'):
572             try:
573                 self.progress_bar_gen.next()
574             except:
575                 pass
576
577     def _complete_cb(self):
578         while True:
579             try:
580                 self.progress_bar_gen.next()
581             except:
582                 break
583
584     def get_object_hashmapp(self, obj,
585         version=None,
586         if_match=None,
587         if_none_match=None,
588         if_modified_since=None,
589         if_unmodified_since=None,
590         data_range=None):
591         """
592         :param obj: (str) remote object path
593
594         :param if_match: (str)
595
596         :param if_none_match: (str)
597
598         :param if_modified_since: (str) formated date
599
600         :param if_unmodified_since: (str) formated date
601
602         :param data_range: (str) from-to where from and to are integers
603             denoting file positions in bytes
604
605         :returns: (list)
606         """
607         try:
608             r = self.object_get(obj,
609                 hashmap=True,
610                 version=version,
611                 if_etag_match=if_match,
612                 if_etag_not_match=if_none_match,
613                 if_modified_since=if_modified_since,
614                 if_unmodified_since=if_unmodified_since,
615                 data_range=data_range)
616         except ClientError as err:
617             if err.status == 304 or err.status == 412:
618                 return {}
619             raise
620         return r.json
621
622     def set_account_group(self, group, usernames):
623         """
624         :param group: (str)
625
626         :param usernames: (list)
627         """
628         r = self.account_post(update=True, groups={group: usernames})
629         r.release()
630
631     def del_account_group(self, group):
632         """
633         :param group: (str)
634         """
635         r = self.account_post(update=True, groups={group: []})
636         r.release()
637
638     def get_account_info(self, until=None):
639         """
640         :param until: (str) formated date
641
642         :returns: (dict)
643         """
644         r = self.account_head(until=until)
645         if r.status_code == 401:
646             raise ClientError("No authorization")
647         return r.headers
648
649     def get_account_quota(self):
650         """
651         :returns: (dict)
652         """
653         return filter_in(self.get_account_info(),
654             'X-Account-Policy-Quota',
655             exactMatch=True)
656
657     def get_account_versioning(self):
658         """
659         :returns: (dict)
660         """
661         return filter_in(self.get_account_info(),
662             'X-Account-Policy-Versioning',
663             exactMatch=True)
664
665     def get_account_meta(self, until=None):
666         """
667         :meta until: (str) formated date
668
669         :returns: (dict)
670         """
671         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
672
673     def get_account_group(self):
674         """
675         :returns: (dict)
676         """
677         return filter_in(self.get_account_info(), 'X-Account-Group-')
678
679     def set_account_meta(self, metapairs):
680         """
681         :param metapairs: (dict) {key1:val1, key2:val2, ...}
682         """
683         assert(type(metapairs) is dict)
684         r = self.account_post(update=True, metadata=metapairs)
685         r.release()
686
687     def del_account_meta(self, metakey):
688         """
689         :param metakey: (str) metadatum key
690         """
691         r = self.account_post(update=True, metadata={metakey: ''})
692         r.release()
693
694     def set_account_quota(self, quota):
695         """
696         :param quota: (int)
697         """
698         r = self.account_post(update=True, quota=quota)
699         r.release()
700
701     def set_account_versioning(self, versioning):
702         """
703         "param versioning: (str)
704         """
705         r = self.account_post(update=True, versioning=versioning)
706         r.release()
707
708     def list_containers(self):
709         """
710         :returns: (dict)
711         """
712         r = self.account_get()
713         return r.json
714
715     def del_container(self, until=None, delimiter=None):
716         """
717         :param until: (str) formated date
718
719         :param delimiter: (str) with / empty container
720
721         :raises ClientError: 404 Container does not exist
722
723         :raises ClientError: 409 Container is not empty
724         """
725         self._assert_container()
726         r = self.container_delete(until=until,
727             delimiter=delimiter,
728             success=(204, 404, 409))
729         r.release()
730         if r.status_code == 404:
731             raise ClientError('Container "%s" does not exist' % self.container,
732                 r.status_code)
733         elif r.status_code == 409:
734             raise ClientError('Container "%s" is not empty' % self.container,
735                 r.status_code)
736
737     def get_container_versioning(self, container):
738         """
739         :param container: (str)
740
741         :returns: (dict)
742         """
743         self.container = container
744         return filter_in(self.get_container_info(),
745             'X-Container-Policy-Versioning')
746
747     def get_container_quota(self, container):
748         """
749         :param container: (str)
750
751         :returns: (dict)
752         """
753         self.container = container
754         return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
755
756     def get_container_info(self, until=None):
757         """
758         :param until: (str) formated date
759
760         :returns: (dict)
761         """
762         r = self.container_head(until=until)
763         return r.headers
764
765     def get_container_meta(self, until=None):
766         """
767         :param until: (str) formated date
768
769         :returns: (dict)
770         """
771         return filter_in(self.get_container_info(until=until),
772             'X-Container-Meta')
773
774     def get_container_object_meta(self, until=None):
775         """
776         :param until: (str) formated date
777
778         :returns: (dict)
779         """
780         return filter_in(self.get_container_info(until=until),
781             'X-Container-Object-Meta')
782
783     def set_container_meta(self, metapairs):
784         """
785         :param metapairs: (dict) {key1:val1, key2:val2, ...}
786         """
787         assert(type(metapairs) is dict)
788         r = self.container_post(update=True, metadata=metapairs)
789         r.release()
790
791     def del_container_meta(self, metakey):
792         """
793         :param metakey: (str) metadatum key
794         """
795         r = self.container_post(update=True, metadata={metakey: ''})
796         r.release()
797
798     def set_container_quota(self, quota):
799         """
800         :param quota: (int)
801         """
802         r = self.container_post(update=True, quota=quota)
803         r.release()
804
805     def set_container_versioning(self, versioning):
806         """
807         :param versioning: (str)
808         """
809         r = self.container_post(update=True, versioning=versioning)
810         r.release()
811
812     def del_object(self, obj, until=None, delimiter=None):
813         """
814         :param obj: (str) remote object path
815
816         :param until: (str) formated date
817
818         :param delimiter: (str)
819         """
820         self._assert_container()
821         r = self.object_delete(obj, until=until, delimiter=delimiter)
822         r.release()
823
824     def set_object_meta(self, obj, metapairs):
825         """
826         :param obj: (str) remote object path
827
828         :param metapairs: (dict) {key1:val1, key2:val2, ...}
829         """
830         assert(type(metapairs) is dict)
831         r = self.object_post(obj, update=True, metadata=metapairs)
832         r.release()
833
834     def del_object_meta(self, obj, metakey):
835         """
836         :param obj: (str) remote object path
837
838         :param metakey: (str) metadatum key
839         """
840         r = self.object_post(obj, update=True, metadata={metakey: ''})
841         r.release()
842
843     def publish_object(self, obj):
844         """
845         :param obj: (str) remote object path
846         """
847         r = self.object_post(obj, update=True, public=True)
848         r.release()
849
850     def unpublish_object(self, obj):
851         """
852         :param obj: (str) remote object path
853         """
854         r = self.object_post(obj, update=True, public=False)
855         r.release()
856
857     def get_object_info(self, obj, version=None):
858         """
859         :param obj: (str) remote object path
860
861         :param version: (str)
862
863         :returns: (dict)
864         """
865         r = self.object_head(obj, version=version)
866         return r.headers
867
868     def get_object_meta(self, obj, version=None):
869         """
870         :param obj: (str) remote object path
871
872         :param version: (str)
873
874         :returns: (dict)
875         """
876         return filter_in(self.get_object_info(obj, version=version),
877             'X-Object-Meta')
878
879     def get_object_sharing(self, obj):
880         """
881         :param obj: (str) remote object path
882
883         :returns: (dict)
884         """
885         r = filter_in(self.get_object_info(obj),
886             'X-Object-Sharing',
887             exactMatch=True)
888         reply = {}
889         if len(r) > 0:
890             perms = r['x-object-sharing'].split(';')
891             for perm in perms:
892                 try:
893                     perm.index('=')
894                 except ValueError:
895                     raise ClientError('Incorrect reply format')
896                 (key, val) = perm.strip().split('=')
897                 reply[key] = val
898         return reply
899
900     def set_object_sharing(self, obj,
901         read_permition=False,
902         write_permition=False):
903         """Give read/write permisions to an object.
904
905         :param obj: (str) remote object path
906
907         :param read_permition: (list - bool) users and user groups that get
908             read permition for this object - False means all previous read
909             permissions will be removed
910
911         :param write_perimition: (list - bool) of users and user groups to get
912            write permition for this object - False means all previous write
913            permissions will be removed
914         """
915
916         perms = dict(read='' if not read_permition else read_permition,
917             write='' if not write_permition else write_permition)
918         r = self.object_post(obj, update=True, permissions=perms)
919         r.release()
920
921     def del_object_sharing(self, obj):
922         """
923         :param obj: (str) remote object path
924         """
925         self.set_object_sharing(obj)
926
927     def append_object(self, obj, source_file, upload_cb=None):
928         """
929         :param obj: (str) remote object path
930
931         :param source_file: open file descriptor
932
933         :param upload_db: progress.bar for uploading
934         """
935
936         self._assert_container()
937         meta = self.get_container_info()
938         blocksize = int(meta['x-container-block-size'])
939         filesize = fstat(source_file.fileno()).st_size
940         nblocks = 1 + (filesize - 1) // blocksize
941         offset = 0
942         if upload_cb is not None:
943             upload_gen = upload_cb(nblocks)
944         for i in range(nblocks):
945             block = source_file.read(min(blocksize, filesize - offset))
946             offset += len(block)
947             r = self.object_post(obj,
948                 update=True,
949                 content_range='bytes */*',
950                 content_type='application/octet-stream',
951                 content_length=len(block),
952                 data=block)
953             r.release()
954
955             if upload_cb is not None:
956                 upload_gen.next()
957
958     def truncate_object(self, obj, upto_bytes):
959         """
960         :param obj: (str) remote object path
961
962         :param upto_bytes: max number of bytes to leave on file
963         """
964         r = self.object_post(obj,
965             update=True,
966             content_range='bytes 0-%s/*' % upto_bytes,
967             content_type='application/octet-stream',
968             object_bytes=upto_bytes,
969             source_object=path4url(self.container, obj))
970         r.release()
971
972     def overwrite_object(self,
973         obj,
974         start,
975         end,
976         source_file,
977         upload_cb=None):
978         """Overwrite a part of an object from local source file
979
980         :param obj: (str) remote object path
981
982         :param start: (int) position in bytes to start overwriting from
983
984         :param end: (int) position in bytes to stop overwriting at
985
986         :param source_file: open file descriptor
987
988         :param upload_db: progress.bar for uploading
989         """
990
991         self._assert_container()
992         meta = self.get_container_info()
993         blocksize = int(meta['x-container-block-size'])
994         filesize = fstat(source_file.fileno()).st_size
995         datasize = int(end) - int(start) + 1
996         nblocks = 1 + (datasize - 1) // blocksize
997         offset = 0
998         if upload_cb is not None:
999             upload_gen = upload_cb(nblocks)
1000         for i in range(nblocks):
1001             block = source_file.read(min(blocksize,
1002                 filesize - offset,
1003                 datasize - offset))
1004             offset += len(block)
1005             r = self.object_post(obj,
1006                 update=True,
1007                 content_type='application/octet-stream',
1008                 content_length=len(block),
1009                 content_range='bytes %s-%s/*' % (start, end),
1010                 data=block)
1011             r.release()
1012
1013             if upload_cb is not None:
1014                 upload_gen.next()
1015
1016     def copy_object(self, src_container, src_object, dst_container,
1017         dst_object=False,
1018         source_version=None,
1019         public=False,
1020         content_type=None,
1021         delimiter=None):
1022         """
1023         :param src_container: (str) source container
1024
1025         :param src_object: (str) source object path
1026
1027         :param dst_container: (str) destination container
1028
1029         :param dst_object: (str) destination object path
1030
1031         :param source_version: (str) source object version
1032
1033         :param public: (bool)
1034
1035         :param content_type: (str)
1036
1037         :param delimiter: (str)
1038         """
1039         self._assert_account()
1040         self.container = dst_container
1041         dst_object = dst_object or src_object
1042         src_path = path4url(src_container, src_object)
1043         r = self.object_put(dst_object,
1044             success=201,
1045             copy_from=src_path,
1046             content_length=0,
1047             source_version=source_version,
1048             public=public,
1049             content_type=content_type,
1050             delimiter=delimiter)
1051         r.release()
1052
1053     def move_object(self, src_container, src_object, dst_container,
1054         dst_object=False,
1055         source_version=None,
1056         public=False,
1057         content_type=None,
1058         delimiter=None):
1059         """
1060         :param src_container: (str) source container
1061
1062         :param src_object: (str) source object path
1063
1064         :param dst_container: (str) destination container
1065
1066         :param dst_object: (str) destination object path
1067
1068         :param source_version: (str) source object version
1069
1070         :param public: (bool)
1071
1072         :param content_type: (str)
1073
1074         :param delimiter: (str)
1075         """
1076         self._assert_account()
1077         self.container = dst_container
1078         dst_object = dst_object or src_object
1079         src_path = path4url(src_container, src_object)
1080         r = self.object_put(dst_object,
1081             success=201,
1082             move_from=src_path,
1083             content_length=0,
1084             source_version=source_version,
1085             public=public,
1086             content_type=content_type,
1087             delimiter=delimiter)
1088         r.release()
1089
1090     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1091         """Get accounts that share with self.account
1092
1093         :param limit: (str)
1094
1095         :param marker: (str)
1096
1097         :returns: (dict)
1098         """
1099         self._assert_account()
1100
1101         self.set_param('format', 'json')
1102         self.set_param('limit', limit, iff=limit is not None)
1103         self.set_param('marker', marker, iff=marker is not None)
1104
1105         path = ''
1106         success = kwargs.pop('success', (200, 204))
1107         r = self.get(path, *args, success=success, **kwargs)
1108         return r.json
1109
1110     def get_object_versionlist(self, obj):
1111         """
1112         :param obj: (str) remote object path
1113
1114         :returns: (list)
1115         """
1116         self._assert_container()
1117         r = self.object_get(obj, format='json', version='list')
1118         return r.json['versions']