Restore thread-wait for append
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def create_container(
75             self,
76             container=None, sizelimit=None, versioning=None, metadata=None):
77         """
78         :param container: (str) if not given, self.container is used instead
79
80         :param sizelimit: (int) container total size limit in bytes
81
82         :param versioning: (str) can be auto or whatever supported by server
83
84         :param metadata: (dict) Custom user-defined metadata of the form
85             { 'name1': 'value1', 'name2': 'value2', ... }
86
87         :returns: (dict) response headers
88         """
89         cnt_back_up = self.container
90         try:
91             self.container = container or cnt_back_up
92             r = self.container_put(
93                 quota=sizelimit, versioning=versioning, metadata=metadata)
94             return r.headers
95         finally:
96             self.container = cnt_back_up
97
98     def purge_container(self, container=None):
99         """Delete an empty container and destroy associated blocks
100         """
101         cnt_back_up = self.container
102         try:
103             self.container = container or cnt_back_up
104             r = self.container_delete(until=unicode(time()))
105         finally:
106             self.container = cnt_back_up
107         return r.headers
108
109     def upload_object_unchunked(
110             self, obj, f,
111             withHashFile=False,
112             size=None,
113             etag=None,
114             content_encoding=None,
115             content_disposition=None,
116             content_type=None,
117             sharing=None,
118             public=None):
119         """
120         :param obj: (str) remote object path
121
122         :param f: open file descriptor
123
124         :param withHashFile: (bool)
125
126         :param size: (int) size of data to upload
127
128         :param etag: (str)
129
130         :param content_encoding: (str)
131
132         :param content_disposition: (str)
133
134         :param content_type: (str)
135
136         :param sharing: {'read':[user and/or grp names],
137             'write':[usr and/or grp names]}
138
139         :param public: (bool)
140
141         :returns: (dict) created object metadata
142         """
143         self._assert_container()
144
145         if withHashFile:
146             data = f.read()
147             try:
148                 import json
149                 data = json.dumps(json.loads(data))
150             except ValueError:
151                 raise ClientError('"%s" is not json-formated' % f.name, 1)
152             except SyntaxError:
153                 msg = '"%s" is not a valid hashmap file' % f.name
154                 raise ClientError(msg, 1)
155             f = StringIO(data)
156         else:
157             data = f.read(size) if size else f.read()
158         r = self.object_put(
159             obj,
160             data=data,
161             etag=etag,
162             content_encoding=content_encoding,
163             content_disposition=content_disposition,
164             content_type=content_type,
165             permissions=sharing,
166             public=public,
167             success=201)
168         return r.headers
169
170     def create_object_by_manifestation(
171             self, obj,
172             etag=None,
173             content_encoding=None,
174             content_disposition=None,
175             content_type=None,
176             sharing=None,
177             public=None):
178         """
179         :param obj: (str) remote object path
180
181         :param etag: (str)
182
183         :param content_encoding: (str)
184
185         :param content_disposition: (str)
186
187         :param content_type: (str)
188
189         :param sharing: {'read':[user and/or grp names],
190             'write':[usr and/or grp names]}
191
192         :param public: (bool)
193
194         :returns: (dict) created object metadata
195         """
196         self._assert_container()
197         r = self.object_put(
198             obj,
199             content_length=0,
200             etag=etag,
201             content_encoding=content_encoding,
202             content_disposition=content_disposition,
203             content_type=content_type,
204             permissions=sharing,
205             public=public,
206             manifest='%s/%s' % (self.container, obj))
207         return r.headers
208
209     # upload_* auxiliary methods
210     def _put_block_async(self, data, hash):
211         event = SilentEvent(method=self._put_block, data=data, hash=hash)
212         event.start()
213         return event
214
215     def _put_block(self, data, hash):
216         r = self.container_post(
217             update=True,
218             content_type='application/octet-stream',
219             content_length=len(data),
220             data=data,
221             format='json')
222         assert r.json[0] == hash, 'Local hash does not match server'
223
224     def _get_file_block_info(self, fileobj, size=None, cache=None):
225         """
226         :param fileobj: (file descriptor) source
227
228         :param size: (int) size of data to upload from source
229
230         :param cache: (dict) if provided, cache container info response to
231         avoid redundant calls
232         """
233         if isinstance(cache, dict):
234             try:
235                 meta = cache[self.container]
236             except KeyError:
237                 meta = self.get_container_info()
238                 cache[self.container] = meta
239         else:
240             meta = self.get_container_info()
241         blocksize = int(meta['x-container-block-size'])
242         blockhash = meta['x-container-block-hash']
243         size = size if size is not None else fstat(fileobj.fileno()).st_size
244         nblocks = 1 + (size - 1) // blocksize
245         return (blocksize, blockhash, size, nblocks)
246
247     def _create_object_or_get_missing_hashes(
248             self, obj, json,
249             size=None,
250             format='json',
251             hashmap=True,
252             content_type=None,
253             if_etag_match=None,
254             if_etag_not_match=None,
255             content_encoding=None,
256             content_disposition=None,
257             permissions=None,
258             public=None,
259             success=(201, 409)):
260         r = self.object_put(
261             obj,
262             format='json',
263             hashmap=True,
264             content_type=content_type,
265             json=json,
266             if_etag_match=if_etag_match,
267             if_etag_not_match=if_etag_not_match,
268             content_encoding=content_encoding,
269             content_disposition=content_disposition,
270             permissions=permissions,
271             public=public,
272             success=success)
273         return (None if r.status_code == 201 else r.json), r.headers
274
275     def _calculate_blocks_for_upload(
276             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277             hash_cb=None):
278         offset = 0
279         if hash_cb:
280             hash_gen = hash_cb(nblocks)
281             hash_gen.next()
282
283         for i in range(nblocks):
284             block = fileobj.read(min(blocksize, size - offset))
285             bytes = len(block)
286             hash = _pithos_hash(block, blockhash)
287             hashes.append(hash)
288             hmap[hash] = (offset, bytes)
289             offset += bytes
290             if hash_cb:
291                 hash_gen.next()
292         msg = 'Failed to calculate uploaded blocks:'
293         ' Offset and object size do not match'
294         assert offset == size, msg
295
296     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297         """upload missing blocks asynchronously"""
298
299         self._init_thread_limit()
300
301         flying = []
302         failures = []
303         for hash in missing:
304             offset, bytes = hmap[hash]
305             fileobj.seek(offset)
306             data = fileobj.read(bytes)
307             r = self._put_block_async(data, hash)
308             flying.append(r)
309             unfinished = self._watch_thread_limit(flying)
310             for thread in set(flying).difference(unfinished):
311                 if thread.exception:
312                     failures.append(thread)
313                     if isinstance(
314                             thread.exception,
315                             ClientError) and thread.exception.status == 502:
316                         self.POOLSIZE = self._thread_limit
317                 elif thread.isAlive():
318                     flying.append(thread)
319                 elif upload_gen:
320                     try:
321                         upload_gen.next()
322                     except:
323                         pass
324             flying = unfinished
325
326         for thread in flying:
327             thread.join()
328             if thread.exception:
329                 failures.append(thread)
330             elif upload_gen:
331                 try:
332                     upload_gen.next()
333                 except:
334                     pass
335
336         return [failure.kwargs['hash'] for failure in failures]
337
338     def upload_object(
339             self, obj, f,
340             size=None,
341             hash_cb=None,
342             upload_cb=None,
343             etag=None,
344             if_etag_match=None,
345             if_not_exist=None,
346             content_encoding=None,
347             content_disposition=None,
348             content_type=None,
349             sharing=None,
350             public=None,
351             container_info_cache=None):
352         """Upload an object using multiple connections (threads)
353
354         :param obj: (str) remote object path
355
356         :param f: open file descriptor (rb)
357
358         :param hash_cb: optional progress.bar object for calculating hashes
359
360         :param upload_cb: optional progress.bar object for uploading
361
362         :param etag: (str)
363
364         :param if_etag_match: (str) Push that value to if-match header at file
365             creation
366
367         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368             it does not exist remotely, otherwise the operation will fail.
369             Involves the case of an object with the same path is created while
370             the object is being uploaded.
371
372         :param content_encoding: (str)
373
374         :param content_disposition: (str)
375
376         :param content_type: (str)
377
378         :param sharing: {'read':[user and/or grp names],
379             'write':[usr and/or grp names]}
380
381         :param public: (bool)
382
383         :param container_info_cache: (dict) if given, avoid redundant calls to
384         server for container info (block size and hash information)
385         """
386         self._assert_container()
387
388         block_info = (
389             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390                 f, size, container_info_cache)
391         (hashes, hmap, offset) = ([], {}, 0)
392         if not content_type:
393             content_type = 'application/octet-stream'
394
395         self._calculate_blocks_for_upload(
396             *block_info,
397             hashes=hashes,
398             hmap=hmap,
399             fileobj=f,
400             hash_cb=hash_cb)
401
402         hashmap = dict(bytes=size, hashes=hashes)
403         missing, obj_headers = self._create_object_or_get_missing_hashes(
404             obj, hashmap,
405             content_type=content_type,
406             size=size,
407             if_etag_match=if_etag_match,
408             if_etag_not_match='*' if if_not_exist else None,
409             content_encoding=content_encoding,
410             content_disposition=content_disposition,
411             permissions=sharing,
412             public=public)
413
414         if missing is None:
415             return obj_headers
416
417         if upload_cb:
418             upload_gen = upload_cb(len(missing))
419             for i in range(len(missing), len(hashmap['hashes']) + 1):
420                 try:
421                     upload_gen.next()
422                 except:
423                     upload_gen = None
424         else:
425             upload_gen = None
426
427         retries = 7
428         try:
429             while retries:
430                 sendlog.info('%s blocks missing' % len(missing))
431                 num_of_blocks = len(missing)
432                 missing = self._upload_missing_blocks(
433                     missing,
434                     hmap,
435                     f,
436                     upload_gen)
437                 if missing:
438                     if num_of_blocks == len(missing):
439                         retries -= 1
440                     else:
441                         num_of_blocks = len(missing)
442                 else:
443                     break
444             if missing:
445                 raise ClientError(
446                     '%s blocks failed to upload' % len(missing),
447                     details=['%s' % thread.exception for thread in missing])
448         except KeyboardInterrupt:
449             sendlog.info('- - - wait for threads to finish')
450             for thread in activethreads():
451                 thread.join()
452             raise
453
454         r = self.object_put(
455             obj,
456             format='json',
457             hashmap=True,
458             content_type=content_type,
459             if_etag_match=if_etag_match,
460             if_etag_not_match='*' if if_not_exist else None,
461             etag=etag,
462             json=hashmap,
463             permissions=sharing,
464             public=public,
465             success=201)
466         return r.headers
467
468     def upload_from_string(
469             self, obj, input_str,
470             hash_cb=None,
471             upload_cb=None,
472             etag=None,
473             if_etag_match=None,
474             if_not_exist=None,
475             content_encoding=None,
476             content_disposition=None,
477             content_type=None,
478             sharing=None,
479             public=None,
480             container_info_cache=None):
481         """Upload an object using multiple connections (threads)
482
483         :param obj: (str) remote object path
484
485         :param input_str: (str) upload content
486
487         :param hash_cb: optional progress.bar object for calculating hashes
488
489         :param upload_cb: optional progress.bar object for uploading
490
491         :param etag: (str)
492
493         :param if_etag_match: (str) Push that value to if-match header at file
494             creation
495
496         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497             it does not exist remotely, otherwise the operation will fail.
498             Involves the case of an object with the same path is created while
499             the object is being uploaded.
500
501         :param content_encoding: (str)
502
503         :param content_disposition: (str)
504
505         :param content_type: (str)
506
507         :param sharing: {'read':[user and/or grp names],
508             'write':[usr and/or grp names]}
509
510         :param public: (bool)
511
512         :param container_info_cache: (dict) if given, avoid redundant calls to
513         server for container info (block size and hash information)
514         """
515         self._assert_container()
516
517         blocksize, blockhash, size, nblocks = self._get_file_block_info(
518                 fileobj=None, size=len(input_str), cache=container_info_cache)
519         (hashes, hmap, offset) = ([], {}, 0)
520         if not content_type:
521             content_type = 'application/octet-stream'
522
523         hashes = []
524         hmap = {}
525         for blockid in range(nblocks):
526             start = blockid * blocksize
527             block = input_str[start: (start + blocksize)]
528             hashes.append(_pithos_hash(block, blockhash))
529             hmap[hashes[blockid]] = (start, block)
530
531         hashmap = dict(bytes=size, hashes=hashes)
532         missing, obj_headers = self._create_object_or_get_missing_hashes(
533             obj, hashmap,
534             content_type=content_type,
535             size=size,
536             if_etag_match=if_etag_match,
537             if_etag_not_match='*' if if_not_exist else None,
538             content_encoding=content_encoding,
539             content_disposition=content_disposition,
540             permissions=sharing,
541             public=public)
542         if missing is None:
543             return obj_headers
544         num_of_missing = len(missing)
545
546         if upload_cb:
547             self.progress_bar_gen = upload_cb(nblocks)
548             for i in range(nblocks + 1 - num_of_missing):
549                 self._cb_next()
550
551         tries = 7
552         old_failures = 0
553         try:
554             while tries and missing:
555                 flying = []
556                 failures = []
557                 for hash in missing:
558                     offset, block = hmap[hash]
559                     bird = self._put_block_async(block, hash)
560                     flying.append(bird)
561                     unfinished = self._watch_thread_limit(flying)
562                     for thread in set(flying).difference(unfinished):
563                         if thread.exception:
564                             failures.append(thread.kwargs['hash'])
565                         if thread.isAlive():
566                             flying.append(thread)
567                         else:
568                             self._cb_next()
569                     flying = unfinished
570                 for thread in flying:
571                     thread.join()
572                     if thread.exception:
573                         failures.append(thread.kwargs['hash'])
574                     self._cb_next()
575                 missing = failures
576                 if missing and len(missing) == old_failures:
577                     tries -= 1
578                 old_failures = len(missing)
579             if missing:
580                 raise ClientError(
581                     '%s blocks failed to upload' % len(missing),
582                     details=['%s' % thread.exception for thread in missing])
583         except KeyboardInterrupt:
584             sendlog.info('- - - wait for threads to finish')
585             for thread in activethreads():
586                 thread.join()
587             raise
588
589         r = self.object_put(
590             obj,
591             format='json',
592             hashmap=True,
593             content_type=content_type,
594             if_etag_match=if_etag_match,
595             if_etag_not_match='*' if if_not_exist else None,
596             etag=etag,
597             json=hashmap,
598             permissions=sharing,
599             public=public,
600             success=201)
601         return r.headers
602
603     # download_* auxiliary methods
604     def _get_remote_blocks_info(self, obj, **restargs):
605         #retrieve object hashmap
606         myrange = restargs.pop('data_range', None)
607         hashmap = self.get_object_hashmap(obj, **restargs)
608         restargs['data_range'] = myrange
609         blocksize = int(hashmap['block_size'])
610         blockhash = hashmap['block_hash']
611         total_size = hashmap['bytes']
612         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
613         map_dict = {}
614         for i, h in enumerate(hashmap['hashes']):
615             #  map_dict[h] = i   CHAGE
616             if h in map_dict:
617                 map_dict[h].append(i)
618             else:
619                 map_dict[h] = [i]
620         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
621
622     def _dump_blocks_sync(
623             self, obj, remote_hashes, blocksize, total_size, dst, range,
624             **args):
625         for blockid, blockhash in enumerate(remote_hashes):
626             if blockhash:
627                 start = blocksize * blockid
628                 is_last = start + blocksize > total_size
629                 end = (total_size - 1) if is_last else (start + blocksize - 1)
630                 (start, end) = _range_up(start, end, range)
631                 args['data_range'] = 'bytes=%s-%s' % (start, end)
632                 r = self.object_get(obj, success=(200, 206), **args)
633                 self._cb_next()
634                 dst.write(r.content)
635                 dst.flush()
636
637     def _get_block_async(self, obj, **args):
638         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
639         event.start()
640         return event
641
642     def _hash_from_file(self, fp, start, size, blockhash):
643         fp.seek(start)
644         block = fp.read(size)
645         h = newhashlib(blockhash)
646         h.update(block.strip('\x00'))
647         return hexlify(h.digest())
648
649     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
650         """write the results of a greenleted rest call to a file
651
652         :param offset: the offset of the file up to blocksize
653         - e.g. if the range is 10-100, all blocks will be written to
654         normal_position - 10
655         """
656         for key, g in flying.items():
657             if g.isAlive():
658                 continue
659             if g.exception:
660                 raise g.exception
661             block = g.value.content
662             for block_start in blockids[key]:
663                 local_file.seek(block_start + offset)
664                 local_file.write(block)
665                 self._cb_next()
666             flying.pop(key)
667             blockids.pop(key)
668         local_file.flush()
669
670     def _dump_blocks_async(
671             self, obj, remote_hashes, blocksize, total_size, local_file,
672             blockhash=None, resume=False, filerange=None, **restargs):
673         file_size = fstat(local_file.fileno()).st_size if resume else 0
674         flying = dict()
675         blockid_dict = dict()
676         offset = 0
677         if filerange is not None:
678             rstart = int(filerange.split('-')[0])
679             offset = rstart if blocksize > rstart else rstart % blocksize
680
681         self._init_thread_limit()
682         for block_hash, blockids in remote_hashes.items():
683             blockids = [blk * blocksize for blk in blockids]
684             unsaved = [blk for blk in blockids if not (
685                 blk < file_size and block_hash == self._hash_from_file(
686                         local_file, blk, blocksize, blockhash))]
687             self._cb_next(len(blockids) - len(unsaved))
688             if unsaved:
689                 key = unsaved[0]
690                 self._watch_thread_limit(flying.values())
691                 self._thread2file(
692                     flying, blockid_dict, local_file, offset,
693                     **restargs)
694                 end = total_size - 1 if (
695                     key + blocksize > total_size) else key + blocksize - 1
696                 start, end = _range_up(key, end, filerange)
697                 if start == end:
698                     self._cb_next()
699                     continue
700                 restargs['async_headers'] = {
701                     'Range': 'bytes=%s-%s' % (start, end)}
702                 flying[key] = self._get_block_async(obj, **restargs)
703                 blockid_dict[key] = unsaved
704
705         for thread in flying.values():
706             thread.join()
707         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
708
709     def download_object(
710             self, obj, dst,
711             download_cb=None,
712             version=None,
713             resume=False,
714             range_str=None,
715             if_match=None,
716             if_none_match=None,
717             if_modified_since=None,
718             if_unmodified_since=None):
719         """Download an object (multiple connections, random blocks)
720
721         :param obj: (str) remote object path
722
723         :param dst: open file descriptor (wb+)
724
725         :param download_cb: optional progress.bar object for downloading
726
727         :param version: (str) file version
728
729         :param resume: (bool) if set, preserve already downloaded file parts
730
731         :param range_str: (str) from, to are file positions (int) in bytes
732
733         :param if_match: (str)
734
735         :param if_none_match: (str)
736
737         :param if_modified_since: (str) formated date
738
739         :param if_unmodified_since: (str) formated date"""
740         restargs = dict(
741             version=version,
742             data_range=None if range_str is None else 'bytes=%s' % range_str,
743             if_match=if_match,
744             if_none_match=if_none_match,
745             if_modified_since=if_modified_since,
746             if_unmodified_since=if_unmodified_since)
747
748         (
749             blocksize,
750             blockhash,
751             total_size,
752             hash_list,
753             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
754         assert total_size >= 0
755
756         if download_cb:
757             self.progress_bar_gen = download_cb(len(hash_list))
758             self._cb_next()
759
760         if dst.isatty():
761             self._dump_blocks_sync(
762                 obj,
763                 hash_list,
764                 blocksize,
765                 total_size,
766                 dst,
767                 range_str,
768                 **restargs)
769         else:
770             self._dump_blocks_async(
771                 obj,
772                 remote_hashes,
773                 blocksize,
774                 total_size,
775                 dst,
776                 blockhash,
777                 resume,
778                 range_str,
779                 **restargs)
780             if not range_str:
781                 dst.truncate(total_size)
782
783         self._complete_cb()
784
785     def download_to_string(
786             self, obj,
787             download_cb=None,
788             version=None,
789             range_str=None,
790             if_match=None,
791             if_none_match=None,
792             if_modified_since=None,
793             if_unmodified_since=None):
794         """Download an object to a string (multiple connections). This method
795         uses threads for http requests, but stores all content in memory.
796
797         :param obj: (str) remote object path
798
799         :param download_cb: optional progress.bar object for downloading
800
801         :param version: (str) file version
802
803         :param range_str: (str) from, to are file positions (int) in bytes
804
805         :param if_match: (str)
806
807         :param if_none_match: (str)
808
809         :param if_modified_since: (str) formated date
810
811         :param if_unmodified_since: (str) formated date
812
813         :returns: (str) the whole object contents
814         """
815         restargs = dict(
816             version=version,
817             data_range=None if range_str is None else 'bytes=%s' % range_str,
818             if_match=if_match,
819             if_none_match=if_none_match,
820             if_modified_since=if_modified_since,
821             if_unmodified_since=if_unmodified_since)
822
823         (
824             blocksize,
825             blockhash,
826             total_size,
827             hash_list,
828             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
829         assert total_size >= 0
830
831         if download_cb:
832             self.progress_bar_gen = download_cb(len(hash_list))
833             self._cb_next()
834
835         num_of_blocks = len(remote_hashes)
836         ret = [''] * num_of_blocks
837         self._init_thread_limit()
838         flying = dict()
839         try:
840             for blockid, blockhash in enumerate(remote_hashes):
841                 start = blocksize * blockid
842                 is_last = start + blocksize > total_size
843                 end = (total_size - 1) if is_last else (start + blocksize - 1)
844                 (start, end) = _range_up(start, end, range_str)
845                 if start < end:
846                     self._watch_thread_limit(flying.values())
847                     flying[blockid] = self._get_block_async(obj, **restargs)
848                 for runid, thread in flying.items():
849                     if (blockid + 1) == num_of_blocks:
850                         thread.join()
851                     elif thread.isAlive():
852                         continue
853                     if thread.exception:
854                         raise thread.exception
855                     ret[runid] = thread.value.content
856                     self._cb_next()
857                     flying.pop(runid)
858             return ''.join(ret)
859         except KeyboardInterrupt:
860             sendlog.info('- - - wait for threads to finish')
861             for thread in activethreads():
862                 thread.join()
863
864     #Command Progress Bar method
865     def _cb_next(self, step=1):
866         if hasattr(self, 'progress_bar_gen'):
867             try:
868                 for i in xrange(step):
869                     self.progress_bar_gen.next()
870             except:
871                 pass
872
873     def _complete_cb(self):
874         while True:
875             try:
876                 self.progress_bar_gen.next()
877             except:
878                 break
879
880     def get_object_hashmap(
881             self, obj,
882             version=None,
883             if_match=None,
884             if_none_match=None,
885             if_modified_since=None,
886             if_unmodified_since=None,
887             data_range=None):
888         """
889         :param obj: (str) remote object path
890
891         :param if_match: (str)
892
893         :param if_none_match: (str)
894
895         :param if_modified_since: (str) formated date
896
897         :param if_unmodified_since: (str) formated date
898
899         :param data_range: (str) from-to where from and to are integers
900             denoting file positions in bytes
901
902         :returns: (list)
903         """
904         try:
905             r = self.object_get(
906                 obj,
907                 hashmap=True,
908                 version=version,
909                 if_etag_match=if_match,
910                 if_etag_not_match=if_none_match,
911                 if_modified_since=if_modified_since,
912                 if_unmodified_since=if_unmodified_since,
913                 data_range=data_range)
914         except ClientError as err:
915             if err.status == 304 or err.status == 412:
916                 return {}
917             raise
918         return r.json
919
920     def set_account_group(self, group, usernames):
921         """
922         :param group: (str)
923
924         :param usernames: (list)
925         """
926         r = self.account_post(update=True, groups={group: usernames})
927         return r
928
929     def del_account_group(self, group):
930         """
931         :param group: (str)
932         """
933         self.account_post(update=True, groups={group: []})
934
935     def get_account_info(self, until=None):
936         """
937         :param until: (str) formated date
938
939         :returns: (dict)
940         """
941         r = self.account_head(until=until)
942         if r.status_code == 401:
943             raise ClientError("No authorization", status=401)
944         return r.headers
945
946     def get_account_quota(self):
947         """
948         :returns: (dict)
949         """
950         return filter_in(
951             self.get_account_info(),
952             'X-Account-Policy-Quota',
953             exactMatch=True)
954
955     def get_account_versioning(self):
956         """
957         :returns: (dict)
958         """
959         return filter_in(
960             self.get_account_info(),
961             'X-Account-Policy-Versioning',
962             exactMatch=True)
963
964     def get_account_meta(self, until=None):
965         """
966         :meta until: (str) formated date
967
968         :returns: (dict)
969         """
970         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
971
972     def get_account_group(self):
973         """
974         :returns: (dict)
975         """
976         return filter_in(self.get_account_info(), 'X-Account-Group-')
977
978     def set_account_meta(self, metapairs):
979         """
980         :param metapairs: (dict) {key1:val1, key2:val2, ...}
981         """
982         assert(type(metapairs) is dict)
983         r = self.account_post(update=True, metadata=metapairs)
984         return r.headers
985
986     def del_account_meta(self, metakey):
987         """
988         :param metakey: (str) metadatum key
989         """
990         r = self.account_post(update=True, metadata={metakey: ''})
991         return r.headers
992
993     """
994     def set_account_quota(self, quota):
995         ""
996         :param quota: (int)
997         ""
998         self.account_post(update=True, quota=quota)
999     """
1000
1001     def set_account_versioning(self, versioning):
1002         """
1003         "param versioning: (str)
1004         """
1005         r = self.account_post(update=True, versioning=versioning)
1006         return r.headers
1007
1008     def list_containers(self):
1009         """
1010         :returns: (dict)
1011         """
1012         r = self.account_get()
1013         return r.json
1014
1015     def del_container(self, until=None, delimiter=None):
1016         """
1017         :param until: (str) formated date
1018
1019         :param delimiter: (str) with / empty container
1020
1021         :raises ClientError: 404 Container does not exist
1022
1023         :raises ClientError: 409 Container is not empty
1024         """
1025         self._assert_container()
1026         r = self.container_delete(
1027             until=until,
1028             delimiter=delimiter,
1029             success=(204, 404, 409))
1030         if r.status_code == 404:
1031             raise ClientError(
1032                 'Container "%s" does not exist' % self.container,
1033                 r.status_code)
1034         elif r.status_code == 409:
1035             raise ClientError(
1036                 'Container "%s" is not empty' % self.container,
1037                 r.status_code)
1038         return r.headers
1039
1040     def get_container_versioning(self, container=None):
1041         """
1042         :param container: (str)
1043
1044         :returns: (dict)
1045         """
1046         cnt_back_up = self.container
1047         try:
1048             self.container = container or cnt_back_up
1049             return filter_in(
1050                 self.get_container_info(),
1051                 'X-Container-Policy-Versioning')
1052         finally:
1053             self.container = cnt_back_up
1054
1055     def get_container_limit(self, container=None):
1056         """
1057         :param container: (str)
1058
1059         :returns: (dict)
1060         """
1061         cnt_back_up = self.container
1062         try:
1063             self.container = container or cnt_back_up
1064             return filter_in(
1065                 self.get_container_info(),
1066                 'X-Container-Policy-Quota')
1067         finally:
1068             self.container = cnt_back_up
1069
1070     def get_container_info(self, until=None):
1071         """
1072         :param until: (str) formated date
1073
1074         :returns: (dict)
1075
1076         :raises ClientError: 404 Container not found
1077         """
1078         try:
1079             r = self.container_head(until=until)
1080         except ClientError as err:
1081             err.details.append('for container %s' % self.container)
1082             raise err
1083         return r.headers
1084
1085     def get_container_meta(self, until=None):
1086         """
1087         :param until: (str) formated date
1088
1089         :returns: (dict)
1090         """
1091         return filter_in(
1092             self.get_container_info(until=until),
1093             'X-Container-Meta')
1094
1095     def get_container_object_meta(self, until=None):
1096         """
1097         :param until: (str) formated date
1098
1099         :returns: (dict)
1100         """
1101         return filter_in(
1102             self.get_container_info(until=until),
1103             'X-Container-Object-Meta')
1104
1105     def set_container_meta(self, metapairs):
1106         """
1107         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1108         """
1109         assert(type(metapairs) is dict)
1110         r = self.container_post(update=True, metadata=metapairs)
1111         return r.headers
1112
1113     def del_container_meta(self, metakey):
1114         """
1115         :param metakey: (str) metadatum key
1116
1117         :returns: (dict) response headers
1118         """
1119         r = self.container_post(update=True, metadata={metakey: ''})
1120         return r.headers
1121
1122     def set_container_limit(self, limit):
1123         """
1124         :param limit: (int)
1125         """
1126         r = self.container_post(update=True, quota=limit)
1127         return r.headers
1128
1129     def set_container_versioning(self, versioning):
1130         """
1131         :param versioning: (str)
1132         """
1133         r = self.container_post(update=True, versioning=versioning)
1134         return r.headers
1135
1136     def del_object(self, obj, until=None, delimiter=None):
1137         """
1138         :param obj: (str) remote object path
1139
1140         :param until: (str) formated date
1141
1142         :param delimiter: (str)
1143         """
1144         self._assert_container()
1145         r = self.object_delete(obj, until=until, delimiter=delimiter)
1146         return r.headers
1147
1148     def set_object_meta(self, obj, metapairs):
1149         """
1150         :param obj: (str) remote object path
1151
1152         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1153         """
1154         assert(type(metapairs) is dict)
1155         r = self.object_post(obj, update=True, metadata=metapairs)
1156         return r.headers
1157
1158     def del_object_meta(self, obj, metakey):
1159         """
1160         :param obj: (str) remote object path
1161
1162         :param metakey: (str) metadatum key
1163         """
1164         r = self.object_post(obj, update=True, metadata={metakey: ''})
1165         return r.headers
1166
1167     def publish_object(self, obj):
1168         """
1169         :param obj: (str) remote object path
1170
1171         :returns: (str) access url
1172         """
1173         self.object_post(obj, update=True, public=True)
1174         info = self.get_object_info(obj)
1175         pref, sep, rest = self.base_url.partition('//')
1176         base = rest.split('/')[0]
1177         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1178
1179     def unpublish_object(self, obj):
1180         """
1181         :param obj: (str) remote object path
1182         """
1183         r = self.object_post(obj, update=True, public=False)
1184         return r.headers
1185
1186     def get_object_info(self, obj, version=None):
1187         """
1188         :param obj: (str) remote object path
1189
1190         :param version: (str)
1191
1192         :returns: (dict)
1193         """
1194         try:
1195             r = self.object_head(obj, version=version)
1196             return r.headers
1197         except ClientError as ce:
1198             if ce.status == 404:
1199                 raise ClientError('Object %s not found' % obj, status=404)
1200             raise
1201
1202     def get_object_meta(self, obj, version=None):
1203         """
1204         :param obj: (str) remote object path
1205
1206         :param version: (str)
1207
1208         :returns: (dict)
1209         """
1210         return filter_in(
1211             self.get_object_info(obj, version=version),
1212             'X-Object-Meta')
1213
1214     def get_object_sharing(self, obj):
1215         """
1216         :param obj: (str) remote object path
1217
1218         :returns: (dict)
1219         """
1220         r = filter_in(
1221             self.get_object_info(obj),
1222             'X-Object-Sharing',
1223             exactMatch=True)
1224         reply = {}
1225         if len(r) > 0:
1226             perms = r['x-object-sharing'].split(';')
1227             for perm in perms:
1228                 try:
1229                     perm.index('=')
1230                 except ValueError:
1231                     raise ClientError('Incorrect reply format')
1232                 (key, val) = perm.strip().split('=')
1233                 reply[key] = val
1234         return reply
1235
1236     def set_object_sharing(
1237             self, obj,
1238             read_permission=False, write_permission=False):
1239         """Give read/write permisions to an object.
1240
1241         :param obj: (str) remote object path
1242
1243         :param read_permission: (list - bool) users and user groups that get
1244             read permission for this object - False means all previous read
1245             permissions will be removed
1246
1247         :param write_permission: (list - bool) of users and user groups to get
1248            write permission for this object - False means all previous write
1249            permissions will be removed
1250
1251         :returns: (dict) response headers
1252         """
1253
1254         perms = dict(read=read_permission or '', write=write_permission or '')
1255         r = self.object_post(obj, update=True, permissions=perms)
1256         return r.headers
1257
1258     def del_object_sharing(self, obj):
1259         """
1260         :param obj: (str) remote object path
1261         """
1262         return self.set_object_sharing(obj)
1263
1264     def append_object(self, obj, source_file, upload_cb=None):
1265         """
1266         :param obj: (str) remote object path
1267
1268         :param source_file: open file descriptor
1269
1270         :param upload_db: progress.bar for uploading
1271         """
1272         self._assert_container()
1273         meta = self.get_container_info()
1274         blocksize = int(meta['x-container-block-size'])
1275         filesize = fstat(source_file.fileno()).st_size
1276         nblocks = 1 + (filesize - 1) // blocksize
1277         offset = 0
1278         headers = {}
1279         if upload_cb:
1280             self.progress_bar_gen = upload_cb(nblocks)
1281             self._cb_next()
1282         flying = {}
1283         self._init_thread_limit()
1284         try:
1285             for i in range(nblocks):
1286                 block = source_file.read(min(blocksize, filesize - offset))
1287                 offset += len(block)
1288
1289                 self._watch_thread_limit(flying.values())
1290                 unfinished = {}
1291                 flying[i] = SilentEvent(
1292                     method=self.object_post,
1293                     obj=obj,
1294                     update=True,
1295                     content_range='bytes */*',
1296                     content_type='application/octet-stream',
1297                     content_length=len(block),
1298                     data=block)
1299                 flying[i].start()
1300
1301                 for key, thread in flying.items():
1302                     if thread.isAlive():
1303                         if i < nblocks:
1304                             unfinished[key] = thread
1305                             continue
1306                         thread.join()
1307                     if thread.exception:
1308                         raise thread.exception
1309                     headers[key] = thread.value.headers
1310                     self._cb_next()
1311                 flying = unfinished
1312         except KeyboardInterrupt:
1313             sendlog.info('- - - wait for threads to finish')
1314             for thread in activethreads():
1315                 thread.join()
1316         finally:
1317             from time import sleep
1318             sleep(2 * len(activethreads()))
1319         return headers.values()
1320
1321     def truncate_object(self, obj, upto_bytes):
1322         """
1323         :param obj: (str) remote object path
1324
1325         :param upto_bytes: max number of bytes to leave on file
1326
1327         :returns: (dict) response headers
1328         """
1329         r = self.object_post(
1330             obj,
1331             update=True,
1332             content_range='bytes 0-%s/*' % upto_bytes,
1333             content_type='application/octet-stream',
1334             object_bytes=upto_bytes,
1335             source_object=path4url(self.container, obj))
1336         return r.headers
1337
1338     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1339         """Overwrite a part of an object from local source file
1340
1341         :param obj: (str) remote object path
1342
1343         :param start: (int) position in bytes to start overwriting from
1344
1345         :param end: (int) position in bytes to stop overwriting at
1346
1347         :param source_file: open file descriptor
1348
1349         :param upload_db: progress.bar for uploading
1350         """
1351
1352         r = self.get_object_info(obj)
1353         rf_size = int(r['content-length'])
1354         if rf_size < int(start):
1355             raise ClientError(
1356                 'Range start exceeds file size',
1357                 status=416)
1358         elif rf_size < int(end):
1359             raise ClientError(
1360                 'Range end exceeds file size',
1361                 status=416)
1362         self._assert_container()
1363         meta = self.get_container_info()
1364         blocksize = int(meta['x-container-block-size'])
1365         filesize = fstat(source_file.fileno()).st_size
1366         datasize = int(end) - int(start) + 1
1367         nblocks = 1 + (datasize - 1) // blocksize
1368         offset = 0
1369         if upload_cb:
1370             self.progress_bar_gen = upload_cb(nblocks)
1371             self._cb_next()
1372         headers = []
1373         for i in range(nblocks):
1374             read_size = min(blocksize, filesize - offset, datasize - offset)
1375             block = source_file.read(read_size)
1376             r = self.object_post(
1377                 obj,
1378                 update=True,
1379                 content_type='application/octet-stream',
1380                 content_length=len(block),
1381                 content_range='bytes %s-%s/*' % (
1382                     start + offset,
1383                     start + offset + len(block) - 1),
1384                 data=block)
1385             headers.append(dict(r.headers))
1386             offset += len(block)
1387
1388             self._cb_next
1389         return headers
1390
1391     def copy_object(
1392             self, src_container, src_object, dst_container,
1393             dst_object=None,
1394             source_version=None,
1395             source_account=None,
1396             public=False,
1397             content_type=None,
1398             delimiter=None):
1399         """
1400         :param src_container: (str) source container
1401
1402         :param src_object: (str) source object path
1403
1404         :param dst_container: (str) destination container
1405
1406         :param dst_object: (str) destination object path
1407
1408         :param source_version: (str) source object version
1409
1410         :param source_account: (str) account to copy from
1411
1412         :param public: (bool)
1413
1414         :param content_type: (str)
1415
1416         :param delimiter: (str)
1417
1418         :returns: (dict) response headers
1419         """
1420         self._assert_account()
1421         self.container = dst_container
1422         src_path = path4url(src_container, src_object)
1423         r = self.object_put(
1424             dst_object or src_object,
1425             success=201,
1426             copy_from=src_path,
1427             content_length=0,
1428             source_version=source_version,
1429             source_account=source_account,
1430             public=public,
1431             content_type=content_type,
1432             delimiter=delimiter)
1433         return r.headers
1434
1435     def move_object(
1436             self, src_container, src_object, dst_container,
1437             dst_object=False,
1438             source_account=None,
1439             source_version=None,
1440             public=False,
1441             content_type=None,
1442             delimiter=None):
1443         """
1444         :param src_container: (str) source container
1445
1446         :param src_object: (str) source object path
1447
1448         :param dst_container: (str) destination container
1449
1450         :param dst_object: (str) destination object path
1451
1452         :param source_account: (str) account to move from
1453
1454         :param source_version: (str) source object version
1455
1456         :param public: (bool)
1457
1458         :param content_type: (str)
1459
1460         :param delimiter: (str)
1461
1462         :returns: (dict) response headers
1463         """
1464         self._assert_account()
1465         self.container = dst_container
1466         dst_object = dst_object or src_object
1467         src_path = path4url(src_container, src_object)
1468         r = self.object_put(
1469             dst_object,
1470             success=201,
1471             move_from=src_path,
1472             content_length=0,
1473             source_account=source_account,
1474             source_version=source_version,
1475             public=public,
1476             content_type=content_type,
1477             delimiter=delimiter)
1478         return r.headers
1479
1480     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1481         """Get accounts that share with self.account
1482
1483         :param limit: (str)
1484
1485         :param marker: (str)
1486
1487         :returns: (dict)
1488         """
1489         self._assert_account()
1490
1491         self.set_param('format', 'json')
1492         self.set_param('limit', limit, iff=limit is not None)
1493         self.set_param('marker', marker, iff=marker is not None)
1494
1495         path = ''
1496         success = kwargs.pop('success', (200, 204))
1497         r = self.get(path, *args, success=success, **kwargs)
1498         return r.json
1499
1500     def get_object_versionlist(self, obj):
1501         """
1502         :param obj: (str) remote object path
1503
1504         :returns: (list)
1505         """
1506         self._assert_container()
1507         r = self.object_get(obj, format='json', version='list')
1508         return r.json['versions']