Do not recalculate #blocks in upload_from_string
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115
116         :returns: (dict) created object metadata
117         """
118         self._assert_container()
119
120         if withHashFile:
121             data = f.read()
122             try:
123                 import json
124                 data = json.dumps(json.loads(data))
125             except ValueError:
126                 raise ClientError('"%s" is not json-formated' % f.name, 1)
127             except SyntaxError:
128                 msg = '"%s" is not a valid hashmap file' % f.name
129                 raise ClientError(msg, 1)
130             f = StringIO(data)
131         else:
132             data = f.read(size) if size else f.read()
133         r = self.object_put(
134             obj,
135             data=data,
136             etag=etag,
137             content_encoding=content_encoding,
138             content_disposition=content_disposition,
139             content_type=content_type,
140             permissions=sharing,
141             public=public,
142             success=201)
143         return r.headers
144
145     def create_object_by_manifestation(
146             self, obj,
147             etag=None,
148             content_encoding=None,
149             content_disposition=None,
150             content_type=None,
151             sharing=None,
152             public=None):
153         """
154         :param obj: (str) remote object path
155
156         :param etag: (str)
157
158         :param content_encoding: (str)
159
160         :param content_disposition: (str)
161
162         :param content_type: (str)
163
164         :param sharing: {'read':[user and/or grp names],
165             'write':[usr and/or grp names]}
166
167         :param public: (bool)
168
169         :returns: (dict) created object metadata
170         """
171         self._assert_container()
172         r = self.object_put(
173             obj,
174             content_length=0,
175             etag=etag,
176             content_encoding=content_encoding,
177             content_disposition=content_disposition,
178             content_type=content_type,
179             permissions=sharing,
180             public=public,
181             manifest='%s/%s' % (self.container, obj))
182         return r.headers
183
184     # upload_* auxiliary methods
185     def _put_block_async(self, data, hash):
186         event = SilentEvent(method=self._put_block, data=data, hash=hash)
187         event.start()
188         return event
189
190     def _put_block(self, data, hash):
191         r = self.container_post(
192             update=True,
193             content_type='application/octet-stream',
194             content_length=len(data),
195             data=data,
196             format='json')
197         assert r.json[0] == hash, 'Local hash does not match server'
198
199     def _get_file_block_info(self, fileobj, size=None, cache=None):
200         """
201         :param fileobj: (file descriptor) source
202
203         :param size: (int) size of data to upload from source
204
205         :param cache: (dict) if provided, cache container info response to
206         avoid redundant calls
207         """
208         if isinstance(cache, dict):
209             try:
210                 meta = cache[self.container]
211             except KeyError:
212                 meta = self.get_container_info()
213                 cache[self.container] = meta
214         else:
215             meta = self.get_container_info()
216         blocksize = int(meta['x-container-block-size'])
217         blockhash = meta['x-container-block-hash']
218         size = size if size is not None else fstat(fileobj.fileno()).st_size
219         nblocks = 1 + (size - 1) // blocksize
220         return (blocksize, blockhash, size, nblocks)
221
222     def _create_object_or_get_missing_hashes(
223             self, obj, json,
224             size=None,
225             format='json',
226             hashmap=True,
227             content_type=None,
228             if_etag_match=None,
229             if_etag_not_match=None,
230             content_encoding=None,
231             content_disposition=None,
232             permissions=None,
233             public=None,
234             success=(201, 409)):
235         r = self.object_put(
236             obj,
237             format='json',
238             hashmap=True,
239             content_type=content_type,
240             json=json,
241             if_etag_match=if_etag_match,
242             if_etag_not_match=if_etag_not_match,
243             content_encoding=content_encoding,
244             content_disposition=content_disposition,
245             permissions=permissions,
246             public=public,
247             success=success)
248         return (None if r.status_code == 201 else r.json), r.headers
249
250     def _calculate_blocks_for_upload(
251             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
252             hash_cb=None):
253         offset = 0
254         if hash_cb:
255             hash_gen = hash_cb(nblocks)
256             hash_gen.next()
257
258         for i in range(nblocks):
259             block = fileobj.read(min(blocksize, size - offset))
260             bytes = len(block)
261             hash = _pithos_hash(block, blockhash)
262             hashes.append(hash)
263             hmap[hash] = (offset, bytes)
264             offset += bytes
265             if hash_cb:
266                 hash_gen.next()
267         msg = 'Failed to calculate uploaded blocks:'
268         ' Offset and object size do not match'
269         assert offset == size, msg
270
271     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
272         """upload missing blocks asynchronously"""
273
274         self._init_thread_limit()
275
276         flying = []
277         failures = []
278         for hash in missing:
279             offset, bytes = hmap[hash]
280             fileobj.seek(offset)
281             data = fileobj.read(bytes)
282             r = self._put_block_async(data, hash)
283             flying.append(r)
284             unfinished = self._watch_thread_limit(flying)
285             for thread in set(flying).difference(unfinished):
286                 if thread.exception:
287                     failures.append(thread)
288                     if isinstance(
289                             thread.exception,
290                             ClientError) and thread.exception.status == 502:
291                         self.POOLSIZE = self._thread_limit
292                 elif thread.isAlive():
293                     flying.append(thread)
294                 elif upload_gen:
295                     try:
296                         upload_gen.next()
297                     except:
298                         pass
299             flying = unfinished
300
301         for thread in flying:
302             thread.join()
303             if thread.exception:
304                 failures.append(thread)
305             elif upload_gen:
306                 try:
307                     upload_gen.next()
308                 except:
309                     pass
310
311         return [failure.kwargs['hash'] for failure in failures]
312
313     def upload_object(
314             self, obj, f,
315             size=None,
316             hash_cb=None,
317             upload_cb=None,
318             etag=None,
319             if_etag_match=None,
320             if_not_exist=None,
321             content_encoding=None,
322             content_disposition=None,
323             content_type=None,
324             sharing=None,
325             public=None,
326             container_info_cache=None):
327         """Upload an object using multiple connections (threads)
328
329         :param obj: (str) remote object path
330
331         :param f: open file descriptor (rb)
332
333         :param hash_cb: optional progress.bar object for calculating hashes
334
335         :param upload_cb: optional progress.bar object for uploading
336
337         :param etag: (str)
338
339         :param if_etag_match: (str) Push that value to if-match header at file
340             creation
341
342         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
343             it does not exist remotely, otherwise the operation will fail.
344             Involves the case of an object with the same path is created while
345             the object is being uploaded.
346
347         :param content_encoding: (str)
348
349         :param content_disposition: (str)
350
351         :param content_type: (str)
352
353         :param sharing: {'read':[user and/or grp names],
354             'write':[usr and/or grp names]}
355
356         :param public: (bool)
357
358         :param container_info_cache: (dict) if given, avoid redundant calls to
359         server for container info (block size and hash information)
360         """
361         self._assert_container()
362
363         block_info = (
364             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
365                 f, size, container_info_cache)
366         (hashes, hmap, offset) = ([], {}, 0)
367         if not content_type:
368             content_type = 'application/octet-stream'
369
370         self._calculate_blocks_for_upload(
371             *block_info,
372             hashes=hashes,
373             hmap=hmap,
374             fileobj=f,
375             hash_cb=hash_cb)
376
377         hashmap = dict(bytes=size, hashes=hashes)
378         missing, obj_headers = self._create_object_or_get_missing_hashes(
379             obj, hashmap,
380             content_type=content_type,
381             size=size,
382             if_etag_match=if_etag_match,
383             if_etag_not_match='*' if if_not_exist else None,
384             content_encoding=content_encoding,
385             content_disposition=content_disposition,
386             permissions=sharing,
387             public=public)
388
389         if missing is None:
390             return obj_headers
391
392         if upload_cb:
393             upload_gen = upload_cb(len(missing))
394             for i in range(len(missing), len(hashmap['hashes']) + 1):
395                 try:
396                     upload_gen.next()
397                 except:
398                     upload_gen = None
399         else:
400             upload_gen = None
401
402         retries = 7
403         try:
404             while retries:
405                 sendlog.info('%s blocks missing' % len(missing))
406                 num_of_blocks = len(missing)
407                 missing = self._upload_missing_blocks(
408                     missing,
409                     hmap,
410                     f,
411                     upload_gen)
412                 if missing:
413                     if num_of_blocks == len(missing):
414                         retries -= 1
415                     else:
416                         num_of_blocks = len(missing)
417                 else:
418                     break
419             if missing:
420                 raise ClientError(
421                     '%s blocks failed to upload' % len(missing),
422                     details=['%s' % thread.exception for thread in missing])
423         except KeyboardInterrupt:
424             sendlog.info('- - - wait for threads to finish')
425             for thread in activethreads():
426                 thread.join()
427             raise
428
429         r = self.object_put(
430             obj,
431             format='json',
432             hashmap=True,
433             content_type=content_type,
434             if_etag_match=if_etag_match,
435             if_etag_not_match='*' if if_not_exist else None,
436             etag=etag,
437             json=hashmap,
438             permissions=sharing,
439             public=public,
440             success=201)
441         return r.headers
442
443     def upload_from_string(
444             self, obj, input_str,
445             hash_cb=None,
446             upload_cb=None,
447             etag=None,
448             if_etag_match=None,
449             if_not_exist=None,
450             content_encoding=None,
451             content_disposition=None,
452             content_type=None,
453             sharing=None,
454             public=None,
455             container_info_cache=None):
456         """Upload an object using multiple connections (threads)
457
458         :param obj: (str) remote object path
459
460         :param input_str: (str) upload content
461
462         :param hash_cb: optional progress.bar object for calculating hashes
463
464         :param upload_cb: optional progress.bar object for uploading
465
466         :param etag: (str)
467
468         :param if_etag_match: (str) Push that value to if-match header at file
469             creation
470
471         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
472             it does not exist remotely, otherwise the operation will fail.
473             Involves the case of an object with the same path is created while
474             the object is being uploaded.
475
476         :param content_encoding: (str)
477
478         :param content_disposition: (str)
479
480         :param content_type: (str)
481
482         :param sharing: {'read':[user and/or grp names],
483             'write':[usr and/or grp names]}
484
485         :param public: (bool)
486
487         :param container_info_cache: (dict) if given, avoid redundant calls to
488         server for container info (block size and hash information)
489         """
490         self._assert_container()
491
492         blocksize, blockhash, size, nblocks = self._get_file_block_info(
493                 fileobj=None, size=len(input_str), cache=container_info_cache)
494         (hashes, hmap, offset) = ([], {}, 0)
495         if not content_type:
496             content_type = 'application/octet-stream'
497
498         hashes = []
499         hmap = {}
500         for blockid in range(nblocks):
501             start = blockid * blocksize
502             block = input_str[start: (start + blocksize)]
503             hashes.append(_pithos_hash(block, blockhash))
504             hmap[hashes[blockid]] = (start, block)
505
506         hashmap = dict(bytes=size, hashes=hashes)
507         missing, obj_headers = self._create_object_or_get_missing_hashes(
508             obj, hashmap,
509             content_type=content_type,
510             size=size,
511             if_etag_match=if_etag_match,
512             if_etag_not_match='*' if if_not_exist else None,
513             content_encoding=content_encoding,
514             content_disposition=content_disposition,
515             permissions=sharing,
516             public=public)
517         if missing is None:
518             return obj_headers
519         num_of_missing = len(missing)
520
521         if upload_cb:
522             self.progress_bar_gen = upload_cb(nblocks)
523             for i in range(nblocks + 1 - num_of_missing):
524                 self._cb_next()
525
526         tries = 7
527         old_failures = 0
528         try:
529             while tries and missing:
530                 flying = []
531                 failures = []
532                 for hash in missing:
533                     offset, block = hmap[hash]
534                     bird = self._put_block_async(block, hash)
535                     flying.append(bird)
536                     unfinished = self._watch_thread_limit(flying)
537                     for thread in set(flying).difference(unfinished):
538                         if thread.exception:
539                             failures.append(thread.kwargs['hash'])
540                         if thread.isAlive():
541                             flying.append(thread)
542                         else:
543                             self._cb_next()
544                     flying = unfinished
545                 for thread in flying:
546                     thread.join()
547                     if thread.exception:
548                         failures.append(thread.kwargs['hash'])
549                     self._cb_next()
550                 missing = failures
551                 if missing and len(missing) == old_failures:
552                     tries -= 1
553                 old_failures = len(missing)
554             if missing:
555                 raise ClientError(
556                     '%s blocks failed to upload' % len(missing),
557                     details=['%s' % thread.exception for thread in missing])
558         except KeyboardInterrupt:
559             sendlog.info('- - - wait for threads to finish')
560             for thread in activethreads():
561                 thread.join()
562             raise
563
564         r = self.object_put(
565             obj,
566             format='json',
567             hashmap=True,
568             content_type=content_type,
569             if_etag_match=if_etag_match,
570             if_etag_not_match='*' if if_not_exist else None,
571             etag=etag,
572             json=hashmap,
573             permissions=sharing,
574             public=public,
575             success=201)
576         return r.headers
577
578     # download_* auxiliary methods
579     def _get_remote_blocks_info(self, obj, **restargs):
580         #retrieve object hashmap
581         myrange = restargs.pop('data_range', None)
582         hashmap = self.get_object_hashmap(obj, **restargs)
583         restargs['data_range'] = myrange
584         blocksize = int(hashmap['block_size'])
585         blockhash = hashmap['block_hash']
586         total_size = hashmap['bytes']
587         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
588         map_dict = {}
589         for i, h in enumerate(hashmap['hashes']):
590             #  map_dict[h] = i   CHAGE
591             if h in map_dict:
592                 map_dict[h].append(i)
593             else:
594                 map_dict[h] = [i]
595         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
596
597     def _dump_blocks_sync(
598             self, obj, remote_hashes, blocksize, total_size, dst, range,
599             **args):
600         for blockid, blockhash in enumerate(remote_hashes):
601             if blockhash:
602                 start = blocksize * blockid
603                 is_last = start + blocksize > total_size
604                 end = (total_size - 1) if is_last else (start + blocksize - 1)
605                 (start, end) = _range_up(start, end, range)
606                 args['data_range'] = 'bytes=%s-%s' % (start, end)
607                 r = self.object_get(obj, success=(200, 206), **args)
608                 self._cb_next()
609                 dst.write(r.content)
610                 dst.flush()
611
612     def _get_block_async(self, obj, **args):
613         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
614         event.start()
615         return event
616
617     def _hash_from_file(self, fp, start, size, blockhash):
618         fp.seek(start)
619         block = fp.read(size)
620         h = newhashlib(blockhash)
621         h.update(block.strip('\x00'))
622         return hexlify(h.digest())
623
624     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
625         """write the results of a greenleted rest call to a file
626
627         :param offset: the offset of the file up to blocksize
628         - e.g. if the range is 10-100, all blocks will be written to
629         normal_position - 10
630         """
631         for key, g in flying.items():
632             if g.isAlive():
633                 continue
634             if g.exception:
635                 raise g.exception
636             block = g.value.content
637             for block_start in blockids[key]:
638                 local_file.seek(block_start + offset)
639                 local_file.write(block)
640                 self._cb_next()
641             flying.pop(key)
642             blockids.pop(key)
643         local_file.flush()
644
645     def _dump_blocks_async(
646             self, obj, remote_hashes, blocksize, total_size, local_file,
647             blockhash=None, resume=False, filerange=None, **restargs):
648         file_size = fstat(local_file.fileno()).st_size if resume else 0
649         flying = dict()
650         blockid_dict = dict()
651         offset = 0
652         if filerange is not None:
653             rstart = int(filerange.split('-')[0])
654             offset = rstart if blocksize > rstart else rstart % blocksize
655
656         self._init_thread_limit()
657         for block_hash, blockids in remote_hashes.items():
658             blockids = [blk * blocksize for blk in blockids]
659             unsaved = [blk for blk in blockids if not (
660                 blk < file_size and block_hash == self._hash_from_file(
661                         local_file, blk, blocksize, blockhash))]
662             self._cb_next(len(blockids) - len(unsaved))
663             if unsaved:
664                 key = unsaved[0]
665                 self._watch_thread_limit(flying.values())
666                 self._thread2file(
667                     flying, blockid_dict, local_file, offset,
668                     **restargs)
669                 end = total_size - 1 if (
670                     key + blocksize > total_size) else key + blocksize - 1
671                 start, end = _range_up(key, end, filerange)
672                 if start == end:
673                     self._cb_next()
674                     continue
675                 restargs['async_headers'] = {
676                     'Range': 'bytes=%s-%s' % (start, end)}
677                 flying[key] = self._get_block_async(obj, **restargs)
678                 blockid_dict[key] = unsaved
679
680         for thread in flying.values():
681             thread.join()
682         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
683
684     def download_object(
685             self, obj, dst,
686             download_cb=None,
687             version=None,
688             resume=False,
689             range_str=None,
690             if_match=None,
691             if_none_match=None,
692             if_modified_since=None,
693             if_unmodified_since=None):
694         """Download an object (multiple connections, random blocks)
695
696         :param obj: (str) remote object path
697
698         :param dst: open file descriptor (wb+)
699
700         :param download_cb: optional progress.bar object for downloading
701
702         :param version: (str) file version
703
704         :param resume: (bool) if set, preserve already downloaded file parts
705
706         :param range_str: (str) from, to are file positions (int) in bytes
707
708         :param if_match: (str)
709
710         :param if_none_match: (str)
711
712         :param if_modified_since: (str) formated date
713
714         :param if_unmodified_since: (str) formated date"""
715         restargs = dict(
716             version=version,
717             data_range=None if range_str is None else 'bytes=%s' % range_str,
718             if_match=if_match,
719             if_none_match=if_none_match,
720             if_modified_since=if_modified_since,
721             if_unmodified_since=if_unmodified_since)
722
723         (
724             blocksize,
725             blockhash,
726             total_size,
727             hash_list,
728             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
729         assert total_size >= 0
730
731         if download_cb:
732             self.progress_bar_gen = download_cb(len(hash_list))
733             self._cb_next()
734
735         if dst.isatty():
736             self._dump_blocks_sync(
737                 obj,
738                 hash_list,
739                 blocksize,
740                 total_size,
741                 dst,
742                 range_str,
743                 **restargs)
744         else:
745             self._dump_blocks_async(
746                 obj,
747                 remote_hashes,
748                 blocksize,
749                 total_size,
750                 dst,
751                 blockhash,
752                 resume,
753                 range_str,
754                 **restargs)
755             if not range_str:
756                 dst.truncate(total_size)
757
758         self._complete_cb()
759
760     def download_to_string(
761             self, obj,
762             download_cb=None,
763             version=None,
764             range_str=None,
765             if_match=None,
766             if_none_match=None,
767             if_modified_since=None,
768             if_unmodified_since=None):
769         """Download an object to a string (multiple connections). This method
770         uses threads for http requests, but stores all content in memory.
771
772         :param obj: (str) remote object path
773
774         :param download_cb: optional progress.bar object for downloading
775
776         :param version: (str) file version
777
778         :param range_str: (str) from, to are file positions (int) in bytes
779
780         :param if_match: (str)
781
782         :param if_none_match: (str)
783
784         :param if_modified_since: (str) formated date
785
786         :param if_unmodified_since: (str) formated date
787
788         :returns: (str) the whole object contents
789         """
790         restargs = dict(
791             version=version,
792             data_range=None if range_str is None else 'bytes=%s' % range_str,
793             if_match=if_match,
794             if_none_match=if_none_match,
795             if_modified_since=if_modified_since,
796             if_unmodified_since=if_unmodified_since)
797
798         (
799             blocksize,
800             blockhash,
801             total_size,
802             hash_list,
803             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
804         assert total_size >= 0
805
806         if download_cb:
807             self.progress_bar_gen = download_cb(len(hash_list))
808             self._cb_next()
809
810         num_of_blocks = len(remote_hashes)
811         ret = [''] * num_of_blocks
812         self._init_thread_limit()
813         flying = dict()
814         for blockid, blockhash in enumerate(remote_hashes):
815             start = blocksize * blockid
816             is_last = start + blocksize > total_size
817             end = (total_size - 1) if is_last else (start + blocksize - 1)
818             (start, end) = _range_up(start, end, range_str)
819             if start < end:
820                 self._watch_thread_limit(flying.values())
821                 flying[blockid] = self._get_block_async(obj, **restargs)
822             for runid, thread in flying.items():
823                 if (blockid + 1) == num_of_blocks:
824                     thread.join()
825                 elif thread.isAlive():
826                     continue
827                 if thread.exception:
828                     raise thread.exception
829                 ret[runid] = thread.value.content
830                 self._cb_next()
831                 flying.pop(runid)
832         return ''.join(ret)
833
834     #Command Progress Bar method
835     def _cb_next(self, step=1):
836         if hasattr(self, 'progress_bar_gen'):
837             try:
838                 for i in xrange(step):
839                     self.progress_bar_gen.next()
840             except:
841                 pass
842
843     def _complete_cb(self):
844         while True:
845             try:
846                 self.progress_bar_gen.next()
847             except:
848                 break
849
850     def get_object_hashmap(
851             self, obj,
852             version=None,
853             if_match=None,
854             if_none_match=None,
855             if_modified_since=None,
856             if_unmodified_since=None,
857             data_range=None):
858         """
859         :param obj: (str) remote object path
860
861         :param if_match: (str)
862
863         :param if_none_match: (str)
864
865         :param if_modified_since: (str) formated date
866
867         :param if_unmodified_since: (str) formated date
868
869         :param data_range: (str) from-to where from and to are integers
870             denoting file positions in bytes
871
872         :returns: (list)
873         """
874         try:
875             r = self.object_get(
876                 obj,
877                 hashmap=True,
878                 version=version,
879                 if_etag_match=if_match,
880                 if_etag_not_match=if_none_match,
881                 if_modified_since=if_modified_since,
882                 if_unmodified_since=if_unmodified_since,
883                 data_range=data_range)
884         except ClientError as err:
885             if err.status == 304 or err.status == 412:
886                 return {}
887             raise
888         return r.json
889
890     def set_account_group(self, group, usernames):
891         """
892         :param group: (str)
893
894         :param usernames: (list)
895         """
896         self.account_post(update=True, groups={group: usernames})
897
898     def del_account_group(self, group):
899         """
900         :param group: (str)
901         """
902         self.account_post(update=True, groups={group: []})
903
904     def get_account_info(self, until=None):
905         """
906         :param until: (str) formated date
907
908         :returns: (dict)
909         """
910         r = self.account_head(until=until)
911         if r.status_code == 401:
912             raise ClientError("No authorization", status=401)
913         return r.headers
914
915     def get_account_quota(self):
916         """
917         :returns: (dict)
918         """
919         return filter_in(
920             self.get_account_info(),
921             'X-Account-Policy-Quota',
922             exactMatch=True)
923
924     def get_account_versioning(self):
925         """
926         :returns: (dict)
927         """
928         return filter_in(
929             self.get_account_info(),
930             'X-Account-Policy-Versioning',
931             exactMatch=True)
932
933     def get_account_meta(self, until=None):
934         """
935         :meta until: (str) formated date
936
937         :returns: (dict)
938         """
939         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
940
941     def get_account_group(self):
942         """
943         :returns: (dict)
944         """
945         return filter_in(self.get_account_info(), 'X-Account-Group-')
946
947     def set_account_meta(self, metapairs):
948         """
949         :param metapairs: (dict) {key1:val1, key2:val2, ...}
950         """
951         assert(type(metapairs) is dict)
952         self.account_post(update=True, metadata=metapairs)
953
954     def del_account_meta(self, metakey):
955         """
956         :param metakey: (str) metadatum key
957         """
958         self.account_post(update=True, metadata={metakey: ''})
959
960     """
961     def set_account_quota(self, quota):
962         ""
963         :param quota: (int)
964         ""
965         self.account_post(update=True, quota=quota)
966     """
967
968     def set_account_versioning(self, versioning):
969         """
970         "param versioning: (str)
971         """
972         self.account_post(update=True, versioning=versioning)
973
974     def list_containers(self):
975         """
976         :returns: (dict)
977         """
978         r = self.account_get()
979         return r.json
980
981     def del_container(self, until=None, delimiter=None):
982         """
983         :param until: (str) formated date
984
985         :param delimiter: (str) with / empty container
986
987         :raises ClientError: 404 Container does not exist
988
989         :raises ClientError: 409 Container is not empty
990         """
991         self._assert_container()
992         r = self.container_delete(
993             until=until,
994             delimiter=delimiter,
995             success=(204, 404, 409))
996         if r.status_code == 404:
997             raise ClientError(
998                 'Container "%s" does not exist' % self.container,
999                 r.status_code)
1000         elif r.status_code == 409:
1001             raise ClientError(
1002                 'Container "%s" is not empty' % self.container,
1003                 r.status_code)
1004
1005     def get_container_versioning(self, container=None):
1006         """
1007         :param container: (str)
1008
1009         :returns: (dict)
1010         """
1011         cnt_back_up = self.container
1012         try:
1013             self.container = container or cnt_back_up
1014             return filter_in(
1015                 self.get_container_info(),
1016                 'X-Container-Policy-Versioning')
1017         finally:
1018             self.container = cnt_back_up
1019
1020     def get_container_limit(self, container=None):
1021         """
1022         :param container: (str)
1023
1024         :returns: (dict)
1025         """
1026         cnt_back_up = self.container
1027         try:
1028             self.container = container or cnt_back_up
1029             return filter_in(
1030                 self.get_container_info(),
1031                 'X-Container-Policy-Quota')
1032         finally:
1033             self.container = cnt_back_up
1034
1035     def get_container_info(self, until=None):
1036         """
1037         :param until: (str) formated date
1038
1039         :returns: (dict)
1040
1041         :raises ClientError: 404 Container not found
1042         """
1043         try:
1044             r = self.container_head(until=until)
1045         except ClientError as err:
1046             err.details.append('for container %s' % self.container)
1047             raise err
1048         return r.headers
1049
1050     def get_container_meta(self, until=None):
1051         """
1052         :param until: (str) formated date
1053
1054         :returns: (dict)
1055         """
1056         return filter_in(
1057             self.get_container_info(until=until),
1058             'X-Container-Meta')
1059
1060     def get_container_object_meta(self, until=None):
1061         """
1062         :param until: (str) formated date
1063
1064         :returns: (dict)
1065         """
1066         return filter_in(
1067             self.get_container_info(until=until),
1068             'X-Container-Object-Meta')
1069
1070     def set_container_meta(self, metapairs):
1071         """
1072         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1073         """
1074         assert(type(metapairs) is dict)
1075         self.container_post(update=True, metadata=metapairs)
1076
1077     def del_container_meta(self, metakey):
1078         """
1079         :param metakey: (str) metadatum key
1080         """
1081         self.container_post(update=True, metadata={metakey: ''})
1082
1083     def set_container_limit(self, limit):
1084         """
1085         :param limit: (int)
1086         """
1087         self.container_post(update=True, quota=limit)
1088
1089     def set_container_versioning(self, versioning):
1090         """
1091         :param versioning: (str)
1092         """
1093         self.container_post(update=True, versioning=versioning)
1094
1095     def del_object(self, obj, until=None, delimiter=None):
1096         """
1097         :param obj: (str) remote object path
1098
1099         :param until: (str) formated date
1100
1101         :param delimiter: (str)
1102         """
1103         self._assert_container()
1104         self.object_delete(obj, until=until, delimiter=delimiter)
1105
1106     def set_object_meta(self, obj, metapairs):
1107         """
1108         :param obj: (str) remote object path
1109
1110         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1111         """
1112         assert(type(metapairs) is dict)
1113         self.object_post(obj, update=True, metadata=metapairs)
1114
1115     def del_object_meta(self, obj, metakey):
1116         """
1117         :param obj: (str) remote object path
1118
1119         :param metakey: (str) metadatum key
1120         """
1121         self.object_post(obj, update=True, metadata={metakey: ''})
1122
1123     def publish_object(self, obj):
1124         """
1125         :param obj: (str) remote object path
1126
1127         :returns: (str) access url
1128         """
1129         self.object_post(obj, update=True, public=True)
1130         info = self.get_object_info(obj)
1131         pref, sep, rest = self.base_url.partition('//')
1132         base = rest.split('/')[0]
1133         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1134
1135     def unpublish_object(self, obj):
1136         """
1137         :param obj: (str) remote object path
1138         """
1139         self.object_post(obj, update=True, public=False)
1140
1141     def get_object_info(self, obj, version=None):
1142         """
1143         :param obj: (str) remote object path
1144
1145         :param version: (str)
1146
1147         :returns: (dict)
1148         """
1149         try:
1150             r = self.object_head(obj, version=version)
1151             return r.headers
1152         except ClientError as ce:
1153             if ce.status == 404:
1154                 raise ClientError('Object %s not found' % obj, status=404)
1155             raise
1156
1157     def get_object_meta(self, obj, version=None):
1158         """
1159         :param obj: (str) remote object path
1160
1161         :param version: (str)
1162
1163         :returns: (dict)
1164         """
1165         return filter_in(
1166             self.get_object_info(obj, version=version),
1167             'X-Object-Meta')
1168
1169     def get_object_sharing(self, obj):
1170         """
1171         :param obj: (str) remote object path
1172
1173         :returns: (dict)
1174         """
1175         r = filter_in(
1176             self.get_object_info(obj),
1177             'X-Object-Sharing',
1178             exactMatch=True)
1179         reply = {}
1180         if len(r) > 0:
1181             perms = r['x-object-sharing'].split(';')
1182             for perm in perms:
1183                 try:
1184                     perm.index('=')
1185                 except ValueError:
1186                     raise ClientError('Incorrect reply format')
1187                 (key, val) = perm.strip().split('=')
1188                 reply[key] = val
1189         return reply
1190
1191     def set_object_sharing(
1192             self, obj,
1193             read_permition=False, write_permition=False):
1194         """Give read/write permisions to an object.
1195
1196         :param obj: (str) remote object path
1197
1198         :param read_permition: (list - bool) users and user groups that get
1199             read permition for this object - False means all previous read
1200             permissions will be removed
1201
1202         :param write_perimition: (list - bool) of users and user groups to get
1203            write permition for this object - False means all previous write
1204            permissions will be removed
1205         """
1206
1207         perms = dict(read=read_permition or '', write=write_permition or '')
1208         self.object_post(obj, update=True, permissions=perms)
1209
1210     def del_object_sharing(self, obj):
1211         """
1212         :param obj: (str) remote object path
1213         """
1214         self.set_object_sharing(obj)
1215
1216     def append_object(self, obj, source_file, upload_cb=None):
1217         """
1218         :param obj: (str) remote object path
1219
1220         :param source_file: open file descriptor
1221
1222         :param upload_db: progress.bar for uploading
1223         """
1224
1225         self._assert_container()
1226         meta = self.get_container_info()
1227         blocksize = int(meta['x-container-block-size'])
1228         filesize = fstat(source_file.fileno()).st_size
1229         nblocks = 1 + (filesize - 1) // blocksize
1230         offset = 0
1231         if upload_cb:
1232             upload_gen = upload_cb(nblocks)
1233             upload_gen.next()
1234         for i in range(nblocks):
1235             block = source_file.read(min(blocksize, filesize - offset))
1236             offset += len(block)
1237             self.object_post(
1238                 obj,
1239                 update=True,
1240                 content_range='bytes */*',
1241                 content_type='application/octet-stream',
1242                 content_length=len(block),
1243                 data=block)
1244
1245             if upload_cb:
1246                 upload_gen.next()
1247
1248     def truncate_object(self, obj, upto_bytes):
1249         """
1250         :param obj: (str) remote object path
1251
1252         :param upto_bytes: max number of bytes to leave on file
1253         """
1254         self.object_post(
1255             obj,
1256             update=True,
1257             content_range='bytes 0-%s/*' % upto_bytes,
1258             content_type='application/octet-stream',
1259             object_bytes=upto_bytes,
1260             source_object=path4url(self.container, obj))
1261
1262     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1263         """Overwrite a part of an object from local source file
1264
1265         :param obj: (str) remote object path
1266
1267         :param start: (int) position in bytes to start overwriting from
1268
1269         :param end: (int) position in bytes to stop overwriting at
1270
1271         :param source_file: open file descriptor
1272
1273         :param upload_db: progress.bar for uploading
1274         """
1275
1276         r = self.get_object_info(obj)
1277         rf_size = int(r['content-length'])
1278         if rf_size < int(start):
1279             raise ClientError(
1280                 'Range start exceeds file size',
1281                 status=416)
1282         elif rf_size < int(end):
1283             raise ClientError(
1284                 'Range end exceeds file size',
1285                 status=416)
1286         self._assert_container()
1287         meta = self.get_container_info()
1288         blocksize = int(meta['x-container-block-size'])
1289         filesize = fstat(source_file.fileno()).st_size
1290         datasize = int(end) - int(start) + 1
1291         nblocks = 1 + (datasize - 1) // blocksize
1292         offset = 0
1293         if upload_cb:
1294             upload_gen = upload_cb(nblocks)
1295             upload_gen.next()
1296         for i in range(nblocks):
1297             read_size = min(blocksize, filesize - offset, datasize - offset)
1298             block = source_file.read(read_size)
1299             self.object_post(
1300                 obj,
1301                 update=True,
1302                 content_type='application/octet-stream',
1303                 content_length=len(block),
1304                 content_range='bytes %s-%s/*' % (
1305                     start + offset,
1306                     start + offset + len(block) - 1),
1307                 data=block)
1308             offset += len(block)
1309
1310             if upload_cb:
1311                 upload_gen.next()
1312
1313     def copy_object(
1314             self, src_container, src_object, dst_container,
1315             dst_object=None,
1316             source_version=None,
1317             source_account=None,
1318             public=False,
1319             content_type=None,
1320             delimiter=None):
1321         """
1322         :param src_container: (str) source container
1323
1324         :param src_object: (str) source object path
1325
1326         :param dst_container: (str) destination container
1327
1328         :param dst_object: (str) destination object path
1329
1330         :param source_version: (str) source object version
1331
1332         :param source_account: (str) account to copy from
1333
1334         :param public: (bool)
1335
1336         :param content_type: (str)
1337
1338         :param delimiter: (str)
1339         """
1340         self._assert_account()
1341         self.container = dst_container
1342         src_path = path4url(src_container, src_object)
1343         self.object_put(
1344             dst_object or src_object,
1345             success=201,
1346             copy_from=src_path,
1347             content_length=0,
1348             source_version=source_version,
1349             source_account=source_account,
1350             public=public,
1351             content_type=content_type,
1352             delimiter=delimiter)
1353
1354     def move_object(
1355             self, src_container, src_object, dst_container,
1356             dst_object=False,
1357             source_account=None,
1358             source_version=None,
1359             public=False,
1360             content_type=None,
1361             delimiter=None):
1362         """
1363         :param src_container: (str) source container
1364
1365         :param src_object: (str) source object path
1366
1367         :param dst_container: (str) destination container
1368
1369         :param dst_object: (str) destination object path
1370
1371         :param source_account: (str) account to move from
1372
1373         :param source_version: (str) source object version
1374
1375         :param public: (bool)
1376
1377         :param content_type: (str)
1378
1379         :param delimiter: (str)
1380         """
1381         self._assert_account()
1382         self.container = dst_container
1383         dst_object = dst_object or src_object
1384         src_path = path4url(src_container, src_object)
1385         self.object_put(
1386             dst_object,
1387             success=201,
1388             move_from=src_path,
1389             content_length=0,
1390             source_account=source_account,
1391             source_version=source_version,
1392             public=public,
1393             content_type=content_type,
1394             delimiter=delimiter)
1395
1396     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1397         """Get accounts that share with self.account
1398
1399         :param limit: (str)
1400
1401         :param marker: (str)
1402
1403         :returns: (dict)
1404         """
1405         self._assert_account()
1406
1407         self.set_param('format', 'json')
1408         self.set_param('limit', limit, iff=limit is not None)
1409         self.set_param('marker', marker, iff=marker is not None)
1410
1411         path = ''
1412         success = kwargs.pop('success', (200, 204))
1413         r = self.get(path, *args, success=success, **kwargs)
1414         return r.json
1415
1416     def get_object_versionlist(self, obj):
1417         """
1418         :param obj: (str) remote object path
1419
1420         :returns: (list)
1421         """
1422         self._assert_container()
1423         r = self.object_get(obj, format='json', version='list')
1424         return r.json['versions']