Add a container_infor_cache param to upload_object
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115
116         :returns: (dict) created object metadata
117         """
118         self._assert_container()
119
120         if withHashFile:
121             data = f.read()
122             try:
123                 import json
124                 data = json.dumps(json.loads(data))
125             except ValueError:
126                 raise ClientError('"%s" is not json-formated' % f.name, 1)
127             except SyntaxError:
128                 msg = '"%s" is not a valid hashmap file' % f.name
129                 raise ClientError(msg, 1)
130             f = StringIO(data)
131         else:
132             data = f.read(size) if size else f.read()
133         r = self.object_put(
134             obj,
135             data=data,
136             etag=etag,
137             content_encoding=content_encoding,
138             content_disposition=content_disposition,
139             content_type=content_type,
140             permissions=sharing,
141             public=public,
142             success=201)
143         return r.headers
144
145     def create_object_by_manifestation(
146             self, obj,
147             etag=None,
148             content_encoding=None,
149             content_disposition=None,
150             content_type=None,
151             sharing=None,
152             public=None):
153         """
154         :param obj: (str) remote object path
155
156         :param etag: (str)
157
158         :param content_encoding: (str)
159
160         :param content_disposition: (str)
161
162         :param content_type: (str)
163
164         :param sharing: {'read':[user and/or grp names],
165             'write':[usr and/or grp names]}
166
167         :param public: (bool)
168
169         :returns: (dict) created object metadata
170         """
171         self._assert_container()
172         r = self.object_put(
173             obj,
174             content_length=0,
175             etag=etag,
176             content_encoding=content_encoding,
177             content_disposition=content_disposition,
178             content_type=content_type,
179             permissions=sharing,
180             public=public,
181             manifest='%s/%s' % (self.container, obj))
182         return r.headers
183
184     # upload_* auxiliary methods
185     def _put_block_async(self, data, hash, upload_gen=None):
186         event = SilentEvent(method=self._put_block, data=data, hash=hash)
187         event.start()
188         return event
189
190     def _put_block(self, data, hash):
191         r = self.container_post(
192             update=True,
193             content_type='application/octet-stream',
194             content_length=len(data),
195             data=data,
196             format='json')
197         assert r.json[0] == hash, 'Local hash does not match server'
198
199     def _get_file_block_info(self, fileobj, size=None, cache=None):
200         """
201         :param fileobj: (file descriptor) source
202
203         :param size: (int) size of data to upload from source
204
205         :param cache: (dict) if provided, cache container info response to
206         avoid redundant calls
207         """
208         if isinstance(cache, dict):
209             try:
210                 meta = cache[self.container]
211             except KeyError:
212                 meta = self.get_container_info()
213                 cache[self.container] = meta
214         else:
215             meta = self.get_container_info()
216         blocksize = int(meta['x-container-block-size'])
217         blockhash = meta['x-container-block-hash']
218         size = size if size is not None else fstat(fileobj.fileno()).st_size
219         nblocks = 1 + (size - 1) // blocksize
220         return (blocksize, blockhash, size, nblocks)
221
222     def _create_or_get_missing_hashes(
223             self, obj, json,
224             size=None,
225             format='json',
226             hashmap=True,
227             content_type=None,
228             if_etag_match=None,
229             if_etag_not_match=None,
230             content_encoding=None,
231             content_disposition=None,
232             permissions=None,
233             public=None,
234             success=(201, 409)):
235         r = self.object_put(
236             obj,
237             format='json',
238             hashmap=True,
239             content_type=content_type,
240             json=json,
241             if_etag_match=if_etag_match,
242             if_etag_not_match=if_etag_not_match,
243             content_encoding=content_encoding,
244             content_disposition=content_disposition,
245             permissions=permissions,
246             public=public,
247             success=success)
248         return (None if r.status_code == 201 else r.json), r.headers
249
250     def _calculate_blocks_for_upload(
251             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
252             hash_cb=None):
253         offset = 0
254         if hash_cb:
255             hash_gen = hash_cb(nblocks)
256             hash_gen.next()
257
258         for i in range(nblocks):
259             block = fileobj.read(min(blocksize, size - offset))
260             bytes = len(block)
261             hash = _pithos_hash(block, blockhash)
262             hashes.append(hash)
263             hmap[hash] = (offset, bytes)
264             offset += bytes
265             if hash_cb:
266                 hash_gen.next()
267         msg = 'Failed to calculate uploaded blocks:'
268         ' Offset and object size do not match'
269         assert offset == size, msg
270
271     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
272         """upload missing blocks asynchronously"""
273
274         self._init_thread_limit()
275
276         flying = []
277         failures = []
278         for hash in missing:
279             offset, bytes = hmap[hash]
280             fileobj.seek(offset)
281             data = fileobj.read(bytes)
282             r = self._put_block_async(data, hash, upload_gen)
283             flying.append(r)
284             unfinished = self._watch_thread_limit(flying)
285             for thread in set(flying).difference(unfinished):
286                 if thread.exception:
287                     failures.append(thread)
288                     if isinstance(
289                             thread.exception,
290                             ClientError) and thread.exception.status == 502:
291                         self.POOLSIZE = self._thread_limit
292                 elif thread.isAlive():
293                     flying.append(thread)
294                 elif upload_gen:
295                     try:
296                         upload_gen.next()
297                     except:
298                         pass
299             flying = unfinished
300
301         for thread in flying:
302             thread.join()
303             if thread.exception:
304                 failures.append(thread)
305             elif upload_gen:
306                 try:
307                     upload_gen.next()
308                 except:
309                     pass
310
311         return [failure.kwargs['hash'] for failure in failures]
312
313     def upload_object(
314             self, obj, f,
315             size=None,
316             hash_cb=None,
317             upload_cb=None,
318             etag=None,
319             if_etag_match=None,
320             if_not_exist=None,
321             content_encoding=None,
322             content_disposition=None,
323             content_type=None,
324             sharing=None,
325             public=None,
326             container_info_cache=None):
327         """Upload an object using multiple connections (threads)
328
329         :param obj: (str) remote object path
330
331         :param f: open file descriptor (rb)
332
333         :param hash_cb: optional progress.bar object for calculating hashes
334
335         :param upload_cb: optional progress.bar object for uploading
336
337         :param etag: (str)
338
339         :param if_etag_match: (str) Push that value to if-match header at file
340             creation
341
342         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
343             it does not exist remotely, otherwise the operation will fail.
344             Involves the case of an object with the same path is created while
345             the object is being uploaded.
346
347         :param content_encoding: (str)
348
349         :param content_disposition: (str)
350
351         :param content_type: (str)
352
353         :param sharing: {'read':[user and/or grp names],
354             'write':[usr and/or grp names]}
355
356         :param public: (bool)
357
358         :param container_info_cache: (dict) if given, avoid redundant calls to
359         server for container info (block size and hash information)
360         """
361         self._assert_container()
362
363         #init
364         block_info = (blocksize, blockhash, size, nblocks) =\
365             self._get_file_block_info(f, size, container_info_cache)
366         (hashes, hmap, offset) = ([], {}, 0)
367         if not content_type:
368             content_type = 'application/octet-stream'
369
370         self._calculate_blocks_for_upload(
371             *block_info,
372             hashes=hashes,
373             hmap=hmap,
374             fileobj=f,
375             hash_cb=hash_cb)
376
377         hashmap = dict(bytes=size, hashes=hashes)
378         missing, obj_headers = self._create_or_get_missing_hashes(
379             obj, hashmap,
380             content_type=content_type,
381             size=size,
382             if_etag_match=if_etag_match,
383             if_etag_not_match='*' if if_not_exist else None,
384             content_encoding=content_encoding,
385             content_disposition=content_disposition,
386             permissions=sharing,
387             public=public)
388
389         if missing is None:
390             return obj_headers
391
392         if upload_cb:
393             upload_gen = upload_cb(len(missing))
394             for i in range(len(missing), len(hashmap['hashes']) + 1):
395                 try:
396                     upload_gen.next()
397                 except:
398                     upload_gen = None
399         else:
400             upload_gen = None
401
402         retries = 7
403         try:
404             while retries:
405                 sendlog.info('%s blocks missing' % len(missing))
406                 num_of_blocks = len(missing)
407                 missing = self._upload_missing_blocks(
408                     missing,
409                     hmap,
410                     f,
411                     upload_gen)
412                 if missing:
413                     if num_of_blocks == len(missing):
414                         retries -= 1
415                     else:
416                         num_of_blocks = len(missing)
417                 else:
418                     break
419             if missing:
420                 raise ClientError(
421                     '%s blocks failed to upload' % len(missing),
422                     status=800)
423         except KeyboardInterrupt:
424             sendlog.info('- - - wait for threads to finish')
425             for thread in activethreads():
426                 thread.join()
427             raise
428
429         r = self.object_put(
430             obj,
431             format='json',
432             hashmap=True,
433             content_type=content_type,
434             if_etag_match=if_etag_match,
435             if_etag_not_match='*' if if_not_exist else None,
436             etag=etag,
437             json=hashmap,
438             permissions=sharing,
439             public=public,
440             success=201)
441         return r.headers
442
443     # download_* auxiliary methods
444     def _get_remote_blocks_info(self, obj, **restargs):
445         #retrieve object hashmap
446         myrange = restargs.pop('data_range', None)
447         hashmap = self.get_object_hashmap(obj, **restargs)
448         restargs['data_range'] = myrange
449         blocksize = int(hashmap['block_size'])
450         blockhash = hashmap['block_hash']
451         total_size = hashmap['bytes']
452         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
453         map_dict = {}
454         for i, h in enumerate(hashmap['hashes']):
455             #  map_dict[h] = i   CHAGE
456             if h in map_dict:
457                 map_dict[h].append(i)
458             else:
459                 map_dict[h] = [i]
460         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
461
462     def _dump_blocks_sync(
463             self, obj, remote_hashes, blocksize, total_size, dst, range,
464             **args):
465         for blockid, blockhash in enumerate(remote_hashes):
466             if blockhash:
467                 start = blocksize * blockid
468                 is_last = start + blocksize > total_size
469                 end = (total_size - 1) if is_last else (start + blocksize - 1)
470                 (start, end) = _range_up(start, end, range)
471                 args['data_range'] = 'bytes=%s-%s' % (start, end)
472                 r = self.object_get(obj, success=(200, 206), **args)
473                 self._cb_next()
474                 dst.write(r.content)
475                 dst.flush()
476
477     def _get_block_async(self, obj, **args):
478         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
479         event.start()
480         return event
481
482     def _hash_from_file(self, fp, start, size, blockhash):
483         fp.seek(start)
484         block = fp.read(size)
485         h = newhashlib(blockhash)
486         h.update(block.strip('\x00'))
487         return hexlify(h.digest())
488
489     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
490         """write the results of a greenleted rest call to a file
491
492         :param offset: the offset of the file up to blocksize
493         - e.g. if the range is 10-100, all blocks will be written to
494         normal_position - 10
495         """
496         for i, (key, g) in enumerate(flying.items()):
497             if g.isAlive():
498                 continue
499             if g.exception:
500                 raise g.exception
501             block = g.value.content
502             for block_start in blockids[key]:
503                 local_file.seek(block_start + offset)
504                 local_file.write(block)
505                 self._cb_next()
506             flying.pop(key)
507             blockids.pop(key)
508         local_file.flush()
509
510     def _dump_blocks_async(
511             self, obj, remote_hashes, blocksize, total_size, local_file,
512             blockhash=None, resume=False, filerange=None, **restargs):
513         file_size = fstat(local_file.fileno()).st_size if resume else 0
514         flying = dict()
515         blockid_dict = dict()
516         offset = 0
517         if filerange is not None:
518             rstart = int(filerange.split('-')[0])
519             offset = rstart if blocksize > rstart else rstart % blocksize
520
521         self._init_thread_limit()
522         for block_hash, blockids in remote_hashes.items():
523             blockids = [blk * blocksize for blk in blockids]
524             unsaved = [blk for blk in blockids if not (
525                 blk < file_size and block_hash == self._hash_from_file(
526                         local_file, blk, blocksize, blockhash))]
527             self._cb_next(len(blockids) - len(unsaved))
528             if unsaved:
529                 key = unsaved[0]
530                 self._watch_thread_limit(flying.values())
531                 self._thread2file(
532                     flying, blockid_dict, local_file, offset,
533                     **restargs)
534                 end = total_size - 1 if key + blocksize > total_size\
535                     else key + blocksize - 1
536                 start, end = _range_up(key, end, filerange)
537                 if start == end:
538                     self._cb_next()
539                     continue
540                 restargs['async_headers'] = {
541                     'Range': 'bytes=%s-%s' % (start, end)}
542                 flying[key] = self._get_block_async(obj, **restargs)
543                 blockid_dict[key] = unsaved
544
545         for thread in flying.values():
546             thread.join()
547         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
548
549     def download_object(
550             self, obj, dst,
551             download_cb=None,
552             version=None,
553             resume=False,
554             range_str=None,
555             if_match=None,
556             if_none_match=None,
557             if_modified_since=None,
558             if_unmodified_since=None):
559         """Download an object (multiple connections, random blocks)
560
561         :param obj: (str) remote object path
562
563         :param dst: open file descriptor (wb+)
564
565         :param download_cb: optional progress.bar object for downloading
566
567         :param version: (str) file version
568
569         :param resume: (bool) if set, preserve already downloaded file parts
570
571         :param range_str: (str) from, to are file positions (int) in bytes
572
573         :param if_match: (str)
574
575         :param if_none_match: (str)
576
577         :param if_modified_since: (str) formated date
578
579         :param if_unmodified_since: (str) formated date"""
580         restargs = dict(
581             version=version,
582             data_range=None if range_str is None else 'bytes=%s' % range_str,
583             if_match=if_match,
584             if_none_match=if_none_match,
585             if_modified_since=if_modified_since,
586             if_unmodified_since=if_unmodified_since)
587
588         (
589             blocksize,
590             blockhash,
591             total_size,
592             hash_list,
593             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
594         assert total_size >= 0
595
596         if download_cb:
597             self.progress_bar_gen = download_cb(len(hash_list))
598             self._cb_next()
599
600         if dst.isatty():
601             self._dump_blocks_sync(
602                 obj,
603                 hash_list,
604                 blocksize,
605                 total_size,
606                 dst,
607                 range_str,
608                 **restargs)
609         else:
610             self._dump_blocks_async(
611                 obj,
612                 remote_hashes,
613                 blocksize,
614                 total_size,
615                 dst,
616                 blockhash,
617                 resume,
618                 range_str,
619                 **restargs)
620             if not range_str:
621                 dst.truncate(total_size)
622
623         self._complete_cb()
624
625     #Command Progress Bar method
626     def _cb_next(self, step=1):
627         if hasattr(self, 'progress_bar_gen'):
628             try:
629                 for i in xrange(step):
630                     self.progress_bar_gen.next()
631             except:
632                 pass
633
634     def _complete_cb(self):
635         while True:
636             try:
637                 self.progress_bar_gen.next()
638             except:
639                 break
640
641     def get_object_hashmap(
642             self, obj,
643             version=None,
644             if_match=None,
645             if_none_match=None,
646             if_modified_since=None,
647             if_unmodified_since=None,
648             data_range=None):
649         """
650         :param obj: (str) remote object path
651
652         :param if_match: (str)
653
654         :param if_none_match: (str)
655
656         :param if_modified_since: (str) formated date
657
658         :param if_unmodified_since: (str) formated date
659
660         :param data_range: (str) from-to where from and to are integers
661             denoting file positions in bytes
662
663         :returns: (list)
664         """
665         try:
666             r = self.object_get(
667                 obj,
668                 hashmap=True,
669                 version=version,
670                 if_etag_match=if_match,
671                 if_etag_not_match=if_none_match,
672                 if_modified_since=if_modified_since,
673                 if_unmodified_since=if_unmodified_since,
674                 data_range=data_range)
675         except ClientError as err:
676             if err.status == 304 or err.status == 412:
677                 return {}
678             raise
679         return r.json
680
681     def set_account_group(self, group, usernames):
682         """
683         :param group: (str)
684
685         :param usernames: (list)
686         """
687         self.account_post(update=True, groups={group: usernames})
688
689     def del_account_group(self, group):
690         """
691         :param group: (str)
692         """
693         self.account_post(update=True, groups={group: []})
694
695     def get_account_info(self, until=None):
696         """
697         :param until: (str) formated date
698
699         :returns: (dict)
700         """
701         r = self.account_head(until=until)
702         if r.status_code == 401:
703             raise ClientError("No authorization", status=401)
704         return r.headers
705
706     def get_account_quota(self):
707         """
708         :returns: (dict)
709         """
710         return filter_in(
711             self.get_account_info(),
712             'X-Account-Policy-Quota',
713             exactMatch=True)
714
715     def get_account_versioning(self):
716         """
717         :returns: (dict)
718         """
719         return filter_in(
720             self.get_account_info(),
721             'X-Account-Policy-Versioning',
722             exactMatch=True)
723
724     def get_account_meta(self, until=None):
725         """
726         :meta until: (str) formated date
727
728         :returns: (dict)
729         """
730         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
731
732     def get_account_group(self):
733         """
734         :returns: (dict)
735         """
736         return filter_in(self.get_account_info(), 'X-Account-Group-')
737
738     def set_account_meta(self, metapairs):
739         """
740         :param metapairs: (dict) {key1:val1, key2:val2, ...}
741         """
742         assert(type(metapairs) is dict)
743         self.account_post(update=True, metadata=metapairs)
744
745     def del_account_meta(self, metakey):
746         """
747         :param metakey: (str) metadatum key
748         """
749         self.account_post(update=True, metadata={metakey: ''})
750
751     """
752     def set_account_quota(self, quota):
753         ""
754         :param quota: (int)
755         ""
756         self.account_post(update=True, quota=quota)
757     """
758
759     def set_account_versioning(self, versioning):
760         """
761         "param versioning: (str)
762         """
763         self.account_post(update=True, versioning=versioning)
764
765     def list_containers(self):
766         """
767         :returns: (dict)
768         """
769         r = self.account_get()
770         return r.json
771
772     def del_container(self, until=None, delimiter=None):
773         """
774         :param until: (str) formated date
775
776         :param delimiter: (str) with / empty container
777
778         :raises ClientError: 404 Container does not exist
779
780         :raises ClientError: 409 Container is not empty
781         """
782         self._assert_container()
783         r = self.container_delete(
784             until=until,
785             delimiter=delimiter,
786             success=(204, 404, 409))
787         if r.status_code == 404:
788             raise ClientError(
789                 'Container "%s" does not exist' % self.container,
790                 r.status_code)
791         elif r.status_code == 409:
792             raise ClientError(
793                 'Container "%s" is not empty' % self.container,
794                 r.status_code)
795
796     def get_container_versioning(self, container=None):
797         """
798         :param container: (str)
799
800         :returns: (dict)
801         """
802         cnt_back_up = self.container
803         try:
804             self.container = container or cnt_back_up
805             return filter_in(
806                 self.get_container_info(),
807                 'X-Container-Policy-Versioning')
808         finally:
809             self.container = cnt_back_up
810
811     def get_container_limit(self, container=None):
812         """
813         :param container: (str)
814
815         :returns: (dict)
816         """
817         cnt_back_up = self.container
818         try:
819             self.container = container or cnt_back_up
820             return filter_in(
821                 self.get_container_info(),
822                 'X-Container-Policy-Quota')
823         finally:
824             self.container = cnt_back_up
825
826     def get_container_info(self, until=None):
827         """
828         :param until: (str) formated date
829
830         :returns: (dict)
831
832         :raises ClientError: 404 Container not found
833         """
834         try:
835             r = self.container_head(until=until)
836         except ClientError as err:
837             err.details.append('for container %s' % self.container)
838             raise err
839         return r.headers
840
841     def get_container_meta(self, until=None):
842         """
843         :param until: (str) formated date
844
845         :returns: (dict)
846         """
847         return filter_in(
848             self.get_container_info(until=until),
849             'X-Container-Meta')
850
851     def get_container_object_meta(self, until=None):
852         """
853         :param until: (str) formated date
854
855         :returns: (dict)
856         """
857         return filter_in(
858             self.get_container_info(until=until),
859             'X-Container-Object-Meta')
860
861     def set_container_meta(self, metapairs):
862         """
863         :param metapairs: (dict) {key1:val1, key2:val2, ...}
864         """
865         assert(type(metapairs) is dict)
866         self.container_post(update=True, metadata=metapairs)
867
868     def del_container_meta(self, metakey):
869         """
870         :param metakey: (str) metadatum key
871         """
872         self.container_post(update=True, metadata={metakey: ''})
873
874     def set_container_limit(self, limit):
875         """
876         :param limit: (int)
877         """
878         self.container_post(update=True, quota=limit)
879
880     def set_container_versioning(self, versioning):
881         """
882         :param versioning: (str)
883         """
884         self.container_post(update=True, versioning=versioning)
885
886     def del_object(self, obj, until=None, delimiter=None):
887         """
888         :param obj: (str) remote object path
889
890         :param until: (str) formated date
891
892         :param delimiter: (str)
893         """
894         self._assert_container()
895         self.object_delete(obj, until=until, delimiter=delimiter)
896
897     def set_object_meta(self, obj, metapairs):
898         """
899         :param obj: (str) remote object path
900
901         :param metapairs: (dict) {key1:val1, key2:val2, ...}
902         """
903         assert(type(metapairs) is dict)
904         self.object_post(obj, update=True, metadata=metapairs)
905
906     def del_object_meta(self, obj, metakey):
907         """
908         :param obj: (str) remote object path
909
910         :param metakey: (str) metadatum key
911         """
912         self.object_post(obj, update=True, metadata={metakey: ''})
913
914     def publish_object(self, obj):
915         """
916         :param obj: (str) remote object path
917
918         :returns: (str) access url
919         """
920         self.object_post(obj, update=True, public=True)
921         info = self.get_object_info(obj)
922         pref, sep, rest = self.base_url.partition('//')
923         base = rest.split('/')[0]
924         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
925
926     def unpublish_object(self, obj):
927         """
928         :param obj: (str) remote object path
929         """
930         self.object_post(obj, update=True, public=False)
931
932     def get_object_info(self, obj, version=None):
933         """
934         :param obj: (str) remote object path
935
936         :param version: (str)
937
938         :returns: (dict)
939         """
940         try:
941             r = self.object_head(obj, version=version)
942             return r.headers
943         except ClientError as ce:
944             if ce.status == 404:
945                 raise ClientError('Object %s not found' % obj, status=404)
946             raise
947
948     def get_object_meta(self, obj, version=None):
949         """
950         :param obj: (str) remote object path
951
952         :param version: (str)
953
954         :returns: (dict)
955         """
956         return filter_in(
957             self.get_object_info(obj, version=version),
958             'X-Object-Meta')
959
960     def get_object_sharing(self, obj):
961         """
962         :param obj: (str) remote object path
963
964         :returns: (dict)
965         """
966         r = filter_in(
967             self.get_object_info(obj),
968             'X-Object-Sharing',
969             exactMatch=True)
970         reply = {}
971         if len(r) > 0:
972             perms = r['x-object-sharing'].split(';')
973             for perm in perms:
974                 try:
975                     perm.index('=')
976                 except ValueError:
977                     raise ClientError('Incorrect reply format')
978                 (key, val) = perm.strip().split('=')
979                 reply[key] = val
980         return reply
981
982     def set_object_sharing(
983             self, obj,
984             read_permition=False, write_permition=False):
985         """Give read/write permisions to an object.
986
987         :param obj: (str) remote object path
988
989         :param read_permition: (list - bool) users and user groups that get
990             read permition for this object - False means all previous read
991             permissions will be removed
992
993         :param write_perimition: (list - bool) of users and user groups to get
994            write permition for this object - False means all previous write
995            permissions will be removed
996         """
997
998         perms = dict(read=read_permition or '', write=write_permition or '')
999         self.object_post(obj, update=True, permissions=perms)
1000
1001     def del_object_sharing(self, obj):
1002         """
1003         :param obj: (str) remote object path
1004         """
1005         self.set_object_sharing(obj)
1006
1007     def append_object(self, obj, source_file, upload_cb=None):
1008         """
1009         :param obj: (str) remote object path
1010
1011         :param source_file: open file descriptor
1012
1013         :param upload_db: progress.bar for uploading
1014         """
1015
1016         self._assert_container()
1017         meta = self.get_container_info()
1018         blocksize = int(meta['x-container-block-size'])
1019         filesize = fstat(source_file.fileno()).st_size
1020         nblocks = 1 + (filesize - 1) // blocksize
1021         offset = 0
1022         if upload_cb:
1023             upload_gen = upload_cb(nblocks)
1024             upload_gen.next()
1025         for i in range(nblocks):
1026             block = source_file.read(min(blocksize, filesize - offset))
1027             offset += len(block)
1028             self.object_post(
1029                 obj,
1030                 update=True,
1031                 content_range='bytes */*',
1032                 content_type='application/octet-stream',
1033                 content_length=len(block),
1034                 data=block)
1035
1036             if upload_cb:
1037                 upload_gen.next()
1038
1039     def truncate_object(self, obj, upto_bytes):
1040         """
1041         :param obj: (str) remote object path
1042
1043         :param upto_bytes: max number of bytes to leave on file
1044         """
1045         self.object_post(
1046             obj,
1047             update=True,
1048             content_range='bytes 0-%s/*' % upto_bytes,
1049             content_type='application/octet-stream',
1050             object_bytes=upto_bytes,
1051             source_object=path4url(self.container, obj))
1052
1053     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1054         """Overwrite a part of an object from local source file
1055
1056         :param obj: (str) remote object path
1057
1058         :param start: (int) position in bytes to start overwriting from
1059
1060         :param end: (int) position in bytes to stop overwriting at
1061
1062         :param source_file: open file descriptor
1063
1064         :param upload_db: progress.bar for uploading
1065         """
1066
1067         r = self.get_object_info(obj)
1068         rf_size = int(r['content-length'])
1069         if rf_size < int(start):
1070             raise ClientError(
1071                 'Range start exceeds file size',
1072                 status=416)
1073         elif rf_size < int(end):
1074             raise ClientError(
1075                 'Range end exceeds file size',
1076                 status=416)
1077         self._assert_container()
1078         meta = self.get_container_info()
1079         blocksize = int(meta['x-container-block-size'])
1080         filesize = fstat(source_file.fileno()).st_size
1081         datasize = int(end) - int(start) + 1
1082         nblocks = 1 + (datasize - 1) // blocksize
1083         offset = 0
1084         if upload_cb:
1085             upload_gen = upload_cb(nblocks)
1086             upload_gen.next()
1087         for i in range(nblocks):
1088             read_size = min(blocksize, filesize - offset, datasize - offset)
1089             block = source_file.read(read_size)
1090             self.object_post(
1091                 obj,
1092                 update=True,
1093                 content_type='application/octet-stream',
1094                 content_length=len(block),
1095                 content_range='bytes %s-%s/*' % (
1096                     start + offset,
1097                     start + offset + len(block) - 1),
1098                 data=block)
1099             offset += len(block)
1100
1101             if upload_cb:
1102                 upload_gen.next()
1103
1104     def copy_object(
1105             self, src_container, src_object, dst_container,
1106             dst_object=None,
1107             source_version=None,
1108             source_account=None,
1109             public=False,
1110             content_type=None,
1111             delimiter=None):
1112         """
1113         :param src_container: (str) source container
1114
1115         :param src_object: (str) source object path
1116
1117         :param dst_container: (str) destination container
1118
1119         :param dst_object: (str) destination object path
1120
1121         :param source_version: (str) source object version
1122
1123         :param source_account: (str) account to copy from
1124
1125         :param public: (bool)
1126
1127         :param content_type: (str)
1128
1129         :param delimiter: (str)
1130         """
1131         self._assert_account()
1132         self.container = dst_container
1133         src_path = path4url(src_container, src_object)
1134         self.object_put(
1135             dst_object or src_object,
1136             success=201,
1137             copy_from=src_path,
1138             content_length=0,
1139             source_version=source_version,
1140             source_account=source_account,
1141             public=public,
1142             content_type=content_type,
1143             delimiter=delimiter)
1144
1145     def move_object(
1146             self, src_container, src_object, dst_container,
1147             dst_object=False,
1148             source_account=None,
1149             source_version=None,
1150             public=False,
1151             content_type=None,
1152             delimiter=None):
1153         """
1154         :param src_container: (str) source container
1155
1156         :param src_object: (str) source object path
1157
1158         :param dst_container: (str) destination container
1159
1160         :param dst_object: (str) destination object path
1161
1162         :param source_account: (str) account to move from
1163
1164         :param source_version: (str) source object version
1165
1166         :param public: (bool)
1167
1168         :param content_type: (str)
1169
1170         :param delimiter: (str)
1171         """
1172         self._assert_account()
1173         self.container = dst_container
1174         dst_object = dst_object or src_object
1175         src_path = path4url(src_container, src_object)
1176         self.object_put(
1177             dst_object,
1178             success=201,
1179             move_from=src_path,
1180             content_length=0,
1181             source_account=source_account,
1182             source_version=source_version,
1183             public=public,
1184             content_type=content_type,
1185             delimiter=delimiter)
1186
1187     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1188         """Get accounts that share with self.account
1189
1190         :param limit: (str)
1191
1192         :param marker: (str)
1193
1194         :returns: (dict)
1195         """
1196         self._assert_account()
1197
1198         self.set_param('format', 'json')
1199         self.set_param('limit', limit, iff=limit is not None)
1200         self.set_param('marker', marker, iff=marker is not None)
1201
1202         path = ''
1203         success = kwargs.pop('success', (200, 204))
1204         r = self.get(path, *args, success=success, **kwargs)
1205         return r.json
1206
1207     def get_object_versionlist(self, obj):
1208         """
1209         :param obj: (str) remote object path
1210
1211         :returns: (list)
1212         """
1213         self._assert_container()
1214         r = self.object_get(obj, format='json', version='list')
1215         return r.json['versions']