Adjust up to store-overwrite
[kamaki] / kamaki / clients / pithos.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos_rest_api import PithosRestAPI
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestAPI):
69     """GRNet Pithos API client"""
70
71     _thread_exceptions = []
72
73     def __init__(self, base_url, token, account=None, container=None):
74         super(PithosClient, self).__init__(base_url, token, account, container)
75
76     def purge_container(self):
77         """Delete an empty container and destroy associated blocks
78         """
79         r = self.container_delete(until=unicode(time()))
80         r.release()
81
82     def upload_object_unchunked(self, obj, f,
83         withHashFile=False,
84         size=None,
85         etag=None,
86         content_encoding=None,
87         content_disposition=None,
88         content_type=None,
89         sharing=None,
90         public=None):
91         """
92         :param obj: (str) remote object path
93
94         :param f: open file descriptor
95
96         :param withHashFile: (bool)
97
98         :param size: (int) size of data to upload
99
100         :param etag: (str)
101
102         :param content_encoding: (str)
103
104         :param content_disposition: (str)
105
106         :param content_type: (str)
107
108         :param sharing: {'read':[user and/or grp names],
109             'write':[usr and/or grp names]}
110
111         :param public: (bool)
112         """
113         self._assert_container()
114
115         if withHashFile:
116             data = f.read()
117             try:
118                 import json
119                 data = json.dumps(json.loads(data))
120             except ValueError:
121                 raise ClientError(message='"%s" is not json-formated' % f.name,
122                     status=1)
123             except SyntaxError:
124                 raise ClientError(message='"%s" is not a valid hashmap file'\
125                 % f.name, status=1)
126             f = StringIO(data)
127         data = f.read(size) if size is not None else f.read()
128         r = self.object_put(obj,
129             data=data,
130             etag=etag,
131             content_encoding=content_encoding,
132             content_disposition=content_disposition,
133             content_type=content_type,
134             permissions=sharing,
135             public=public,
136             success=201)
137         r.release()
138
139     def create_object_by_manifestation(self, obj,
140         etag=None,
141         content_encoding=None,
142         content_disposition=None,
143         content_type=None,
144         sharing=None,
145         public=None):
146         """
147         :param obj: (str) remote object path
148
149         :param etag: (str)
150
151         :param content_encoding: (str)
152
153         :param content_disposition: (str)
154
155         :param content_type: (str)
156
157         :param sharing: {'read':[user and/or grp names],
158             'write':[usr and/or grp names]}
159
160         :param public: (bool)
161         """
162         self._assert_container()
163         r = self.object_put(obj,
164             content_length=0,
165             etag=etag,
166             content_encoding=content_encoding,
167             content_disposition=content_disposition,
168             content_type=content_type,
169             permissions=sharing,
170             public=public,
171             manifest='%s/%s' % (self.container, obj))
172         r.release()
173
174     # upload_* auxiliary methods
175     def _put_block_async(self, data, hash, upload_gen=None):
176         event = SilentEvent(method=self._put_block, data=data, hash=hash)
177         event.start()
178         return event
179
180     def _put_block(self, data, hash):
181         from random import randint
182         if not randint(0, 7):
183             raise ClientError('BAD GATEWAY STUFF', 503)
184         r = self.container_post(update=True,
185             content_type='application/octet-stream',
186             content_length=len(data),
187             data=data,
188             format='json')
189         assert r.json[0] == hash, 'Local hash does not match server'
190
191     def _get_file_block_info(self, fileobj, size=None):
192         meta = self.get_container_info()
193         blocksize = int(meta['x-container-block-size'])
194         blockhash = meta['x-container-block-hash']
195         size = size if size is not None else fstat(fileobj.fileno()).st_size
196         nblocks = 1 + (size - 1) // blocksize
197         return (blocksize, blockhash, size, nblocks)
198
199     def _get_missing_hashes(self, obj, json,
200         size=None,
201         format='json',
202         hashmap=True,
203         content_type=None,
204         etag=None,
205         content_encoding=None,
206         content_disposition=None,
207         permissions=None,
208         public=None,
209         success=(201, 409)):
210         r = self.object_put(obj,
211             format='json',
212             hashmap=True,
213             content_type=content_type,
214             json=json,
215             etag=etag,
216             content_encoding=content_encoding,
217             content_disposition=content_disposition,
218             permissions=permissions,
219             public=public,
220             success=success)
221         if r.status_code == 201:
222             r.release()
223             return None
224         return r.json
225
226     def _caclulate_uploaded_blocks(self,
227         blocksize,
228         blockhash,
229         size,
230         nblocks,
231         hashes,
232         hmap,
233         fileobj,
234         hash_cb=None):
235         offset = 0
236         if hash_cb:
237             hash_gen = hash_cb(nblocks)
238             hash_gen.next()
239
240         for i in range(nblocks):
241             block = fileobj.read(min(blocksize, size - offset))
242             bytes = len(block)
243             hash = _pithos_hash(block, blockhash)
244             hashes.append(hash)
245             hmap[hash] = (offset, bytes)
246             offset += bytes
247             if hash_cb:
248                 hash_gen.next()
249         if offset != size:
250             assert offset == size, \
251                    "Failed to calculate uploaded blocks: " \
252                     "Offset and object size do not match"
253
254     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
255         """upload missing blocks asynchronously"""
256
257         self._init_thread_limit()
258
259         flying = []
260         failures = []
261         for hash in missing:
262             offset, bytes = hmap[hash]
263             fileobj.seek(offset)
264             data = fileobj.read(bytes)
265             r = self._put_block_async(data, hash, upload_gen)
266             flying.append(r)
267             unfinished = self._watch_thread_limit(flying)
268             for thread in set(flying).difference(unfinished):
269                 if thread.exception:
270                     failures.append(thread)
271                     if isinstance(thread.exception, ClientError)\
272                     and thread.exception.status == 502:
273                         self.POOLSIZE = self._thread_limit
274                 elif thread.isAlive():
275                     flying.append(thread)
276                 elif upload_gen:
277                     try:
278                         upload_gen.next()
279                     except:
280                         pass
281             flying = unfinished
282
283         for thread in flying:
284             thread.join()
285             if thread.exception:
286                 failures.append(thread)
287             elif upload_gen:
288                 try:
289                     upload_gen.next()
290                 except:
291                     pass
292
293         return [failure.kwargs['hash'] for failure in failures]
294
295     def upload_object(self, obj, f,
296         size=None,
297         hash_cb=None,
298         upload_cb=None,
299         etag=None,
300         content_encoding=None,
301         content_disposition=None,
302         content_type=None,
303         sharing=None,
304         public=None):
305         """Upload an object using multiple connections (threads)
306
307         :param obj: (str) remote object path
308
309         :param f: open file descriptor (rb)
310
311         :param hash_cb: optional progress.bar object for calculating hashes
312
313         :param upload_cb: optional progress.bar object for uploading
314
315         :param etag: (str)
316
317         :param content_encoding: (str)
318
319         :param content_disposition: (str)
320
321         :param content_type: (str)
322
323         :param sharing: {'read':[user and/or grp names],
324             'write':[usr and/or grp names]}
325
326         :param public: (bool)
327         """
328         self._assert_container()
329
330         #init
331         block_info = (blocksize, blockhash, size, nblocks) =\
332             self._get_file_block_info(f, size)
333         (hashes, hmap, offset) = ([], {}, 0)
334         if content_type is None:
335             content_type = 'application/octet-stream'
336
337         self._caclulate_uploaded_blocks(*block_info,
338             hashes=hashes,
339             hmap=hmap,
340             fileobj=f,
341             hash_cb=hash_cb)
342
343         hashmap = dict(bytes=size, hashes=hashes)
344         missing = self._get_missing_hashes(obj, hashmap,
345             content_type=content_type,
346             size=size,
347             etag=etag,
348             content_encoding=content_encoding,
349             content_disposition=content_disposition,
350             permissions=sharing,
351             public=public)
352
353         if missing is None:
354             return
355
356         if upload_cb:
357             upload_gen = upload_cb(len(missing))
358             for i in range(len(missing), len(hashmap['hashes']) + 1):
359                 try:
360                     upload_gen.next()
361                 except:
362                     upload_gen = None
363         else:
364             upload_gen = None
365
366         retries = 7
367         try:
368             while retries:
369                 sendlog.info('%s blocks missing' % len(missing))
370                 num_of_blocks = len(missing)
371                 missing = self._upload_missing_blocks(
372                     missing,
373                     hmap,
374                     f,
375                     upload_gen)
376                 if missing:
377                     if num_of_blocks == len(missing):
378                         retries -= 1
379                     else:
380                         num_of_blocks = len(missing)
381                 else:
382                     break
383             if missing:
384                 raise ClientError(
385                     '%s blocks failed to upload' % len(missing),
386                     status=800)
387         except KeyboardInterrupt:
388             sendlog.info('- - - wait for threads to finish')
389             for thread in activethreads():
390                 thread.join()
391             raise
392
393         r = self.object_put(
394             obj,
395             format='json',
396             hashmap=True,
397             content_type=content_type,
398             json=hashmap,
399             success=201)
400         r.release()
401
402     # download_* auxiliary methods
403     def _get_remote_blocks_info(self, obj, **restargs):
404         #retrieve object hashmap
405         myrange = restargs.pop('data_range', None)
406         hashmap = self.get_object_hashmap(obj, **restargs)
407         restargs['data_range'] = myrange
408         blocksize = int(hashmap['block_size'])
409         blockhash = hashmap['block_hash']
410         total_size = hashmap['bytes']
411         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
412         map_dict = {}
413         for i, h in enumerate(hashmap['hashes']):
414             map_dict[h] = i
415         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
416
417     def _dump_blocks_sync(self,
418         obj,
419         remote_hashes,
420         blocksize,
421         total_size,
422         dst,
423         range,
424         **restargs):
425         for blockid, blockhash in enumerate(remote_hashes):
426             if blockhash == None:
427                 continue
428             start = blocksize * blockid
429             end = total_size - 1 if start + blocksize > total_size\
430                 else start + blocksize - 1
431             (start, end) = _range_up(start, end, range)
432             restargs['data_range'] = 'bytes=%s-%s' % (start, end)
433             r = self.object_get(obj, success=(200, 206), **restargs)
434             self._cb_next()
435             dst.write(r.content)
436             dst.flush()
437
438     def _get_block_async(self, obj, **restargs):
439         event = SilentEvent(self.object_get,
440             obj,
441             success=(200, 206),
442             **restargs)
443         event.start()
444         return event
445
446     def _hash_from_file(self, fp, start, size, blockhash):
447         fp.seek(start)
448         block = fp.read(size)
449         h = newhashlib(blockhash)
450         h.update(block.strip('\x00'))
451         return hexlify(h.digest())
452
453     def _thread2file(self,
454         flying,
455         local_file,
456         offset=0,
457         **restargs):
458         """write the results of a greenleted rest call to a file
459         @offset: the offset of the file up to blocksize
460             - e.g. if the range is 10-100, all
461         blocks will be written to normal_position - 10"""
462         finished = []
463         for i, (start, g) in enumerate(flying.items()):
464             if not g.isAlive():
465                 if g.exception:
466                     raise g.exception
467                 block = g.value.content
468                 local_file.seek(start - offset)
469                 local_file.write(block)
470                 self._cb_next()
471                 finished.append(flying.pop(start))
472         local_file.flush()
473         return finished
474
475     def _dump_blocks_async(self,
476         obj,
477         remote_hashes,
478         blocksize,
479         total_size,
480         local_file,
481         blockhash=None,
482         resume=False,
483         filerange=None,
484         **restargs):
485
486         file_size = fstat(local_file.fileno()).st_size if resume else 0
487         flying = {}
488         finished = []
489         offset = 0
490         if filerange is not None:
491             rstart = int(filerange.split('-')[0])
492             offset = rstart if blocksize > rstart else rstart % blocksize
493
494         self._init_thread_limit()
495         for block_hash, blockid in remote_hashes.items():
496             start = blocksize * blockid
497             if start < file_size\
498             and block_hash == self._hash_from_file(
499                     local_file,
500                     start,
501                     blocksize,
502                     blockhash):
503                 self._cb_next()
504                 continue
505             self._watch_thread_limit(flying.values())
506             finished += self._thread2file(
507                 flying,
508                 local_file,
509                 offset,
510                 **restargs)
511             end = total_size - 1 if start + blocksize > total_size\
512                 else start + blocksize - 1
513             (start, end) = _range_up(start, end, filerange)
514             if start == end:
515                 self._cb_next()
516                 continue
517             restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
518             flying[start] = self._get_block_async(obj, **restargs)
519
520         for thread in flying.values():
521             thread.join()
522         finished += self._thread2file(flying, local_file, offset, **restargs)
523
524     def download_object(self,
525         obj,
526         dst,
527         download_cb=None,
528         version=None,
529         resume=False,
530         range=None,
531         if_match=None,
532         if_none_match=None,
533         if_modified_since=None,
534         if_unmodified_since=None):
535         """Download an object using multiple connections (threads) and
536             writing to random parts of the file
537
538         :param obj: (str) remote object path
539
540         :param dst: open file descriptor (wb+)
541
542         :param download_cb: optional progress.bar object for downloading
543
544         :param version: (str) file version
545
546         :param resume: (bool) if set, preserve already downloaded file parts
547
548         :param range: (str) from-to where from and to are integers denoting
549             file positions in bytes
550
551         :param if_match: (str)
552
553         :param if_none_match: (str)
554
555         :param if_modified_since: (str) formated date
556
557         :param if_unmodified_since: (str) formated date
558         """
559
560         restargs = dict(version=version,
561             data_range=None if range is None else 'bytes=%s' % range,
562             if_match=if_match,
563             if_none_match=if_none_match,
564             if_modified_since=if_modified_since,
565             if_unmodified_since=if_unmodified_since)
566
567         (blocksize,
568             blockhash,
569             total_size,
570             hash_list,
571             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
572         assert total_size >= 0
573
574         if download_cb:
575             self.progress_bar_gen = download_cb(len(remote_hashes))
576             self._cb_next()
577
578         if dst.isatty():
579             self._dump_blocks_sync(obj,
580                 hash_list,
581                 blocksize,
582                 total_size,
583                 dst,
584                 range,
585                 **restargs)
586         else:
587             self._dump_blocks_async(obj,
588                 remote_hashes,
589                 blocksize,
590                 total_size,
591                 dst,
592                 blockhash,
593                 resume,
594                 range,
595                 **restargs)
596             if range is None:
597                 dst.truncate(total_size)
598
599         self._complete_cb()
600
601     #Command Progress Bar method
602     def _cb_next(self):
603         if hasattr(self, 'progress_bar_gen'):
604             try:
605                 self.progress_bar_gen.next()
606             except:
607                 pass
608
609     def _complete_cb(self):
610         while True:
611             try:
612                 self.progress_bar_gen.next()
613             except:
614                 break
615
616     def get_object_hashmap(self, obj,
617         version=None,
618         if_match=None,
619         if_none_match=None,
620         if_modified_since=None,
621         if_unmodified_since=None,
622         data_range=None):
623         """
624         :param obj: (str) remote object path
625
626         :param if_match: (str)
627
628         :param if_none_match: (str)
629
630         :param if_modified_since: (str) formated date
631
632         :param if_unmodified_since: (str) formated date
633
634         :param data_range: (str) from-to where from and to are integers
635             denoting file positions in bytes
636
637         :returns: (list)
638         """
639         try:
640             r = self.object_get(obj,
641                 hashmap=True,
642                 version=version,
643                 if_etag_match=if_match,
644                 if_etag_not_match=if_none_match,
645                 if_modified_since=if_modified_since,
646                 if_unmodified_since=if_unmodified_since,
647                 data_range=data_range)
648         except ClientError as err:
649             if err.status == 304 or err.status == 412:
650                 return {}
651             raise
652         return r.json
653
654     def set_account_group(self, group, usernames):
655         """
656         :param group: (str)
657
658         :param usernames: (list)
659         """
660         r = self.account_post(update=True, groups={group: usernames})
661         r.release()
662
663     def del_account_group(self, group):
664         """
665         :param group: (str)
666         """
667         r = self.account_post(update=True, groups={group: []})
668         r.release()
669
670     def get_account_info(self, until=None):
671         """
672         :param until: (str) formated date
673
674         :returns: (dict)
675         """
676         r = self.account_head(until=until)
677         if r.status_code == 401:
678             raise ClientError("No authorization")
679         return r.headers
680
681     def get_account_quota(self):
682         """
683         :returns: (dict)
684         """
685         return filter_in(self.get_account_info(),
686             'X-Account-Policy-Quota',
687             exactMatch=True)
688
689     def get_account_versioning(self):
690         """
691         :returns: (dict)
692         """
693         return filter_in(self.get_account_info(),
694             'X-Account-Policy-Versioning',
695             exactMatch=True)
696
697     def get_account_meta(self, until=None):
698         """
699         :meta until: (str) formated date
700
701         :returns: (dict)
702         """
703         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
704
705     def get_account_group(self):
706         """
707         :returns: (dict)
708         """
709         return filter_in(self.get_account_info(), 'X-Account-Group-')
710
711     def set_account_meta(self, metapairs):
712         """
713         :param metapairs: (dict) {key1:val1, key2:val2, ...}
714         """
715         assert(type(metapairs) is dict)
716         r = self.account_post(update=True, metadata=metapairs)
717         r.release()
718
719     def del_account_meta(self, metakey):
720         """
721         :param metakey: (str) metadatum key
722         """
723         r = self.account_post(update=True, metadata={metakey: ''})
724         r.release()
725
726     def set_account_quota(self, quota):
727         """
728         :param quota: (int)
729         """
730         r = self.account_post(update=True, quota=quota)
731         r.release()
732
733     def set_account_versioning(self, versioning):
734         """
735         "param versioning: (str)
736         """
737         r = self.account_post(update=True, versioning=versioning)
738         r.release()
739
740     def list_containers(self):
741         """
742         :returns: (dict)
743         """
744         r = self.account_get()
745         return r.json
746
747     def del_container(self, until=None, delimiter=None):
748         """
749         :param until: (str) formated date
750
751         :param delimiter: (str) with / empty container
752
753         :raises ClientError: 404 Container does not exist
754
755         :raises ClientError: 409 Container is not empty
756         """
757         self._assert_container()
758         r = self.container_delete(until=until,
759             delimiter=delimiter,
760             success=(204, 404, 409))
761         r.release()
762         if r.status_code == 404:
763             raise ClientError('Container "%s" does not exist' % self.container,
764                 r.status_code)
765         elif r.status_code == 409:
766             raise ClientError('Container "%s" is not empty' % self.container,
767                 r.status_code)
768
769     def get_container_versioning(self, container):
770         """
771         :param container: (str)
772
773         :returns: (dict)
774         """
775         self.container = container
776         return filter_in(self.get_container_info(),
777             'X-Container-Policy-Versioning')
778
779     def get_container_quota(self, container):
780         """
781         :param container: (str)
782
783         :returns: (dict)
784         """
785         self.container = container
786         return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
787
788     def get_container_info(self, until=None):
789         """
790         :param until: (str) formated date
791
792         :returns: (dict)
793
794         :raises ClientError: 404 Container not found
795         """
796         try:
797             r = self.container_head(until=until)
798         except ClientError as err:
799             err.details.append('for container %s' % self.container)
800             raise err
801         return r.headers
802
803     def get_container_meta(self, until=None):
804         """
805         :param until: (str) formated date
806
807         :returns: (dict)
808         """
809         return filter_in(self.get_container_info(until=until),
810             'X-Container-Meta')
811
812     def get_container_object_meta(self, until=None):
813         """
814         :param until: (str) formated date
815
816         :returns: (dict)
817         """
818         return filter_in(self.get_container_info(until=until),
819             'X-Container-Object-Meta')
820
821     def set_container_meta(self, metapairs):
822         """
823         :param metapairs: (dict) {key1:val1, key2:val2, ...}
824         """
825         assert(type(metapairs) is dict)
826         r = self.container_post(update=True, metadata=metapairs)
827         r.release()
828
829     def del_container_meta(self, metakey):
830         """
831         :param metakey: (str) metadatum key
832         """
833         r = self.container_post(update=True, metadata={metakey: ''})
834         r.release()
835
836     def set_container_quota(self, quota):
837         """
838         :param quota: (int)
839         """
840         r = self.container_post(update=True, quota=quota)
841         r.release()
842
843     def set_container_versioning(self, versioning):
844         """
845         :param versioning: (str)
846         """
847         r = self.container_post(update=True, versioning=versioning)
848         r.release()
849
850     def del_object(self, obj, until=None, delimiter=None):
851         """
852         :param obj: (str) remote object path
853
854         :param until: (str) formated date
855
856         :param delimiter: (str)
857         """
858         self._assert_container()
859         r = self.object_delete(obj, until=until, delimiter=delimiter)
860         r.release()
861
862     def set_object_meta(self, obj, metapairs):
863         """
864         :param obj: (str) remote object path
865
866         :param metapairs: (dict) {key1:val1, key2:val2, ...}
867         """
868         assert(type(metapairs) is dict)
869         r = self.object_post(obj, update=True, metadata=metapairs)
870         r.release()
871
872     def del_object_meta(self, obj, metakey):
873         """
874         :param obj: (str) remote object path
875
876         :param metakey: (str) metadatum key
877         """
878         r = self.object_post(obj, update=True, metadata={metakey: ''})
879         r.release()
880
881     def publish_object(self, obj):
882         """
883         :param obj: (str) remote object path
884
885         :returns: (str) access url
886         """
887         r = self.object_post(obj, update=True, public=True)
888         r.release()
889         info = self.get_object_info(obj)
890         pref, sep, rest = self.base_url.partition('//')
891         base = rest.split('/')[0]
892         newurl = path4url('%s%s%s' % (pref, sep, base),
893             info['x-object-public'])
894         return newurl[1:]
895
896     def unpublish_object(self, obj):
897         """
898         :param obj: (str) remote object path
899         """
900         r = self.object_post(obj, update=True, public=False)
901         r.release()
902
903     def get_object_info(self, obj, version=None):
904         """
905         :param obj: (str) remote object path
906
907         :param version: (str)
908
909         :returns: (dict)
910         """
911         try:
912             r = self.object_head(obj, version=version)
913             return r.headers
914         except ClientError as ce:
915             if ce.status == 404:
916                 raise ClientError('Object not found', status=404)
917             raise
918
919     def get_object_meta(self, obj, version=None):
920         """
921         :param obj: (str) remote object path
922
923         :param version: (str)
924
925         :returns: (dict)
926         """
927         return filter_in(self.get_object_info(obj, version=version),
928             'X-Object-Meta')
929
930     def get_object_sharing(self, obj):
931         """
932         :param obj: (str) remote object path
933
934         :returns: (dict)
935         """
936         r = filter_in(self.get_object_info(obj),
937             'X-Object-Sharing',
938             exactMatch=True)
939         reply = {}
940         if len(r) > 0:
941             perms = r['x-object-sharing'].split(';')
942             for perm in perms:
943                 try:
944                     perm.index('=')
945                 except ValueError:
946                     raise ClientError('Incorrect reply format')
947                 (key, val) = perm.strip().split('=')
948                 reply[key] = val
949         return reply
950
951     def set_object_sharing(self, obj,
952         read_permition=False,
953         write_permition=False):
954         """Give read/write permisions to an object.
955
956         :param obj: (str) remote object path
957
958         :param read_permition: (list - bool) users and user groups that get
959             read permition for this object - False means all previous read
960             permissions will be removed
961
962         :param write_perimition: (list - bool) of users and user groups to get
963            write permition for this object - False means all previous write
964            permissions will be removed
965         """
966
967         perms = dict(read='' if not read_permition else read_permition,
968             write='' if not write_permition else write_permition)
969         r = self.object_post(obj, update=True, permissions=perms)
970         r.release()
971
972     def del_object_sharing(self, obj):
973         """
974         :param obj: (str) remote object path
975         """
976         self.set_object_sharing(obj)
977
978     def append_object(self, obj, source_file, upload_cb=None):
979         """
980         :param obj: (str) remote object path
981
982         :param source_file: open file descriptor
983
984         :param upload_db: progress.bar for uploading
985         """
986
987         self._assert_container()
988         meta = self.get_container_info()
989         blocksize = int(meta['x-container-block-size'])
990         filesize = fstat(source_file.fileno()).st_size
991         nblocks = 1 + (filesize - 1) // blocksize
992         offset = 0
993         if upload_cb:
994             upload_gen = upload_cb(nblocks)
995             upload_gen.next()
996         for i in range(nblocks):
997             block = source_file.read(min(blocksize, filesize - offset))
998             offset += len(block)
999             r = self.object_post(obj,
1000                 update=True,
1001                 content_range='bytes */*',
1002                 content_type='application/octet-stream',
1003                 content_length=len(block),
1004                 data=block)
1005             r.release()
1006
1007             if upload_cb:
1008                 upload_gen.next()
1009
1010     def truncate_object(self, obj, upto_bytes):
1011         """
1012         :param obj: (str) remote object path
1013
1014         :param upto_bytes: max number of bytes to leave on file
1015         """
1016         r = self.object_post(obj,
1017             update=True,
1018             content_range='bytes 0-%s/*' % upto_bytes,
1019             content_type='application/octet-stream',
1020             object_bytes=upto_bytes,
1021             source_object=path4url(self.container, obj))
1022         r.release()
1023
1024     def overwrite_object(self,
1025         obj,
1026         start,
1027         end,
1028         source_file,
1029         upload_cb=None):
1030         """Overwrite a part of an object from local source file
1031
1032         :param obj: (str) remote object path
1033
1034         :param start: (int) position in bytes to start overwriting from
1035
1036         :param end: (int) position in bytes to stop overwriting at
1037
1038         :param source_file: open file descriptor
1039
1040         :param upload_db: progress.bar for uploading
1041         """
1042
1043         r = self.get_object_info(obj)
1044         rf_size = int(r['content-length'])
1045         if rf_size < int(start):
1046             raise ClientError(
1047                 'Range start exceeds file size',
1048                 status=416)
1049         elif rf_size < int(end):
1050             raise ClientError(
1051                 'Range end exceeds file size',
1052                 status=416)
1053         self._assert_container()
1054         meta = self.get_container_info()
1055         blocksize = int(meta['x-container-block-size'])
1056         filesize = fstat(source_file.fileno()).st_size
1057         datasize = int(end) - int(start) + 1
1058         nblocks = 1 + (datasize - 1) // blocksize
1059         offset = 0
1060         if upload_cb:
1061             upload_gen = upload_cb(nblocks)
1062             upload_gen.next()
1063         for i in range(nblocks):
1064             block = source_file.read(min(blocksize,
1065                 filesize - offset,
1066                 datasize - offset))
1067             r = self.object_post(obj,
1068                 update=True,
1069                 content_type='application/octet-stream',
1070                 content_length=len(block),
1071                 content_range='bytes %s-%s/*' % (
1072                     start + offset,
1073                     start + offset + len(block) - 1),
1074                 data=block)
1075             offset += len(block)
1076             r.release()
1077
1078             if upload_cb:
1079                 upload_gen.next()
1080
1081     def copy_object(self, src_container, src_object, dst_container,
1082         dst_object=False,
1083         source_version=None,
1084         public=False,
1085         content_type=None,
1086         delimiter=None):
1087         """
1088         :param src_container: (str) source container
1089
1090         :param src_object: (str) source object path
1091
1092         :param dst_container: (str) destination container
1093
1094         :param dst_object: (str) destination object path
1095
1096         :param source_version: (str) source object version
1097
1098         :param public: (bool)
1099
1100         :param content_type: (str)
1101
1102         :param delimiter: (str)
1103         """
1104         self._assert_account()
1105         self.container = dst_container
1106         dst_object = dst_object or src_object
1107         src_path = path4url(src_container, src_object)
1108         r = self.object_put(dst_object,
1109             success=201,
1110             copy_from=src_path,
1111             content_length=0,
1112             source_version=source_version,
1113             public=public,
1114             content_type=content_type,
1115             delimiter=delimiter)
1116         r.release()
1117
1118     def move_object(self, src_container, src_object, dst_container,
1119         dst_object=False,
1120         source_version=None,
1121         public=False,
1122         content_type=None,
1123         delimiter=None):
1124         """
1125         :param src_container: (str) source container
1126
1127         :param src_object: (str) source object path
1128
1129         :param dst_container: (str) destination container
1130
1131         :param dst_object: (str) destination object path
1132
1133         :param source_version: (str) source object version
1134
1135         :param public: (bool)
1136
1137         :param content_type: (str)
1138
1139         :param delimiter: (str)
1140         """
1141         self._assert_account()
1142         self.container = dst_container
1143         dst_object = dst_object or src_object
1144         src_path = path4url(src_container, src_object)
1145         r = self.object_put(dst_object,
1146             success=201,
1147             move_from=src_path,
1148             content_length=0,
1149             source_version=source_version,
1150             public=public,
1151             content_type=content_type,
1152             delimiter=delimiter)
1153         r.release()
1154
1155     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1156         """Get accounts that share with self.account
1157
1158         :param limit: (str)
1159
1160         :param marker: (str)
1161
1162         :returns: (dict)
1163         """
1164         self._assert_account()
1165
1166         self.set_param('format', 'json')
1167         self.set_param('limit', limit, iff=limit is not None)
1168         self.set_param('marker', marker, iff=marker is not None)
1169
1170         path = ''
1171         success = kwargs.pop('success', (200, 204))
1172         r = self.get(path, *args, success=success, **kwargs)
1173         return r.json
1174
1175     def get_object_versionlist(self, obj):
1176         """
1177         :param obj: (str) remote object path
1178
1179         :returns: (list)
1180         """
1181         self._assert_container()
1182         r = self.object_get(obj, format='json', version='list')
1183         return r.json['versions']