Adjust pithos livetest to use container_info_cache
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115
116         :returns: (dict) created object metadata
117         """
118         self._assert_container()
119
120         if withHashFile:
121             data = f.read()
122             try:
123                 import json
124                 data = json.dumps(json.loads(data))
125             except ValueError:
126                 raise ClientError('"%s" is not json-formated' % f.name, 1)
127             except SyntaxError:
128                 msg = '"%s" is not a valid hashmap file' % f.name
129                 raise ClientError(msg, 1)
130             f = StringIO(data)
131         else:
132             data = f.read(size) if size else f.read()
133         r = self.object_put(
134             obj,
135             data=data,
136             etag=etag,
137             content_encoding=content_encoding,
138             content_disposition=content_disposition,
139             content_type=content_type,
140             permissions=sharing,
141             public=public,
142             success=201)
143         return r.headers
144
145     def create_object_by_manifestation(
146             self, obj,
147             etag=None,
148             content_encoding=None,
149             content_disposition=None,
150             content_type=None,
151             sharing=None,
152             public=None):
153         """
154         :param obj: (str) remote object path
155
156         :param etag: (str)
157
158         :param content_encoding: (str)
159
160         :param content_disposition: (str)
161
162         :param content_type: (str)
163
164         :param sharing: {'read':[user and/or grp names],
165             'write':[usr and/or grp names]}
166
167         :param public: (bool)
168
169         :returns: (dict) created object metadata
170         """
171         self._assert_container()
172         r = self.object_put(
173             obj,
174             content_length=0,
175             etag=etag,
176             content_encoding=content_encoding,
177             content_disposition=content_disposition,
178             content_type=content_type,
179             permissions=sharing,
180             public=public,
181             manifest='%s/%s' % (self.container, obj))
182         return r.headers
183
184     # upload_* auxiliary methods
185     def _put_block_async(self, data, hash, upload_gen=None):
186         event = SilentEvent(method=self._put_block, data=data, hash=hash)
187         event.start()
188         return event
189
190     def _put_block(self, data, hash):
191         r = self.container_post(
192             update=True,
193             content_type='application/octet-stream',
194             content_length=len(data),
195             data=data,
196             format='json')
197         assert r.json[0] == hash, 'Local hash does not match server'
198
199     def _get_file_block_info(self, fileobj, size=None, cache=None):
200         """
201         :param fileobj: (file descriptor) source
202
203         :param size: (int) size of data to upload from source
204
205         :param cache: (dict) if provided, cache container info response to
206         avoid redundant calls
207         """
208         if isinstance(cache, dict):
209             try:
210                 meta = cache[self.container]
211             except KeyError:
212                 meta = self.get_container_info()
213                 cache[self.container] = meta
214         else:
215             meta = self.get_container_info()
216         blocksize = int(meta['x-container-block-size'])
217         blockhash = meta['x-container-block-hash']
218         size = size if size is not None else fstat(fileobj.fileno()).st_size
219         nblocks = 1 + (size - 1) // blocksize
220         return (blocksize, blockhash, size, nblocks)
221
222     def _create_or_get_missing_hashes(
223             self, obj, json,
224             size=None,
225             format='json',
226             hashmap=True,
227             content_type=None,
228             if_etag_match=None,
229             if_etag_not_match=None,
230             content_encoding=None,
231             content_disposition=None,
232             permissions=None,
233             public=None,
234             success=(201, 409)):
235         r = self.object_put(
236             obj,
237             format='json',
238             hashmap=True,
239             content_type=content_type,
240             json=json,
241             if_etag_match=if_etag_match,
242             if_etag_not_match=if_etag_not_match,
243             content_encoding=content_encoding,
244             content_disposition=content_disposition,
245             permissions=permissions,
246             public=public,
247             success=success)
248         return (None if r.status_code == 201 else r.json), r.headers
249
250     def _calculate_blocks_for_upload(
251             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
252             hash_cb=None):
253         offset = 0
254         if hash_cb:
255             hash_gen = hash_cb(nblocks)
256             hash_gen.next()
257
258         for i in range(nblocks):
259             block = fileobj.read(min(blocksize, size - offset))
260             bytes = len(block)
261             hash = _pithos_hash(block, blockhash)
262             hashes.append(hash)
263             hmap[hash] = (offset, bytes)
264             offset += bytes
265             if hash_cb:
266                 hash_gen.next()
267         msg = 'Failed to calculate uploaded blocks:'
268         ' Offset and object size do not match'
269         assert offset == size, msg
270
271     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
272         """upload missing blocks asynchronously"""
273
274         self._init_thread_limit()
275
276         flying = []
277         failures = []
278         for hash in missing:
279             offset, bytes = hmap[hash]
280             fileobj.seek(offset)
281             data = fileobj.read(bytes)
282             r = self._put_block_async(data, hash, upload_gen)
283             flying.append(r)
284             unfinished = self._watch_thread_limit(flying)
285             for thread in set(flying).difference(unfinished):
286                 if thread.exception:
287                     failures.append(thread)
288                     if isinstance(
289                             thread.exception,
290                             ClientError) and thread.exception.status == 502:
291                         self.POOLSIZE = self._thread_limit
292                 elif thread.isAlive():
293                     flying.append(thread)
294                 elif upload_gen:
295                     try:
296                         upload_gen.next()
297                     except:
298                         pass
299             flying = unfinished
300
301         for thread in flying:
302             thread.join()
303             if thread.exception:
304                 failures.append(thread)
305             elif upload_gen:
306                 try:
307                     upload_gen.next()
308                 except:
309                     pass
310
311         return [failure.kwargs['hash'] for failure in failures]
312
313     def upload_object(
314             self, obj, f,
315             size=None,
316             hash_cb=None,
317             upload_cb=None,
318             etag=None,
319             if_etag_match=None,
320             if_not_exist=None,
321             content_encoding=None,
322             content_disposition=None,
323             content_type=None,
324             sharing=None,
325             public=None,
326             container_info_cache=None):
327         """Upload an object using multiple connections (threads)
328
329         :param obj: (str) remote object path
330
331         :param f: open file descriptor (rb)
332
333         :param hash_cb: optional progress.bar object for calculating hashes
334
335         :param upload_cb: optional progress.bar object for uploading
336
337         :param etag: (str)
338
339         :param if_etag_match: (str) Push that value to if-match header at file
340             creation
341
342         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
343             it does not exist remotely, otherwise the operation will fail.
344             Involves the case of an object with the same path is created while
345             the object is being uploaded.
346
347         :param content_encoding: (str)
348
349         :param content_disposition: (str)
350
351         :param content_type: (str)
352
353         :param sharing: {'read':[user and/or grp names],
354             'write':[usr and/or grp names]}
355
356         :param public: (bool)
357
358         :param container_info_cache: (dict) if given, avoid redundant calls to
359         server for container info (block size and hash information)
360         """
361         self._assert_container()
362
363         #init
364         block_info = (
365             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
366                 f, size, container_info_cache)
367         (hashes, hmap, offset) = ([], {}, 0)
368         if not content_type:
369             content_type = 'application/octet-stream'
370
371         self._calculate_blocks_for_upload(
372             *block_info,
373             hashes=hashes,
374             hmap=hmap,
375             fileobj=f,
376             hash_cb=hash_cb)
377
378         hashmap = dict(bytes=size, hashes=hashes)
379         missing, obj_headers = self._create_or_get_missing_hashes(
380             obj, hashmap,
381             content_type=content_type,
382             size=size,
383             if_etag_match=if_etag_match,
384             if_etag_not_match='*' if if_not_exist else None,
385             content_encoding=content_encoding,
386             content_disposition=content_disposition,
387             permissions=sharing,
388             public=public)
389
390         if missing is None:
391             return obj_headers
392
393         if upload_cb:
394             upload_gen = upload_cb(len(missing))
395             for i in range(len(missing), len(hashmap['hashes']) + 1):
396                 try:
397                     upload_gen.next()
398                 except:
399                     upload_gen = None
400         else:
401             upload_gen = None
402
403         retries = 7
404         try:
405             while retries:
406                 sendlog.info('%s blocks missing' % len(missing))
407                 num_of_blocks = len(missing)
408                 missing = self._upload_missing_blocks(
409                     missing,
410                     hmap,
411                     f,
412                     upload_gen)
413                 if missing:
414                     if num_of_blocks == len(missing):
415                         retries -= 1
416                     else:
417                         num_of_blocks = len(missing)
418                 else:
419                     break
420             if missing:
421                 raise ClientError(
422                     '%s blocks failed to upload' % len(missing),
423                     status=800)
424         except KeyboardInterrupt:
425             sendlog.info('- - - wait for threads to finish')
426             for thread in activethreads():
427                 thread.join()
428             raise
429
430         r = self.object_put(
431             obj,
432             format='json',
433             hashmap=True,
434             content_type=content_type,
435             if_etag_match=if_etag_match,
436             if_etag_not_match='*' if if_not_exist else None,
437             etag=etag,
438             json=hashmap,
439             permissions=sharing,
440             public=public,
441             success=201)
442         return r.headers
443
444     # download_* auxiliary methods
445     def _get_remote_blocks_info(self, obj, **restargs):
446         #retrieve object hashmap
447         myrange = restargs.pop('data_range', None)
448         hashmap = self.get_object_hashmap(obj, **restargs)
449         restargs['data_range'] = myrange
450         blocksize = int(hashmap['block_size'])
451         blockhash = hashmap['block_hash']
452         total_size = hashmap['bytes']
453         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
454         map_dict = {}
455         for i, h in enumerate(hashmap['hashes']):
456             #  map_dict[h] = i   CHAGE
457             if h in map_dict:
458                 map_dict[h].append(i)
459             else:
460                 map_dict[h] = [i]
461         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
462
463     def _dump_blocks_sync(
464             self, obj, remote_hashes, blocksize, total_size, dst, range,
465             **args):
466         for blockid, blockhash in enumerate(remote_hashes):
467             if blockhash:
468                 start = blocksize * blockid
469                 is_last = start + blocksize > total_size
470                 end = (total_size - 1) if is_last else (start + blocksize - 1)
471                 (start, end) = _range_up(start, end, range)
472                 args['data_range'] = 'bytes=%s-%s' % (start, end)
473                 r = self.object_get(obj, success=(200, 206), **args)
474                 self._cb_next()
475                 dst.write(r.content)
476                 dst.flush()
477
478     def _get_block_async(self, obj, **args):
479         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
480         event.start()
481         return event
482
483     def _hash_from_file(self, fp, start, size, blockhash):
484         fp.seek(start)
485         block = fp.read(size)
486         h = newhashlib(blockhash)
487         h.update(block.strip('\x00'))
488         return hexlify(h.digest())
489
490     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
491         """write the results of a greenleted rest call to a file
492
493         :param offset: the offset of the file up to blocksize
494         - e.g. if the range is 10-100, all blocks will be written to
495         normal_position - 10
496         """
497         for i, (key, g) in enumerate(flying.items()):
498             if g.isAlive():
499                 continue
500             if g.exception:
501                 raise g.exception
502             block = g.value.content
503             for block_start in blockids[key]:
504                 local_file.seek(block_start + offset)
505                 local_file.write(block)
506                 self._cb_next()
507             flying.pop(key)
508             blockids.pop(key)
509         local_file.flush()
510
511     def _dump_blocks_async(
512             self, obj, remote_hashes, blocksize, total_size, local_file,
513             blockhash=None, resume=False, filerange=None, **restargs):
514         file_size = fstat(local_file.fileno()).st_size if resume else 0
515         flying = dict()
516         blockid_dict = dict()
517         offset = 0
518         if filerange is not None:
519             rstart = int(filerange.split('-')[0])
520             offset = rstart if blocksize > rstart else rstart % blocksize
521
522         self._init_thread_limit()
523         for block_hash, blockids in remote_hashes.items():
524             blockids = [blk * blocksize for blk in blockids]
525             unsaved = [blk for blk in blockids if not (
526                 blk < file_size and block_hash == self._hash_from_file(
527                         local_file, blk, blocksize, blockhash))]
528             self._cb_next(len(blockids) - len(unsaved))
529             if unsaved:
530                 key = unsaved[0]
531                 self._watch_thread_limit(flying.values())
532                 self._thread2file(
533                     flying, blockid_dict, local_file, offset,
534                     **restargs)
535                 end = total_size - 1 if key + blocksize > total_size\
536                     else key + blocksize - 1
537                 start, end = _range_up(key, end, filerange)
538                 if start == end:
539                     self._cb_next()
540                     continue
541                 restargs['async_headers'] = {
542                     'Range': 'bytes=%s-%s' % (start, end)}
543                 flying[key] = self._get_block_async(obj, **restargs)
544                 blockid_dict[key] = unsaved
545
546         for thread in flying.values():
547             thread.join()
548         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
549
550     def download_object(
551             self, obj, dst,
552             download_cb=None,
553             version=None,
554             resume=False,
555             range_str=None,
556             if_match=None,
557             if_none_match=None,
558             if_modified_since=None,
559             if_unmodified_since=None):
560         """Download an object (multiple connections, random blocks)
561
562         :param obj: (str) remote object path
563
564         :param dst: open file descriptor (wb+)
565
566         :param download_cb: optional progress.bar object for downloading
567
568         :param version: (str) file version
569
570         :param resume: (bool) if set, preserve already downloaded file parts
571
572         :param range_str: (str) from, to are file positions (int) in bytes
573
574         :param if_match: (str)
575
576         :param if_none_match: (str)
577
578         :param if_modified_since: (str) formated date
579
580         :param if_unmodified_since: (str) formated date"""
581         restargs = dict(
582             version=version,
583             data_range=None if range_str is None else 'bytes=%s' % range_str,
584             if_match=if_match,
585             if_none_match=if_none_match,
586             if_modified_since=if_modified_since,
587             if_unmodified_since=if_unmodified_since)
588
589         (
590             blocksize,
591             blockhash,
592             total_size,
593             hash_list,
594             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
595         assert total_size >= 0
596
597         if download_cb:
598             self.progress_bar_gen = download_cb(len(hash_list))
599             self._cb_next()
600
601         if dst.isatty():
602             self._dump_blocks_sync(
603                 obj,
604                 hash_list,
605                 blocksize,
606                 total_size,
607                 dst,
608                 range_str,
609                 **restargs)
610         else:
611             self._dump_blocks_async(
612                 obj,
613                 remote_hashes,
614                 blocksize,
615                 total_size,
616                 dst,
617                 blockhash,
618                 resume,
619                 range_str,
620                 **restargs)
621             if not range_str:
622                 dst.truncate(total_size)
623
624         self._complete_cb()
625
626     #Command Progress Bar method
627     def _cb_next(self, step=1):
628         if hasattr(self, 'progress_bar_gen'):
629             try:
630                 for i in xrange(step):
631                     self.progress_bar_gen.next()
632             except:
633                 pass
634
635     def _complete_cb(self):
636         while True:
637             try:
638                 self.progress_bar_gen.next()
639             except:
640                 break
641
642     def get_object_hashmap(
643             self, obj,
644             version=None,
645             if_match=None,
646             if_none_match=None,
647             if_modified_since=None,
648             if_unmodified_since=None,
649             data_range=None):
650         """
651         :param obj: (str) remote object path
652
653         :param if_match: (str)
654
655         :param if_none_match: (str)
656
657         :param if_modified_since: (str) formated date
658
659         :param if_unmodified_since: (str) formated date
660
661         :param data_range: (str) from-to where from and to are integers
662             denoting file positions in bytes
663
664         :returns: (list)
665         """
666         try:
667             r = self.object_get(
668                 obj,
669                 hashmap=True,
670                 version=version,
671                 if_etag_match=if_match,
672                 if_etag_not_match=if_none_match,
673                 if_modified_since=if_modified_since,
674                 if_unmodified_since=if_unmodified_since,
675                 data_range=data_range)
676         except ClientError as err:
677             if err.status == 304 or err.status == 412:
678                 return {}
679             raise
680         return r.json
681
682     def set_account_group(self, group, usernames):
683         """
684         :param group: (str)
685
686         :param usernames: (list)
687         """
688         self.account_post(update=True, groups={group: usernames})
689
690     def del_account_group(self, group):
691         """
692         :param group: (str)
693         """
694         self.account_post(update=True, groups={group: []})
695
696     def get_account_info(self, until=None):
697         """
698         :param until: (str) formated date
699
700         :returns: (dict)
701         """
702         r = self.account_head(until=until)
703         if r.status_code == 401:
704             raise ClientError("No authorization", status=401)
705         return r.headers
706
707     def get_account_quota(self):
708         """
709         :returns: (dict)
710         """
711         return filter_in(
712             self.get_account_info(),
713             'X-Account-Policy-Quota',
714             exactMatch=True)
715
716     def get_account_versioning(self):
717         """
718         :returns: (dict)
719         """
720         return filter_in(
721             self.get_account_info(),
722             'X-Account-Policy-Versioning',
723             exactMatch=True)
724
725     def get_account_meta(self, until=None):
726         """
727         :meta until: (str) formated date
728
729         :returns: (dict)
730         """
731         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
732
733     def get_account_group(self):
734         """
735         :returns: (dict)
736         """
737         return filter_in(self.get_account_info(), 'X-Account-Group-')
738
739     def set_account_meta(self, metapairs):
740         """
741         :param metapairs: (dict) {key1:val1, key2:val2, ...}
742         """
743         assert(type(metapairs) is dict)
744         self.account_post(update=True, metadata=metapairs)
745
746     def del_account_meta(self, metakey):
747         """
748         :param metakey: (str) metadatum key
749         """
750         self.account_post(update=True, metadata={metakey: ''})
751
752     """
753     def set_account_quota(self, quota):
754         ""
755         :param quota: (int)
756         ""
757         self.account_post(update=True, quota=quota)
758     """
759
760     def set_account_versioning(self, versioning):
761         """
762         "param versioning: (str)
763         """
764         self.account_post(update=True, versioning=versioning)
765
766     def list_containers(self):
767         """
768         :returns: (dict)
769         """
770         r = self.account_get()
771         return r.json
772
773     def del_container(self, until=None, delimiter=None):
774         """
775         :param until: (str) formated date
776
777         :param delimiter: (str) with / empty container
778
779         :raises ClientError: 404 Container does not exist
780
781         :raises ClientError: 409 Container is not empty
782         """
783         self._assert_container()
784         r = self.container_delete(
785             until=until,
786             delimiter=delimiter,
787             success=(204, 404, 409))
788         if r.status_code == 404:
789             raise ClientError(
790                 'Container "%s" does not exist' % self.container,
791                 r.status_code)
792         elif r.status_code == 409:
793             raise ClientError(
794                 'Container "%s" is not empty' % self.container,
795                 r.status_code)
796
797     def get_container_versioning(self, container=None):
798         """
799         :param container: (str)
800
801         :returns: (dict)
802         """
803         cnt_back_up = self.container
804         try:
805             self.container = container or cnt_back_up
806             return filter_in(
807                 self.get_container_info(),
808                 'X-Container-Policy-Versioning')
809         finally:
810             self.container = cnt_back_up
811
812     def get_container_limit(self, container=None):
813         """
814         :param container: (str)
815
816         :returns: (dict)
817         """
818         cnt_back_up = self.container
819         try:
820             self.container = container or cnt_back_up
821             return filter_in(
822                 self.get_container_info(),
823                 'X-Container-Policy-Quota')
824         finally:
825             self.container = cnt_back_up
826
827     def get_container_info(self, until=None):
828         """
829         :param until: (str) formated date
830
831         :returns: (dict)
832
833         :raises ClientError: 404 Container not found
834         """
835         try:
836             r = self.container_head(until=until)
837         except ClientError as err:
838             err.details.append('for container %s' % self.container)
839             raise err
840         return r.headers
841
842     def get_container_meta(self, until=None):
843         """
844         :param until: (str) formated date
845
846         :returns: (dict)
847         """
848         return filter_in(
849             self.get_container_info(until=until),
850             'X-Container-Meta')
851
852     def get_container_object_meta(self, until=None):
853         """
854         :param until: (str) formated date
855
856         :returns: (dict)
857         """
858         return filter_in(
859             self.get_container_info(until=until),
860             'X-Container-Object-Meta')
861
862     def set_container_meta(self, metapairs):
863         """
864         :param metapairs: (dict) {key1:val1, key2:val2, ...}
865         """
866         assert(type(metapairs) is dict)
867         self.container_post(update=True, metadata=metapairs)
868
869     def del_container_meta(self, metakey):
870         """
871         :param metakey: (str) metadatum key
872         """
873         self.container_post(update=True, metadata={metakey: ''})
874
875     def set_container_limit(self, limit):
876         """
877         :param limit: (int)
878         """
879         self.container_post(update=True, quota=limit)
880
881     def set_container_versioning(self, versioning):
882         """
883         :param versioning: (str)
884         """
885         self.container_post(update=True, versioning=versioning)
886
887     def del_object(self, obj, until=None, delimiter=None):
888         """
889         :param obj: (str) remote object path
890
891         :param until: (str) formated date
892
893         :param delimiter: (str)
894         """
895         self._assert_container()
896         self.object_delete(obj, until=until, delimiter=delimiter)
897
898     def set_object_meta(self, obj, metapairs):
899         """
900         :param obj: (str) remote object path
901
902         :param metapairs: (dict) {key1:val1, key2:val2, ...}
903         """
904         assert(type(metapairs) is dict)
905         self.object_post(obj, update=True, metadata=metapairs)
906
907     def del_object_meta(self, obj, metakey):
908         """
909         :param obj: (str) remote object path
910
911         :param metakey: (str) metadatum key
912         """
913         self.object_post(obj, update=True, metadata={metakey: ''})
914
915     def publish_object(self, obj):
916         """
917         :param obj: (str) remote object path
918
919         :returns: (str) access url
920         """
921         self.object_post(obj, update=True, public=True)
922         info = self.get_object_info(obj)
923         pref, sep, rest = self.base_url.partition('//')
924         base = rest.split('/')[0]
925         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
926
927     def unpublish_object(self, obj):
928         """
929         :param obj: (str) remote object path
930         """
931         self.object_post(obj, update=True, public=False)
932
933     def get_object_info(self, obj, version=None):
934         """
935         :param obj: (str) remote object path
936
937         :param version: (str)
938
939         :returns: (dict)
940         """
941         try:
942             r = self.object_head(obj, version=version)
943             return r.headers
944         except ClientError as ce:
945             if ce.status == 404:
946                 raise ClientError('Object %s not found' % obj, status=404)
947             raise
948
949     def get_object_meta(self, obj, version=None):
950         """
951         :param obj: (str) remote object path
952
953         :param version: (str)
954
955         :returns: (dict)
956         """
957         return filter_in(
958             self.get_object_info(obj, version=version),
959             'X-Object-Meta')
960
961     def get_object_sharing(self, obj):
962         """
963         :param obj: (str) remote object path
964
965         :returns: (dict)
966         """
967         r = filter_in(
968             self.get_object_info(obj),
969             'X-Object-Sharing',
970             exactMatch=True)
971         reply = {}
972         if len(r) > 0:
973             perms = r['x-object-sharing'].split(';')
974             for perm in perms:
975                 try:
976                     perm.index('=')
977                 except ValueError:
978                     raise ClientError('Incorrect reply format')
979                 (key, val) = perm.strip().split('=')
980                 reply[key] = val
981         return reply
982
983     def set_object_sharing(
984             self, obj,
985             read_permition=False, write_permition=False):
986         """Give read/write permisions to an object.
987
988         :param obj: (str) remote object path
989
990         :param read_permition: (list - bool) users and user groups that get
991             read permition for this object - False means all previous read
992             permissions will be removed
993
994         :param write_perimition: (list - bool) of users and user groups to get
995            write permition for this object - False means all previous write
996            permissions will be removed
997         """
998
999         perms = dict(read=read_permition or '', write=write_permition or '')
1000         self.object_post(obj, update=True, permissions=perms)
1001
1002     def del_object_sharing(self, obj):
1003         """
1004         :param obj: (str) remote object path
1005         """
1006         self.set_object_sharing(obj)
1007
1008     def append_object(self, obj, source_file, upload_cb=None):
1009         """
1010         :param obj: (str) remote object path
1011
1012         :param source_file: open file descriptor
1013
1014         :param upload_db: progress.bar for uploading
1015         """
1016
1017         self._assert_container()
1018         meta = self.get_container_info()
1019         blocksize = int(meta['x-container-block-size'])
1020         filesize = fstat(source_file.fileno()).st_size
1021         nblocks = 1 + (filesize - 1) // blocksize
1022         offset = 0
1023         if upload_cb:
1024             upload_gen = upload_cb(nblocks)
1025             upload_gen.next()
1026         for i in range(nblocks):
1027             block = source_file.read(min(blocksize, filesize - offset))
1028             offset += len(block)
1029             self.object_post(
1030                 obj,
1031                 update=True,
1032                 content_range='bytes */*',
1033                 content_type='application/octet-stream',
1034                 content_length=len(block),
1035                 data=block)
1036
1037             if upload_cb:
1038                 upload_gen.next()
1039
1040     def truncate_object(self, obj, upto_bytes):
1041         """
1042         :param obj: (str) remote object path
1043
1044         :param upto_bytes: max number of bytes to leave on file
1045         """
1046         self.object_post(
1047             obj,
1048             update=True,
1049             content_range='bytes 0-%s/*' % upto_bytes,
1050             content_type='application/octet-stream',
1051             object_bytes=upto_bytes,
1052             source_object=path4url(self.container, obj))
1053
1054     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1055         """Overwrite a part of an object from local source file
1056
1057         :param obj: (str) remote object path
1058
1059         :param start: (int) position in bytes to start overwriting from
1060
1061         :param end: (int) position in bytes to stop overwriting at
1062
1063         :param source_file: open file descriptor
1064
1065         :param upload_db: progress.bar for uploading
1066         """
1067
1068         r = self.get_object_info(obj)
1069         rf_size = int(r['content-length'])
1070         if rf_size < int(start):
1071             raise ClientError(
1072                 'Range start exceeds file size',
1073                 status=416)
1074         elif rf_size < int(end):
1075             raise ClientError(
1076                 'Range end exceeds file size',
1077                 status=416)
1078         self._assert_container()
1079         meta = self.get_container_info()
1080         blocksize = int(meta['x-container-block-size'])
1081         filesize = fstat(source_file.fileno()).st_size
1082         datasize = int(end) - int(start) + 1
1083         nblocks = 1 + (datasize - 1) // blocksize
1084         offset = 0
1085         if upload_cb:
1086             upload_gen = upload_cb(nblocks)
1087             upload_gen.next()
1088         for i in range(nblocks):
1089             read_size = min(blocksize, filesize - offset, datasize - offset)
1090             block = source_file.read(read_size)
1091             self.object_post(
1092                 obj,
1093                 update=True,
1094                 content_type='application/octet-stream',
1095                 content_length=len(block),
1096                 content_range='bytes %s-%s/*' % (
1097                     start + offset,
1098                     start + offset + len(block) - 1),
1099                 data=block)
1100             offset += len(block)
1101
1102             if upload_cb:
1103                 upload_gen.next()
1104
1105     def copy_object(
1106             self, src_container, src_object, dst_container,
1107             dst_object=None,
1108             source_version=None,
1109             source_account=None,
1110             public=False,
1111             content_type=None,
1112             delimiter=None):
1113         """
1114         :param src_container: (str) source container
1115
1116         :param src_object: (str) source object path
1117
1118         :param dst_container: (str) destination container
1119
1120         :param dst_object: (str) destination object path
1121
1122         :param source_version: (str) source object version
1123
1124         :param source_account: (str) account to copy from
1125
1126         :param public: (bool)
1127
1128         :param content_type: (str)
1129
1130         :param delimiter: (str)
1131         """
1132         self._assert_account()
1133         self.container = dst_container
1134         src_path = path4url(src_container, src_object)
1135         self.object_put(
1136             dst_object or src_object,
1137             success=201,
1138             copy_from=src_path,
1139             content_length=0,
1140             source_version=source_version,
1141             source_account=source_account,
1142             public=public,
1143             content_type=content_type,
1144             delimiter=delimiter)
1145
1146     def move_object(
1147             self, src_container, src_object, dst_container,
1148             dst_object=False,
1149             source_account=None,
1150             source_version=None,
1151             public=False,
1152             content_type=None,
1153             delimiter=None):
1154         """
1155         :param src_container: (str) source container
1156
1157         :param src_object: (str) source object path
1158
1159         :param dst_container: (str) destination container
1160
1161         :param dst_object: (str) destination object path
1162
1163         :param source_account: (str) account to move from
1164
1165         :param source_version: (str) source object version
1166
1167         :param public: (bool)
1168
1169         :param content_type: (str)
1170
1171         :param delimiter: (str)
1172         """
1173         self._assert_account()
1174         self.container = dst_container
1175         dst_object = dst_object or src_object
1176         src_path = path4url(src_container, src_object)
1177         self.object_put(
1178             dst_object,
1179             success=201,
1180             move_from=src_path,
1181             content_length=0,
1182             source_account=source_account,
1183             source_version=source_version,
1184             public=public,
1185             content_type=content_type,
1186             delimiter=delimiter)
1187
1188     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1189         """Get accounts that share with self.account
1190
1191         :param limit: (str)
1192
1193         :param marker: (str)
1194
1195         :returns: (dict)
1196         """
1197         self._assert_account()
1198
1199         self.set_param('format', 'json')
1200         self.set_param('limit', limit, iff=limit is not None)
1201         self.set_param('marker', marker, iff=marker is not None)
1202
1203         path = ''
1204         success = kwargs.pop('success', (200, 204))
1205         r = self.get(path, *args, success=success, **kwargs)
1206         return r.json
1207
1208     def get_object_versionlist(self, obj):
1209         """
1210         :param obj: (str) remote object path
1211
1212         :returns: (list)
1213         """
1214         self._assert_container()
1215         r = self.object_get(obj, format='json', version='list')
1216         return r.json['versions']