Prepare to merge into develop
[kamaki] / kamaki / clients / pithos.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent
43 from kamaki.clients.pithos_rest_api import PithosRestAPI
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestAPI):
69     """GRNet Pithos API client"""
70
71     _thread_exceptions = []
72
73     def __init__(self, base_url, token, account=None, container=None):
74         super(PithosClient, self).__init__(base_url, token, account, container)
75
76     def purge_container(self):
77         """Delete an empty container and destroy associated blocks
78         """
79         r = self.container_delete(until=unicode(time()))
80         r.release()
81
82     def upload_object_unchunked(self, obj, f,
83         withHashFile=False,
84         size=None,
85         etag=None,
86         content_encoding=None,
87         content_disposition=None,
88         content_type=None,
89         sharing=None,
90         public=None):
91         """
92         :param obj: (str) remote object path
93
94         :param f: open file descriptor
95
96         :param withHashFile: (bool)
97
98         :param size: (int) size of data to upload
99
100         :param etag: (str)
101
102         :param content_encoding: (str)
103
104         :param content_disposition: (str)
105
106         :param content_type: (str)
107
108         :param sharing: {'read':[user and/or grp names],
109             'write':[usr and/or grp names]}
110
111         :param public: (bool)
112         """
113         self._assert_container()
114
115         if withHashFile:
116             data = f.read()
117             try:
118                 import json
119                 data = json.dumps(json.loads(data))
120             except ValueError:
121                 raise ClientError(message='"%s" is not json-formated' % f.name,
122                     status=1)
123             except SyntaxError:
124                 raise ClientError(message='"%s" is not a valid hashmap file'\
125                 % f.name, status=1)
126             f = StringIO(data)
127         data = f.read(size) if size is not None else f.read()
128         r = self.object_put(obj,
129             data=data,
130             etag=etag,
131             content_encoding=content_encoding,
132             content_disposition=content_disposition,
133             content_type=content_type,
134             permissions=sharing,
135             public=public,
136             success=201)
137         r.release()
138
139     def create_object_by_manifestation(self, obj,
140         etag=None,
141         content_encoding=None,
142         content_disposition=None,
143         content_type=None,
144         sharing=None,
145         public=None):
146         """
147         :param obj: (str) remote object path
148
149         :param etag: (str)
150
151         :param content_encoding: (str)
152
153         :param content_disposition: (str)
154
155         :param content_type: (str)
156
157         :param sharing: {'read':[user and/or grp names],
158             'write':[usr and/or grp names]}
159
160         :param public: (bool)
161         """
162         self._assert_container()
163         r = self.object_put(obj,
164             content_length=0,
165             etag=etag,
166             content_encoding=content_encoding,
167             content_disposition=content_disposition,
168             content_type=content_type,
169             permissions=sharing,
170             public=public,
171             manifest='%s/%s' % (self.container, obj))
172         r.release()
173
174     # upload_* auxiliary methods
175     def _put_block_async(self, data, hash):
176         event = SilentEvent(method=self._put_block, data=data, hash=hash)
177         event.start()
178         return event
179
180     def _put_block(self, data, hash):
181         r = self.container_post(update=True,
182             content_type='application/octet-stream',
183             content_length=len(data),
184             data=data,
185             format='json')
186         assert r.json[0] == hash, 'Local hash does not match server'
187
188     def _get_file_block_info(self, fileobj, size=None):
189         meta = self.get_container_info()
190         blocksize = int(meta['x-container-block-size'])
191         blockhash = meta['x-container-block-hash']
192         size = size if size is not None else fstat(fileobj.fileno()).st_size
193         nblocks = 1 + (size - 1) // blocksize
194         return (blocksize, blockhash, size, nblocks)
195
196     def _get_missing_hashes(self, obj, json,
197         size=None,
198         format='json',
199         hashmap=True,
200         content_type=None,
201         etag=None,
202         content_encoding=None,
203         content_disposition=None,
204         permissions=None,
205         public=None,
206         success=(201, 409)):
207         r = self.object_put(obj,
208             format='json',
209             hashmap=True,
210             content_type=content_type,
211             json=json,
212             etag=etag,
213             content_encoding=content_encoding,
214             content_disposition=content_disposition,
215             permissions=permissions,
216             public=public,
217             success=success)
218         if r.status_code == 201:
219             r.release()
220             return None
221         return r.json
222
223     def _caclulate_uploaded_blocks(self,
224         blocksize,
225         blockhash,
226         size,
227         nblocks,
228         hashes,
229         hmap,
230         fileobj,
231         hash_cb=None):
232         offset = 0
233         if hash_cb:
234             hash_gen = hash_cb(nblocks)
235             hash_gen.next()
236
237         for i in range(nblocks):
238             block = fileobj.read(min(blocksize, size - offset))
239             bytes = len(block)
240             hash = _pithos_hash(block, blockhash)
241             hashes.append(hash)
242             hmap[hash] = (offset, bytes)
243             offset += bytes
244             if hash_cb:
245                 hash_gen.next()
246         if offset != size:
247             print("Size is %i" % size)
248             print("Offset is %i" % offset)
249             assert offset == size, \
250                    "Failed to calculate uploaded blocks: " \
251                     "Offset and object size do not match"
252
253     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
254         """upload missing blocks asynchronously.
255         """
256         if upload_cb:
257             upload_gen = upload_cb(len(missing))
258             upload_gen.next()
259
260         self._init_thread_limit()
261
262         flying = []
263         for hash in missing:
264             offset, bytes = hmap[hash]
265             fileobj.seek(offset)
266             data = fileobj.read(bytes)
267             r = self._put_block_async(data, hash)
268             flying.append(r)
269             unfinished = []
270             for i, thread in enumerate(flying):
271
272                 unfinished = self._watch_thread_limit(unfinished)
273
274                 if thread.isAlive() or thread.exception:
275                     unfinished.append(thread)
276                 else:
277                     if upload_cb:
278                         upload_gen.next()
279             flying = unfinished
280
281         for thread in flying:
282             thread.join()
283
284         failures = [r for r in flying if r.exception]
285         if len(failures):
286             details = ', '.join([' (%s).%s' % (i, r.exception)\
287                 for i, r in enumerate(failures)])
288             raise ClientError(message="Block uploading failed",
289                 status=505,
290                 details=details)
291
292         while upload_cb:
293             try:
294                 upload_gen.next()
295             except StopIteration:
296                 break
297
298     def upload_object(self, obj, f,
299         size=None,
300         hash_cb=None,
301         upload_cb=None,
302         etag=None,
303         content_encoding=None,
304         content_disposition=None,
305         content_type=None,
306         sharing=None,
307         public=None):
308         """Upload an object using multiple connections (threads)
309
310         :param obj: (str) remote object path
311
312         :param f: open file descriptor (rb)
313
314         :param hash_cb: optional progress.bar object for calculating hashes
315
316         :param upload_cb: optional progress.bar object for uploading
317
318         :param etag: (str)
319
320         :param content_encoding: (str)
321
322         :param content_disposition: (str)
323
324         :param content_type: (str)
325
326         :param sharing: {'read':[user and/or grp names],
327             'write':[usr and/or grp names]}
328
329         :param public: (bool)
330         """
331         self._assert_container()
332
333         #init
334         block_info = (blocksize, blockhash, size, nblocks) =\
335             self._get_file_block_info(f, size)
336         (hashes, hmap, offset) = ([], {}, 0)
337         if content_type is None:
338             content_type = 'application/octet-stream'
339
340         self._caclulate_uploaded_blocks(*block_info,
341             hashes=hashes,
342             hmap=hmap,
343             fileobj=f,
344             hash_cb=hash_cb)
345
346         hashmap = dict(bytes=size, hashes=hashes)
347         missing = self._get_missing_hashes(obj, hashmap,
348             content_type=content_type,
349             size=size,
350             etag=etag,
351             content_encoding=content_encoding,
352             content_disposition=content_disposition,
353             permissions=sharing,
354             public=public)
355
356         if missing is None:
357             return
358         try:
359             self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
360         except KeyboardInterrupt:
361             print('- - - wait for threads to finish')
362             for thread in activethreads():
363                 thread.join()
364             raise
365
366         r = self.object_put(
367             obj,
368             format='json',
369             hashmap=True,
370             content_type=content_type,
371             json=hashmap,
372             success=201)
373         r.release()
374
375     # download_* auxiliary methods
376     def _get_remote_blocks_info(self, obj, **restargs):
377         #retrieve object hashmap
378         myrange = restargs.pop('data_range', None)
379         hashmap = self.get_object_hashmapp(obj, **restargs)
380         restargs['data_range'] = myrange
381         blocksize = int(hashmap['block_size'])
382         blockhash = hashmap['block_hash']
383         total_size = hashmap['bytes']
384         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
385         map_dict = {}
386         for i, h in enumerate(hashmap['hashes']):
387             map_dict[h] = i
388         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
389
390     def _dump_blocks_sync(self,
391         obj,
392         remote_hashes,
393         blocksize,
394         total_size,
395         dst,
396         range,
397         **restargs):
398         for blockid, blockhash in enumerate(remote_hashes):
399             if blockhash == None:
400                 continue
401             start = blocksize * blockid
402             end = total_size - 1 if start + blocksize > total_size\
403                 else start + blocksize - 1
404             (start, end) = _range_up(start, end, range)
405             restargs['data_range'] = 'bytes=%s-%s' % (start, end)
406             r = self.object_get(obj, success=(200, 206), **restargs)
407             self._cb_next()
408             dst.write(r.content)
409             dst.flush()
410
411     def _get_block_async(self, obj, **restargs):
412         event = SilentEvent(self.object_get,
413             obj,
414             success=(200, 206),
415             **restargs)
416         event.start()
417         return event
418
419     def _hash_from_file(self, fp, start, size, blockhash):
420         fp.seek(start)
421         block = fp.read(size)
422         h = newhashlib(blockhash)
423         h.update(block.strip('\x00'))
424         return hexlify(h.digest())
425
426     def _thread2file(self,
427         flying,
428         local_file,
429         offset=0,
430         **restargs):
431         """write the results of a greenleted rest call to a file
432         @offset: the offset of the file up to blocksize
433             - e.g. if the range is 10-100, all
434         blocks will be written to normal_position - 10"""
435         finished = []
436         for i, (start, g) in enumerate(flying.items()):
437             if not g.isAlive():
438                 if g.exception:
439                     raise g.exception
440                 block = g.value.content
441                 local_file.seek(start - offset)
442                 local_file.write(block)
443                 self._cb_next()
444                 finished.append(flying.pop(start))
445         local_file.flush()
446         return finished
447
448     def _dump_blocks_async(self,
449         obj,
450         remote_hashes,
451         blocksize,
452         total_size,
453         local_file,
454         blockhash=None,
455         resume=False,
456         filerange=None,
457         **restargs):
458
459         file_size = fstat(local_file.fileno()).st_size if resume else 0
460         flying = {}
461         finished = []
462         offset = 0
463         if filerange is not None:
464             rstart = int(filerange.split('-')[0])
465             offset = rstart if blocksize > rstart else rstart % blocksize
466
467         self._init_thread_limit()
468         for block_hash, blockid in remote_hashes.items():
469             start = blocksize * blockid
470             if start < file_size\
471             and block_hash == self._hash_from_file(
472                     local_file,
473                     start,
474                     blocksize,
475                     blockhash):
476                 self._cb_next()
477                 continue
478             self._watch_thread_limit(flying.values())
479             finished += self._thread2file(
480                 flying,
481                 local_file,
482                 offset,
483                 **restargs)
484             end = total_size - 1 if start + blocksize > total_size\
485                 else start + blocksize - 1
486             (start, end) = _range_up(start, end, filerange)
487             if start == end:
488                 self._cb_next()
489                 continue
490             restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
491             flying[start] = self._get_block_async(obj, **restargs)
492
493         for thread in flying.values():
494             thread.join()
495         finished += self._thread2file(flying, local_file, offset, **restargs)
496
497     def download_object(self,
498         obj,
499         dst,
500         download_cb=None,
501         version=None,
502         resume=False,
503         range=None,
504         if_match=None,
505         if_none_match=None,
506         if_modified_since=None,
507         if_unmodified_since=None):
508         """Download an object using multiple connections (threads) and
509             writing to random parts of the file
510
511         :param obj: (str) remote object path
512
513         :param dst: open file descriptor (wb+)
514
515         :param download_cb: optional progress.bar object for downloading
516
517         :param version: (str) file version
518
519         :param resume: (bool) if set, preserve already downloaded file parts
520
521         :param range: (str) from-to where from and to are integers denoting
522             file positions in bytes
523
524         :param if_match: (str)
525
526         :param if_none_match: (str)
527
528         :param if_modified_since: (str) formated date
529
530         :param if_unmodified_since: (str) formated date
531         """
532
533         restargs = dict(version=version,
534             data_range=None if range is None else 'bytes=%s' % range,
535             if_match=if_match,
536             if_none_match=if_none_match,
537             if_modified_since=if_modified_since,
538             if_unmodified_since=if_unmodified_since)
539
540         (blocksize,
541             blockhash,
542             total_size,
543             hash_list,
544             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
545         assert total_size >= 0
546
547         if download_cb:
548             self.progress_bar_gen = download_cb(len(remote_hashes))
549             self._cb_next()
550
551         if dst.isatty():
552             self._dump_blocks_sync(obj,
553                 hash_list,
554                 blocksize,
555                 total_size,
556                 dst,
557                 range,
558                 **restargs)
559         else:
560             self._dump_blocks_async(obj,
561                 remote_hashes,
562                 blocksize,
563                 total_size,
564                 dst,
565                 blockhash,
566                 resume,
567                 range,
568                 **restargs)
569             if range is None:
570                 dst.truncate(total_size)
571
572         self._complete_cb()
573
574     #Command Progress Bar method
575     def _cb_next(self):
576         if hasattr(self, 'progress_bar_gen'):
577             try:
578                 self.progress_bar_gen.next()
579             except:
580                 pass
581
582     def _complete_cb(self):
583         while True:
584             try:
585                 self.progress_bar_gen.next()
586             except:
587                 break
588
589     def get_object_hashmapp(self, obj,
590         version=None,
591         if_match=None,
592         if_none_match=None,
593         if_modified_since=None,
594         if_unmodified_since=None,
595         data_range=None):
596         """
597         :param obj: (str) remote object path
598
599         :param if_match: (str)
600
601         :param if_none_match: (str)
602
603         :param if_modified_since: (str) formated date
604
605         :param if_unmodified_since: (str) formated date
606
607         :param data_range: (str) from-to where from and to are integers
608             denoting file positions in bytes
609
610         :returns: (list)
611         """
612         try:
613             r = self.object_get(obj,
614                 hashmap=True,
615                 version=version,
616                 if_etag_match=if_match,
617                 if_etag_not_match=if_none_match,
618                 if_modified_since=if_modified_since,
619                 if_unmodified_since=if_unmodified_since,
620                 data_range=data_range)
621         except ClientError as err:
622             if err.status == 304 or err.status == 412:
623                 return {}
624             raise
625         return r.json
626
627     def set_account_group(self, group, usernames):
628         """
629         :param group: (str)
630
631         :param usernames: (list)
632         """
633         r = self.account_post(update=True, groups={group: usernames})
634         r.release()
635
636     def del_account_group(self, group):
637         """
638         :param group: (str)
639         """
640         r = self.account_post(update=True, groups={group: []})
641         r.release()
642
643     def get_account_info(self, until=None):
644         """
645         :param until: (str) formated date
646
647         :returns: (dict)
648         """
649         r = self.account_head(until=until)
650         if r.status_code == 401:
651             raise ClientError("No authorization")
652         return r.headers
653
654     def get_account_quota(self):
655         """
656         :returns: (dict)
657         """
658         return filter_in(self.get_account_info(),
659             'X-Account-Policy-Quota',
660             exactMatch=True)
661
662     def get_account_versioning(self):
663         """
664         :returns: (dict)
665         """
666         return filter_in(self.get_account_info(),
667             'X-Account-Policy-Versioning',
668             exactMatch=True)
669
670     def get_account_meta(self, until=None):
671         """
672         :meta until: (str) formated date
673
674         :returns: (dict)
675         """
676         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
677
678     def get_account_group(self):
679         """
680         :returns: (dict)
681         """
682         return filter_in(self.get_account_info(), 'X-Account-Group-')
683
684     def set_account_meta(self, metapairs):
685         """
686         :param metapairs: (dict) {key1:val1, key2:val2, ...}
687         """
688         assert(type(metapairs) is dict)
689         r = self.account_post(update=True, metadata=metapairs)
690         r.release()
691
692     def del_account_meta(self, metakey):
693         """
694         :param metakey: (str) metadatum key
695         """
696         r = self.account_post(update=True, metadata={metakey: ''})
697         r.release()
698
699     def set_account_quota(self, quota):
700         """
701         :param quota: (int)
702         """
703         r = self.account_post(update=True, quota=quota)
704         r.release()
705
706     def set_account_versioning(self, versioning):
707         """
708         "param versioning: (str)
709         """
710         r = self.account_post(update=True, versioning=versioning)
711         r.release()
712
713     def list_containers(self):
714         """
715         :returns: (dict)
716         """
717         r = self.account_get()
718         return r.json
719
720     def del_container(self, until=None, delimiter=None):
721         """
722         :param until: (str) formated date
723
724         :param delimiter: (str)
725
726         :raises ClientError: 404 Container does not exist
727
728         :raises ClientError: 409 Container is not empty
729         """
730         self._assert_container()
731         r = self.container_delete(until=until,
732             delimiter=delimiter,
733             success=(204, 404, 409))
734         r.release()
735         if r.status_code == 404:
736             raise ClientError('Container "%s" does not exist' % self.container,
737                 r.status_code)
738         elif r.status_code == 409:
739             raise ClientError('Container "%s" is not empty' % self.container,
740                 r.status_code)
741
742     def get_container_versioning(self, container):
743         """
744         :param container: (str)
745
746         :returns: (dict)
747         """
748         self.container = container
749         return filter_in(self.get_container_info(),
750             'X-Container-Policy-Versioning')
751
752     def get_container_quota(self, container):
753         """
754         :param container: (str)
755
756         :returns: (dict)
757         """
758         self.container = container
759         return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
760
761     def get_container_info(self, until=None):
762         """
763         :param until: (str) formated date
764
765         :returns: (dict)
766         """
767         r = self.container_head(until=until)
768         return r.headers
769
770     def get_container_meta(self, until=None):
771         """
772         :param until: (str) formated date
773
774         :returns: (dict)
775         """
776         return filter_in(self.get_container_info(until=until),
777             'X-Container-Meta')
778
779     def get_container_object_meta(self, until=None):
780         """
781         :param until: (str) formated date
782
783         :returns: (dict)
784         """
785         return filter_in(self.get_container_info(until=until),
786             'X-Container-Object-Meta')
787
788     def set_container_meta(self, metapairs):
789         """
790         :param metapairs: (dict) {key1:val1, key2:val2, ...}
791         """
792         assert(type(metapairs) is dict)
793         r = self.container_post(update=True, metadata=metapairs)
794         r.release()
795
796     def del_container_meta(self, metakey):
797         """
798         :param metakey: (str) metadatum key
799         """
800         r = self.container_post(update=True, metadata={metakey: ''})
801         r.release()
802
803     def set_container_quota(self, quota):
804         """
805         :param quota: (int)
806         """
807         r = self.container_post(update=True, quota=quota)
808         r.release()
809
810     def set_container_versioning(self, versioning):
811         """
812         :param versioning: (str)
813         """
814         r = self.container_post(update=True, versioning=versioning)
815         r.release()
816
817     def del_object(self, obj, until=None, delimiter=None):
818         """
819         :param obj: (str) remote object path
820
821         :param until: (str) formated date
822
823         :param delimiter: (str)
824         """
825         self._assert_container()
826         r = self.object_delete(obj, until=until, delimiter=delimiter)
827         r.release()
828
829     def set_object_meta(self, obj, metapairs):
830         """
831         :param obj: (str) remote object path
832
833         :param metapairs: (dict) {key1:val1, key2:val2, ...}
834         """
835         assert(type(metapairs) is dict)
836         r = self.object_post(obj, update=True, metadata=metapairs)
837         r.release()
838
839     def del_object_meta(self, obj, metakey):
840         """
841         :param obj: (str) remote object path
842
843         :param metakey: (str) metadatum key
844         """
845         r = self.object_post(obj, update=True, metadata={metakey: ''})
846         r.release()
847
848     def publish_object(self, obj):
849         """
850         :param obj: (str) remote object path
851         """
852         r = self.object_post(obj, update=True, public=True)
853         r.release()
854
855     def unpublish_object(self, obj):
856         """
857         :param obj: (str) remote object path
858         """
859         r = self.object_post(obj, update=True, public=False)
860         r.release()
861
862     def get_object_info(self, obj, version=None):
863         """
864         :param obj: (str) remote object path
865
866         :param version: (str)
867
868         :returns: (dict)
869         """
870         r = self.object_head(obj, version=version)
871         return r.headers
872
873     def get_object_meta(self, obj, version=None):
874         """
875         :param obj: (str) remote object path
876
877         :param version: (str)
878
879         :returns: (dict)
880         """
881         return filter_in(self.get_object_info(obj, version=version),
882             'X-Object-Meta')
883
884     def get_object_sharing(self, obj):
885         """
886         :param obj: (str) remote object path
887
888         :returns: (dict)
889         """
890         r = filter_in(self.get_object_info(obj),
891             'X-Object-Sharing',
892             exactMatch=True)
893         reply = {}
894         if len(r) > 0:
895             perms = r['x-object-sharing'].split(';')
896             for perm in perms:
897                 try:
898                     perm.index('=')
899                 except ValueError:
900                     raise ClientError('Incorrect reply format')
901                 (key, val) = perm.strip().split('=')
902                 reply[key] = val
903         return reply
904
905     def set_object_sharing(self, obj,
906         read_permition=False,
907         write_permition=False):
908         """Give read/write permisions to an object.
909
910         :param obj: (str) remote object path
911
912         :param read_permition: (list - bool) users and user groups that get
913             read permition for this object - False means all previous read
914             permissions will be removed
915
916         :param write_perimition: (list - bool) of users and user groups to get
917            write permition for this object - False means all previous write
918            permissions will be removed
919         """
920
921         perms = dict(read='' if not read_permition else read_permition,
922             write='' if not write_permition else write_permition)
923         r = self.object_post(obj, update=True, permissions=perms)
924         r.release()
925
926     def del_object_sharing(self, obj):
927         """
928         :param obj: (str) remote object path
929         """
930         self.set_object_sharing(obj)
931
932     def append_object(self, obj, source_file, upload_cb=None):
933         """
934         :param obj: (str) remote object path
935
936         :param source_file: open file descriptor
937
938         :param upload_db: progress.bar for uploading
939         """
940
941         self._assert_container()
942         meta = self.get_container_info()
943         blocksize = int(meta['x-container-block-size'])
944         filesize = fstat(source_file.fileno()).st_size
945         nblocks = 1 + (filesize - 1) // blocksize
946         offset = 0
947         if upload_cb is not None:
948             upload_gen = upload_cb(nblocks)
949         for i in range(nblocks):
950             block = source_file.read(min(blocksize, filesize - offset))
951             offset += len(block)
952             r = self.object_post(obj,
953                 update=True,
954                 content_range='bytes */*',
955                 content_type='application/octet-stream',
956                 content_length=len(block),
957                 data=block)
958             r.release()
959
960             if upload_cb is not None:
961                 upload_gen.next()
962
963     def truncate_object(self, obj, upto_bytes):
964         """
965         :param obj: (str) remote object path
966
967         :param upto_bytes: max number of bytes to leave on file
968         """
969         r = self.object_post(obj,
970             update=True,
971             content_range='bytes 0-%s/*' % upto_bytes,
972             content_type='application/octet-stream',
973             object_bytes=upto_bytes,
974             source_object=path4url(self.container, obj))
975         r.release()
976
977     def overwrite_object(self,
978         obj,
979         start,
980         end,
981         source_file,
982         upload_cb=None):
983         """Overwrite a part of an object from local source file
984
985         :param obj: (str) remote object path
986
987         :param start: (int) position in bytes to start overwriting from
988
989         :param end: (int) position in bytes to stop overwriting at
990
991         :param source_file: open file descriptor
992
993         :param upload_db: progress.bar for uploading
994         """
995
996         self._assert_container()
997         meta = self.get_container_info()
998         blocksize = int(meta['x-container-block-size'])
999         filesize = fstat(source_file.fileno()).st_size
1000         datasize = int(end) - int(start) + 1
1001         nblocks = 1 + (datasize - 1) // blocksize
1002         offset = 0
1003         if upload_cb is not None:
1004             upload_gen = upload_cb(nblocks)
1005         for i in range(nblocks):
1006             block = source_file.read(min(blocksize,
1007                 filesize - offset,
1008                 datasize - offset))
1009             offset += len(block)
1010             r = self.object_post(obj,
1011                 update=True,
1012                 content_type='application/octet-stream',
1013                 content_length=len(block),
1014                 content_range='bytes %s-%s/*' % (start, end),
1015                 data=block)
1016             r.release()
1017
1018             if upload_cb is not None:
1019                 upload_gen.next()
1020
1021     def copy_object(self, src_container, src_object, dst_container,
1022         dst_object=False,
1023         source_version=None,
1024         public=False,
1025         content_type=None,
1026         delimiter=None):
1027         """
1028         :param src_container: (str) source container
1029
1030         :param src_object: (str) source object path
1031
1032         :param dst_container: (str) destination container
1033
1034         :param dst_object: (str) destination object path
1035
1036         :param source_version: (str) source object version
1037
1038         :param public: (bool)
1039
1040         :param content_type: (str)
1041
1042         :param delimiter: (str)
1043         """
1044         self._assert_account()
1045         self.container = dst_container
1046         dst_object = dst_object or src_object
1047         src_path = path4url(src_container, src_object)
1048         r = self.object_put(dst_object,
1049             success=201,
1050             copy_from=src_path,
1051             content_length=0,
1052             source_version=source_version,
1053             public=public,
1054             content_type=content_type,
1055             delimiter=delimiter)
1056         r.release()
1057
1058     def move_object(self, src_container, src_object, dst_container,
1059         dst_object=False,
1060         source_version=None,
1061         public=False,
1062         content_type=None,
1063         delimiter=None):
1064         """
1065         :param src_container: (str) source container
1066
1067         :param src_object: (str) source object path
1068
1069         :param dst_container: (str) destination container
1070
1071         :param dst_object: (str) destination object path
1072
1073         :param source_version: (str) source object version
1074
1075         :param public: (bool)
1076
1077         :param content_type: (str)
1078
1079         :param delimiter: (str)
1080         """
1081         self._assert_account()
1082         self.container = dst_container
1083         dst_object = dst_object or src_object
1084         src_path = path4url(src_container, src_object)
1085         r = self.object_put(dst_object,
1086             success=201,
1087             move_from=src_path,
1088             content_length=0,
1089             source_version=source_version,
1090             public=public,
1091             content_type=content_type,
1092             delimiter=delimiter)
1093         r.release()
1094
1095     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1096         """Get accounts that share with self.account
1097
1098         :param limit: (str)
1099
1100         :param marker: (str)
1101
1102         :returns: (dict)
1103         """
1104         self._assert_account()
1105
1106         self.set_param('format', 'json')
1107         self.set_param('limit', limit, iff=limit is not None)
1108         self.set_param('marker', marker, iff=marker is not None)
1109
1110         path = ''
1111         success = kwargs.pop('success', (200, 204))
1112         r = self.get(path, *args, success=success, **kwargs)
1113         return r.json
1114
1115     def get_object_versionlist(self, obj):
1116         """
1117         :param obj: (str) remote object path
1118
1119         :returns: (list)
1120         """
1121         self._assert_container()
1122         r = self.object_get(obj, format='json', version='list')
1123         return r.json['versions']