Optimize download_to_string by using threads
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115
116         :returns: (dict) created object metadata
117         """
118         self._assert_container()
119
120         if withHashFile:
121             data = f.read()
122             try:
123                 import json
124                 data = json.dumps(json.loads(data))
125             except ValueError:
126                 raise ClientError('"%s" is not json-formated' % f.name, 1)
127             except SyntaxError:
128                 msg = '"%s" is not a valid hashmap file' % f.name
129                 raise ClientError(msg, 1)
130             f = StringIO(data)
131         else:
132             data = f.read(size) if size else f.read()
133         r = self.object_put(
134             obj,
135             data=data,
136             etag=etag,
137             content_encoding=content_encoding,
138             content_disposition=content_disposition,
139             content_type=content_type,
140             permissions=sharing,
141             public=public,
142             success=201)
143         return r.headers
144
145     def create_object_by_manifestation(
146             self, obj,
147             etag=None,
148             content_encoding=None,
149             content_disposition=None,
150             content_type=None,
151             sharing=None,
152             public=None):
153         """
154         :param obj: (str) remote object path
155
156         :param etag: (str)
157
158         :param content_encoding: (str)
159
160         :param content_disposition: (str)
161
162         :param content_type: (str)
163
164         :param sharing: {'read':[user and/or grp names],
165             'write':[usr and/or grp names]}
166
167         :param public: (bool)
168
169         :returns: (dict) created object metadata
170         """
171         self._assert_container()
172         r = self.object_put(
173             obj,
174             content_length=0,
175             etag=etag,
176             content_encoding=content_encoding,
177             content_disposition=content_disposition,
178             content_type=content_type,
179             permissions=sharing,
180             public=public,
181             manifest='%s/%s' % (self.container, obj))
182         return r.headers
183
184     # upload_* auxiliary methods
185     def _put_block_async(self, data, hash, upload_gen=None):
186         event = SilentEvent(method=self._put_block, data=data, hash=hash)
187         event.start()
188         return event
189
190     def _put_block(self, data, hash):
191         r = self.container_post(
192             update=True,
193             content_type='application/octet-stream',
194             content_length=len(data),
195             data=data,
196             format='json')
197         assert r.json[0] == hash, 'Local hash does not match server'
198
199     def _get_file_block_info(self, fileobj, size=None, cache=None):
200         """
201         :param fileobj: (file descriptor) source
202
203         :param size: (int) size of data to upload from source
204
205         :param cache: (dict) if provided, cache container info response to
206         avoid redundant calls
207         """
208         if isinstance(cache, dict):
209             try:
210                 meta = cache[self.container]
211             except KeyError:
212                 meta = self.get_container_info()
213                 cache[self.container] = meta
214         else:
215             meta = self.get_container_info()
216         blocksize = int(meta['x-container-block-size'])
217         blockhash = meta['x-container-block-hash']
218         size = size if size is not None else fstat(fileobj.fileno()).st_size
219         nblocks = 1 + (size - 1) // blocksize
220         return (blocksize, blockhash, size, nblocks)
221
222     def _create_or_get_missing_hashes(
223             self, obj, json,
224             size=None,
225             format='json',
226             hashmap=True,
227             content_type=None,
228             if_etag_match=None,
229             if_etag_not_match=None,
230             content_encoding=None,
231             content_disposition=None,
232             permissions=None,
233             public=None,
234             success=(201, 409)):
235         r = self.object_put(
236             obj,
237             format='json',
238             hashmap=True,
239             content_type=content_type,
240             json=json,
241             if_etag_match=if_etag_match,
242             if_etag_not_match=if_etag_not_match,
243             content_encoding=content_encoding,
244             content_disposition=content_disposition,
245             permissions=permissions,
246             public=public,
247             success=success)
248         return (None if r.status_code == 201 else r.json), r.headers
249
250     def _calculate_blocks_for_upload(
251             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
252             hash_cb=None):
253         offset = 0
254         if hash_cb:
255             hash_gen = hash_cb(nblocks)
256             hash_gen.next()
257
258         for i in range(nblocks):
259             block = fileobj.read(min(blocksize, size - offset))
260             bytes = len(block)
261             hash = _pithos_hash(block, blockhash)
262             hashes.append(hash)
263             hmap[hash] = (offset, bytes)
264             offset += bytes
265             if hash_cb:
266                 hash_gen.next()
267         msg = 'Failed to calculate uploaded blocks:'
268         ' Offset and object size do not match'
269         assert offset == size, msg
270
271     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
272         """upload missing blocks asynchronously"""
273
274         self._init_thread_limit()
275
276         flying = []
277         failures = []
278         for hash in missing:
279             offset, bytes = hmap[hash]
280             fileobj.seek(offset)
281             data = fileobj.read(bytes)
282             r = self._put_block_async(data, hash, upload_gen)
283             flying.append(r)
284             unfinished = self._watch_thread_limit(flying)
285             for thread in set(flying).difference(unfinished):
286                 if thread.exception:
287                     failures.append(thread)
288                     if isinstance(
289                             thread.exception,
290                             ClientError) and thread.exception.status == 502:
291                         self.POOLSIZE = self._thread_limit
292                 elif thread.isAlive():
293                     flying.append(thread)
294                 elif upload_gen:
295                     try:
296                         upload_gen.next()
297                     except:
298                         pass
299             flying = unfinished
300
301         for thread in flying:
302             thread.join()
303             if thread.exception:
304                 failures.append(thread)
305             elif upload_gen:
306                 try:
307                     upload_gen.next()
308                 except:
309                     pass
310
311         return [failure.kwargs['hash'] for failure in failures]
312
313     def upload_object(
314             self, obj, f,
315             size=None,
316             hash_cb=None,
317             upload_cb=None,
318             etag=None,
319             if_etag_match=None,
320             if_not_exist=None,
321             content_encoding=None,
322             content_disposition=None,
323             content_type=None,
324             sharing=None,
325             public=None,
326             container_info_cache=None):
327         """Upload an object using multiple connections (threads)
328
329         :param obj: (str) remote object path
330
331         :param f: open file descriptor (rb)
332
333         :param hash_cb: optional progress.bar object for calculating hashes
334
335         :param upload_cb: optional progress.bar object for uploading
336
337         :param etag: (str)
338
339         :param if_etag_match: (str) Push that value to if-match header at file
340             creation
341
342         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
343             it does not exist remotely, otherwise the operation will fail.
344             Involves the case of an object with the same path is created while
345             the object is being uploaded.
346
347         :param content_encoding: (str)
348
349         :param content_disposition: (str)
350
351         :param content_type: (str)
352
353         :param sharing: {'read':[user and/or grp names],
354             'write':[usr and/or grp names]}
355
356         :param public: (bool)
357
358         :param container_info_cache: (dict) if given, avoid redundant calls to
359         server for container info (block size and hash information)
360         """
361         self._assert_container()
362
363         #init
364         block_info = (
365             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
366                 f, size, container_info_cache)
367         (hashes, hmap, offset) = ([], {}, 0)
368         if not content_type:
369             content_type = 'application/octet-stream'
370
371         self._calculate_blocks_for_upload(
372             *block_info,
373             hashes=hashes,
374             hmap=hmap,
375             fileobj=f,
376             hash_cb=hash_cb)
377
378         hashmap = dict(bytes=size, hashes=hashes)
379         missing, obj_headers = self._create_or_get_missing_hashes(
380             obj, hashmap,
381             content_type=content_type,
382             size=size,
383             if_etag_match=if_etag_match,
384             if_etag_not_match='*' if if_not_exist else None,
385             content_encoding=content_encoding,
386             content_disposition=content_disposition,
387             permissions=sharing,
388             public=public)
389
390         if missing is None:
391             return obj_headers
392
393         if upload_cb:
394             upload_gen = upload_cb(len(missing))
395             for i in range(len(missing), len(hashmap['hashes']) + 1):
396                 try:
397                     upload_gen.next()
398                 except:
399                     upload_gen = None
400         else:
401             upload_gen = None
402
403         retries = 7
404         try:
405             while retries:
406                 sendlog.info('%s blocks missing' % len(missing))
407                 num_of_blocks = len(missing)
408                 missing = self._upload_missing_blocks(
409                     missing,
410                     hmap,
411                     f,
412                     upload_gen)
413                 if missing:
414                     if num_of_blocks == len(missing):
415                         retries -= 1
416                     else:
417                         num_of_blocks = len(missing)
418                 else:
419                     break
420             if missing:
421                 raise ClientError(
422                     '%s blocks failed to upload' % len(missing),
423                     status=800)
424         except KeyboardInterrupt:
425             sendlog.info('- - - wait for threads to finish')
426             for thread in activethreads():
427                 thread.join()
428             raise
429
430         r = self.object_put(
431             obj,
432             format='json',
433             hashmap=True,
434             content_type=content_type,
435             if_etag_match=if_etag_match,
436             if_etag_not_match='*' if if_not_exist else None,
437             etag=etag,
438             json=hashmap,
439             permissions=sharing,
440             public=public,
441             success=201)
442         return r.headers
443
444     # download_* auxiliary methods
445     def _get_remote_blocks_info(self, obj, **restargs):
446         #retrieve object hashmap
447         myrange = restargs.pop('data_range', None)
448         hashmap = self.get_object_hashmap(obj, **restargs)
449         restargs['data_range'] = myrange
450         blocksize = int(hashmap['block_size'])
451         blockhash = hashmap['block_hash']
452         total_size = hashmap['bytes']
453         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
454         map_dict = {}
455         for i, h in enumerate(hashmap['hashes']):
456             #  map_dict[h] = i   CHAGE
457             if h in map_dict:
458                 map_dict[h].append(i)
459             else:
460                 map_dict[h] = [i]
461         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
462
463     def _dump_blocks_sync(
464             self, obj, remote_hashes, blocksize, total_size, dst, range,
465             **args):
466         for blockid, blockhash in enumerate(remote_hashes):
467             if blockhash:
468                 start = blocksize * blockid
469                 is_last = start + blocksize > total_size
470                 end = (total_size - 1) if is_last else (start + blocksize - 1)
471                 (start, end) = _range_up(start, end, range)
472                 args['data_range'] = 'bytes=%s-%s' % (start, end)
473                 r = self.object_get(obj, success=(200, 206), **args)
474                 self._cb_next()
475                 dst.write(r.content)
476                 dst.flush()
477
478     def _get_block_async(self, obj, **args):
479         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
480         event.start()
481         return event
482
483     def _hash_from_file(self, fp, start, size, blockhash):
484         fp.seek(start)
485         block = fp.read(size)
486         h = newhashlib(blockhash)
487         h.update(block.strip('\x00'))
488         return hexlify(h.digest())
489
490     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
491         """write the results of a greenleted rest call to a file
492
493         :param offset: the offset of the file up to blocksize
494         - e.g. if the range is 10-100, all blocks will be written to
495         normal_position - 10
496         """
497         for key, g in flying.items():
498             if g.isAlive():
499                 continue
500             if g.exception:
501                 raise g.exception
502             block = g.value.content
503             for block_start in blockids[key]:
504                 local_file.seek(block_start + offset)
505                 local_file.write(block)
506                 self._cb_next()
507             flying.pop(key)
508             blockids.pop(key)
509         local_file.flush()
510
511     def _dump_blocks_async(
512             self, obj, remote_hashes, blocksize, total_size, local_file,
513             blockhash=None, resume=False, filerange=None, **restargs):
514         file_size = fstat(local_file.fileno()).st_size if resume else 0
515         flying = dict()
516         blockid_dict = dict()
517         offset = 0
518         if filerange is not None:
519             rstart = int(filerange.split('-')[0])
520             offset = rstart if blocksize > rstart else rstart % blocksize
521
522         self._init_thread_limit()
523         for block_hash, blockids in remote_hashes.items():
524             blockids = [blk * blocksize for blk in blockids]
525             unsaved = [blk for blk in blockids if not (
526                 blk < file_size and block_hash == self._hash_from_file(
527                         local_file, blk, blocksize, blockhash))]
528             self._cb_next(len(blockids) - len(unsaved))
529             if unsaved:
530                 key = unsaved[0]
531                 self._watch_thread_limit(flying.values())
532                 self._thread2file(
533                     flying, blockid_dict, local_file, offset,
534                     **restargs)
535                 end = total_size - 1 if (
536                     key + blocksize > total_size) else key + blocksize - 1
537                 start, end = _range_up(key, end, filerange)
538                 if start == end:
539                     self._cb_next()
540                     continue
541                 restargs['async_headers'] = {
542                     'Range': 'bytes=%s-%s' % (start, end)}
543                 flying[key] = self._get_block_async(obj, **restargs)
544                 blockid_dict[key] = unsaved
545
546         for thread in flying.values():
547             thread.join()
548         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
549
550     def download_object(
551             self, obj, dst,
552             download_cb=None,
553             version=None,
554             resume=False,
555             range_str=None,
556             if_match=None,
557             if_none_match=None,
558             if_modified_since=None,
559             if_unmodified_since=None):
560         """Download an object (multiple connections, random blocks)
561
562         :param obj: (str) remote object path
563
564         :param dst: open file descriptor (wb+)
565
566         :param download_cb: optional progress.bar object for downloading
567
568         :param version: (str) file version
569
570         :param resume: (bool) if set, preserve already downloaded file parts
571
572         :param range_str: (str) from, to are file positions (int) in bytes
573
574         :param if_match: (str)
575
576         :param if_none_match: (str)
577
578         :param if_modified_since: (str) formated date
579
580         :param if_unmodified_since: (str) formated date"""
581         restargs = dict(
582             version=version,
583             data_range=None if range_str is None else 'bytes=%s' % range_str,
584             if_match=if_match,
585             if_none_match=if_none_match,
586             if_modified_since=if_modified_since,
587             if_unmodified_since=if_unmodified_since)
588
589         (
590             blocksize,
591             blockhash,
592             total_size,
593             hash_list,
594             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
595         assert total_size >= 0
596
597         if download_cb:
598             self.progress_bar_gen = download_cb(len(hash_list))
599             self._cb_next()
600
601         if dst.isatty():
602             self._dump_blocks_sync(
603                 obj,
604                 hash_list,
605                 blocksize,
606                 total_size,
607                 dst,
608                 range_str,
609                 **restargs)
610         else:
611             self._dump_blocks_async(
612                 obj,
613                 remote_hashes,
614                 blocksize,
615                 total_size,
616                 dst,
617                 blockhash,
618                 resume,
619                 range_str,
620                 **restargs)
621             if not range_str:
622                 dst.truncate(total_size)
623
624         self._complete_cb()
625
626     def download_to_string(
627             self, obj,
628             download_cb=None,
629             version=None,
630             range_str=None,
631             if_match=None,
632             if_none_match=None,
633             if_modified_since=None,
634             if_unmodified_since=None):
635         """Download an object to a string (multiple connections). This method
636         uses threads for http requests, but stores all content in memory.
637
638         :param obj: (str) remote object path
639
640         :param download_cb: optional progress.bar object for downloading
641
642         :param version: (str) file version
643
644         :param range_str: (str) from, to are file positions (int) in bytes
645
646         :param if_match: (str)
647
648         :param if_none_match: (str)
649
650         :param if_modified_since: (str) formated date
651
652         :param if_unmodified_since: (str) formated date
653
654         :returns: (str) the whole object contents
655         """
656         restargs = dict(
657             version=version,
658             data_range=None if range_str is None else 'bytes=%s' % range_str,
659             if_match=if_match,
660             if_none_match=if_none_match,
661             if_modified_since=if_modified_since,
662             if_unmodified_since=if_unmodified_since)
663
664         (
665             blocksize,
666             blockhash,
667             total_size,
668             hash_list,
669             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
670         assert total_size >= 0
671
672         if download_cb:
673             self.progress_bar_gen = download_cb(len(hash_list))
674             self._cb_next()
675
676         num_of_blocks = len(remote_hashes)
677         ret = [''] * num_of_blocks
678         self._init_thread_limit()
679         flying = dict()
680         for blockid, blockhash in enumerate(remote_hashes):
681             start = blocksize * blockid
682             is_last = start + blocksize > total_size
683             end = (total_size - 1) if is_last else (start + blocksize - 1)
684             (start, end) = _range_up(start, end, range_str)
685             if start < end:
686                 self._watch_thread_limit(flying.values())
687                 flying[blockid] = self._get_block_async(obj, **restargs)
688             for runid, thread in flying.items():
689                 if (blockid + 1) == num_of_blocks:
690                     thread.join()
691                 elif thread.isAlive():
692                     continue
693                 if thread.exception:
694                     raise thread.exception
695                 ret[runid] = thread.value.content
696                 self._cb_next()
697                 flying.pop(runid)
698         return ''.join(ret)
699
700     #Command Progress Bar method
701     def _cb_next(self, step=1):
702         if hasattr(self, 'progress_bar_gen'):
703             try:
704                 for i in xrange(step):
705                     self.progress_bar_gen.next()
706             except:
707                 pass
708
709     def _complete_cb(self):
710         while True:
711             try:
712                 self.progress_bar_gen.next()
713             except:
714                 break
715
716     def get_object_hashmap(
717             self, obj,
718             version=None,
719             if_match=None,
720             if_none_match=None,
721             if_modified_since=None,
722             if_unmodified_since=None,
723             data_range=None):
724         """
725         :param obj: (str) remote object path
726
727         :param if_match: (str)
728
729         :param if_none_match: (str)
730
731         :param if_modified_since: (str) formated date
732
733         :param if_unmodified_since: (str) formated date
734
735         :param data_range: (str) from-to where from and to are integers
736             denoting file positions in bytes
737
738         :returns: (list)
739         """
740         try:
741             r = self.object_get(
742                 obj,
743                 hashmap=True,
744                 version=version,
745                 if_etag_match=if_match,
746                 if_etag_not_match=if_none_match,
747                 if_modified_since=if_modified_since,
748                 if_unmodified_since=if_unmodified_since,
749                 data_range=data_range)
750         except ClientError as err:
751             if err.status == 304 or err.status == 412:
752                 return {}
753             raise
754         return r.json
755
756     def set_account_group(self, group, usernames):
757         """
758         :param group: (str)
759
760         :param usernames: (list)
761         """
762         self.account_post(update=True, groups={group: usernames})
763
764     def del_account_group(self, group):
765         """
766         :param group: (str)
767         """
768         self.account_post(update=True, groups={group: []})
769
770     def get_account_info(self, until=None):
771         """
772         :param until: (str) formated date
773
774         :returns: (dict)
775         """
776         r = self.account_head(until=until)
777         if r.status_code == 401:
778             raise ClientError("No authorization", status=401)
779         return r.headers
780
781     def get_account_quota(self):
782         """
783         :returns: (dict)
784         """
785         return filter_in(
786             self.get_account_info(),
787             'X-Account-Policy-Quota',
788             exactMatch=True)
789
790     def get_account_versioning(self):
791         """
792         :returns: (dict)
793         """
794         return filter_in(
795             self.get_account_info(),
796             'X-Account-Policy-Versioning',
797             exactMatch=True)
798
799     def get_account_meta(self, until=None):
800         """
801         :meta until: (str) formated date
802
803         :returns: (dict)
804         """
805         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
806
807     def get_account_group(self):
808         """
809         :returns: (dict)
810         """
811         return filter_in(self.get_account_info(), 'X-Account-Group-')
812
813     def set_account_meta(self, metapairs):
814         """
815         :param metapairs: (dict) {key1:val1, key2:val2, ...}
816         """
817         assert(type(metapairs) is dict)
818         self.account_post(update=True, metadata=metapairs)
819
820     def del_account_meta(self, metakey):
821         """
822         :param metakey: (str) metadatum key
823         """
824         self.account_post(update=True, metadata={metakey: ''})
825
826     """
827     def set_account_quota(self, quota):
828         ""
829         :param quota: (int)
830         ""
831         self.account_post(update=True, quota=quota)
832     """
833
834     def set_account_versioning(self, versioning):
835         """
836         "param versioning: (str)
837         """
838         self.account_post(update=True, versioning=versioning)
839
840     def list_containers(self):
841         """
842         :returns: (dict)
843         """
844         r = self.account_get()
845         return r.json
846
847     def del_container(self, until=None, delimiter=None):
848         """
849         :param until: (str) formated date
850
851         :param delimiter: (str) with / empty container
852
853         :raises ClientError: 404 Container does not exist
854
855         :raises ClientError: 409 Container is not empty
856         """
857         self._assert_container()
858         r = self.container_delete(
859             until=until,
860             delimiter=delimiter,
861             success=(204, 404, 409))
862         if r.status_code == 404:
863             raise ClientError(
864                 'Container "%s" does not exist' % self.container,
865                 r.status_code)
866         elif r.status_code == 409:
867             raise ClientError(
868                 'Container "%s" is not empty' % self.container,
869                 r.status_code)
870
871     def get_container_versioning(self, container=None):
872         """
873         :param container: (str)
874
875         :returns: (dict)
876         """
877         cnt_back_up = self.container
878         try:
879             self.container = container or cnt_back_up
880             return filter_in(
881                 self.get_container_info(),
882                 'X-Container-Policy-Versioning')
883         finally:
884             self.container = cnt_back_up
885
886     def get_container_limit(self, container=None):
887         """
888         :param container: (str)
889
890         :returns: (dict)
891         """
892         cnt_back_up = self.container
893         try:
894             self.container = container or cnt_back_up
895             return filter_in(
896                 self.get_container_info(),
897                 'X-Container-Policy-Quota')
898         finally:
899             self.container = cnt_back_up
900
901     def get_container_info(self, until=None):
902         """
903         :param until: (str) formated date
904
905         :returns: (dict)
906
907         :raises ClientError: 404 Container not found
908         """
909         try:
910             r = self.container_head(until=until)
911         except ClientError as err:
912             err.details.append('for container %s' % self.container)
913             raise err
914         return r.headers
915
916     def get_container_meta(self, until=None):
917         """
918         :param until: (str) formated date
919
920         :returns: (dict)
921         """
922         return filter_in(
923             self.get_container_info(until=until),
924             'X-Container-Meta')
925
926     def get_container_object_meta(self, until=None):
927         """
928         :param until: (str) formated date
929
930         :returns: (dict)
931         """
932         return filter_in(
933             self.get_container_info(until=until),
934             'X-Container-Object-Meta')
935
936     def set_container_meta(self, metapairs):
937         """
938         :param metapairs: (dict) {key1:val1, key2:val2, ...}
939         """
940         assert(type(metapairs) is dict)
941         self.container_post(update=True, metadata=metapairs)
942
943     def del_container_meta(self, metakey):
944         """
945         :param metakey: (str) metadatum key
946         """
947         self.container_post(update=True, metadata={metakey: ''})
948
949     def set_container_limit(self, limit):
950         """
951         :param limit: (int)
952         """
953         self.container_post(update=True, quota=limit)
954
955     def set_container_versioning(self, versioning):
956         """
957         :param versioning: (str)
958         """
959         self.container_post(update=True, versioning=versioning)
960
961     def del_object(self, obj, until=None, delimiter=None):
962         """
963         :param obj: (str) remote object path
964
965         :param until: (str) formated date
966
967         :param delimiter: (str)
968         """
969         self._assert_container()
970         self.object_delete(obj, until=until, delimiter=delimiter)
971
972     def set_object_meta(self, obj, metapairs):
973         """
974         :param obj: (str) remote object path
975
976         :param metapairs: (dict) {key1:val1, key2:val2, ...}
977         """
978         assert(type(metapairs) is dict)
979         self.object_post(obj, update=True, metadata=metapairs)
980
981     def del_object_meta(self, obj, metakey):
982         """
983         :param obj: (str) remote object path
984
985         :param metakey: (str) metadatum key
986         """
987         self.object_post(obj, update=True, metadata={metakey: ''})
988
989     def publish_object(self, obj):
990         """
991         :param obj: (str) remote object path
992
993         :returns: (str) access url
994         """
995         self.object_post(obj, update=True, public=True)
996         info = self.get_object_info(obj)
997         pref, sep, rest = self.base_url.partition('//')
998         base = rest.split('/')[0]
999         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1000
1001     def unpublish_object(self, obj):
1002         """
1003         :param obj: (str) remote object path
1004         """
1005         self.object_post(obj, update=True, public=False)
1006
1007     def get_object_info(self, obj, version=None):
1008         """
1009         :param obj: (str) remote object path
1010
1011         :param version: (str)
1012
1013         :returns: (dict)
1014         """
1015         try:
1016             r = self.object_head(obj, version=version)
1017             return r.headers
1018         except ClientError as ce:
1019             if ce.status == 404:
1020                 raise ClientError('Object %s not found' % obj, status=404)
1021             raise
1022
1023     def get_object_meta(self, obj, version=None):
1024         """
1025         :param obj: (str) remote object path
1026
1027         :param version: (str)
1028
1029         :returns: (dict)
1030         """
1031         return filter_in(
1032             self.get_object_info(obj, version=version),
1033             'X-Object-Meta')
1034
1035     def get_object_sharing(self, obj):
1036         """
1037         :param obj: (str) remote object path
1038
1039         :returns: (dict)
1040         """
1041         r = filter_in(
1042             self.get_object_info(obj),
1043             'X-Object-Sharing',
1044             exactMatch=True)
1045         reply = {}
1046         if len(r) > 0:
1047             perms = r['x-object-sharing'].split(';')
1048             for perm in perms:
1049                 try:
1050                     perm.index('=')
1051                 except ValueError:
1052                     raise ClientError('Incorrect reply format')
1053                 (key, val) = perm.strip().split('=')
1054                 reply[key] = val
1055         return reply
1056
1057     def set_object_sharing(
1058             self, obj,
1059             read_permition=False, write_permition=False):
1060         """Give read/write permisions to an object.
1061
1062         :param obj: (str) remote object path
1063
1064         :param read_permition: (list - bool) users and user groups that get
1065             read permition for this object - False means all previous read
1066             permissions will be removed
1067
1068         :param write_perimition: (list - bool) of users and user groups to get
1069            write permition for this object - False means all previous write
1070            permissions will be removed
1071         """
1072
1073         perms = dict(read=read_permition or '', write=write_permition or '')
1074         self.object_post(obj, update=True, permissions=perms)
1075
1076     def del_object_sharing(self, obj):
1077         """
1078         :param obj: (str) remote object path
1079         """
1080         self.set_object_sharing(obj)
1081
1082     def append_object(self, obj, source_file, upload_cb=None):
1083         """
1084         :param obj: (str) remote object path
1085
1086         :param source_file: open file descriptor
1087
1088         :param upload_db: progress.bar for uploading
1089         """
1090
1091         self._assert_container()
1092         meta = self.get_container_info()
1093         blocksize = int(meta['x-container-block-size'])
1094         filesize = fstat(source_file.fileno()).st_size
1095         nblocks = 1 + (filesize - 1) // blocksize
1096         offset = 0
1097         if upload_cb:
1098             upload_gen = upload_cb(nblocks)
1099             upload_gen.next()
1100         for i in range(nblocks):
1101             block = source_file.read(min(blocksize, filesize - offset))
1102             offset += len(block)
1103             self.object_post(
1104                 obj,
1105                 update=True,
1106                 content_range='bytes */*',
1107                 content_type='application/octet-stream',
1108                 content_length=len(block),
1109                 data=block)
1110
1111             if upload_cb:
1112                 upload_gen.next()
1113
1114     def truncate_object(self, obj, upto_bytes):
1115         """
1116         :param obj: (str) remote object path
1117
1118         :param upto_bytes: max number of bytes to leave on file
1119         """
1120         self.object_post(
1121             obj,
1122             update=True,
1123             content_range='bytes 0-%s/*' % upto_bytes,
1124             content_type='application/octet-stream',
1125             object_bytes=upto_bytes,
1126             source_object=path4url(self.container, obj))
1127
1128     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1129         """Overwrite a part of an object from local source file
1130
1131         :param obj: (str) remote object path
1132
1133         :param start: (int) position in bytes to start overwriting from
1134
1135         :param end: (int) position in bytes to stop overwriting at
1136
1137         :param source_file: open file descriptor
1138
1139         :param upload_db: progress.bar for uploading
1140         """
1141
1142         r = self.get_object_info(obj)
1143         rf_size = int(r['content-length'])
1144         if rf_size < int(start):
1145             raise ClientError(
1146                 'Range start exceeds file size',
1147                 status=416)
1148         elif rf_size < int(end):
1149             raise ClientError(
1150                 'Range end exceeds file size',
1151                 status=416)
1152         self._assert_container()
1153         meta = self.get_container_info()
1154         blocksize = int(meta['x-container-block-size'])
1155         filesize = fstat(source_file.fileno()).st_size
1156         datasize = int(end) - int(start) + 1
1157         nblocks = 1 + (datasize - 1) // blocksize
1158         offset = 0
1159         if upload_cb:
1160             upload_gen = upload_cb(nblocks)
1161             upload_gen.next()
1162         for i in range(nblocks):
1163             read_size = min(blocksize, filesize - offset, datasize - offset)
1164             block = source_file.read(read_size)
1165             self.object_post(
1166                 obj,
1167                 update=True,
1168                 content_type='application/octet-stream',
1169                 content_length=len(block),
1170                 content_range='bytes %s-%s/*' % (
1171                     start + offset,
1172                     start + offset + len(block) - 1),
1173                 data=block)
1174             offset += len(block)
1175
1176             if upload_cb:
1177                 upload_gen.next()
1178
1179     def copy_object(
1180             self, src_container, src_object, dst_container,
1181             dst_object=None,
1182             source_version=None,
1183             source_account=None,
1184             public=False,
1185             content_type=None,
1186             delimiter=None):
1187         """
1188         :param src_container: (str) source container
1189
1190         :param src_object: (str) source object path
1191
1192         :param dst_container: (str) destination container
1193
1194         :param dst_object: (str) destination object path
1195
1196         :param source_version: (str) source object version
1197
1198         :param source_account: (str) account to copy from
1199
1200         :param public: (bool)
1201
1202         :param content_type: (str)
1203
1204         :param delimiter: (str)
1205         """
1206         self._assert_account()
1207         self.container = dst_container
1208         src_path = path4url(src_container, src_object)
1209         self.object_put(
1210             dst_object or src_object,
1211             success=201,
1212             copy_from=src_path,
1213             content_length=0,
1214             source_version=source_version,
1215             source_account=source_account,
1216             public=public,
1217             content_type=content_type,
1218             delimiter=delimiter)
1219
1220     def move_object(
1221             self, src_container, src_object, dst_container,
1222             dst_object=False,
1223             source_account=None,
1224             source_version=None,
1225             public=False,
1226             content_type=None,
1227             delimiter=None):
1228         """
1229         :param src_container: (str) source container
1230
1231         :param src_object: (str) source object path
1232
1233         :param dst_container: (str) destination container
1234
1235         :param dst_object: (str) destination object path
1236
1237         :param source_account: (str) account to move from
1238
1239         :param source_version: (str) source object version
1240
1241         :param public: (bool)
1242
1243         :param content_type: (str)
1244
1245         :param delimiter: (str)
1246         """
1247         self._assert_account()
1248         self.container = dst_container
1249         dst_object = dst_object or src_object
1250         src_path = path4url(src_container, src_object)
1251         self.object_put(
1252             dst_object,
1253             success=201,
1254             move_from=src_path,
1255             content_length=0,
1256             source_account=source_account,
1257             source_version=source_version,
1258             public=public,
1259             content_type=content_type,
1260             delimiter=delimiter)
1261
1262     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1263         """Get accounts that share with self.account
1264
1265         :param limit: (str)
1266
1267         :param marker: (str)
1268
1269         :returns: (dict)
1270         """
1271         self._assert_account()
1272
1273         self.set_param('format', 'json')
1274         self.set_param('limit', limit, iff=limit is not None)
1275         self.set_param('marker', marker, iff=marker is not None)
1276
1277         path = ''
1278         success = kwargs.pop('success', (200, 204))
1279         r = self.get(path, *args, success=success, **kwargs)
1280         return r.json
1281
1282     def get_object_versionlist(self, obj):
1283         """
1284         :param obj: (str) remote object path
1285
1286         :returns: (list)
1287         """
1288         self._assert_container()
1289         r = self.object_get(obj, format='json', version='list')
1290         return r.json['versions']