Rename tests to livetest in kamaki.clients
[kamaki] / kamaki / clients / pithos.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos_rest_api import PithosRestAPI
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestAPI):
69     """GRNet Pithos API client"""
70
71     _thread_exceptions = []
72
73     def __init__(self, base_url, token, account=None, container=None):
74         super(PithosClient, self).__init__(base_url, token, account, container)
75
76     def purge_container(self):
77         """Delete an empty container and destroy associated blocks
78         """
79         r = self.container_delete(until=unicode(time()))
80         r.release()
81
82     def upload_object_unchunked(
83             self, obj, f,
84             withHashFile=False,
85             size=None,
86             etag=None,
87             content_encoding=None,
88             content_disposition=None,
89             content_type=None,
90             sharing=None,
91             public=None):
92         """
93         :param obj: (str) remote object path
94
95         :param f: open file descriptor
96
97         :param withHashFile: (bool)
98
99         :param size: (int) size of data to upload
100
101         :param etag: (str)
102
103         :param content_encoding: (str)
104
105         :param content_disposition: (str)
106
107         :param content_type: (str)
108
109         :param sharing: {'read':[user and/or grp names],
110             'write':[usr and/or grp names]}
111
112         :param public: (bool)
113         """
114         self._assert_container()
115
116         if withHashFile:
117             data = f.read()
118             try:
119                 import json
120                 data = json.dumps(json.loads(data))
121             except ValueError:
122                 raise ClientError('"%s" is not json-formated' % f.name, 1)
123             except SyntaxError:
124                 msg = '"%s" is not a valid hashmap file' % f.name
125                 raise ClientError(msg, 1)
126             f = StringIO(data)
127         data = f.read(size) if size is not None else f.read()
128         r = self.object_put(
129             obj,
130             data=data,
131             etag=etag,
132             content_encoding=content_encoding,
133             content_disposition=content_disposition,
134             content_type=content_type,
135             permissions=sharing,
136             public=public,
137             success=201)
138         r.release()
139
140     def create_object_by_manifestation(
141             self, obj,
142             etag=None,
143             content_encoding=None,
144             content_disposition=None,
145             content_type=None,
146             sharing=None,
147             public=None):
148         """
149         :param obj: (str) remote object path
150
151         :param etag: (str)
152
153         :param content_encoding: (str)
154
155         :param content_disposition: (str)
156
157         :param content_type: (str)
158
159         :param sharing: {'read':[user and/or grp names],
160             'write':[usr and/or grp names]}
161
162         :param public: (bool)
163         """
164         self._assert_container()
165         r = self.object_put(
166             obj,
167             content_length=0,
168             etag=etag,
169             content_encoding=content_encoding,
170             content_disposition=content_disposition,
171             content_type=content_type,
172             permissions=sharing,
173             public=public,
174             manifest='%s/%s' % (self.container, obj))
175         r.release()
176
177     # upload_* auxiliary methods
178     def _put_block_async(self, data, hash, upload_gen=None):
179         event = SilentEvent(method=self._put_block, data=data, hash=hash)
180         event.start()
181         return event
182
183     def _put_block(self, data, hash):
184         from random import randint
185         if not randint(0, 7):
186             raise ClientError('BAD GATEWAY STUFF', 503)
187         r = self.container_post(
188             update=True,
189             content_type='application/octet-stream',
190             content_length=len(data),
191             data=data,
192             format='json')
193         assert r.json[0] == hash, 'Local hash does not match server'
194
195     def _get_file_block_info(self, fileobj, size=None):
196         meta = self.get_container_info()
197         blocksize = int(meta['x-container-block-size'])
198         blockhash = meta['x-container-block-hash']
199         size = size if size is not None else fstat(fileobj.fileno()).st_size
200         nblocks = 1 + (size - 1) // blocksize
201         return (blocksize, blockhash, size, nblocks)
202
203     def _get_missing_hashes(
204             self, obj, json,
205             size=None,
206             format='json',
207             hashmap=True,
208             content_type=None,
209             etag=None,
210             content_encoding=None,
211             content_disposition=None,
212             permissions=None,
213             public=None,
214             success=(201, 409)):
215         r = self.object_put(
216             obj,
217             format='json',
218             hashmap=True,
219             content_type=content_type,
220             json=json,
221             etag=etag,
222             content_encoding=content_encoding,
223             content_disposition=content_disposition,
224             permissions=permissions,
225             public=public,
226             success=success)
227         if r.status_code == 201:
228             r.release()
229             return None
230         return r.json
231
232     def _caclulate_uploaded_blocks(
233             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
234             hash_cb=None):
235         offset = 0
236         if hash_cb:
237             hash_gen = hash_cb(nblocks)
238             hash_gen.next()
239
240         for i in range(nblocks):
241             block = fileobj.read(min(blocksize, size - offset))
242             bytes = len(block)
243             hash = _pithos_hash(block, blockhash)
244             hashes.append(hash)
245             hmap[hash] = (offset, bytes)
246             offset += bytes
247             if hash_cb:
248                 hash_gen.next()
249         msg = 'Failed to calculate uploaded blocks:'
250         msg += ' Offset and object size do not match'
251         assert offset == size, msg
252
253     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
254         """upload missing blocks asynchronously"""
255
256         self._init_thread_limit()
257
258         flying = []
259         failures = []
260         for hash in missing:
261             offset, bytes = hmap[hash]
262             fileobj.seek(offset)
263             data = fileobj.read(bytes)
264             r = self._put_block_async(data, hash, upload_gen)
265             flying.append(r)
266             unfinished = self._watch_thread_limit(flying)
267             for thread in set(flying).difference(unfinished):
268                 if thread.exception:
269                     failures.append(thread)
270                     if isinstance(
271                             thread.exception,
272                             ClientError) and thread.exception.status == 502:
273                         self.POOLSIZE = self._thread_limit
274                 elif thread.isAlive():
275                     flying.append(thread)
276                 elif upload_gen:
277                     try:
278                         upload_gen.next()
279                     except:
280                         pass
281             flying = unfinished
282
283         for thread in flying:
284             thread.join()
285             if thread.exception:
286                 failures.append(thread)
287             elif upload_gen:
288                 try:
289                     upload_gen.next()
290                 except:
291                     pass
292
293         return [failure.kwargs['hash'] for failure in failures]
294
295     def upload_object(
296             self, obj, f,
297             size=None,
298             hash_cb=None,
299             upload_cb=None,
300             etag=None,
301             content_encoding=None,
302             content_disposition=None,
303             content_type=None,
304             sharing=None,
305             public=None):
306         """Upload an object using multiple connections (threads)
307
308         :param obj: (str) remote object path
309
310         :param f: open file descriptor (rb)
311
312         :param hash_cb: optional progress.bar object for calculating hashes
313
314         :param upload_cb: optional progress.bar object for uploading
315
316         :param etag: (str)
317
318         :param content_encoding: (str)
319
320         :param content_disposition: (str)
321
322         :param content_type: (str)
323
324         :param sharing: {'read':[user and/or grp names],
325             'write':[usr and/or grp names]}
326
327         :param public: (bool)
328         """
329         self._assert_container()
330
331         #init
332         block_info = (blocksize, blockhash, size, nblocks) =\
333             self._get_file_block_info(f, size)
334         (hashes, hmap, offset) = ([], {}, 0)
335         if content_type is None:
336             content_type = 'application/octet-stream'
337
338         self._caclulate_uploaded_blocks(
339             *block_info,
340             hashes=hashes,
341             hmap=hmap,
342             fileobj=f,
343             hash_cb=hash_cb)
344
345         hashmap = dict(bytes=size, hashes=hashes)
346         missing = self._get_missing_hashes(
347             obj, hashmap,
348             content_type=content_type,
349             size=size,
350             etag=etag,
351             content_encoding=content_encoding,
352             content_disposition=content_disposition,
353             permissions=sharing,
354             public=public)
355
356         if missing is None:
357             return
358
359         if upload_cb:
360             upload_gen = upload_cb(len(missing))
361             for i in range(len(missing), len(hashmap['hashes']) + 1):
362                 try:
363                     upload_gen.next()
364                 except:
365                     upload_gen = None
366         else:
367             upload_gen = None
368
369         retries = 7
370         try:
371             while retries:
372                 sendlog.info('%s blocks missing' % len(missing))
373                 num_of_blocks = len(missing)
374                 missing = self._upload_missing_blocks(
375                     missing,
376                     hmap,
377                     f,
378                     upload_gen)
379                 if missing:
380                     if num_of_blocks == len(missing):
381                         retries -= 1
382                     else:
383                         num_of_blocks = len(missing)
384                 else:
385                     break
386             if missing:
387                 raise ClientError(
388                     '%s blocks failed to upload' % len(missing),
389                     status=800)
390         except KeyboardInterrupt:
391             sendlog.info('- - - wait for threads to finish')
392             for thread in activethreads():
393                 thread.join()
394             raise
395
396         r = self.object_put(
397             obj,
398             format='json',
399             hashmap=True,
400             content_type=content_type,
401             json=hashmap,
402             success=201)
403         r.release()
404
405     # download_* auxiliary methods
406     def _get_remote_blocks_info(self, obj, **restargs):
407         #retrieve object hashmap
408         myrange = restargs.pop('data_range', None)
409         hashmap = self.get_object_hashmap(obj, **restargs)
410         restargs['data_range'] = myrange
411         blocksize = int(hashmap['block_size'])
412         blockhash = hashmap['block_hash']
413         total_size = hashmap['bytes']
414         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
415         map_dict = {}
416         for i, h in enumerate(hashmap['hashes']):
417             map_dict[h] = i
418         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
419
420     def _dump_blocks_sync(
421             self, obj, remote_hashes, blocksize, total_size, dst, range,
422             **args):
423         for blockid, blockhash in enumerate(remote_hashes):
424             if blockhash:
425                 start = blocksize * blockid
426                 is_last = start + blocksize > total_size
427                 end = (total_size - 1) if is_last else (start + blocksize - 1)
428                 (start, end) = _range_up(start, end, range)
429                 args['data_range'] = 'bytes=%s-%s' % (start, end)
430                 r = self.object_get(obj, success=(200, 206), **args)
431                 self._cb_next()
432                 dst.write(r.content)
433                 dst.flush()
434
435     def _get_block_async(self, obj, **args):
436         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
437         event.start()
438         return event
439
440     def _hash_from_file(self, fp, start, size, blockhash):
441         fp.seek(start)
442         block = fp.read(size)
443         h = newhashlib(blockhash)
444         h.update(block.strip('\x00'))
445         return hexlify(h.digest())
446
447     def _thread2file(self, flying, local_file, offset=0, **restargs):
448         """write the results of a greenleted rest call to a file
449
450         :param offset: the offset of the file up to blocksize
451         - e.g. if the range is 10-100, all blocks will be written to
452         normal_position - 10
453         """
454         finished = []
455         for i, (start, g) in enumerate(flying.items()):
456             if not g.isAlive():
457                 if g.exception:
458                     raise g.exception
459                 block = g.value.content
460                 local_file.seek(start - offset)
461                 local_file.write(block)
462                 self._cb_next()
463                 finished.append(flying.pop(start))
464         local_file.flush()
465         return finished
466
467     def _dump_blocks_async(
468             self, obj, remote_hashes, blocksize, total_size, local_file,
469             blockhash=None, resume=False, filerange=None, **restargs):
470         file_size = fstat(local_file.fileno()).st_size if resume else 0
471         flying = {}
472         finished = []
473         offset = 0
474         if filerange is not None:
475             rstart = int(filerange.split('-')[0])
476             offset = rstart if blocksize > rstart else rstart % blocksize
477
478         self._init_thread_limit()
479         for block_hash, blockid in remote_hashes.items():
480             start = blocksize * blockid
481             if start < file_size and block_hash == self._hash_from_file(
482                     local_file, start, blocksize, blockhash):
483                 self._cb_next()
484                 continue
485             self._watch_thread_limit(flying.values())
486             finished += self._thread2file(
487                 flying,
488                 local_file,
489                 offset,
490                 **restargs)
491             end = total_size - 1 if start + blocksize > total_size\
492                 else start + blocksize - 1
493             (start, end) = _range_up(start, end, filerange)
494             if start == end:
495                 self._cb_next()
496                 continue
497             restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
498             flying[start] = self._get_block_async(obj, **restargs)
499
500         for thread in flying.values():
501             thread.join()
502         finished += self._thread2file(flying, local_file, offset, **restargs)
503
504     def download_object(
505             self, obj, dst,
506             download_cb=None,
507             version=None,
508             resume=False,
509             range_str=None,
510             if_match=None,
511             if_none_match=None,
512             if_modified_since=None,
513             if_unmodified_since=None):
514         """Download an object (multiple connections, random blocks)
515
516         :param obj: (str) remote object path
517
518         :param dst: open file descriptor (wb+)
519
520         :param download_cb: optional progress.bar object for downloading
521
522         :param version: (str) file version
523
524         :param resume: (bool) if set, preserve already downloaded file parts
525
526         :param range_str: (str) from, to are file positions (int) in bytes
527
528         :param if_match: (str)
529
530         :param if_none_match: (str)
531
532         :param if_modified_since: (str) formated date
533
534         :param if_unmodified_since: (str) formated date"""
535         restargs = dict(
536             version=version,
537             data_range=None if range_str is None else 'bytes=%s' % range_str,
538             if_match=if_match,
539             if_none_match=if_none_match,
540             if_modified_since=if_modified_since,
541             if_unmodified_since=if_unmodified_since)
542
543         (
544             blocksize,
545             blockhash,
546             total_size,
547             hash_list,
548             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
549         assert total_size >= 0
550
551         if download_cb:
552             self.progress_bar_gen = download_cb(len(remote_hashes))
553             self._cb_next()
554
555         if dst.isatty():
556             self._dump_blocks_sync(
557                 obj,
558                 hash_list,
559                 blocksize,
560                 total_size,
561                 dst,
562                 range_str,
563                 **restargs)
564         else:
565             self._dump_blocks_async(
566                 obj,
567                 remote_hashes,
568                 blocksize,
569                 total_size,
570                 dst,
571                 blockhash,
572                 resume,
573                 range_str,
574                 **restargs)
575             if not range_str:
576                 dst.truncate(total_size)
577
578         self._complete_cb()
579
580     #Command Progress Bar method
581     def _cb_next(self):
582         if hasattr(self, 'progress_bar_gen'):
583             try:
584                 self.progress_bar_gen.next()
585             except:
586                 pass
587
588     def _complete_cb(self):
589         while True:
590             try:
591                 self.progress_bar_gen.next()
592             except:
593                 break
594
595     def get_object_hashmap(
596             self, obj,
597             version=None,
598             if_match=None,
599             if_none_match=None,
600             if_modified_since=None,
601             if_unmodified_since=None,
602             data_range=None):
603         """
604         :param obj: (str) remote object path
605
606         :param if_match: (str)
607
608         :param if_none_match: (str)
609
610         :param if_modified_since: (str) formated date
611
612         :param if_unmodified_since: (str) formated date
613
614         :param data_range: (str) from-to where from and to are integers
615             denoting file positions in bytes
616
617         :returns: (list)
618         """
619         try:
620             r = self.object_get(
621                 obj,
622                 hashmap=True,
623                 version=version,
624                 if_etag_match=if_match,
625                 if_etag_not_match=if_none_match,
626                 if_modified_since=if_modified_since,
627                 if_unmodified_since=if_unmodified_since,
628                 data_range=data_range)
629         except ClientError as err:
630             if err.status == 304 or err.status == 412:
631                 return {}
632             raise
633         return r.json
634
635     def set_account_group(self, group, usernames):
636         """
637         :param group: (str)
638
639         :param usernames: (list)
640         """
641         r = self.account_post(update=True, groups={group: usernames})
642         r.release()
643
644     def del_account_group(self, group):
645         """
646         :param group: (str)
647         """
648         r = self.account_post(update=True, groups={group: []})
649         r.release()
650
651     def get_account_info(self, until=None):
652         """
653         :param until: (str) formated date
654
655         :returns: (dict)
656         """
657         r = self.account_head(until=until)
658         if r.status_code == 401:
659             raise ClientError("No authorization")
660         return r.headers
661
662     def get_account_quota(self):
663         """
664         :returns: (dict)
665         """
666         return filter_in(
667             self.get_account_info(),
668             'X-Account-Policy-Quota',
669             exactMatch=True)
670
671     def get_account_versioning(self):
672         """
673         :returns: (dict)
674         """
675         return filter_in(
676             self.get_account_info(),
677             'X-Account-Policy-Versioning',
678             exactMatch=True)
679
680     def get_account_meta(self, until=None):
681         """
682         :meta until: (str) formated date
683
684         :returns: (dict)
685         """
686         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
687
688     def get_account_group(self):
689         """
690         :returns: (dict)
691         """
692         return filter_in(self.get_account_info(), 'X-Account-Group-')
693
694     def set_account_meta(self, metapairs):
695         """
696         :param metapairs: (dict) {key1:val1, key2:val2, ...}
697         """
698         assert(type(metapairs) is dict)
699         r = self.account_post(update=True, metadata=metapairs)
700         r.release()
701
702     def del_account_meta(self, metakey):
703         """
704         :param metakey: (str) metadatum key
705         """
706         r = self.account_post(update=True, metadata={metakey: ''})
707         r.release()
708
709     def set_account_quota(self, quota):
710         """
711         :param quota: (int)
712         """
713         r = self.account_post(update=True, quota=quota)
714         r.release()
715
716     def set_account_versioning(self, versioning):
717         """
718         "param versioning: (str)
719         """
720         r = self.account_post(update=True, versioning=versioning)
721         r.release()
722
723     def list_containers(self):
724         """
725         :returns: (dict)
726         """
727         r = self.account_get()
728         return r.json
729
730     def del_container(self, until=None, delimiter=None):
731         """
732         :param until: (str) formated date
733
734         :param delimiter: (str) with / empty container
735
736         :raises ClientError: 404 Container does not exist
737
738         :raises ClientError: 409 Container is not empty
739         """
740         self._assert_container()
741         r = self.container_delete(
742             until=until,
743             delimiter=delimiter,
744             success=(204, 404, 409))
745         r.release()
746         if r.status_code == 404:
747             raise ClientError(
748                 'Container "%s" does not exist' % self.container,
749                 r.status_code)
750         elif r.status_code == 409:
751             raise ClientError(
752                 'Container "%s" is not empty' % self.container,
753                 r.status_code)
754
755     def get_container_versioning(self, container):
756         """
757         :param container: (str)
758
759         :returns: (dict)
760         """
761         self.container = container
762         return filter_in(
763             self.get_container_info(),
764             'X-Container-Policy-Versioning')
765
766     def get_container_quota(self, container):
767         """
768         :param container: (str)
769
770         :returns: (dict)
771         """
772         self.container = container
773         return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
774
775     def get_container_info(self, until=None):
776         """
777         :param until: (str) formated date
778
779         :returns: (dict)
780
781         :raises ClientError: 404 Container not found
782         """
783         try:
784             r = self.container_head(until=until)
785         except ClientError as err:
786             err.details.append('for container %s' % self.container)
787             raise err
788         return r.headers
789
790     def get_container_meta(self, until=None):
791         """
792         :param until: (str) formated date
793
794         :returns: (dict)
795         """
796         return filter_in(
797             self.get_container_info(until=until),
798             'X-Container-Meta')
799
800     def get_container_object_meta(self, until=None):
801         """
802         :param until: (str) formated date
803
804         :returns: (dict)
805         """
806         return filter_in(
807             self.get_container_info(until=until),
808             'X-Container-Object-Meta')
809
810     def set_container_meta(self, metapairs):
811         """
812         :param metapairs: (dict) {key1:val1, key2:val2, ...}
813         """
814         assert(type(metapairs) is dict)
815         r = self.container_post(update=True, metadata=metapairs)
816         r.release()
817
818     def del_container_meta(self, metakey):
819         """
820         :param metakey: (str) metadatum key
821         """
822         r = self.container_post(update=True, metadata={metakey: ''})
823         r.release()
824
825     def set_container_quota(self, quota):
826         """
827         :param quota: (int)
828         """
829         r = self.container_post(update=True, quota=quota)
830         r.release()
831
832     def set_container_versioning(self, versioning):
833         """
834         :param versioning: (str)
835         """
836         r = self.container_post(update=True, versioning=versioning)
837         r.release()
838
839     def del_object(self, obj, until=None, delimiter=None):
840         """
841         :param obj: (str) remote object path
842
843         :param until: (str) formated date
844
845         :param delimiter: (str)
846         """
847         self._assert_container()
848         r = self.object_delete(obj, until=until, delimiter=delimiter)
849         r.release()
850
851     def set_object_meta(self, obj, metapairs):
852         """
853         :param obj: (str) remote object path
854
855         :param metapairs: (dict) {key1:val1, key2:val2, ...}
856         """
857         assert(type(metapairs) is dict)
858         r = self.object_post(obj, update=True, metadata=metapairs)
859         r.release()
860
861     def del_object_meta(self, obj, metakey):
862         """
863         :param obj: (str) remote object path
864
865         :param metakey: (str) metadatum key
866         """
867         r = self.object_post(obj, update=True, metadata={metakey: ''})
868         r.release()
869
870     def publish_object(self, obj):
871         """
872         :param obj: (str) remote object path
873
874         :returns: (str) access url
875         """
876         r = self.object_post(obj, update=True, public=True)
877         r.release()
878         info = self.get_object_info(obj)
879         pref, sep, rest = self.base_url.partition('//')
880         base = rest.split('/')[0]
881         newurl = path4url(
882             '%s%s%s' % (pref, sep, base),
883             info['x-object-public'])
884         return newurl[1:]
885
886     def unpublish_object(self, obj):
887         """
888         :param obj: (str) remote object path
889         """
890         r = self.object_post(obj, update=True, public=False)
891         r.release()
892
893     def get_object_info(self, obj, version=None):
894         """
895         :param obj: (str) remote object path
896
897         :param version: (str)
898
899         :returns: (dict)
900         """
901         try:
902             r = self.object_head(obj, version=version)
903             return r.headers
904         except ClientError as ce:
905             if ce.status == 404:
906                 raise ClientError('Object not found', status=404)
907             raise
908
909     def get_object_meta(self, obj, version=None):
910         """
911         :param obj: (str) remote object path
912
913         :param version: (str)
914
915         :returns: (dict)
916         """
917         return filter_in(
918             self.get_object_info(obj, version=version),
919             'X-Object-Meta')
920
921     def get_object_sharing(self, obj):
922         """
923         :param obj: (str) remote object path
924
925         :returns: (dict)
926         """
927         r = filter_in(
928             self.get_object_info(obj),
929             'X-Object-Sharing',
930             exactMatch=True)
931         reply = {}
932         if len(r) > 0:
933             perms = r['x-object-sharing'].split(';')
934             for perm in perms:
935                 try:
936                     perm.index('=')
937                 except ValueError:
938                     raise ClientError('Incorrect reply format')
939                 (key, val) = perm.strip().split('=')
940                 reply[key] = val
941         return reply
942
943     def set_object_sharing(
944             self, obj,
945             read_permition=False, write_permition=False):
946         """Give read/write permisions to an object.
947
948         :param obj: (str) remote object path
949
950         :param read_permition: (list - bool) users and user groups that get
951             read permition for this object - False means all previous read
952             permissions will be removed
953
954         :param write_perimition: (list - bool) of users and user groups to get
955            write permition for this object - False means all previous write
956            permissions will be removed
957         """
958
959         perms = dict(
960             read='' if not read_permition else read_permition,
961             write='' if not write_permition else write_permition)
962         r = self.object_post(obj, update=True, permissions=perms)
963         r.release()
964
965     def del_object_sharing(self, obj):
966         """
967         :param obj: (str) remote object path
968         """
969         self.set_object_sharing(obj)
970
971     def append_object(self, obj, source_file, upload_cb=None):
972         """
973         :param obj: (str) remote object path
974
975         :param source_file: open file descriptor
976
977         :param upload_db: progress.bar for uploading
978         """
979
980         self._assert_container()
981         meta = self.get_container_info()
982         blocksize = int(meta['x-container-block-size'])
983         filesize = fstat(source_file.fileno()).st_size
984         nblocks = 1 + (filesize - 1) // blocksize
985         offset = 0
986         if upload_cb:
987             upload_gen = upload_cb(nblocks)
988             upload_gen.next()
989         for i in range(nblocks):
990             block = source_file.read(min(blocksize, filesize - offset))
991             offset += len(block)
992             r = self.object_post(
993                 obj,
994                 update=True,
995                 content_range='bytes */*',
996                 content_type='application/octet-stream',
997                 content_length=len(block),
998                 data=block)
999             r.release()
1000
1001             if upload_cb:
1002                 upload_gen.next()
1003
1004     def truncate_object(self, obj, upto_bytes):
1005         """
1006         :param obj: (str) remote object path
1007
1008         :param upto_bytes: max number of bytes to leave on file
1009         """
1010         r = self.object_post(
1011             obj,
1012             update=True,
1013             content_range='bytes 0-%s/*' % upto_bytes,
1014             content_type='application/octet-stream',
1015             object_bytes=upto_bytes,
1016             source_object=path4url(self.container, obj))
1017         r.release()
1018
1019     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1020         """Overwrite a part of an object from local source file
1021
1022         :param obj: (str) remote object path
1023
1024         :param start: (int) position in bytes to start overwriting from
1025
1026         :param end: (int) position in bytes to stop overwriting at
1027
1028         :param source_file: open file descriptor
1029
1030         :param upload_db: progress.bar for uploading
1031         """
1032
1033         r = self.get_object_info(obj)
1034         rf_size = int(r['content-length'])
1035         if rf_size < int(start):
1036             raise ClientError(
1037                 'Range start exceeds file size',
1038                 status=416)
1039         elif rf_size < int(end):
1040             raise ClientError(
1041                 'Range end exceeds file size',
1042                 status=416)
1043         self._assert_container()
1044         meta = self.get_container_info()
1045         blocksize = int(meta['x-container-block-size'])
1046         filesize = fstat(source_file.fileno()).st_size
1047         datasize = int(end) - int(start) + 1
1048         nblocks = 1 + (datasize - 1) // blocksize
1049         offset = 0
1050         if upload_cb:
1051             upload_gen = upload_cb(nblocks)
1052             upload_gen.next()
1053         for i in range(nblocks):
1054             read_size = min(blocksize, filesize - offset, datasize - offset)
1055             block = source_file.read(read_size)
1056             r = self.object_post(
1057                 obj,
1058                 update=True,
1059                 content_type='application/octet-stream',
1060                 content_length=len(block),
1061                 content_range='bytes %s-%s/*' % (
1062                     start + offset,
1063                     start + offset + len(block) - 1),
1064                 data=block)
1065             offset += len(block)
1066             r.release()
1067
1068             if upload_cb:
1069                 upload_gen.next()
1070
1071     def copy_object(
1072             self, src_container, src_object, dst_container,
1073             dst_object=False,
1074             source_version=None,
1075             public=False,
1076             content_type=None,
1077             delimiter=None):
1078         """
1079         :param src_container: (str) source container
1080
1081         :param src_object: (str) source object path
1082
1083         :param dst_container: (str) destination container
1084
1085         :param dst_object: (str) destination object path
1086
1087         :param source_version: (str) source object version
1088
1089         :param public: (bool)
1090
1091         :param content_type: (str)
1092
1093         :param delimiter: (str)
1094         """
1095         self._assert_account()
1096         self.container = dst_container
1097         dst_object = dst_object or src_object
1098         src_path = path4url(src_container, src_object)
1099         r = self.object_put(
1100             dst_object,
1101             success=201,
1102             copy_from=src_path,
1103             content_length=0,
1104             source_version=source_version,
1105             public=public,
1106             content_type=content_type,
1107             delimiter=delimiter)
1108         r.release()
1109
1110     def move_object(
1111             self, src_container, src_object, dst_container,
1112             dst_object=False,
1113             source_version=None,
1114             public=False,
1115             content_type=None,
1116             delimiter=None):
1117         """
1118         :param src_container: (str) source container
1119
1120         :param src_object: (str) source object path
1121
1122         :param dst_container: (str) destination container
1123
1124         :param dst_object: (str) destination object path
1125
1126         :param source_version: (str) source object version
1127
1128         :param public: (bool)
1129
1130         :param content_type: (str)
1131
1132         :param delimiter: (str)
1133         """
1134         self._assert_account()
1135         self.container = dst_container
1136         dst_object = dst_object or src_object
1137         src_path = path4url(src_container, src_object)
1138         r = self.object_put(
1139             dst_object,
1140             success=201,
1141             move_from=src_path,
1142             content_length=0,
1143             source_version=source_version,
1144             public=public,
1145             content_type=content_type,
1146             delimiter=delimiter)
1147         r.release()
1148
1149     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1150         """Get accounts that share with self.account
1151
1152         :param limit: (str)
1153
1154         :param marker: (str)
1155
1156         :returns: (dict)
1157         """
1158         self._assert_account()
1159
1160         self.set_param('format', 'json')
1161         self.set_param('limit', limit, iff=limit is not None)
1162         self.set_param('marker', marker, iff=marker is not None)
1163
1164         path = ''
1165         success = kwargs.pop('success', (200, 204))
1166         r = self.get(path, *args, success=success, **kwargs)
1167         return r.json
1168
1169     def get_object_versionlist(self, obj):
1170         """
1171         :param obj: (str) remote object path
1172
1173         :returns: (list)
1174         """
1175         self._assert_container()
1176         r = self.object_get(obj, format='json', version='list')
1177         return r.json['versions']