Honour if_(none_)match even if all blocks are up
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115         """
116         self._assert_container()
117
118         if withHashFile:
119             data = f.read()
120             try:
121                 import json
122                 data = json.dumps(json.loads(data))
123             except ValueError:
124                 raise ClientError('"%s" is not json-formated' % f.name, 1)
125             except SyntaxError:
126                 msg = '"%s" is not a valid hashmap file' % f.name
127                 raise ClientError(msg, 1)
128             f = StringIO(data)
129         else:
130             data = f.read(size) if size else f.read()
131         self.object_put(
132             obj,
133             data=data,
134             etag=etag,
135             content_encoding=content_encoding,
136             content_disposition=content_disposition,
137             content_type=content_type,
138             permissions=sharing,
139             public=public,
140             success=201)
141
142     def create_object_by_manifestation(
143             self, obj,
144             etag=None,
145             content_encoding=None,
146             content_disposition=None,
147             content_type=None,
148             sharing=None,
149             public=None):
150         """
151         :param obj: (str) remote object path
152
153         :param etag: (str)
154
155         :param content_encoding: (str)
156
157         :param content_disposition: (str)
158
159         :param content_type: (str)
160
161         :param sharing: {'read':[user and/or grp names],
162             'write':[usr and/or grp names]}
163
164         :param public: (bool)
165         """
166         self._assert_container()
167         self.object_put(
168             obj,
169             content_length=0,
170             etag=etag,
171             content_encoding=content_encoding,
172             content_disposition=content_disposition,
173             content_type=content_type,
174             permissions=sharing,
175             public=public,
176             manifest='%s/%s' % (self.container, obj))
177
178     # upload_* auxiliary methods
179     def _put_block_async(self, data, hash, upload_gen=None):
180         event = SilentEvent(method=self._put_block, data=data, hash=hash)
181         event.start()
182         return event
183
184     def _put_block(self, data, hash):
185         r = self.container_post(
186             update=True,
187             content_type='application/octet-stream',
188             content_length=len(data),
189             data=data,
190             format='json')
191         assert r.json[0] == hash, 'Local hash does not match server'
192
193     def _get_file_block_info(self, fileobj, size=None):
194         meta = self.get_container_info()
195         blocksize = int(meta['x-container-block-size'])
196         blockhash = meta['x-container-block-hash']
197         size = size if size is not None else fstat(fileobj.fileno()).st_size
198         nblocks = 1 + (size - 1) // blocksize
199         return (blocksize, blockhash, size, nblocks)
200
201     def _create_or_get_missing_hashes(
202             self, obj, json,
203             size=None,
204             format='json',
205             hashmap=True,
206             content_type=None,
207             if_etag_match=None,
208             if_etag_not_match=None,
209             content_encoding=None,
210             content_disposition=None,
211             permissions=None,
212             public=None,
213             success=(201, 409)):
214         r = self.object_put(
215             obj,
216             format='json',
217             hashmap=True,
218             content_type=content_type,
219             json=json,
220             if_etag_match=if_etag_match,
221             if_etag_not_match=if_etag_not_match,
222             content_encoding=content_encoding,
223             content_disposition=content_disposition,
224             permissions=permissions,
225             public=public,
226             success=success)
227         return None if r.status_code == 201 else r.json
228
229     def _calculate_blocks_for_upload(
230             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
231             hash_cb=None):
232         offset = 0
233         if hash_cb:
234             hash_gen = hash_cb(nblocks)
235             hash_gen.next()
236
237         for i in range(nblocks):
238             block = fileobj.read(min(blocksize, size - offset))
239             bytes = len(block)
240             hash = _pithos_hash(block, blockhash)
241             hashes.append(hash)
242             hmap[hash] = (offset, bytes)
243             offset += bytes
244             if hash_cb:
245                 hash_gen.next()
246         msg = 'Failed to calculate uploaded blocks:'
247         ' Offset and object size do not match'
248         assert offset == size, msg
249
250     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
251         """upload missing blocks asynchronously"""
252
253         self._init_thread_limit()
254
255         flying = []
256         failures = []
257         for hash in missing:
258             offset, bytes = hmap[hash]
259             fileobj.seek(offset)
260             data = fileobj.read(bytes)
261             r = self._put_block_async(data, hash, upload_gen)
262             flying.append(r)
263             unfinished = self._watch_thread_limit(flying)
264             for thread in set(flying).difference(unfinished):
265                 if thread.exception:
266                     failures.append(thread)
267                     if isinstance(
268                             thread.exception,
269                             ClientError) and thread.exception.status == 502:
270                         self.POOLSIZE = self._thread_limit
271                 elif thread.isAlive():
272                     flying.append(thread)
273                 elif upload_gen:
274                     try:
275                         upload_gen.next()
276                     except:
277                         pass
278             flying = unfinished
279
280         for thread in flying:
281             thread.join()
282             if thread.exception:
283                 failures.append(thread)
284             elif upload_gen:
285                 try:
286                     upload_gen.next()
287                 except:
288                     pass
289
290         return [failure.kwargs['hash'] for failure in failures]
291
292     def upload_object(
293             self, obj, f,
294             size=None,
295             hash_cb=None,
296             upload_cb=None,
297             etag=None,
298             if_etag_match=None,
299             if_not_exist=None,
300             content_encoding=None,
301             content_disposition=None,
302             content_type=None,
303             sharing=None,
304             public=None):
305         """Upload an object using multiple connections (threads)
306
307         :param obj: (str) remote object path
308
309         :param f: open file descriptor (rb)
310
311         :param hash_cb: optional progress.bar object for calculating hashes
312
313         :param upload_cb: optional progress.bar object for uploading
314
315         :param etag: (str)
316
317         :param if_etag_match: (str) Push that value to if-match header at file
318             creation
319
320         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
321             it does not exist remotely, otherwise the operation will fail.
322             Involves the case of an object with the same path is created while
323             the object is being uploaded.
324
325         :param content_encoding: (str)
326
327         :param content_disposition: (str)
328
329         :param content_type: (str)
330
331         :param sharing: {'read':[user and/or grp names],
332             'write':[usr and/or grp names]}
333
334         :param public: (bool)
335         """
336         self._assert_container()
337
338         #init
339         block_info = (blocksize, blockhash, size, nblocks) =\
340             self._get_file_block_info(f, size)
341         (hashes, hmap, offset) = ([], {}, 0)
342         if not content_type:
343             content_type = 'application/octet-stream'
344
345         self._calculate_blocks_for_upload(
346             *block_info,
347             hashes=hashes,
348             hmap=hmap,
349             fileobj=f,
350             hash_cb=hash_cb)
351
352         hashmap = dict(bytes=size, hashes=hashes)
353         missing = self.create_or_get_missing_hashes(
354             obj, hashmap,
355             content_type=content_type,
356             size=size,
357             if_etag_match=if_etag_match,
358             if_etag_not_match='*' if if_not_exist else None,
359             content_encoding=content_encoding,
360             content_disposition=content_disposition,
361             permissions=sharing,
362             public=public)
363
364         if missing is None:
365             return
366
367         if upload_cb:
368             upload_gen = upload_cb(len(missing))
369             for i in range(len(missing), len(hashmap['hashes']) + 1):
370                 try:
371                     upload_gen.next()
372                 except:
373                     upload_gen = None
374         else:
375             upload_gen = None
376
377         retries = 7
378         try:
379             while retries:
380                 sendlog.info('%s blocks missing' % len(missing))
381                 num_of_blocks = len(missing)
382                 missing = self._upload_missing_blocks(
383                     missing,
384                     hmap,
385                     f,
386                     upload_gen)
387                 if missing:
388                     if num_of_blocks == len(missing):
389                         retries -= 1
390                     else:
391                         num_of_blocks = len(missing)
392                 else:
393                     break
394             if missing:
395                 raise ClientError(
396                     '%s blocks failed to upload' % len(missing),
397                     status=800)
398         except KeyboardInterrupt:
399             sendlog.info('- - - wait for threads to finish')
400             for thread in activethreads():
401                 thread.join()
402             raise
403
404         self.object_put(
405             obj,
406             format='json',
407             hashmap=True,
408             content_type=content_type,
409             if_etag_match=if_etag_match,
410             if_etag_not_match='*' if if_not_exist else None,
411             etag=etag,
412             json=hashmap,
413             permissions=sharing,
414             public=public,
415             success=201)
416
417     # download_* auxiliary methods
418     def _get_remote_blocks_info(self, obj, **restargs):
419         #retrieve object hashmap
420         myrange = restargs.pop('data_range', None)
421         hashmap = self.get_object_hashmap(obj, **restargs)
422         restargs['data_range'] = myrange
423         blocksize = int(hashmap['block_size'])
424         blockhash = hashmap['block_hash']
425         total_size = hashmap['bytes']
426         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
427         map_dict = {}
428         for i, h in enumerate(hashmap['hashes']):
429             #  map_dict[h] = i   CHAGE
430             if h in map_dict:
431                 map_dict[h].append(i)
432             else:
433                 map_dict[h] = [i]
434         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
435
436     def _dump_blocks_sync(
437             self, obj, remote_hashes, blocksize, total_size, dst, range,
438             **args):
439         for blockid, blockhash in enumerate(remote_hashes):
440             if blockhash:
441                 start = blocksize * blockid
442                 is_last = start + blocksize > total_size
443                 end = (total_size - 1) if is_last else (start + blocksize - 1)
444                 (start, end) = _range_up(start, end, range)
445                 args['data_range'] = 'bytes=%s-%s' % (start, end)
446                 r = self.object_get(obj, success=(200, 206), **args)
447                 self._cb_next()
448                 dst.write(r.content)
449                 dst.flush()
450
451     def _get_block_async(self, obj, **args):
452         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
453         event.start()
454         return event
455
456     def _hash_from_file(self, fp, start, size, blockhash):
457         fp.seek(start)
458         block = fp.read(size)
459         h = newhashlib(blockhash)
460         h.update(block.strip('\x00'))
461         return hexlify(h.digest())
462
463     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
464         """write the results of a greenleted rest call to a file
465
466         :param offset: the offset of the file up to blocksize
467         - e.g. if the range is 10-100, all blocks will be written to
468         normal_position - 10
469         """
470         for i, (key, g) in enumerate(flying.items()):
471             if g.isAlive():
472                 continue
473             if g.exception:
474                 raise g.exception
475             block = g.value.content
476             for block_start in blockids[key]:
477                 local_file.seek(block_start + offset)
478                 local_file.write(block)
479                 self._cb_next()
480             flying.pop(key)
481             blockids.pop(key)
482         local_file.flush()
483
484     def _dump_blocks_async(
485             self, obj, remote_hashes, blocksize, total_size, local_file,
486             blockhash=None, resume=False, filerange=None, **restargs):
487         file_size = fstat(local_file.fileno()).st_size if resume else 0
488         flying = dict()
489         blockid_dict = dict()
490         offset = 0
491         if filerange is not None:
492             rstart = int(filerange.split('-')[0])
493             offset = rstart if blocksize > rstart else rstart % blocksize
494
495         self._init_thread_limit()
496         for block_hash, blockids in remote_hashes.items():
497             blockids = [blk * blocksize for blk in blockids]
498             unsaved = [blk for blk in blockids if not (
499                 blk < file_size and block_hash == self._hash_from_file(
500                         local_file, blk, blocksize, blockhash))]
501             self._cb_next(len(blockids) - len(unsaved))
502             if unsaved:
503                 key = unsaved[0]
504                 self._watch_thread_limit(flying.values())
505                 self._thread2file(
506                     flying, blockid_dict, local_file, offset,
507                     **restargs)
508                 end = total_size - 1 if key + blocksize > total_size\
509                     else key + blocksize - 1
510                 start, end = _range_up(key, end, filerange)
511                 if start == end:
512                     self._cb_next()
513                     continue
514                 restargs['async_headers'] = {
515                     'Range': 'bytes=%s-%s' % (start, end)}
516                 flying[key] = self._get_block_async(obj, **restargs)
517                 blockid_dict[key] = unsaved
518
519         for thread in flying.values():
520             thread.join()
521         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
522
523     def download_object(
524             self, obj, dst,
525             download_cb=None,
526             version=None,
527             resume=False,
528             range_str=None,
529             if_match=None,
530             if_none_match=None,
531             if_modified_since=None,
532             if_unmodified_since=None):
533         """Download an object (multiple connections, random blocks)
534
535         :param obj: (str) remote object path
536
537         :param dst: open file descriptor (wb+)
538
539         :param download_cb: optional progress.bar object for downloading
540
541         :param version: (str) file version
542
543         :param resume: (bool) if set, preserve already downloaded file parts
544
545         :param range_str: (str) from, to are file positions (int) in bytes
546
547         :param if_match: (str)
548
549         :param if_none_match: (str)
550
551         :param if_modified_since: (str) formated date
552
553         :param if_unmodified_since: (str) formated date"""
554         restargs = dict(
555             version=version,
556             data_range=None if range_str is None else 'bytes=%s' % range_str,
557             if_match=if_match,
558             if_none_match=if_none_match,
559             if_modified_since=if_modified_since,
560             if_unmodified_since=if_unmodified_since)
561
562         (
563             blocksize,
564             blockhash,
565             total_size,
566             hash_list,
567             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
568         assert total_size >= 0
569
570         if download_cb:
571             self.progress_bar_gen = download_cb(len(hash_list))
572             self._cb_next()
573
574         if dst.isatty():
575             self._dump_blocks_sync(
576                 obj,
577                 hash_list,
578                 blocksize,
579                 total_size,
580                 dst,
581                 range_str,
582                 **restargs)
583         else:
584             self._dump_blocks_async(
585                 obj,
586                 remote_hashes,
587                 blocksize,
588                 total_size,
589                 dst,
590                 blockhash,
591                 resume,
592                 range_str,
593                 **restargs)
594             if not range_str:
595                 dst.truncate(total_size)
596
597         self._complete_cb()
598
599     #Command Progress Bar method
600     def _cb_next(self, step=1):
601         if hasattr(self, 'progress_bar_gen'):
602             try:
603                 for i in xrange(step):
604                     self.progress_bar_gen.next()
605             except:
606                 pass
607
608     def _complete_cb(self):
609         while True:
610             try:
611                 self.progress_bar_gen.next()
612             except:
613                 break
614
615     def get_object_hashmap(
616             self, obj,
617             version=None,
618             if_match=None,
619             if_none_match=None,
620             if_modified_since=None,
621             if_unmodified_since=None,
622             data_range=None):
623         """
624         :param obj: (str) remote object path
625
626         :param if_match: (str)
627
628         :param if_none_match: (str)
629
630         :param if_modified_since: (str) formated date
631
632         :param if_unmodified_since: (str) formated date
633
634         :param data_range: (str) from-to where from and to are integers
635             denoting file positions in bytes
636
637         :returns: (list)
638         """
639         try:
640             r = self.object_get(
641                 obj,
642                 hashmap=True,
643                 version=version,
644                 if_etag_match=if_match,
645                 if_etag_not_match=if_none_match,
646                 if_modified_since=if_modified_since,
647                 if_unmodified_since=if_unmodified_since,
648                 data_range=data_range)
649         except ClientError as err:
650             if err.status == 304 or err.status == 412:
651                 return {}
652             raise
653         return r.json
654
655     def set_account_group(self, group, usernames):
656         """
657         :param group: (str)
658
659         :param usernames: (list)
660         """
661         self.account_post(update=True, groups={group: usernames})
662
663     def del_account_group(self, group):
664         """
665         :param group: (str)
666         """
667         self.account_post(update=True, groups={group: []})
668
669     def get_account_info(self, until=None):
670         """
671         :param until: (str) formated date
672
673         :returns: (dict)
674         """
675         r = self.account_head(until=until)
676         if r.status_code == 401:
677             raise ClientError("No authorization", status=401)
678         return r.headers
679
680     def get_account_quota(self):
681         """
682         :returns: (dict)
683         """
684         return filter_in(
685             self.get_account_info(),
686             'X-Account-Policy-Quota',
687             exactMatch=True)
688
689     def get_account_versioning(self):
690         """
691         :returns: (dict)
692         """
693         return filter_in(
694             self.get_account_info(),
695             'X-Account-Policy-Versioning',
696             exactMatch=True)
697
698     def get_account_meta(self, until=None):
699         """
700         :meta until: (str) formated date
701
702         :returns: (dict)
703         """
704         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
705
706     def get_account_group(self):
707         """
708         :returns: (dict)
709         """
710         return filter_in(self.get_account_info(), 'X-Account-Group-')
711
712     def set_account_meta(self, metapairs):
713         """
714         :param metapairs: (dict) {key1:val1, key2:val2, ...}
715         """
716         assert(type(metapairs) is dict)
717         self.account_post(update=True, metadata=metapairs)
718
719     def del_account_meta(self, metakey):
720         """
721         :param metakey: (str) metadatum key
722         """
723         self.account_post(update=True, metadata={metakey: ''})
724
725     """
726     def set_account_quota(self, quota):
727         ""
728         :param quota: (int)
729         ""
730         self.account_post(update=True, quota=quota)
731     """
732
733     def set_account_versioning(self, versioning):
734         """
735         "param versioning: (str)
736         """
737         self.account_post(update=True, versioning=versioning)
738
739     def list_containers(self):
740         """
741         :returns: (dict)
742         """
743         r = self.account_get()
744         return r.json
745
746     def del_container(self, until=None, delimiter=None):
747         """
748         :param until: (str) formated date
749
750         :param delimiter: (str) with / empty container
751
752         :raises ClientError: 404 Container does not exist
753
754         :raises ClientError: 409 Container is not empty
755         """
756         self._assert_container()
757         r = self.container_delete(
758             until=until,
759             delimiter=delimiter,
760             success=(204, 404, 409))
761         if r.status_code == 404:
762             raise ClientError(
763                 'Container "%s" does not exist' % self.container,
764                 r.status_code)
765         elif r.status_code == 409:
766             raise ClientError(
767                 'Container "%s" is not empty' % self.container,
768                 r.status_code)
769
770     def get_container_versioning(self, container=None):
771         """
772         :param container: (str)
773
774         :returns: (dict)
775         """
776         cnt_back_up = self.container
777         try:
778             self.container = container or cnt_back_up
779             return filter_in(
780                 self.get_container_info(),
781                 'X-Container-Policy-Versioning')
782         finally:
783             self.container = cnt_back_up
784
785     def get_container_limit(self, container=None):
786         """
787         :param container: (str)
788
789         :returns: (dict)
790         """
791         cnt_back_up = self.container
792         try:
793             self.container = container or cnt_back_up
794             return filter_in(
795                 self.get_container_info(),
796                 'X-Container-Policy-Quota')
797         finally:
798             self.container = cnt_back_up
799
800     def get_container_info(self, until=None):
801         """
802         :param until: (str) formated date
803
804         :returns: (dict)
805
806         :raises ClientError: 404 Container not found
807         """
808         try:
809             r = self.container_head(until=until)
810         except ClientError as err:
811             err.details.append('for container %s' % self.container)
812             raise err
813         return r.headers
814
815     def get_container_meta(self, until=None):
816         """
817         :param until: (str) formated date
818
819         :returns: (dict)
820         """
821         return filter_in(
822             self.get_container_info(until=until),
823             'X-Container-Meta')
824
825     def get_container_object_meta(self, until=None):
826         """
827         :param until: (str) formated date
828
829         :returns: (dict)
830         """
831         return filter_in(
832             self.get_container_info(until=until),
833             'X-Container-Object-Meta')
834
835     def set_container_meta(self, metapairs):
836         """
837         :param metapairs: (dict) {key1:val1, key2:val2, ...}
838         """
839         assert(type(metapairs) is dict)
840         self.container_post(update=True, metadata=metapairs)
841
842     def del_container_meta(self, metakey):
843         """
844         :param metakey: (str) metadatum key
845         """
846         self.container_post(update=True, metadata={metakey: ''})
847
848     def set_container_limit(self, limit):
849         """
850         :param limit: (int)
851         """
852         self.container_post(update=True, quota=limit)
853
854     def set_container_versioning(self, versioning):
855         """
856         :param versioning: (str)
857         """
858         self.container_post(update=True, versioning=versioning)
859
860     def del_object(self, obj, until=None, delimiter=None):
861         """
862         :param obj: (str) remote object path
863
864         :param until: (str) formated date
865
866         :param delimiter: (str)
867         """
868         self._assert_container()
869         self.object_delete(obj, until=until, delimiter=delimiter)
870
871     def set_object_meta(self, obj, metapairs):
872         """
873         :param obj: (str) remote object path
874
875         :param metapairs: (dict) {key1:val1, key2:val2, ...}
876         """
877         assert(type(metapairs) is dict)
878         self.object_post(obj, update=True, metadata=metapairs)
879
880     def del_object_meta(self, obj, metakey):
881         """
882         :param obj: (str) remote object path
883
884         :param metakey: (str) metadatum key
885         """
886         self.object_post(obj, update=True, metadata={metakey: ''})
887
888     def publish_object(self, obj):
889         """
890         :param obj: (str) remote object path
891
892         :returns: (str) access url
893         """
894         self.object_post(obj, update=True, public=True)
895         info = self.get_object_info(obj)
896         pref, sep, rest = self.base_url.partition('//')
897         base = rest.split('/')[0]
898         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
899
900     def unpublish_object(self, obj):
901         """
902         :param obj: (str) remote object path
903         """
904         self.object_post(obj, update=True, public=False)
905
906     def get_object_info(self, obj, version=None):
907         """
908         :param obj: (str) remote object path
909
910         :param version: (str)
911
912         :returns: (dict)
913         """
914         try:
915             r = self.object_head(obj, version=version)
916             return r.headers
917         except ClientError as ce:
918             if ce.status == 404:
919                 raise ClientError('Object %s not found' % obj, status=404)
920             raise
921
922     def get_object_meta(self, obj, version=None):
923         """
924         :param obj: (str) remote object path
925
926         :param version: (str)
927
928         :returns: (dict)
929         """
930         return filter_in(
931             self.get_object_info(obj, version=version),
932             'X-Object-Meta')
933
934     def get_object_sharing(self, obj):
935         """
936         :param obj: (str) remote object path
937
938         :returns: (dict)
939         """
940         r = filter_in(
941             self.get_object_info(obj),
942             'X-Object-Sharing',
943             exactMatch=True)
944         reply = {}
945         if len(r) > 0:
946             perms = r['x-object-sharing'].split(';')
947             for perm in perms:
948                 try:
949                     perm.index('=')
950                 except ValueError:
951                     raise ClientError('Incorrect reply format')
952                 (key, val) = perm.strip().split('=')
953                 reply[key] = val
954         return reply
955
956     def set_object_sharing(
957             self, obj,
958             read_permition=False, write_permition=False):
959         """Give read/write permisions to an object.
960
961         :param obj: (str) remote object path
962
963         :param read_permition: (list - bool) users and user groups that get
964             read permition for this object - False means all previous read
965             permissions will be removed
966
967         :param write_perimition: (list - bool) of users and user groups to get
968            write permition for this object - False means all previous write
969            permissions will be removed
970         """
971
972         perms = dict(read=read_permition or '', write=write_permition or '')
973         self.object_post(obj, update=True, permissions=perms)
974
975     def del_object_sharing(self, obj):
976         """
977         :param obj: (str) remote object path
978         """
979         self.set_object_sharing(obj)
980
981     def append_object(self, obj, source_file, upload_cb=None):
982         """
983         :param obj: (str) remote object path
984
985         :param source_file: open file descriptor
986
987         :param upload_db: progress.bar for uploading
988         """
989
990         self._assert_container()
991         meta = self.get_container_info()
992         blocksize = int(meta['x-container-block-size'])
993         filesize = fstat(source_file.fileno()).st_size
994         nblocks = 1 + (filesize - 1) // blocksize
995         offset = 0
996         if upload_cb:
997             upload_gen = upload_cb(nblocks)
998             upload_gen.next()
999         for i in range(nblocks):
1000             block = source_file.read(min(blocksize, filesize - offset))
1001             offset += len(block)
1002             self.object_post(
1003                 obj,
1004                 update=True,
1005                 content_range='bytes */*',
1006                 content_type='application/octet-stream',
1007                 content_length=len(block),
1008                 data=block)
1009
1010             if upload_cb:
1011                 upload_gen.next()
1012
1013     def truncate_object(self, obj, upto_bytes):
1014         """
1015         :param obj: (str) remote object path
1016
1017         :param upto_bytes: max number of bytes to leave on file
1018         """
1019         self.object_post(
1020             obj,
1021             update=True,
1022             content_range='bytes 0-%s/*' % upto_bytes,
1023             content_type='application/octet-stream',
1024             object_bytes=upto_bytes,
1025             source_object=path4url(self.container, obj))
1026
1027     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1028         """Overwrite a part of an object from local source file
1029
1030         :param obj: (str) remote object path
1031
1032         :param start: (int) position in bytes to start overwriting from
1033
1034         :param end: (int) position in bytes to stop overwriting at
1035
1036         :param source_file: open file descriptor
1037
1038         :param upload_db: progress.bar for uploading
1039         """
1040
1041         r = self.get_object_info(obj)
1042         rf_size = int(r['content-length'])
1043         if rf_size < int(start):
1044             raise ClientError(
1045                 'Range start exceeds file size',
1046                 status=416)
1047         elif rf_size < int(end):
1048             raise ClientError(
1049                 'Range end exceeds file size',
1050                 status=416)
1051         self._assert_container()
1052         meta = self.get_container_info()
1053         blocksize = int(meta['x-container-block-size'])
1054         filesize = fstat(source_file.fileno()).st_size
1055         datasize = int(end) - int(start) + 1
1056         nblocks = 1 + (datasize - 1) // blocksize
1057         offset = 0
1058         if upload_cb:
1059             upload_gen = upload_cb(nblocks)
1060             upload_gen.next()
1061         for i in range(nblocks):
1062             read_size = min(blocksize, filesize - offset, datasize - offset)
1063             block = source_file.read(read_size)
1064             self.object_post(
1065                 obj,
1066                 update=True,
1067                 content_type='application/octet-stream',
1068                 content_length=len(block),
1069                 content_range='bytes %s-%s/*' % (
1070                     start + offset,
1071                     start + offset + len(block) - 1),
1072                 data=block)
1073             offset += len(block)
1074
1075             if upload_cb:
1076                 upload_gen.next()
1077
1078     def copy_object(
1079             self, src_container, src_object, dst_container,
1080             dst_object=None,
1081             source_version=None,
1082             source_account=None,
1083             public=False,
1084             content_type=None,
1085             delimiter=None):
1086         """
1087         :param src_container: (str) source container
1088
1089         :param src_object: (str) source object path
1090
1091         :param dst_container: (str) destination container
1092
1093         :param dst_object: (str) destination object path
1094
1095         :param source_version: (str) source object version
1096
1097         :param source_account: (str) account to copy from
1098
1099         :param public: (bool)
1100
1101         :param content_type: (str)
1102
1103         :param delimiter: (str)
1104         """
1105         self._assert_account()
1106         self.container = dst_container
1107         src_path = path4url(src_container, src_object)
1108         self.object_put(
1109             dst_object or src_object,
1110             success=201,
1111             copy_from=src_path,
1112             content_length=0,
1113             source_version=source_version,
1114             source_account=source_account,
1115             public=public,
1116             content_type=content_type,
1117             delimiter=delimiter)
1118
1119     def move_object(
1120             self, src_container, src_object, dst_container,
1121             dst_object=False,
1122             source_account=None,
1123             source_version=None,
1124             public=False,
1125             content_type=None,
1126             delimiter=None):
1127         """
1128         :param src_container: (str) source container
1129
1130         :param src_object: (str) source object path
1131
1132         :param dst_container: (str) destination container
1133
1134         :param dst_object: (str) destination object path
1135
1136         :param source_account: (str) account to move from
1137
1138         :param source_version: (str) source object version
1139
1140         :param public: (bool)
1141
1142         :param content_type: (str)
1143
1144         :param delimiter: (str)
1145         """
1146         self._assert_account()
1147         self.container = dst_container
1148         dst_object = dst_object or src_object
1149         src_path = path4url(src_container, src_object)
1150         self.object_put(
1151             dst_object,
1152             success=201,
1153             move_from=src_path,
1154             content_length=0,
1155             source_account=source_account,
1156             source_version=source_version,
1157             public=public,
1158             content_type=content_type,
1159             delimiter=delimiter)
1160
1161     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1162         """Get accounts that share with self.account
1163
1164         :param limit: (str)
1165
1166         :param marker: (str)
1167
1168         :returns: (dict)
1169         """
1170         self._assert_account()
1171
1172         self.set_param('format', 'json')
1173         self.set_param('limit', limit, iff=limit is not None)
1174         self.set_param('marker', marker, iff=marker is not None)
1175
1176         path = ''
1177         success = kwargs.pop('success', (200, 204))
1178         r = self.get(path, *args, success=success, **kwargs)
1179         return r.json
1180
1181     def get_object_versionlist(self, obj):
1182         """
1183         :param obj: (str) remote object path
1184
1185         :returns: (list)
1186         """
1187         self._assert_container()
1188         r = self.object_get(obj, format='json', version='list')
1189         return r.json['versions']