fixbug 3555: blcks missing if same to other blcks
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """GRNet Pithos API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115         """
116         self._assert_container()
117
118         if withHashFile:
119             data = f.read()
120             try:
121                 import json
122                 data = json.dumps(json.loads(data))
123             except ValueError:
124                 raise ClientError('"%s" is not json-formated' % f.name, 1)
125             except SyntaxError:
126                 msg = '"%s" is not a valid hashmap file' % f.name
127                 raise ClientError(msg, 1)
128             f = StringIO(data)
129         else:
130             data = f.read(size) if size else f.read()
131         self.object_put(
132             obj,
133             data=data,
134             etag=etag,
135             content_encoding=content_encoding,
136             content_disposition=content_disposition,
137             content_type=content_type,
138             permissions=sharing,
139             public=public,
140             success=201)
141
142     def create_object_by_manifestation(
143             self, obj,
144             etag=None,
145             content_encoding=None,
146             content_disposition=None,
147             content_type=None,
148             sharing=None,
149             public=None):
150         """
151         :param obj: (str) remote object path
152
153         :param etag: (str)
154
155         :param content_encoding: (str)
156
157         :param content_disposition: (str)
158
159         :param content_type: (str)
160
161         :param sharing: {'read':[user and/or grp names],
162             'write':[usr and/or grp names]}
163
164         :param public: (bool)
165         """
166         self._assert_container()
167         self.object_put(
168             obj,
169             content_length=0,
170             etag=etag,
171             content_encoding=content_encoding,
172             content_disposition=content_disposition,
173             content_type=content_type,
174             permissions=sharing,
175             public=public,
176             manifest='%s/%s' % (self.container, obj))
177
178     # upload_* auxiliary methods
179     def _put_block_async(self, data, hash, upload_gen=None):
180         event = SilentEvent(method=self._put_block, data=data, hash=hash)
181         event.start()
182         return event
183
184     def _put_block(self, data, hash):
185         r = self.container_post(
186             update=True,
187             content_type='application/octet-stream',
188             content_length=len(data),
189             data=data,
190             format='json')
191         assert r.json[0] == hash, 'Local hash does not match server'
192
193     def _get_file_block_info(self, fileobj, size=None):
194         meta = self.get_container_info()
195         blocksize = int(meta['x-container-block-size'])
196         blockhash = meta['x-container-block-hash']
197         size = size if size is not None else fstat(fileobj.fileno()).st_size
198         nblocks = 1 + (size - 1) // blocksize
199         return (blocksize, blockhash, size, nblocks)
200
201     def _get_missing_hashes(
202             self, obj, json,
203             size=None,
204             format='json',
205             hashmap=True,
206             content_type=None,
207             etag=None,
208             content_encoding=None,
209             content_disposition=None,
210             permissions=None,
211             public=None,
212             success=(201, 409)):
213         r = self.object_put(
214             obj,
215             format='json',
216             hashmap=True,
217             content_type=content_type,
218             json=json,
219             etag=etag,
220             content_encoding=content_encoding,
221             content_disposition=content_disposition,
222             permissions=permissions,
223             public=public,
224             success=success)
225         return None if r.status_code == 201 else r.json
226
227     def _culculate_blocks_for_upload(
228             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
229             hash_cb=None):
230         offset = 0
231         if hash_cb:
232             hash_gen = hash_cb(nblocks)
233             hash_gen.next()
234
235         for i in range(nblocks):
236             block = fileobj.read(min(blocksize, size - offset))
237             bytes = len(block)
238             hash = _pithos_hash(block, blockhash)
239             hashes.append(hash)
240             hmap[hash] = (offset, bytes)
241             offset += bytes
242             if hash_cb:
243                 hash_gen.next()
244         msg = 'Failed to calculate uploaded blocks:'
245         ' Offset and object size do not match'
246         assert offset == size, msg
247
248     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
249         """upload missing blocks asynchronously"""
250
251         self._init_thread_limit()
252
253         flying = []
254         failures = []
255         for hash in missing:
256             offset, bytes = hmap[hash]
257             fileobj.seek(offset)
258             data = fileobj.read(bytes)
259             r = self._put_block_async(data, hash, upload_gen)
260             flying.append(r)
261             unfinished = self._watch_thread_limit(flying)
262             for thread in set(flying).difference(unfinished):
263                 if thread.exception:
264                     failures.append(thread)
265                     if isinstance(
266                             thread.exception,
267                             ClientError) and thread.exception.status == 502:
268                         self.POOLSIZE = self._thread_limit
269                 elif thread.isAlive():
270                     flying.append(thread)
271                 elif upload_gen:
272                     try:
273                         upload_gen.next()
274                     except:
275                         pass
276             flying = unfinished
277
278         for thread in flying:
279             thread.join()
280             if thread.exception:
281                 failures.append(thread)
282             elif upload_gen:
283                 try:
284                     upload_gen.next()
285                 except:
286                     pass
287
288         return [failure.kwargs['hash'] for failure in failures]
289
290     def upload_object(
291             self, obj, f,
292             size=None,
293             hash_cb=None,
294             upload_cb=None,
295             etag=None,
296             content_encoding=None,
297             content_disposition=None,
298             content_type=None,
299             sharing=None,
300             public=None):
301         """Upload an object using multiple connections (threads)
302
303         :param obj: (str) remote object path
304
305         :param f: open file descriptor (rb)
306
307         :param hash_cb: optional progress.bar object for calculating hashes
308
309         :param upload_cb: optional progress.bar object for uploading
310
311         :param etag: (str)
312
313         :param content_encoding: (str)
314
315         :param content_disposition: (str)
316
317         :param content_type: (str)
318
319         :param sharing: {'read':[user and/or grp names],
320             'write':[usr and/or grp names]}
321
322         :param public: (bool)
323         """
324         self._assert_container()
325
326         #init
327         block_info = (blocksize, blockhash, size, nblocks) =\
328             self._get_file_block_info(f, size)
329         (hashes, hmap, offset) = ([], {}, 0)
330         if not content_type:
331             content_type = 'application/octet-stream'
332
333         self._culculate_blocks_for_upload(
334             *block_info,
335             hashes=hashes,
336             hmap=hmap,
337             fileobj=f,
338             hash_cb=hash_cb)
339
340         hashmap = dict(bytes=size, hashes=hashes)
341         missing = self._get_missing_hashes(
342             obj, hashmap,
343             content_type=content_type,
344             size=size,
345             etag=etag,
346             content_encoding=content_encoding,
347             content_disposition=content_disposition,
348             permissions=sharing,
349             public=public)
350
351         if missing is None:
352             return
353
354         if upload_cb:
355             upload_gen = upload_cb(len(missing))
356             for i in range(len(missing), len(hashmap['hashes']) + 1):
357                 try:
358                     upload_gen.next()
359                 except:
360                     upload_gen = None
361         else:
362             upload_gen = None
363
364         retries = 7
365         try:
366             while retries:
367                 sendlog.info('%s blocks missing' % len(missing))
368                 num_of_blocks = len(missing)
369                 missing = self._upload_missing_blocks(
370                     missing,
371                     hmap,
372                     f,
373                     upload_gen)
374                 if missing:
375                     if num_of_blocks == len(missing):
376                         retries -= 1
377                     else:
378                         num_of_blocks = len(missing)
379                 else:
380                     break
381             if missing:
382                 raise ClientError(
383                     '%s blocks failed to upload' % len(missing),
384                     status=800)
385         except KeyboardInterrupt:
386             sendlog.info('- - - wait for threads to finish')
387             for thread in activethreads():
388                 thread.join()
389             raise
390
391         self.object_put(
392             obj,
393             format='json',
394             hashmap=True,
395             content_type=content_type,
396             json=hashmap,
397             success=201)
398
399     # download_* auxiliary methods
400     def _get_remote_blocks_info(self, obj, **restargs):
401         #retrieve object hashmap
402         myrange = restargs.pop('data_range', None)
403         hashmap = self.get_object_hashmap(obj, **restargs)
404         restargs['data_range'] = myrange
405         blocksize = int(hashmap['block_size'])
406         blockhash = hashmap['block_hash']
407         total_size = hashmap['bytes']
408         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
409         map_dict = {}
410         for i, h in enumerate(hashmap['hashes']):
411             #  map_dict[h] = i   CHAGE
412             if h in map_dict:
413                 map_dict[h].append(i)
414             else:
415                 map_dict[h] = [i]
416         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
417
418     def _dump_blocks_sync(
419             self, obj, remote_hashes, blocksize, total_size, dst, range,
420             **args):
421         for blockid, blockhash in enumerate(remote_hashes):
422             if blockhash:
423                 start = blocksize * blockid
424                 is_last = start + blocksize > total_size
425                 end = (total_size - 1) if is_last else (start + blocksize - 1)
426                 (start, end) = _range_up(start, end, range)
427                 args['data_range'] = 'bytes=%s-%s' % (start, end)
428                 r = self.object_get(obj, success=(200, 206), **args)
429                 self._cb_next()
430                 dst.write(r.content)
431                 dst.flush()
432
433     def _get_block_async(self, obj, **args):
434         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
435         event.start()
436         return event
437
438     def _hash_from_file(self, fp, start, size, blockhash):
439         fp.seek(start)
440         block = fp.read(size)
441         h = newhashlib(blockhash)
442         h.update(block.strip('\x00'))
443         return hexlify(h.digest())
444
445     def _thread2file(self, flying, local_file, offset=0, **restargs):
446         """write the results of a greenleted rest call to a file
447
448         :param offset: the offset of the file up to blocksize
449         - e.g. if the range is 10-100, all blocks will be written to
450         normal_position - 10
451         """
452         finished = []
453         for i, (start, g) in enumerate(flying.items()):
454             if not g.isAlive():
455                 if g.exception:
456                     raise g.exception
457                 block = g.value.content
458                 local_file.seek(start - offset)
459                 local_file.write(block)
460                 self._cb_next()
461                 finished.append(flying.pop(start))
462         local_file.flush()
463         return finished
464
465     def _dump_blocks_async(
466             self, obj, remote_hashes, blocksize, total_size, local_file,
467             blockhash=None, resume=False, filerange=None, **restargs):
468         file_size = fstat(local_file.fileno()).st_size if resume else 0
469         flying = {}
470         finished = []
471         offset = 0
472         if filerange is not None:
473             rstart = int(filerange.split('-')[0])
474             offset = rstart if blocksize > rstart else rstart % blocksize
475
476         self._init_thread_limit()
477         for block_hash, blockids in remote_hashes.items():
478             for blockid in blockids:
479                 start = blocksize * blockid
480                 if start < file_size and block_hash == self._hash_from_file(
481                         local_file, start, blocksize, blockhash):
482                     self._cb_next()
483                     continue
484                 self._watch_thread_limit(flying.values())
485                 finished += self._thread2file(
486                     flying,
487                     local_file,
488                     offset,
489                     **restargs)
490                 end = total_size - 1 if start + blocksize > total_size\
491                     else start + blocksize - 1
492                 (start, end) = _range_up(start, end, filerange)
493                 if start == end:
494                     self._cb_next()
495                     continue
496                 restargs['async_headers'] = {
497                     'Range': 'bytes=%s-%s' % (start, end)}
498                 flying[start] = self._get_block_async(obj, **restargs)
499
500         for thread in flying.values():
501             thread.join()
502         finished += self._thread2file(flying, local_file, offset, **restargs)
503
504     def download_object(
505             self, obj, dst,
506             download_cb=None,
507             version=None,
508             resume=False,
509             range_str=None,
510             if_match=None,
511             if_none_match=None,
512             if_modified_since=None,
513             if_unmodified_since=None):
514         """Download an object (multiple connections, random blocks)
515
516         :param obj: (str) remote object path
517
518         :param dst: open file descriptor (wb+)
519
520         :param download_cb: optional progress.bar object for downloading
521
522         :param version: (str) file version
523
524         :param resume: (bool) if set, preserve already downloaded file parts
525
526         :param range_str: (str) from, to are file positions (int) in bytes
527
528         :param if_match: (str)
529
530         :param if_none_match: (str)
531
532         :param if_modified_since: (str) formated date
533
534         :param if_unmodified_since: (str) formated date"""
535         restargs = dict(
536             version=version,
537             data_range=None if range_str is None else 'bytes=%s' % range_str,
538             if_match=if_match,
539             if_none_match=if_none_match,
540             if_modified_since=if_modified_since,
541             if_unmodified_since=if_unmodified_since)
542
543         (
544             blocksize,
545             blockhash,
546             total_size,
547             hash_list,
548             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
549         assert total_size >= 0
550
551         if download_cb:
552             self.progress_bar_gen = download_cb(len(hash_list))
553             self._cb_next()
554
555         if dst.isatty():
556             self._dump_blocks_sync(
557                 obj,
558                 hash_list,
559                 blocksize,
560                 total_size,
561                 dst,
562                 range_str,
563                 **restargs)
564         else:
565             self._dump_blocks_async(
566                 obj,
567                 remote_hashes,
568                 blocksize,
569                 total_size,
570                 dst,
571                 blockhash,
572                 resume,
573                 range_str,
574                 **restargs)
575             if not range_str:
576                 dst.truncate(total_size)
577
578         self._complete_cb()
579
580     #Command Progress Bar method
581     def _cb_next(self):
582         if hasattr(self, 'progress_bar_gen'):
583             try:
584                 self.progress_bar_gen.next()
585             except:
586                 pass
587
588     def _complete_cb(self):
589         while True:
590             try:
591                 self.progress_bar_gen.next()
592             except:
593                 break
594
595     def get_object_hashmap(
596             self, obj,
597             version=None,
598             if_match=None,
599             if_none_match=None,
600             if_modified_since=None,
601             if_unmodified_since=None,
602             data_range=None):
603         """
604         :param obj: (str) remote object path
605
606         :param if_match: (str)
607
608         :param if_none_match: (str)
609
610         :param if_modified_since: (str) formated date
611
612         :param if_unmodified_since: (str) formated date
613
614         :param data_range: (str) from-to where from and to are integers
615             denoting file positions in bytes
616
617         :returns: (list)
618         """
619         try:
620             r = self.object_get(
621                 obj,
622                 hashmap=True,
623                 version=version,
624                 if_etag_match=if_match,
625                 if_etag_not_match=if_none_match,
626                 if_modified_since=if_modified_since,
627                 if_unmodified_since=if_unmodified_since,
628                 data_range=data_range)
629         except ClientError as err:
630             if err.status == 304 or err.status == 412:
631                 return {}
632             raise
633         return r.json
634
635     def set_account_group(self, group, usernames):
636         """
637         :param group: (str)
638
639         :param usernames: (list)
640         """
641         self.account_post(update=True, groups={group: usernames})
642
643     def del_account_group(self, group):
644         """
645         :param group: (str)
646         """
647         self.account_post(update=True, groups={group: []})
648
649     def get_account_info(self, until=None):
650         """
651         :param until: (str) formated date
652
653         :returns: (dict)
654         """
655         r = self.account_head(until=until)
656         if r.status_code == 401:
657             raise ClientError("No authorization", status=401)
658         return r.headers
659
660     def get_account_quota(self):
661         """
662         :returns: (dict)
663         """
664         return filter_in(
665             self.get_account_info(),
666             'X-Account-Policy-Quota',
667             exactMatch=True)
668
669     def get_account_versioning(self):
670         """
671         :returns: (dict)
672         """
673         return filter_in(
674             self.get_account_info(),
675             'X-Account-Policy-Versioning',
676             exactMatch=True)
677
678     def get_account_meta(self, until=None):
679         """
680         :meta until: (str) formated date
681
682         :returns: (dict)
683         """
684         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
685
686     def get_account_group(self):
687         """
688         :returns: (dict)
689         """
690         return filter_in(self.get_account_info(), 'X-Account-Group-')
691
692     def set_account_meta(self, metapairs):
693         """
694         :param metapairs: (dict) {key1:val1, key2:val2, ...}
695         """
696         assert(type(metapairs) is dict)
697         self.account_post(update=True, metadata=metapairs)
698
699     def del_account_meta(self, metakey):
700         """
701         :param metakey: (str) metadatum key
702         """
703         self.account_post(update=True, metadata={metakey: ''})
704
705     def set_account_quota(self, quota):
706         """
707         :param quota: (int)
708         """
709         self.account_post(update=True, quota=quota)
710
711     def set_account_versioning(self, versioning):
712         """
713         "param versioning: (str)
714         """
715         self.account_post(update=True, versioning=versioning)
716
717     def list_containers(self):
718         """
719         :returns: (dict)
720         """
721         r = self.account_get()
722         return r.json
723
724     def del_container(self, until=None, delimiter=None):
725         """
726         :param until: (str) formated date
727
728         :param delimiter: (str) with / empty container
729
730         :raises ClientError: 404 Container does not exist
731
732         :raises ClientError: 409 Container is not empty
733         """
734         self._assert_container()
735         r = self.container_delete(
736             until=until,
737             delimiter=delimiter,
738             success=(204, 404, 409))
739         if r.status_code == 404:
740             raise ClientError(
741                 'Container "%s" does not exist' % self.container,
742                 r.status_code)
743         elif r.status_code == 409:
744             raise ClientError(
745                 'Container "%s" is not empty' % self.container,
746                 r.status_code)
747
748     def get_container_versioning(self, container=None):
749         """
750         :param container: (str)
751
752         :returns: (dict)
753         """
754         cnt_back_up = self.container
755         try:
756             self.container = container or cnt_back_up
757             return filter_in(
758                 self.get_container_info(),
759                 'X-Container-Policy-Versioning')
760         finally:
761             self.container = cnt_back_up
762
763     def get_container_quota(self, container=None):
764         """
765         :param container: (str)
766
767         :returns: (dict)
768         """
769         cnt_back_up = self.container
770         try:
771             self.container = container or cnt_back_up
772             return filter_in(
773                 self.get_container_info(),
774                 'X-Container-Policy-Quota')
775         finally:
776             self.container = cnt_back_up
777
778     def get_container_info(self, until=None):
779         """
780         :param until: (str) formated date
781
782         :returns: (dict)
783
784         :raises ClientError: 404 Container not found
785         """
786         try:
787             r = self.container_head(until=until)
788         except ClientError as err:
789             err.details.append('for container %s' % self.container)
790             raise err
791         return r.headers
792
793     def get_container_meta(self, until=None):
794         """
795         :param until: (str) formated date
796
797         :returns: (dict)
798         """
799         return filter_in(
800             self.get_container_info(until=until),
801             'X-Container-Meta')
802
803     def get_container_object_meta(self, until=None):
804         """
805         :param until: (str) formated date
806
807         :returns: (dict)
808         """
809         return filter_in(
810             self.get_container_info(until=until),
811             'X-Container-Object-Meta')
812
813     def set_container_meta(self, metapairs):
814         """
815         :param metapairs: (dict) {key1:val1, key2:val2, ...}
816         """
817         assert(type(metapairs) is dict)
818         self.container_post(update=True, metadata=metapairs)
819
820     def del_container_meta(self, metakey):
821         """
822         :param metakey: (str) metadatum key
823         """
824         self.container_post(update=True, metadata={metakey: ''})
825
826     def set_container_quota(self, quota):
827         """
828         :param quota: (int)
829         """
830         self.container_post(update=True, quota=quota)
831
832     def set_container_versioning(self, versioning):
833         """
834         :param versioning: (str)
835         """
836         self.container_post(update=True, versioning=versioning)
837
838     def del_object(self, obj, until=None, delimiter=None):
839         """
840         :param obj: (str) remote object path
841
842         :param until: (str) formated date
843
844         :param delimiter: (str)
845         """
846         self._assert_container()
847         self.object_delete(obj, until=until, delimiter=delimiter)
848
849     def set_object_meta(self, obj, metapairs):
850         """
851         :param obj: (str) remote object path
852
853         :param metapairs: (dict) {key1:val1, key2:val2, ...}
854         """
855         assert(type(metapairs) is dict)
856         self.object_post(obj, update=True, metadata=metapairs)
857
858     def del_object_meta(self, obj, metakey):
859         """
860         :param obj: (str) remote object path
861
862         :param metakey: (str) metadatum key
863         """
864         self.object_post(obj, update=True, metadata={metakey: ''})
865
866     def publish_object(self, obj):
867         """
868         :param obj: (str) remote object path
869
870         :returns: (str) access url
871         """
872         self.object_post(obj, update=True, public=True)
873         info = self.get_object_info(obj)
874         pref, sep, rest = self.base_url.partition('//')
875         base = rest.split('/')[0]
876         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
877
878     def unpublish_object(self, obj):
879         """
880         :param obj: (str) remote object path
881         """
882         self.object_post(obj, update=True, public=False)
883
884     def get_object_info(self, obj, version=None):
885         """
886         :param obj: (str) remote object path
887
888         :param version: (str)
889
890         :returns: (dict)
891         """
892         try:
893             r = self.object_head(obj, version=version)
894             return r.headers
895         except ClientError as ce:
896             if ce.status == 404:
897                 raise ClientError('Object %s not found' % obj, status=404)
898             raise
899
900     def get_object_meta(self, obj, version=None):
901         """
902         :param obj: (str) remote object path
903
904         :param version: (str)
905
906         :returns: (dict)
907         """
908         return filter_in(
909             self.get_object_info(obj, version=version),
910             'X-Object-Meta')
911
912     def get_object_sharing(self, obj):
913         """
914         :param obj: (str) remote object path
915
916         :returns: (dict)
917         """
918         r = filter_in(
919             self.get_object_info(obj),
920             'X-Object-Sharing',
921             exactMatch=True)
922         reply = {}
923         if len(r) > 0:
924             perms = r['x-object-sharing'].split(';')
925             for perm in perms:
926                 try:
927                     perm.index('=')
928                 except ValueError:
929                     raise ClientError('Incorrect reply format')
930                 (key, val) = perm.strip().split('=')
931                 reply[key] = val
932         return reply
933
934     def set_object_sharing(
935             self, obj,
936             read_permition=False, write_permition=False):
937         """Give read/write permisions to an object.
938
939         :param obj: (str) remote object path
940
941         :param read_permition: (list - bool) users and user groups that get
942             read permition for this object - False means all previous read
943             permissions will be removed
944
945         :param write_perimition: (list - bool) of users and user groups to get
946            write permition for this object - False means all previous write
947            permissions will be removed
948         """
949
950         perms = dict(read=read_permition or '', write=write_permition or '')
951         self.object_post(obj, update=True, permissions=perms)
952
953     def del_object_sharing(self, obj):
954         """
955         :param obj: (str) remote object path
956         """
957         self.set_object_sharing(obj)
958
959     def append_object(self, obj, source_file, upload_cb=None):
960         """
961         :param obj: (str) remote object path
962
963         :param source_file: open file descriptor
964
965         :param upload_db: progress.bar for uploading
966         """
967
968         self._assert_container()
969         meta = self.get_container_info()
970         blocksize = int(meta['x-container-block-size'])
971         filesize = fstat(source_file.fileno()).st_size
972         nblocks = 1 + (filesize - 1) // blocksize
973         offset = 0
974         if upload_cb:
975             upload_gen = upload_cb(nblocks)
976             upload_gen.next()
977         for i in range(nblocks):
978             block = source_file.read(min(blocksize, filesize - offset))
979             offset += len(block)
980             self.object_post(
981                 obj,
982                 update=True,
983                 content_range='bytes */*',
984                 content_type='application/octet-stream',
985                 content_length=len(block),
986                 data=block)
987
988             if upload_cb:
989                 upload_gen.next()
990
991     def truncate_object(self, obj, upto_bytes):
992         """
993         :param obj: (str) remote object path
994
995         :param upto_bytes: max number of bytes to leave on file
996         """
997         self.object_post(
998             obj,
999             update=True,
1000             content_range='bytes 0-%s/*' % upto_bytes,
1001             content_type='application/octet-stream',
1002             object_bytes=upto_bytes,
1003             source_object=path4url(self.container, obj))
1004
1005     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1006         """Overwrite a part of an object from local source file
1007
1008         :param obj: (str) remote object path
1009
1010         :param start: (int) position in bytes to start overwriting from
1011
1012         :param end: (int) position in bytes to stop overwriting at
1013
1014         :param source_file: open file descriptor
1015
1016         :param upload_db: progress.bar for uploading
1017         """
1018
1019         r = self.get_object_info(obj)
1020         rf_size = int(r['content-length'])
1021         if rf_size < int(start):
1022             raise ClientError(
1023                 'Range start exceeds file size',
1024                 status=416)
1025         elif rf_size < int(end):
1026             raise ClientError(
1027                 'Range end exceeds file size',
1028                 status=416)
1029         self._assert_container()
1030         meta = self.get_container_info()
1031         blocksize = int(meta['x-container-block-size'])
1032         filesize = fstat(source_file.fileno()).st_size
1033         datasize = int(end) - int(start) + 1
1034         nblocks = 1 + (datasize - 1) // blocksize
1035         offset = 0
1036         if upload_cb:
1037             upload_gen = upload_cb(nblocks)
1038             upload_gen.next()
1039         for i in range(nblocks):
1040             read_size = min(blocksize, filesize - offset, datasize - offset)
1041             block = source_file.read(read_size)
1042             self.object_post(
1043                 obj,
1044                 update=True,
1045                 content_type='application/octet-stream',
1046                 content_length=len(block),
1047                 content_range='bytes %s-%s/*' % (
1048                     start + offset,
1049                     start + offset + len(block) - 1),
1050                 data=block)
1051             offset += len(block)
1052
1053             if upload_cb:
1054                 upload_gen.next()
1055
1056     def copy_object(
1057             self, src_container, src_object, dst_container,
1058             dst_object=None,
1059             source_version=None,
1060             source_account=None,
1061             public=False,
1062             content_type=None,
1063             delimiter=None):
1064         """
1065         :param src_container: (str) source container
1066
1067         :param src_object: (str) source object path
1068
1069         :param dst_container: (str) destination container
1070
1071         :param dst_object: (str) destination object path
1072
1073         :param source_version: (str) source object version
1074
1075         :param source_account: (str) account to copy from
1076
1077         :param public: (bool)
1078
1079         :param content_type: (str)
1080
1081         :param delimiter: (str)
1082         """
1083         self._assert_account()
1084         self.container = dst_container
1085         src_path = path4url(src_container, src_object)
1086         self.object_put(
1087             dst_object or src_object,
1088             success=201,
1089             copy_from=src_path,
1090             content_length=0,
1091             source_version=source_version,
1092             source_account=source_account,
1093             public=public,
1094             content_type=content_type,
1095             delimiter=delimiter)
1096
1097     def move_object(
1098             self, src_container, src_object, dst_container,
1099             dst_object=False,
1100             source_account=None,
1101             source_version=None,
1102             public=False,
1103             content_type=None,
1104             delimiter=None):
1105         """
1106         :param src_container: (str) source container
1107
1108         :param src_object: (str) source object path
1109
1110         :param dst_container: (str) destination container
1111
1112         :param dst_object: (str) destination object path
1113
1114         :param source_account: (str) account to move from
1115
1116         :param source_version: (str) source object version
1117
1118         :param public: (bool)
1119
1120         :param content_type: (str)
1121
1122         :param delimiter: (str)
1123         """
1124         self._assert_account()
1125         self.container = dst_container
1126         dst_object = dst_object or src_object
1127         src_path = path4url(src_container, src_object)
1128         self.object_put(
1129             dst_object,
1130             success=201,
1131             move_from=src_path,
1132             content_length=0,
1133             source_account=source_account,
1134             source_version=source_version,
1135             public=public,
1136             content_type=content_type,
1137             delimiter=delimiter)
1138
1139     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1140         """Get accounts that share with self.account
1141
1142         :param limit: (str)
1143
1144         :param marker: (str)
1145
1146         :returns: (dict)
1147         """
1148         self._assert_account()
1149
1150         self.set_param('format', 'json')
1151         self.set_param('limit', limit, iff=limit is not None)
1152         self.set_param('marker', marker, iff=marker is not None)
1153
1154         path = ''
1155         success = kwargs.pop('success', (200, 204))
1156         r = self.get(path, *args, success=success, **kwargs)
1157         return r.json
1158
1159     def get_object_versionlist(self, obj):
1160         """
1161         :param obj: (str) remote object path
1162
1163         :returns: (list)
1164         """
1165         self._assert_container()
1166         r = self.object_get(obj, format='json', version='list')
1167         return r.json['versions']