Make --public and --sharing to work in upload
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """GRNet Pithos API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115         """
116         self._assert_container()
117
118         if withHashFile:
119             data = f.read()
120             try:
121                 import json
122                 data = json.dumps(json.loads(data))
123             except ValueError:
124                 raise ClientError('"%s" is not json-formated' % f.name, 1)
125             except SyntaxError:
126                 msg = '"%s" is not a valid hashmap file' % f.name
127                 raise ClientError(msg, 1)
128             f = StringIO(data)
129         else:
130             data = f.read(size) if size else f.read()
131         self.object_put(
132             obj,
133             data=data,
134             etag=etag,
135             content_encoding=content_encoding,
136             content_disposition=content_disposition,
137             content_type=content_type,
138             permissions=sharing,
139             public=public,
140             success=201)
141
142     def create_object_by_manifestation(
143             self, obj,
144             etag=None,
145             content_encoding=None,
146             content_disposition=None,
147             content_type=None,
148             sharing=None,
149             public=None):
150         """
151         :param obj: (str) remote object path
152
153         :param etag: (str)
154
155         :param content_encoding: (str)
156
157         :param content_disposition: (str)
158
159         :param content_type: (str)
160
161         :param sharing: {'read':[user and/or grp names],
162             'write':[usr and/or grp names]}
163
164         :param public: (bool)
165         """
166         self._assert_container()
167         self.object_put(
168             obj,
169             content_length=0,
170             etag=etag,
171             content_encoding=content_encoding,
172             content_disposition=content_disposition,
173             content_type=content_type,
174             permissions=sharing,
175             public=public,
176             manifest='%s/%s' % (self.container, obj))
177
178     # upload_* auxiliary methods
179     def _put_block_async(self, data, hash, upload_gen=None):
180         event = SilentEvent(method=self._put_block, data=data, hash=hash)
181         event.start()
182         return event
183
184     def _put_block(self, data, hash):
185         r = self.container_post(
186             update=True,
187             content_type='application/octet-stream',
188             content_length=len(data),
189             data=data,
190             format='json')
191         assert r.json[0] == hash, 'Local hash does not match server'
192
193     def _get_file_block_info(self, fileobj, size=None):
194         meta = self.get_container_info()
195         blocksize = int(meta['x-container-block-size'])
196         blockhash = meta['x-container-block-hash']
197         size = size if size is not None else fstat(fileobj.fileno()).st_size
198         nblocks = 1 + (size - 1) // blocksize
199         return (blocksize, blockhash, size, nblocks)
200
201     def _get_missing_hashes(
202             self, obj, json,
203             size=None,
204             format='json',
205             hashmap=True,
206             content_type=None,
207             etag=None,
208             content_encoding=None,
209             content_disposition=None,
210             permissions=None,
211             public=None,
212             success=(201, 409)):
213         r = self.object_put(
214             obj,
215             format='json',
216             hashmap=True,
217             content_type=content_type,
218             json=json,
219             etag=etag,
220             content_encoding=content_encoding,
221             content_disposition=content_disposition,
222             permissions=permissions,
223             public=public,
224             success=success)
225         return None if r.status_code == 201 else r.json
226
227     def _culculate_blocks_for_upload(
228             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
229             hash_cb=None):
230         offset = 0
231         if hash_cb:
232             hash_gen = hash_cb(nblocks)
233             hash_gen.next()
234
235         for i in range(nblocks):
236             block = fileobj.read(min(blocksize, size - offset))
237             bytes = len(block)
238             hash = _pithos_hash(block, blockhash)
239             hashes.append(hash)
240             hmap[hash] = (offset, bytes)
241             offset += bytes
242             if hash_cb:
243                 hash_gen.next()
244         msg = 'Failed to calculate uploaded blocks:'
245         ' Offset and object size do not match'
246         assert offset == size, msg
247
248     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
249         """upload missing blocks asynchronously"""
250
251         self._init_thread_limit()
252
253         flying = []
254         failures = []
255         for hash in missing:
256             offset, bytes = hmap[hash]
257             fileobj.seek(offset)
258             data = fileobj.read(bytes)
259             r = self._put_block_async(data, hash, upload_gen)
260             flying.append(r)
261             unfinished = self._watch_thread_limit(flying)
262             for thread in set(flying).difference(unfinished):
263                 if thread.exception:
264                     failures.append(thread)
265                     if isinstance(
266                             thread.exception,
267                             ClientError) and thread.exception.status == 502:
268                         self.POOLSIZE = self._thread_limit
269                 elif thread.isAlive():
270                     flying.append(thread)
271                 elif upload_gen:
272                     try:
273                         upload_gen.next()
274                     except:
275                         pass
276             flying = unfinished
277
278         for thread in flying:
279             thread.join()
280             if thread.exception:
281                 failures.append(thread)
282             elif upload_gen:
283                 try:
284                     upload_gen.next()
285                 except:
286                     pass
287
288         return [failure.kwargs['hash'] for failure in failures]
289
290     def upload_object(
291             self, obj, f,
292             size=None,
293             hash_cb=None,
294             upload_cb=None,
295             etag=None,
296             content_encoding=None,
297             content_disposition=None,
298             content_type=None,
299             sharing=None,
300             public=None):
301         """Upload an object using multiple connections (threads)
302
303         :param obj: (str) remote object path
304
305         :param f: open file descriptor (rb)
306
307         :param hash_cb: optional progress.bar object for calculating hashes
308
309         :param upload_cb: optional progress.bar object for uploading
310
311         :param etag: (str)
312
313         :param content_encoding: (str)
314
315         :param content_disposition: (str)
316
317         :param content_type: (str)
318
319         :param sharing: {'read':[user and/or grp names],
320             'write':[usr and/or grp names]}
321
322         :param public: (bool)
323         """
324         self._assert_container()
325
326         #init
327         block_info = (blocksize, blockhash, size, nblocks) =\
328             self._get_file_block_info(f, size)
329         (hashes, hmap, offset) = ([], {}, 0)
330         if not content_type:
331             content_type = 'application/octet-stream'
332
333         self._culculate_blocks_for_upload(
334             *block_info,
335             hashes=hashes,
336             hmap=hmap,
337             fileobj=f,
338             hash_cb=hash_cb)
339
340         hashmap = dict(bytes=size, hashes=hashes)
341         missing = self._get_missing_hashes(
342             obj, hashmap,
343             content_type=content_type,
344             size=size,
345             etag=etag,
346             content_encoding=content_encoding,
347             content_disposition=content_disposition,
348             permissions=sharing,
349             public=public)
350
351         if missing is None:
352             return
353
354         if upload_cb:
355             upload_gen = upload_cb(len(missing))
356             for i in range(len(missing), len(hashmap['hashes']) + 1):
357                 try:
358                     upload_gen.next()
359                 except:
360                     upload_gen = None
361         else:
362             upload_gen = None
363
364         retries = 7
365         try:
366             while retries:
367                 sendlog.info('%s blocks missing' % len(missing))
368                 num_of_blocks = len(missing)
369                 missing = self._upload_missing_blocks(
370                     missing,
371                     hmap,
372                     f,
373                     upload_gen)
374                 if missing:
375                     if num_of_blocks == len(missing):
376                         retries -= 1
377                     else:
378                         num_of_blocks = len(missing)
379                 else:
380                     break
381             if missing:
382                 raise ClientError(
383                     '%s blocks failed to upload' % len(missing),
384                     status=800)
385         except KeyboardInterrupt:
386             sendlog.info('- - - wait for threads to finish')
387             for thread in activethreads():
388                 thread.join()
389             raise
390
391         self.object_put(
392             obj,
393             format='json',
394             hashmap=True,
395             content_type=content_type,
396             json=hashmap,
397             permissions=sharing,
398             public=public,
399             success=201)
400
401     # download_* auxiliary methods
402     def _get_remote_blocks_info(self, obj, **restargs):
403         #retrieve object hashmap
404         myrange = restargs.pop('data_range', None)
405         hashmap = self.get_object_hashmap(obj, **restargs)
406         restargs['data_range'] = myrange
407         blocksize = int(hashmap['block_size'])
408         blockhash = hashmap['block_hash']
409         total_size = hashmap['bytes']
410         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
411         map_dict = {}
412         for i, h in enumerate(hashmap['hashes']):
413             #  map_dict[h] = i   CHAGE
414             if h in map_dict:
415                 map_dict[h].append(i)
416             else:
417                 map_dict[h] = [i]
418         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
419
420     def _dump_blocks_sync(
421             self, obj, remote_hashes, blocksize, total_size, dst, range,
422             **args):
423         for blockid, blockhash in enumerate(remote_hashes):
424             if blockhash:
425                 start = blocksize * blockid
426                 is_last = start + blocksize > total_size
427                 end = (total_size - 1) if is_last else (start + blocksize - 1)
428                 (start, end) = _range_up(start, end, range)
429                 args['data_range'] = 'bytes=%s-%s' % (start, end)
430                 r = self.object_get(obj, success=(200, 206), **args)
431                 self._cb_next()
432                 dst.write(r.content)
433                 dst.flush()
434
435     def _get_block_async(self, obj, **args):
436         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
437         event.start()
438         return event
439
440     def _hash_from_file(self, fp, start, size, blockhash):
441         fp.seek(start)
442         block = fp.read(size)
443         h = newhashlib(blockhash)
444         h.update(block.strip('\x00'))
445         return hexlify(h.digest())
446
447     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
448         """write the results of a greenleted rest call to a file
449
450         :param offset: the offset of the file up to blocksize
451         - e.g. if the range is 10-100, all blocks will be written to
452         normal_position - 10
453         """
454         for i, (key, g) in enumerate(flying.items()):
455             if g.isAlive():
456                 continue
457             if g.exception:
458                 raise g.exception
459             block = g.value.content
460             for block_start in blockids[key]:
461                 local_file.seek(block_start + offset)
462                 local_file.write(block)
463                 self._cb_next()
464             flying.pop(key)
465             blockids.pop(key)
466         local_file.flush()
467
468     def _dump_blocks_async(
469             self, obj, remote_hashes, blocksize, total_size, local_file,
470             blockhash=None, resume=False, filerange=None, **restargs):
471         file_size = fstat(local_file.fileno()).st_size if resume else 0
472         flying = dict()
473         blockid_dict = dict()
474         offset = 0
475         if filerange is not None:
476             rstart = int(filerange.split('-')[0])
477             offset = rstart if blocksize > rstart else rstart % blocksize
478
479         self._init_thread_limit()
480         for block_hash, blockids in remote_hashes.items():
481             blockids = [blk * blocksize for blk in blockids]
482             unsaved = [blk for blk in blockids if not (
483                 blk < file_size and block_hash == self._hash_from_file(
484                         local_file, blk, blocksize, blockhash))]
485             self._cb_next(len(blockids) - len(unsaved))
486             if unsaved:
487                 key = unsaved[0]
488                 self._watch_thread_limit(flying.values())
489                 self._thread2file(
490                     flying, blockid_dict, local_file, offset,
491                     **restargs)
492                 end = total_size - 1 if key + blocksize > total_size\
493                     else key + blocksize - 1
494                 start, end = _range_up(key, end, filerange)
495                 if start == end:
496                     self._cb_next()
497                     continue
498                 restargs['async_headers'] = {
499                     'Range': 'bytes=%s-%s' % (start, end)}
500                 flying[key] = self._get_block_async(obj, **restargs)
501                 blockid_dict[key] = unsaved
502
503         for thread in flying.values():
504             thread.join()
505         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
506
507     def download_object(
508             self, obj, dst,
509             download_cb=None,
510             version=None,
511             resume=False,
512             range_str=None,
513             if_match=None,
514             if_none_match=None,
515             if_modified_since=None,
516             if_unmodified_since=None):
517         """Download an object (multiple connections, random blocks)
518
519         :param obj: (str) remote object path
520
521         :param dst: open file descriptor (wb+)
522
523         :param download_cb: optional progress.bar object for downloading
524
525         :param version: (str) file version
526
527         :param resume: (bool) if set, preserve already downloaded file parts
528
529         :param range_str: (str) from, to are file positions (int) in bytes
530
531         :param if_match: (str)
532
533         :param if_none_match: (str)
534
535         :param if_modified_since: (str) formated date
536
537         :param if_unmodified_since: (str) formated date"""
538         restargs = dict(
539             version=version,
540             data_range=None if range_str is None else 'bytes=%s' % range_str,
541             if_match=if_match,
542             if_none_match=if_none_match,
543             if_modified_since=if_modified_since,
544             if_unmodified_since=if_unmodified_since)
545
546         (
547             blocksize,
548             blockhash,
549             total_size,
550             hash_list,
551             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
552         assert total_size >= 0
553
554         if download_cb:
555             self.progress_bar_gen = download_cb(len(hash_list))
556             self._cb_next()
557
558         if dst.isatty():
559             self._dump_blocks_sync(
560                 obj,
561                 hash_list,
562                 blocksize,
563                 total_size,
564                 dst,
565                 range_str,
566                 **restargs)
567         else:
568             self._dump_blocks_async(
569                 obj,
570                 remote_hashes,
571                 blocksize,
572                 total_size,
573                 dst,
574                 blockhash,
575                 resume,
576                 range_str,
577                 **restargs)
578             if not range_str:
579                 dst.truncate(total_size)
580
581         self._complete_cb()
582
583     #Command Progress Bar method
584     def _cb_next(self, step=1):
585         if hasattr(self, 'progress_bar_gen'):
586             try:
587                 for i in xrange(step):
588                     self.progress_bar_gen.next()
589             except:
590                 pass
591
592     def _complete_cb(self):
593         while True:
594             try:
595                 self.progress_bar_gen.next()
596             except:
597                 break
598
599     def get_object_hashmap(
600             self, obj,
601             version=None,
602             if_match=None,
603             if_none_match=None,
604             if_modified_since=None,
605             if_unmodified_since=None,
606             data_range=None):
607         """
608         :param obj: (str) remote object path
609
610         :param if_match: (str)
611
612         :param if_none_match: (str)
613
614         :param if_modified_since: (str) formated date
615
616         :param if_unmodified_since: (str) formated date
617
618         :param data_range: (str) from-to where from and to are integers
619             denoting file positions in bytes
620
621         :returns: (list)
622         """
623         try:
624             r = self.object_get(
625                 obj,
626                 hashmap=True,
627                 version=version,
628                 if_etag_match=if_match,
629                 if_etag_not_match=if_none_match,
630                 if_modified_since=if_modified_since,
631                 if_unmodified_since=if_unmodified_since,
632                 data_range=data_range)
633         except ClientError as err:
634             if err.status == 304 or err.status == 412:
635                 return {}
636             raise
637         return r.json
638
639     def set_account_group(self, group, usernames):
640         """
641         :param group: (str)
642
643         :param usernames: (list)
644         """
645         self.account_post(update=True, groups={group: usernames})
646
647     def del_account_group(self, group):
648         """
649         :param group: (str)
650         """
651         self.account_post(update=True, groups={group: []})
652
653     def get_account_info(self, until=None):
654         """
655         :param until: (str) formated date
656
657         :returns: (dict)
658         """
659         r = self.account_head(until=until)
660         if r.status_code == 401:
661             raise ClientError("No authorization", status=401)
662         return r.headers
663
664     def get_account_quota(self):
665         """
666         :returns: (dict)
667         """
668         return filter_in(
669             self.get_account_info(),
670             'X-Account-Policy-Quota',
671             exactMatch=True)
672
673     def get_account_versioning(self):
674         """
675         :returns: (dict)
676         """
677         return filter_in(
678             self.get_account_info(),
679             'X-Account-Policy-Versioning',
680             exactMatch=True)
681
682     def get_account_meta(self, until=None):
683         """
684         :meta until: (str) formated date
685
686         :returns: (dict)
687         """
688         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
689
690     def get_account_group(self):
691         """
692         :returns: (dict)
693         """
694         return filter_in(self.get_account_info(), 'X-Account-Group-')
695
696     def set_account_meta(self, metapairs):
697         """
698         :param metapairs: (dict) {key1:val1, key2:val2, ...}
699         """
700         assert(type(metapairs) is dict)
701         self.account_post(update=True, metadata=metapairs)
702
703     def del_account_meta(self, metakey):
704         """
705         :param metakey: (str) metadatum key
706         """
707         self.account_post(update=True, metadata={metakey: ''})
708
709     def set_account_quota(self, quota):
710         """
711         :param quota: (int)
712         """
713         self.account_post(update=True, quota=quota)
714
715     def set_account_versioning(self, versioning):
716         """
717         "param versioning: (str)
718         """
719         self.account_post(update=True, versioning=versioning)
720
721     def list_containers(self):
722         """
723         :returns: (dict)
724         """
725         r = self.account_get()
726         return r.json
727
728     def del_container(self, until=None, delimiter=None):
729         """
730         :param until: (str) formated date
731
732         :param delimiter: (str) with / empty container
733
734         :raises ClientError: 404 Container does not exist
735
736         :raises ClientError: 409 Container is not empty
737         """
738         self._assert_container()
739         r = self.container_delete(
740             until=until,
741             delimiter=delimiter,
742             success=(204, 404, 409))
743         if r.status_code == 404:
744             raise ClientError(
745                 'Container "%s" does not exist' % self.container,
746                 r.status_code)
747         elif r.status_code == 409:
748             raise ClientError(
749                 'Container "%s" is not empty' % self.container,
750                 r.status_code)
751
752     def get_container_versioning(self, container=None):
753         """
754         :param container: (str)
755
756         :returns: (dict)
757         """
758         cnt_back_up = self.container
759         try:
760             self.container = container or cnt_back_up
761             return filter_in(
762                 self.get_container_info(),
763                 'X-Container-Policy-Versioning')
764         finally:
765             self.container = cnt_back_up
766
767     def get_container_quota(self, container=None):
768         """
769         :param container: (str)
770
771         :returns: (dict)
772         """
773         cnt_back_up = self.container
774         try:
775             self.container = container or cnt_back_up
776             return filter_in(
777                 self.get_container_info(),
778                 'X-Container-Policy-Quota')
779         finally:
780             self.container = cnt_back_up
781
782     def get_container_info(self, until=None):
783         """
784         :param until: (str) formated date
785
786         :returns: (dict)
787
788         :raises ClientError: 404 Container not found
789         """
790         try:
791             r = self.container_head(until=until)
792         except ClientError as err:
793             err.details.append('for container %s' % self.container)
794             raise err
795         return r.headers
796
797     def get_container_meta(self, until=None):
798         """
799         :param until: (str) formated date
800
801         :returns: (dict)
802         """
803         return filter_in(
804             self.get_container_info(until=until),
805             'X-Container-Meta')
806
807     def get_container_object_meta(self, until=None):
808         """
809         :param until: (str) formated date
810
811         :returns: (dict)
812         """
813         return filter_in(
814             self.get_container_info(until=until),
815             'X-Container-Object-Meta')
816
817     def set_container_meta(self, metapairs):
818         """
819         :param metapairs: (dict) {key1:val1, key2:val2, ...}
820         """
821         assert(type(metapairs) is dict)
822         self.container_post(update=True, metadata=metapairs)
823
824     def del_container_meta(self, metakey):
825         """
826         :param metakey: (str) metadatum key
827         """
828         self.container_post(update=True, metadata={metakey: ''})
829
830     def set_container_quota(self, quota):
831         """
832         :param quota: (int)
833         """
834         self.container_post(update=True, quota=quota)
835
836     def set_container_versioning(self, versioning):
837         """
838         :param versioning: (str)
839         """
840         self.container_post(update=True, versioning=versioning)
841
842     def del_object(self, obj, until=None, delimiter=None):
843         """
844         :param obj: (str) remote object path
845
846         :param until: (str) formated date
847
848         :param delimiter: (str)
849         """
850         self._assert_container()
851         self.object_delete(obj, until=until, delimiter=delimiter)
852
853     def set_object_meta(self, obj, metapairs):
854         """
855         :param obj: (str) remote object path
856
857         :param metapairs: (dict) {key1:val1, key2:val2, ...}
858         """
859         assert(type(metapairs) is dict)
860         self.object_post(obj, update=True, metadata=metapairs)
861
862     def del_object_meta(self, obj, metakey):
863         """
864         :param obj: (str) remote object path
865
866         :param metakey: (str) metadatum key
867         """
868         self.object_post(obj, update=True, metadata={metakey: ''})
869
870     def publish_object(self, obj):
871         """
872         :param obj: (str) remote object path
873
874         :returns: (str) access url
875         """
876         self.object_post(obj, update=True, public=True)
877         info = self.get_object_info(obj)
878         pref, sep, rest = self.base_url.partition('//')
879         base = rest.split('/')[0]
880         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
881
882     def unpublish_object(self, obj):
883         """
884         :param obj: (str) remote object path
885         """
886         self.object_post(obj, update=True, public=False)
887
888     def get_object_info(self, obj, version=None):
889         """
890         :param obj: (str) remote object path
891
892         :param version: (str)
893
894         :returns: (dict)
895         """
896         try:
897             r = self.object_head(obj, version=version)
898             return r.headers
899         except ClientError as ce:
900             if ce.status == 404:
901                 raise ClientError('Object %s not found' % obj, status=404)
902             raise
903
904     def get_object_meta(self, obj, version=None):
905         """
906         :param obj: (str) remote object path
907
908         :param version: (str)
909
910         :returns: (dict)
911         """
912         return filter_in(
913             self.get_object_info(obj, version=version),
914             'X-Object-Meta')
915
916     def get_object_sharing(self, obj):
917         """
918         :param obj: (str) remote object path
919
920         :returns: (dict)
921         """
922         r = filter_in(
923             self.get_object_info(obj),
924             'X-Object-Sharing',
925             exactMatch=True)
926         reply = {}
927         if len(r) > 0:
928             perms = r['x-object-sharing'].split(';')
929             for perm in perms:
930                 try:
931                     perm.index('=')
932                 except ValueError:
933                     raise ClientError('Incorrect reply format')
934                 (key, val) = perm.strip().split('=')
935                 reply[key] = val
936         return reply
937
938     def set_object_sharing(
939             self, obj,
940             read_permition=False, write_permition=False):
941         """Give read/write permisions to an object.
942
943         :param obj: (str) remote object path
944
945         :param read_permition: (list - bool) users and user groups that get
946             read permition for this object - False means all previous read
947             permissions will be removed
948
949         :param write_perimition: (list - bool) of users and user groups to get
950            write permition for this object - False means all previous write
951            permissions will be removed
952         """
953
954         perms = dict(read=read_permition or '', write=write_permition or '')
955         self.object_post(obj, update=True, permissions=perms)
956
957     def del_object_sharing(self, obj):
958         """
959         :param obj: (str) remote object path
960         """
961         self.set_object_sharing(obj)
962
963     def append_object(self, obj, source_file, upload_cb=None):
964         """
965         :param obj: (str) remote object path
966
967         :param source_file: open file descriptor
968
969         :param upload_db: progress.bar for uploading
970         """
971
972         self._assert_container()
973         meta = self.get_container_info()
974         blocksize = int(meta['x-container-block-size'])
975         filesize = fstat(source_file.fileno()).st_size
976         nblocks = 1 + (filesize - 1) // blocksize
977         offset = 0
978         if upload_cb:
979             upload_gen = upload_cb(nblocks)
980             upload_gen.next()
981         for i in range(nblocks):
982             block = source_file.read(min(blocksize, filesize - offset))
983             offset += len(block)
984             self.object_post(
985                 obj,
986                 update=True,
987                 content_range='bytes */*',
988                 content_type='application/octet-stream',
989                 content_length=len(block),
990                 data=block)
991
992             if upload_cb:
993                 upload_gen.next()
994
995     def truncate_object(self, obj, upto_bytes):
996         """
997         :param obj: (str) remote object path
998
999         :param upto_bytes: max number of bytes to leave on file
1000         """
1001         self.object_post(
1002             obj,
1003             update=True,
1004             content_range='bytes 0-%s/*' % upto_bytes,
1005             content_type='application/octet-stream',
1006             object_bytes=upto_bytes,
1007             source_object=path4url(self.container, obj))
1008
1009     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1010         """Overwrite a part of an object from local source file
1011
1012         :param obj: (str) remote object path
1013
1014         :param start: (int) position in bytes to start overwriting from
1015
1016         :param end: (int) position in bytes to stop overwriting at
1017
1018         :param source_file: open file descriptor
1019
1020         :param upload_db: progress.bar for uploading
1021         """
1022
1023         r = self.get_object_info(obj)
1024         rf_size = int(r['content-length'])
1025         if rf_size < int(start):
1026             raise ClientError(
1027                 'Range start exceeds file size',
1028                 status=416)
1029         elif rf_size < int(end):
1030             raise ClientError(
1031                 'Range end exceeds file size',
1032                 status=416)
1033         self._assert_container()
1034         meta = self.get_container_info()
1035         blocksize = int(meta['x-container-block-size'])
1036         filesize = fstat(source_file.fileno()).st_size
1037         datasize = int(end) - int(start) + 1
1038         nblocks = 1 + (datasize - 1) // blocksize
1039         offset = 0
1040         if upload_cb:
1041             upload_gen = upload_cb(nblocks)
1042             upload_gen.next()
1043         for i in range(nblocks):
1044             read_size = min(blocksize, filesize - offset, datasize - offset)
1045             block = source_file.read(read_size)
1046             self.object_post(
1047                 obj,
1048                 update=True,
1049                 content_type='application/octet-stream',
1050                 content_length=len(block),
1051                 content_range='bytes %s-%s/*' % (
1052                     start + offset,
1053                     start + offset + len(block) - 1),
1054                 data=block)
1055             offset += len(block)
1056
1057             if upload_cb:
1058                 upload_gen.next()
1059
1060     def copy_object(
1061             self, src_container, src_object, dst_container,
1062             dst_object=None,
1063             source_version=None,
1064             source_account=None,
1065             public=False,
1066             content_type=None,
1067             delimiter=None):
1068         """
1069         :param src_container: (str) source container
1070
1071         :param src_object: (str) source object path
1072
1073         :param dst_container: (str) destination container
1074
1075         :param dst_object: (str) destination object path
1076
1077         :param source_version: (str) source object version
1078
1079         :param source_account: (str) account to copy from
1080
1081         :param public: (bool)
1082
1083         :param content_type: (str)
1084
1085         :param delimiter: (str)
1086         """
1087         self._assert_account()
1088         self.container = dst_container
1089         src_path = path4url(src_container, src_object)
1090         self.object_put(
1091             dst_object or src_object,
1092             success=201,
1093             copy_from=src_path,
1094             content_length=0,
1095             source_version=source_version,
1096             source_account=source_account,
1097             public=public,
1098             content_type=content_type,
1099             delimiter=delimiter)
1100
1101     def move_object(
1102             self, src_container, src_object, dst_container,
1103             dst_object=False,
1104             source_account=None,
1105             source_version=None,
1106             public=False,
1107             content_type=None,
1108             delimiter=None):
1109         """
1110         :param src_container: (str) source container
1111
1112         :param src_object: (str) source object path
1113
1114         :param dst_container: (str) destination container
1115
1116         :param dst_object: (str) destination object path
1117
1118         :param source_account: (str) account to move from
1119
1120         :param source_version: (str) source object version
1121
1122         :param public: (bool)
1123
1124         :param content_type: (str)
1125
1126         :param delimiter: (str)
1127         """
1128         self._assert_account()
1129         self.container = dst_container
1130         dst_object = dst_object or src_object
1131         src_path = path4url(src_container, src_object)
1132         self.object_put(
1133             dst_object,
1134             success=201,
1135             move_from=src_path,
1136             content_length=0,
1137             source_account=source_account,
1138             source_version=source_version,
1139             public=public,
1140             content_type=content_type,
1141             delimiter=delimiter)
1142
1143     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1144         """Get accounts that share with self.account
1145
1146         :param limit: (str)
1147
1148         :param marker: (str)
1149
1150         :returns: (dict)
1151         """
1152         self._assert_account()
1153
1154         self.set_param('format', 'json')
1155         self.set_param('limit', limit, iff=limit is not None)
1156         self.set_param('marker', marker, iff=marker is not None)
1157
1158         path = ''
1159         success = kwargs.pop('success', (200, 204))
1160         r = self.get(path, *args, success=success, **kwargs)
1161         return r.json
1162
1163     def get_object_versionlist(self, obj):
1164         """
1165         :param obj: (str) remote object path
1166
1167         :returns: (list)
1168         """
1169         self._assert_container()
1170         r = self.object_get(obj, format='json', version='list')
1171         return r.json['versions']