Add option to fail upload if remote file exists
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """GRNet Pithos API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115         """
116         self._assert_container()
117
118         if withHashFile:
119             data = f.read()
120             try:
121                 import json
122                 data = json.dumps(json.loads(data))
123             except ValueError:
124                 raise ClientError('"%s" is not json-formated' % f.name, 1)
125             except SyntaxError:
126                 msg = '"%s" is not a valid hashmap file' % f.name
127                 raise ClientError(msg, 1)
128             f = StringIO(data)
129         else:
130             data = f.read(size) if size else f.read()
131         self.object_put(
132             obj,
133             data=data,
134             etag=etag,
135             content_encoding=content_encoding,
136             content_disposition=content_disposition,
137             content_type=content_type,
138             permissions=sharing,
139             public=public,
140             success=201)
141
142     def create_object_by_manifestation(
143             self, obj,
144             etag=None,
145             content_encoding=None,
146             content_disposition=None,
147             content_type=None,
148             sharing=None,
149             public=None):
150         """
151         :param obj: (str) remote object path
152
153         :param etag: (str)
154
155         :param content_encoding: (str)
156
157         :param content_disposition: (str)
158
159         :param content_type: (str)
160
161         :param sharing: {'read':[user and/or grp names],
162             'write':[usr and/or grp names]}
163
164         :param public: (bool)
165         """
166         self._assert_container()
167         self.object_put(
168             obj,
169             content_length=0,
170             etag=etag,
171             content_encoding=content_encoding,
172             content_disposition=content_disposition,
173             content_type=content_type,
174             permissions=sharing,
175             public=public,
176             manifest='%s/%s' % (self.container, obj))
177
178     # upload_* auxiliary methods
179     def _put_block_async(self, data, hash, upload_gen=None):
180         event = SilentEvent(method=self._put_block, data=data, hash=hash)
181         event.start()
182         return event
183
184     def _put_block(self, data, hash):
185         r = self.container_post(
186             update=True,
187             content_type='application/octet-stream',
188             content_length=len(data),
189             data=data,
190             format='json')
191         assert r.json[0] == hash, 'Local hash does not match server'
192
193     def _get_file_block_info(self, fileobj, size=None):
194         meta = self.get_container_info()
195         blocksize = int(meta['x-container-block-size'])
196         blockhash = meta['x-container-block-hash']
197         size = size if size is not None else fstat(fileobj.fileno()).st_size
198         nblocks = 1 + (size - 1) // blocksize
199         return (blocksize, blockhash, size, nblocks)
200
201     def _get_missing_hashes(
202             self, obj, json,
203             size=None,
204             format='json',
205             hashmap=True,
206             content_type=None,
207             content_encoding=None,
208             content_disposition=None,
209             permissions=None,
210             public=None,
211             success=(201, 409)):
212         r = self.object_put(
213             obj,
214             format='json',
215             hashmap=True,
216             content_type=content_type,
217             json=json,
218             content_encoding=content_encoding,
219             content_disposition=content_disposition,
220             permissions=permissions,
221             public=public,
222             success=success)
223         return None if r.status_code == 201 else r.json
224
225     def _culculate_blocks_for_upload(
226             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
227             hash_cb=None):
228         offset = 0
229         if hash_cb:
230             hash_gen = hash_cb(nblocks)
231             hash_gen.next()
232
233         for i in range(nblocks):
234             block = fileobj.read(min(blocksize, size - offset))
235             bytes = len(block)
236             hash = _pithos_hash(block, blockhash)
237             hashes.append(hash)
238             hmap[hash] = (offset, bytes)
239             offset += bytes
240             if hash_cb:
241                 hash_gen.next()
242         msg = 'Failed to calculate uploaded blocks:'
243         ' Offset and object size do not match'
244         assert offset == size, msg
245
246     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
247         """upload missing blocks asynchronously"""
248
249         self._init_thread_limit()
250
251         flying = []
252         failures = []
253         for hash in missing:
254             offset, bytes = hmap[hash]
255             fileobj.seek(offset)
256             data = fileobj.read(bytes)
257             r = self._put_block_async(data, hash, upload_gen)
258             flying.append(r)
259             unfinished = self._watch_thread_limit(flying)
260             for thread in set(flying).difference(unfinished):
261                 if thread.exception:
262                     failures.append(thread)
263                     if isinstance(
264                             thread.exception,
265                             ClientError) and thread.exception.status == 502:
266                         self.POOLSIZE = self._thread_limit
267                 elif thread.isAlive():
268                     flying.append(thread)
269                 elif upload_gen:
270                     try:
271                         upload_gen.next()
272                     except:
273                         pass
274             flying = unfinished
275
276         for thread in flying:
277             thread.join()
278             if thread.exception:
279                 failures.append(thread)
280             elif upload_gen:
281                 try:
282                     upload_gen.next()
283                 except:
284                     pass
285
286         return [failure.kwargs['hash'] for failure in failures]
287
288     def upload_object(
289             self, obj, f,
290             size=None,
291             hash_cb=None,
292             upload_cb=None,
293             etag=None,
294             if_not_exist=None,
295             content_encoding=None,
296             content_disposition=None,
297             content_type=None,
298             sharing=None,
299             public=None):
300         """Upload an object using multiple connections (threads)
301
302         :param obj: (str) remote object path
303
304         :param f: open file descriptor (rb)
305
306         :param hash_cb: optional progress.bar object for calculating hashes
307
308         :param upload_cb: optional progress.bar object for uploading
309
310         :param etag: (str)
311
312         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
313             it does not exist remotely, otherwise the operation will fail.
314             Involves the case of an object with the same path is created while
315             the object is being uploaded.
316
317         :param content_encoding: (str)
318
319         :param content_disposition: (str)
320
321         :param content_type: (str)
322
323         :param sharing: {'read':[user and/or grp names],
324             'write':[usr and/or grp names]}
325
326         :param public: (bool)
327         """
328         self._assert_container()
329
330         #init
331         block_info = (blocksize, blockhash, size, nblocks) =\
332             self._get_file_block_info(f, size)
333         (hashes, hmap, offset) = ([], {}, 0)
334         if not content_type:
335             content_type = 'application/octet-stream'
336
337         self._culculate_blocks_for_upload(
338             *block_info,
339             hashes=hashes,
340             hmap=hmap,
341             fileobj=f,
342             hash_cb=hash_cb)
343
344         hashmap = dict(bytes=size, hashes=hashes)
345         missing = self._get_missing_hashes(
346             obj, hashmap,
347             content_type=content_type,
348             size=size,
349             content_encoding=content_encoding,
350             content_disposition=content_disposition,
351             permissions=sharing,
352             public=public)
353
354         if missing is None:
355             return
356
357         if upload_cb:
358             upload_gen = upload_cb(len(missing))
359             for i in range(len(missing), len(hashmap['hashes']) + 1):
360                 try:
361                     upload_gen.next()
362                 except:
363                     upload_gen = None
364         else:
365             upload_gen = None
366
367         retries = 7
368         try:
369             while retries:
370                 sendlog.info('%s blocks missing' % len(missing))
371                 num_of_blocks = len(missing)
372                 missing = self._upload_missing_blocks(
373                     missing,
374                     hmap,
375                     f,
376                     upload_gen)
377                 if missing:
378                     if num_of_blocks == len(missing):
379                         retries -= 1
380                     else:
381                         num_of_blocks = len(missing)
382                 else:
383                     break
384             if missing:
385                 raise ClientError(
386                     '%s blocks failed to upload' % len(missing),
387                     status=800)
388         except KeyboardInterrupt:
389             sendlog.info('- - - wait for threads to finish')
390             for thread in activethreads():
391                 thread.join()
392             raise
393
394         self.object_put(
395             obj,
396             format='json',
397             hashmap=True,
398             content_type=content_type,
399             if_etag_not_match='*' if if_not_exist else None,
400             etag=etag,
401             json=hashmap,
402             success=201)
403
404     # download_* auxiliary methods
405     def _get_remote_blocks_info(self, obj, **restargs):
406         #retrieve object hashmap
407         myrange = restargs.pop('data_range', None)
408         hashmap = self.get_object_hashmap(obj, **restargs)
409         restargs['data_range'] = myrange
410         blocksize = int(hashmap['block_size'])
411         blockhash = hashmap['block_hash']
412         total_size = hashmap['bytes']
413         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
414         map_dict = {}
415         for i, h in enumerate(hashmap['hashes']):
416             #  map_dict[h] = i   CHAGE
417             if h in map_dict:
418                 map_dict[h].append(i)
419             else:
420                 map_dict[h] = [i]
421         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
422
423     def _dump_blocks_sync(
424             self, obj, remote_hashes, blocksize, total_size, dst, range,
425             **args):
426         for blockid, blockhash in enumerate(remote_hashes):
427             if blockhash:
428                 start = blocksize * blockid
429                 is_last = start + blocksize > total_size
430                 end = (total_size - 1) if is_last else (start + blocksize - 1)
431                 (start, end) = _range_up(start, end, range)
432                 args['data_range'] = 'bytes=%s-%s' % (start, end)
433                 r = self.object_get(obj, success=(200, 206), **args)
434                 self._cb_next()
435                 dst.write(r.content)
436                 dst.flush()
437
438     def _get_block_async(self, obj, **args):
439         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
440         event.start()
441         return event
442
443     def _hash_from_file(self, fp, start, size, blockhash):
444         fp.seek(start)
445         block = fp.read(size)
446         h = newhashlib(blockhash)
447         h.update(block.strip('\x00'))
448         return hexlify(h.digest())
449
450     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
451         """write the results of a greenleted rest call to a file
452
453         :param offset: the offset of the file up to blocksize
454         - e.g. if the range is 10-100, all blocks will be written to
455         normal_position - 10
456         """
457         for i, (key, g) in enumerate(flying.items()):
458             if g.isAlive():
459                 continue
460             if g.exception:
461                 raise g.exception
462             block = g.value.content
463             for block_start in blockids[key]:
464                 local_file.seek(block_start + offset)
465                 local_file.write(block)
466                 self._cb_next()
467             flying.pop(key)
468             blockids.pop(key)
469         local_file.flush()
470
471     def _dump_blocks_async(
472             self, obj, remote_hashes, blocksize, total_size, local_file,
473             blockhash=None, resume=False, filerange=None, **restargs):
474         file_size = fstat(local_file.fileno()).st_size if resume else 0
475         flying = dict()
476         blockid_dict = dict()
477         offset = 0
478         if filerange is not None:
479             rstart = int(filerange.split('-')[0])
480             offset = rstart if blocksize > rstart else rstart % blocksize
481
482         self._init_thread_limit()
483         for block_hash, blockids in remote_hashes.items():
484             blockids = [blk * blocksize for blk in blockids]
485             unsaved = [blk for blk in blockids if not (
486                 blk < file_size and block_hash == self._hash_from_file(
487                         local_file, blk, blocksize, blockhash))]
488             self._cb_next(len(blockids) - len(unsaved))
489             if unsaved:
490                 key = unsaved[0]
491                 self._watch_thread_limit(flying.values())
492                 self._thread2file(
493                     flying, blockid_dict, local_file, offset,
494                     **restargs)
495                 end = total_size - 1 if key + blocksize > total_size\
496                     else key + blocksize - 1
497                 start, end = _range_up(key, end, filerange)
498                 if start == end:
499                     self._cb_next()
500                     continue
501                 restargs['async_headers'] = {
502                     'Range': 'bytes=%s-%s' % (start, end)}
503                 flying[key] = self._get_block_async(obj, **restargs)
504                 blockid_dict[key] = unsaved
505
506         for thread in flying.values():
507             thread.join()
508         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
509
510     def download_object(
511             self, obj, dst,
512             download_cb=None,
513             version=None,
514             resume=False,
515             range_str=None,
516             if_match=None,
517             if_none_match=None,
518             if_modified_since=None,
519             if_unmodified_since=None):
520         """Download an object (multiple connections, random blocks)
521
522         :param obj: (str) remote object path
523
524         :param dst: open file descriptor (wb+)
525
526         :param download_cb: optional progress.bar object for downloading
527
528         :param version: (str) file version
529
530         :param resume: (bool) if set, preserve already downloaded file parts
531
532         :param range_str: (str) from, to are file positions (int) in bytes
533
534         :param if_match: (str)
535
536         :param if_none_match: (str)
537
538         :param if_modified_since: (str) formated date
539
540         :param if_unmodified_since: (str) formated date"""
541         restargs = dict(
542             version=version,
543             data_range=None if range_str is None else 'bytes=%s' % range_str,
544             if_match=if_match,
545             if_none_match=if_none_match,
546             if_modified_since=if_modified_since,
547             if_unmodified_since=if_unmodified_since)
548
549         (
550             blocksize,
551             blockhash,
552             total_size,
553             hash_list,
554             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
555         assert total_size >= 0
556
557         if download_cb:
558             self.progress_bar_gen = download_cb(len(hash_list))
559             self._cb_next()
560
561         if dst.isatty():
562             self._dump_blocks_sync(
563                 obj,
564                 hash_list,
565                 blocksize,
566                 total_size,
567                 dst,
568                 range_str,
569                 **restargs)
570         else:
571             self._dump_blocks_async(
572                 obj,
573                 remote_hashes,
574                 blocksize,
575                 total_size,
576                 dst,
577                 blockhash,
578                 resume,
579                 range_str,
580                 **restargs)
581             if not range_str:
582                 dst.truncate(total_size)
583
584         self._complete_cb()
585
586     #Command Progress Bar method
587     def _cb_next(self, step=1):
588         if hasattr(self, 'progress_bar_gen'):
589             try:
590                 for i in xrange(step):
591                     self.progress_bar_gen.next()
592             except:
593                 pass
594
595     def _complete_cb(self):
596         while True:
597             try:
598                 self.progress_bar_gen.next()
599             except:
600                 break
601
602     def get_object_hashmap(
603             self, obj,
604             version=None,
605             if_match=None,
606             if_none_match=None,
607             if_modified_since=None,
608             if_unmodified_since=None,
609             data_range=None):
610         """
611         :param obj: (str) remote object path
612
613         :param if_match: (str)
614
615         :param if_none_match: (str)
616
617         :param if_modified_since: (str) formated date
618
619         :param if_unmodified_since: (str) formated date
620
621         :param data_range: (str) from-to where from and to are integers
622             denoting file positions in bytes
623
624         :returns: (list)
625         """
626         try:
627             r = self.object_get(
628                 obj,
629                 hashmap=True,
630                 version=version,
631                 if_etag_match=if_match,
632                 if_etag_not_match=if_none_match,
633                 if_modified_since=if_modified_since,
634                 if_unmodified_since=if_unmodified_since,
635                 data_range=data_range)
636         except ClientError as err:
637             if err.status == 304 or err.status == 412:
638                 return {}
639             raise
640         return r.json
641
642     def set_account_group(self, group, usernames):
643         """
644         :param group: (str)
645
646         :param usernames: (list)
647         """
648         self.account_post(update=True, groups={group: usernames})
649
650     def del_account_group(self, group):
651         """
652         :param group: (str)
653         """
654         self.account_post(update=True, groups={group: []})
655
656     def get_account_info(self, until=None):
657         """
658         :param until: (str) formated date
659
660         :returns: (dict)
661         """
662         r = self.account_head(until=until)
663         if r.status_code == 401:
664             raise ClientError("No authorization", status=401)
665         return r.headers
666
667     def get_account_quota(self):
668         """
669         :returns: (dict)
670         """
671         return filter_in(
672             self.get_account_info(),
673             'X-Account-Policy-Quota',
674             exactMatch=True)
675
676     def get_account_versioning(self):
677         """
678         :returns: (dict)
679         """
680         return filter_in(
681             self.get_account_info(),
682             'X-Account-Policy-Versioning',
683             exactMatch=True)
684
685     def get_account_meta(self, until=None):
686         """
687         :meta until: (str) formated date
688
689         :returns: (dict)
690         """
691         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
692
693     def get_account_group(self):
694         """
695         :returns: (dict)
696         """
697         return filter_in(self.get_account_info(), 'X-Account-Group-')
698
699     def set_account_meta(self, metapairs):
700         """
701         :param metapairs: (dict) {key1:val1, key2:val2, ...}
702         """
703         assert(type(metapairs) is dict)
704         self.account_post(update=True, metadata=metapairs)
705
706     def del_account_meta(self, metakey):
707         """
708         :param metakey: (str) metadatum key
709         """
710         self.account_post(update=True, metadata={metakey: ''})
711
712     def set_account_quota(self, quota):
713         """
714         :param quota: (int)
715         """
716         self.account_post(update=True, quota=quota)
717
718     def set_account_versioning(self, versioning):
719         """
720         "param versioning: (str)
721         """
722         self.account_post(update=True, versioning=versioning)
723
724     def list_containers(self):
725         """
726         :returns: (dict)
727         """
728         r = self.account_get()
729         return r.json
730
731     def del_container(self, until=None, delimiter=None):
732         """
733         :param until: (str) formated date
734
735         :param delimiter: (str) with / empty container
736
737         :raises ClientError: 404 Container does not exist
738
739         :raises ClientError: 409 Container is not empty
740         """
741         self._assert_container()
742         r = self.container_delete(
743             until=until,
744             delimiter=delimiter,
745             success=(204, 404, 409))
746         if r.status_code == 404:
747             raise ClientError(
748                 'Container "%s" does not exist' % self.container,
749                 r.status_code)
750         elif r.status_code == 409:
751             raise ClientError(
752                 'Container "%s" is not empty' % self.container,
753                 r.status_code)
754
755     def get_container_versioning(self, container=None):
756         """
757         :param container: (str)
758
759         :returns: (dict)
760         """
761         cnt_back_up = self.container
762         try:
763             self.container = container or cnt_back_up
764             return filter_in(
765                 self.get_container_info(),
766                 'X-Container-Policy-Versioning')
767         finally:
768             self.container = cnt_back_up
769
770     def get_container_quota(self, container=None):
771         """
772         :param container: (str)
773
774         :returns: (dict)
775         """
776         cnt_back_up = self.container
777         try:
778             self.container = container or cnt_back_up
779             return filter_in(
780                 self.get_container_info(),
781                 'X-Container-Policy-Quota')
782         finally:
783             self.container = cnt_back_up
784
785     def get_container_info(self, until=None):
786         """
787         :param until: (str) formated date
788
789         :returns: (dict)
790
791         :raises ClientError: 404 Container not found
792         """
793         try:
794             r = self.container_head(until=until)
795         except ClientError as err:
796             err.details.append('for container %s' % self.container)
797             raise err
798         return r.headers
799
800     def get_container_meta(self, until=None):
801         """
802         :param until: (str) formated date
803
804         :returns: (dict)
805         """
806         return filter_in(
807             self.get_container_info(until=until),
808             'X-Container-Meta')
809
810     def get_container_object_meta(self, until=None):
811         """
812         :param until: (str) formated date
813
814         :returns: (dict)
815         """
816         return filter_in(
817             self.get_container_info(until=until),
818             'X-Container-Object-Meta')
819
820     def set_container_meta(self, metapairs):
821         """
822         :param metapairs: (dict) {key1:val1, key2:val2, ...}
823         """
824         assert(type(metapairs) is dict)
825         self.container_post(update=True, metadata=metapairs)
826
827     def del_container_meta(self, metakey):
828         """
829         :param metakey: (str) metadatum key
830         """
831         self.container_post(update=True, metadata={metakey: ''})
832
833     def set_container_quota(self, quota):
834         """
835         :param quota: (int)
836         """
837         self.container_post(update=True, quota=quota)
838
839     def set_container_versioning(self, versioning):
840         """
841         :param versioning: (str)
842         """
843         self.container_post(update=True, versioning=versioning)
844
845     def del_object(self, obj, until=None, delimiter=None):
846         """
847         :param obj: (str) remote object path
848
849         :param until: (str) formated date
850
851         :param delimiter: (str)
852         """
853         self._assert_container()
854         self.object_delete(obj, until=until, delimiter=delimiter)
855
856     def set_object_meta(self, obj, metapairs):
857         """
858         :param obj: (str) remote object path
859
860         :param metapairs: (dict) {key1:val1, key2:val2, ...}
861         """
862         assert(type(metapairs) is dict)
863         self.object_post(obj, update=True, metadata=metapairs)
864
865     def del_object_meta(self, obj, metakey):
866         """
867         :param obj: (str) remote object path
868
869         :param metakey: (str) metadatum key
870         """
871         self.object_post(obj, update=True, metadata={metakey: ''})
872
873     def publish_object(self, obj):
874         """
875         :param obj: (str) remote object path
876
877         :returns: (str) access url
878         """
879         self.object_post(obj, update=True, public=True)
880         info = self.get_object_info(obj)
881         pref, sep, rest = self.base_url.partition('//')
882         base = rest.split('/')[0]
883         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
884
885     def unpublish_object(self, obj):
886         """
887         :param obj: (str) remote object path
888         """
889         self.object_post(obj, update=True, public=False)
890
891     def get_object_info(self, obj, version=None):
892         """
893         :param obj: (str) remote object path
894
895         :param version: (str)
896
897         :returns: (dict)
898         """
899         try:
900             r = self.object_head(obj, version=version)
901             return r.headers
902         except ClientError as ce:
903             if ce.status == 404:
904                 raise ClientError('Object %s not found' % obj, status=404)
905             raise
906
907     def get_object_meta(self, obj, version=None):
908         """
909         :param obj: (str) remote object path
910
911         :param version: (str)
912
913         :returns: (dict)
914         """
915         return filter_in(
916             self.get_object_info(obj, version=version),
917             'X-Object-Meta')
918
919     def get_object_sharing(self, obj):
920         """
921         :param obj: (str) remote object path
922
923         :returns: (dict)
924         """
925         r = filter_in(
926             self.get_object_info(obj),
927             'X-Object-Sharing',
928             exactMatch=True)
929         reply = {}
930         if len(r) > 0:
931             perms = r['x-object-sharing'].split(';')
932             for perm in perms:
933                 try:
934                     perm.index('=')
935                 except ValueError:
936                     raise ClientError('Incorrect reply format')
937                 (key, val) = perm.strip().split('=')
938                 reply[key] = val
939         return reply
940
941     def set_object_sharing(
942             self, obj,
943             read_permition=False, write_permition=False):
944         """Give read/write permisions to an object.
945
946         :param obj: (str) remote object path
947
948         :param read_permition: (list - bool) users and user groups that get
949             read permition for this object - False means all previous read
950             permissions will be removed
951
952         :param write_perimition: (list - bool) of users and user groups to get
953            write permition for this object - False means all previous write
954            permissions will be removed
955         """
956
957         perms = dict(read=read_permition or '', write=write_permition or '')
958         self.object_post(obj, update=True, permissions=perms)
959
960     def del_object_sharing(self, obj):
961         """
962         :param obj: (str) remote object path
963         """
964         self.set_object_sharing(obj)
965
966     def append_object(self, obj, source_file, upload_cb=None):
967         """
968         :param obj: (str) remote object path
969
970         :param source_file: open file descriptor
971
972         :param upload_db: progress.bar for uploading
973         """
974
975         self._assert_container()
976         meta = self.get_container_info()
977         blocksize = int(meta['x-container-block-size'])
978         filesize = fstat(source_file.fileno()).st_size
979         nblocks = 1 + (filesize - 1) // blocksize
980         offset = 0
981         if upload_cb:
982             upload_gen = upload_cb(nblocks)
983             upload_gen.next()
984         for i in range(nblocks):
985             block = source_file.read(min(blocksize, filesize - offset))
986             offset += len(block)
987             self.object_post(
988                 obj,
989                 update=True,
990                 content_range='bytes */*',
991                 content_type='application/octet-stream',
992                 content_length=len(block),
993                 data=block)
994
995             if upload_cb:
996                 upload_gen.next()
997
998     def truncate_object(self, obj, upto_bytes):
999         """
1000         :param obj: (str) remote object path
1001
1002         :param upto_bytes: max number of bytes to leave on file
1003         """
1004         self.object_post(
1005             obj,
1006             update=True,
1007             content_range='bytes 0-%s/*' % upto_bytes,
1008             content_type='application/octet-stream',
1009             object_bytes=upto_bytes,
1010             source_object=path4url(self.container, obj))
1011
1012     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1013         """Overwrite a part of an object from local source file
1014
1015         :param obj: (str) remote object path
1016
1017         :param start: (int) position in bytes to start overwriting from
1018
1019         :param end: (int) position in bytes to stop overwriting at
1020
1021         :param source_file: open file descriptor
1022
1023         :param upload_db: progress.bar for uploading
1024         """
1025
1026         r = self.get_object_info(obj)
1027         rf_size = int(r['content-length'])
1028         if rf_size < int(start):
1029             raise ClientError(
1030                 'Range start exceeds file size',
1031                 status=416)
1032         elif rf_size < int(end):
1033             raise ClientError(
1034                 'Range end exceeds file size',
1035                 status=416)
1036         self._assert_container()
1037         meta = self.get_container_info()
1038         blocksize = int(meta['x-container-block-size'])
1039         filesize = fstat(source_file.fileno()).st_size
1040         datasize = int(end) - int(start) + 1
1041         nblocks = 1 + (datasize - 1) // blocksize
1042         offset = 0
1043         if upload_cb:
1044             upload_gen = upload_cb(nblocks)
1045             upload_gen.next()
1046         for i in range(nblocks):
1047             read_size = min(blocksize, filesize - offset, datasize - offset)
1048             block = source_file.read(read_size)
1049             self.object_post(
1050                 obj,
1051                 update=True,
1052                 content_type='application/octet-stream',
1053                 content_length=len(block),
1054                 content_range='bytes %s-%s/*' % (
1055                     start + offset,
1056                     start + offset + len(block) - 1),
1057                 data=block)
1058             offset += len(block)
1059
1060             if upload_cb:
1061                 upload_gen.next()
1062
1063     def copy_object(
1064             self, src_container, src_object, dst_container,
1065             dst_object=None,
1066             source_version=None,
1067             source_account=None,
1068             public=False,
1069             content_type=None,
1070             delimiter=None):
1071         """
1072         :param src_container: (str) source container
1073
1074         :param src_object: (str) source object path
1075
1076         :param dst_container: (str) destination container
1077
1078         :param dst_object: (str) destination object path
1079
1080         :param source_version: (str) source object version
1081
1082         :param source_account: (str) account to copy from
1083
1084         :param public: (bool)
1085
1086         :param content_type: (str)
1087
1088         :param delimiter: (str)
1089         """
1090         self._assert_account()
1091         self.container = dst_container
1092         src_path = path4url(src_container, src_object)
1093         self.object_put(
1094             dst_object or src_object,
1095             success=201,
1096             copy_from=src_path,
1097             content_length=0,
1098             source_version=source_version,
1099             source_account=source_account,
1100             public=public,
1101             content_type=content_type,
1102             delimiter=delimiter)
1103
1104     def move_object(
1105             self, src_container, src_object, dst_container,
1106             dst_object=False,
1107             source_account=None,
1108             source_version=None,
1109             public=False,
1110             content_type=None,
1111             delimiter=None):
1112         """
1113         :param src_container: (str) source container
1114
1115         :param src_object: (str) source object path
1116
1117         :param dst_container: (str) destination container
1118
1119         :param dst_object: (str) destination object path
1120
1121         :param source_account: (str) account to move from
1122
1123         :param source_version: (str) source object version
1124
1125         :param public: (bool)
1126
1127         :param content_type: (str)
1128
1129         :param delimiter: (str)
1130         """
1131         self._assert_account()
1132         self.container = dst_container
1133         dst_object = dst_object or src_object
1134         src_path = path4url(src_container, src_object)
1135         self.object_put(
1136             dst_object,
1137             success=201,
1138             move_from=src_path,
1139             content_length=0,
1140             source_account=source_account,
1141             source_version=source_version,
1142             public=public,
1143             content_type=content_type,
1144             delimiter=delimiter)
1145
1146     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1147         """Get accounts that share with self.account
1148
1149         :param limit: (str)
1150
1151         :param marker: (str)
1152
1153         :returns: (dict)
1154         """
1155         self._assert_account()
1156
1157         self.set_param('format', 'json')
1158         self.set_param('limit', limit, iff=limit is not None)
1159         self.set_param('marker', marker, iff=marker is not None)
1160
1161         path = ''
1162         success = kwargs.pop('success', (200, 204))
1163         r = self.get(path, *args, success=success, **kwargs)
1164         return r.json
1165
1166     def get_object_versionlist(self, obj):
1167         """
1168         :param obj: (str) remote object path
1169
1170         :returns: (list)
1171         """
1172         self._assert_container()
1173         r = self.object_get(obj, format='json', version='list')
1174         return r.json['versions']