Fix bug w. 0 file syncing
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, max_value, a_range):
56     """
57     :param start: (int) the window bottom
58
59     :param end: (int) the window top
60
61     :param max_value: (int) maximum accepted value
62
63     :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64         where X: x|x-y|-x where x < y and x, y natural numbers
65
66     :returns: (str) a range string cut-off for the start-end range
67         an empty response means this window is out of range
68     """
69     assert start >= 0, '_range_up called w. start(%s) < 0' % start
70     assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71         end, start)
72     assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73         max_value, end)
74     if not a_range:
75         return '%s-%s' % (start, end)
76     selected = []
77     for some_range in a_range.split(','):
78         v0, sep, v1 = some_range.partition('-')
79         if v0:
80             v0 = int(v0)
81             if sep:
82                 v1 = int(v1)
83                 if v1 < start or v0 > end or v1 < v0:
84                     continue
85                 v0 = v0 if v0 > start else start
86                 v1 = v1 if v1 < end else end
87                 selected.append('%s-%s' % (v0, v1))
88             elif v0 < start:
89                 continue
90             else:
91                 v1 = v0 if v0 <= end else end
92                 selected.append('%s-%s' % (start, v1))
93         else:
94             v1 = int(v1)
95             if max_value - v1 > end:
96                 continue
97             v0 = (max_value - v1) if max_value - v1 > start else start
98             selected.append('%s-%s' % (v0, end))
99     return ','.join(selected)
100
101
102 class PithosClient(PithosRestClient):
103     """Synnefo Pithos+ API client"""
104
105     def __init__(self, base_url, token, account=None, container=None):
106         super(PithosClient, self).__init__(base_url, token, account, container)
107
108     def create_container(
109             self,
110             container=None, sizelimit=None, versioning=None, metadata=None,
111             **kwargs):
112         """
113         :param container: (str) if not given, self.container is used instead
114
115         :param sizelimit: (int) container total size limit in bytes
116
117         :param versioning: (str) can be auto or whatever supported by server
118
119         :param metadata: (dict) Custom user-defined metadata of the form
120             { 'name1': 'value1', 'name2': 'value2', ... }
121
122         :returns: (dict) response headers
123         """
124         cnt_back_up = self.container
125         try:
126             self.container = container or cnt_back_up
127             r = self.container_put(
128                 quota=sizelimit, versioning=versioning, metadata=metadata,
129                 **kwargs)
130             return r.headers
131         finally:
132             self.container = cnt_back_up
133
134     def purge_container(self, container=None):
135         """Delete an empty container and destroy associated blocks
136         """
137         cnt_back_up = self.container
138         try:
139             self.container = container or cnt_back_up
140             r = self.container_delete(until=unicode(time()))
141         finally:
142             self.container = cnt_back_up
143         return r.headers
144
145     def upload_object_unchunked(
146             self, obj, f,
147             withHashFile=False,
148             size=None,
149             etag=None,
150             content_encoding=None,
151             content_disposition=None,
152             content_type=None,
153             sharing=None,
154             public=None):
155         """
156         :param obj: (str) remote object path
157
158         :param f: open file descriptor
159
160         :param withHashFile: (bool)
161
162         :param size: (int) size of data to upload
163
164         :param etag: (str)
165
166         :param content_encoding: (str)
167
168         :param content_disposition: (str)
169
170         :param content_type: (str)
171
172         :param sharing: {'read':[user and/or grp names],
173             'write':[usr and/or grp names]}
174
175         :param public: (bool)
176
177         :returns: (dict) created object metadata
178         """
179         self._assert_container()
180
181         if withHashFile:
182             data = f.read()
183             try:
184                 import json
185                 data = json.dumps(json.loads(data))
186             except ValueError:
187                 raise ClientError('"%s" is not json-formated' % f.name, 1)
188             except SyntaxError:
189                 msg = '"%s" is not a valid hashmap file' % f.name
190                 raise ClientError(msg, 1)
191             f = StringIO(data)
192         else:
193             data = readall(f, size) if size else f.read()
194         r = self.object_put(
195             obj,
196             data=data,
197             etag=etag,
198             content_encoding=content_encoding,
199             content_disposition=content_disposition,
200             content_type=content_type,
201             permissions=sharing,
202             public=public,
203             success=201)
204         return r.headers
205
206     def create_object_by_manifestation(
207             self, obj,
208             etag=None,
209             content_encoding=None,
210             content_disposition=None,
211             content_type=None,
212             sharing=None,
213             public=None):
214         """
215         :param obj: (str) remote object path
216
217         :param etag: (str)
218
219         :param content_encoding: (str)
220
221         :param content_disposition: (str)
222
223         :param content_type: (str)
224
225         :param sharing: {'read':[user and/or grp names],
226             'write':[usr and/or grp names]}
227
228         :param public: (bool)
229
230         :returns: (dict) created object metadata
231         """
232         self._assert_container()
233         r = self.object_put(
234             obj,
235             content_length=0,
236             etag=etag,
237             content_encoding=content_encoding,
238             content_disposition=content_disposition,
239             content_type=content_type,
240             permissions=sharing,
241             public=public,
242             manifest='%s/%s' % (self.container, obj))
243         return r.headers
244
245     # upload_* auxiliary methods
246     def _put_block_async(self, data, hash):
247         event = SilentEvent(method=self._put_block, data=data, hash=hash)
248         event.start()
249         return event
250
251     def _put_block(self, data, hash):
252         r = self.container_post(
253             update=True,
254             content_type='application/octet-stream',
255             content_length=len(data),
256             data=data,
257             format='json')
258         assert r.json[0] == hash, 'Local hash does not match server'
259
260     def _get_file_block_info(self, fileobj, size=None, cache=None):
261         """
262         :param fileobj: (file descriptor) source
263
264         :param size: (int) size of data to upload from source
265
266         :param cache: (dict) if provided, cache container info response to
267         avoid redundant calls
268         """
269         if isinstance(cache, dict):
270             try:
271                 meta = cache[self.container]
272             except KeyError:
273                 meta = self.get_container_info()
274                 cache[self.container] = meta
275         else:
276             meta = self.get_container_info()
277         blocksize = int(meta['x-container-block-size'])
278         blockhash = meta['x-container-block-hash']
279         size = size if size is not None else fstat(fileobj.fileno()).st_size
280         nblocks = 1 + (size - 1) // blocksize
281         return (blocksize, blockhash, size, nblocks)
282
283     def _create_object_or_get_missing_hashes(
284             self, obj, json,
285             size=None,
286             format='json',
287             hashmap=True,
288             content_type=None,
289             if_etag_match=None,
290             if_etag_not_match=None,
291             content_encoding=None,
292             content_disposition=None,
293             permissions=None,
294             public=None,
295             success=(201, 409)):
296         r = self.object_put(
297             obj,
298             format='json',
299             hashmap=True,
300             content_type=content_type,
301             json=json,
302             if_etag_match=if_etag_match,
303             if_etag_not_match=if_etag_not_match,
304             content_encoding=content_encoding,
305             content_disposition=content_disposition,
306             permissions=permissions,
307             public=public,
308             success=success)
309         return (None if r.status_code == 201 else r.json), r.headers
310
311     def _calculate_blocks_for_upload(
312             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
313             hash_cb=None):
314         offset = 0
315         if hash_cb:
316             hash_gen = hash_cb(nblocks)
317             hash_gen.next()
318
319         for i in range(nblocks):
320             block = readall(fileobj, min(blocksize, size - offset))
321             bytes = len(block)
322             hash = _pithos_hash(block, blockhash)
323             hashes.append(hash)
324             hmap[hash] = (offset, bytes)
325             offset += bytes
326             if hash_cb:
327                 hash_gen.next()
328         msg = ('Failed to calculate uploaded blocks:'
329                ' Offset and object size do not match')
330         assert offset == size, msg
331
332     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
333         """upload missing blocks asynchronously"""
334
335         self._init_thread_limit()
336
337         flying = []
338         failures = []
339         for hash in missing:
340             offset, bytes = hmap[hash]
341             fileobj.seek(offset)
342             data = readall(fileobj, bytes)
343             r = self._put_block_async(data, hash)
344             flying.append(r)
345             unfinished = self._watch_thread_limit(flying)
346             for thread in set(flying).difference(unfinished):
347                 if thread.exception:
348                     failures.append(thread)
349                     if isinstance(
350                             thread.exception,
351                             ClientError) and thread.exception.status == 502:
352                         self.POOLSIZE = self._thread_limit
353                 elif thread.isAlive():
354                     flying.append(thread)
355                 elif upload_gen:
356                     try:
357                         upload_gen.next()
358                     except:
359                         pass
360             flying = unfinished
361
362         for thread in flying:
363             thread.join()
364             if thread.exception:
365                 failures.append(thread)
366             elif upload_gen:
367                 try:
368                     upload_gen.next()
369                 except:
370                     pass
371
372         return [failure.kwargs['hash'] for failure in failures]
373
374     def upload_object(
375             self, obj, f,
376             size=None,
377             hash_cb=None,
378             upload_cb=None,
379             etag=None,
380             if_etag_match=None,
381             if_not_exist=None,
382             content_encoding=None,
383             content_disposition=None,
384             content_type=None,
385             sharing=None,
386             public=None,
387             container_info_cache=None):
388         """Upload an object using multiple connections (threads)
389
390         :param obj: (str) remote object path
391
392         :param f: open file descriptor (rb)
393
394         :param hash_cb: optional progress.bar object for calculating hashes
395
396         :param upload_cb: optional progress.bar object for uploading
397
398         :param etag: (str)
399
400         :param if_etag_match: (str) Push that value to if-match header at file
401             creation
402
403         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
404             it does not exist remotely, otherwise the operation will fail.
405             Involves the case of an object with the same path is created while
406             the object is being uploaded.
407
408         :param content_encoding: (str)
409
410         :param content_disposition: (str)
411
412         :param content_type: (str)
413
414         :param sharing: {'read':[user and/or grp names],
415             'write':[usr and/or grp names]}
416
417         :param public: (bool)
418
419         :param container_info_cache: (dict) if given, avoid redundant calls to
420             server for container info (block size and hash information)
421         """
422         self._assert_container()
423
424         block_info = (
425             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
426                 f, size, container_info_cache)
427         (hashes, hmap, offset) = ([], {}, 0)
428         if not content_type:
429             content_type = 'application/octet-stream'
430
431         self._calculate_blocks_for_upload(
432             *block_info,
433             hashes=hashes,
434             hmap=hmap,
435             fileobj=f,
436             hash_cb=hash_cb)
437
438         hashmap = dict(bytes=size, hashes=hashes)
439         missing, obj_headers = self._create_object_or_get_missing_hashes(
440             obj, hashmap,
441             content_type=content_type,
442             size=size,
443             if_etag_match=if_etag_match,
444             if_etag_not_match='*' if if_not_exist else None,
445             content_encoding=content_encoding,
446             content_disposition=content_disposition,
447             permissions=sharing,
448             public=public)
449
450         if missing is None:
451             return obj_headers
452
453         if upload_cb:
454             upload_gen = upload_cb(len(missing))
455             for i in range(len(missing), len(hashmap['hashes']) + 1):
456                 try:
457                     upload_gen.next()
458                 except:
459                     upload_gen = None
460         else:
461             upload_gen = None
462
463         retries = 7
464         try:
465             while retries:
466                 sendlog.info('%s blocks missing' % len(missing))
467                 num_of_blocks = len(missing)
468                 missing = self._upload_missing_blocks(
469                     missing,
470                     hmap,
471                     f,
472                     upload_gen)
473                 if missing:
474                     if num_of_blocks == len(missing):
475                         retries -= 1
476                     else:
477                         num_of_blocks = len(missing)
478                 else:
479                     break
480             if missing:
481                 try:
482                     details = ['%s' % thread.exception for thread in missing]
483                 except Exception:
484                     details = ['Also, failed to read thread exceptions']
485                 raise ClientError(
486                     '%s blocks failed to upload' % len(missing),
487                     details=details)
488         except KeyboardInterrupt:
489             sendlog.info('- - - wait for threads to finish')
490             for thread in activethreads():
491                 thread.join()
492             raise
493
494         r = self.object_put(
495             obj,
496             format='json',
497             hashmap=True,
498             content_type=content_type,
499             content_encoding=content_encoding,
500             if_etag_match=if_etag_match,
501             if_etag_not_match='*' if if_not_exist else None,
502             etag=etag,
503             json=hashmap,
504             permissions=sharing,
505             public=public,
506             success=201)
507         return r.headers
508
509     def upload_from_string(
510             self, obj, input_str,
511             hash_cb=None,
512             upload_cb=None,
513             etag=None,
514             if_etag_match=None,
515             if_not_exist=None,
516             content_encoding=None,
517             content_disposition=None,
518             content_type=None,
519             sharing=None,
520             public=None,
521             container_info_cache=None):
522         """Upload an object using multiple connections (threads)
523
524         :param obj: (str) remote object path
525
526         :param input_str: (str) upload content
527
528         :param hash_cb: optional progress.bar object for calculating hashes
529
530         :param upload_cb: optional progress.bar object for uploading
531
532         :param etag: (str)
533
534         :param if_etag_match: (str) Push that value to if-match header at file
535             creation
536
537         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
538             it does not exist remotely, otherwise the operation will fail.
539             Involves the case of an object with the same path is created while
540             the object is being uploaded.
541
542         :param content_encoding: (str)
543
544         :param content_disposition: (str)
545
546         :param content_type: (str)
547
548         :param sharing: {'read':[user and/or grp names],
549             'write':[usr and/or grp names]}
550
551         :param public: (bool)
552
553         :param container_info_cache: (dict) if given, avoid redundant calls to
554             server for container info (block size and hash information)
555         """
556         self._assert_container()
557
558         blocksize, blockhash, size, nblocks = self._get_file_block_info(
559                 fileobj=None, size=len(input_str), cache=container_info_cache)
560         (hashes, hmap, offset) = ([], {}, 0)
561         if not content_type:
562             content_type = 'application/octet-stream'
563
564         hashes = []
565         hmap = {}
566         for blockid in range(nblocks):
567             start = blockid * blocksize
568             block = input_str[start: (start + blocksize)]
569             hashes.append(_pithos_hash(block, blockhash))
570             hmap[hashes[blockid]] = (start, block)
571
572         hashmap = dict(bytes=size, hashes=hashes)
573         missing, obj_headers = self._create_object_or_get_missing_hashes(
574             obj, hashmap,
575             content_type=content_type,
576             size=size,
577             if_etag_match=if_etag_match,
578             if_etag_not_match='*' if if_not_exist else None,
579             content_encoding=content_encoding,
580             content_disposition=content_disposition,
581             permissions=sharing,
582             public=public)
583         if missing is None:
584             return obj_headers
585         num_of_missing = len(missing)
586
587         if upload_cb:
588             self.progress_bar_gen = upload_cb(nblocks)
589             for i in range(nblocks + 1 - num_of_missing):
590                 self._cb_next()
591
592         tries = 7
593         old_failures = 0
594         try:
595             while tries and missing:
596                 flying = []
597                 failures = []
598                 for hash in missing:
599                     offset, block = hmap[hash]
600                     bird = self._put_block_async(block, hash)
601                     flying.append(bird)
602                     unfinished = self._watch_thread_limit(flying)
603                     for thread in set(flying).difference(unfinished):
604                         if thread.exception:
605                             failures.append(thread.kwargs['hash'])
606                         if thread.isAlive():
607                             flying.append(thread)
608                         else:
609                             self._cb_next()
610                     flying = unfinished
611                 for thread in flying:
612                     thread.join()
613                     if thread.exception:
614                         failures.append(thread.kwargs['hash'])
615                     self._cb_next()
616                 missing = failures
617                 if missing and len(missing) == old_failures:
618                     tries -= 1
619                 old_failures = len(missing)
620             if missing:
621                 raise ClientError(
622                     '%s blocks failed to upload' % len(missing),
623                     details=['%s' % thread.exception for thread in missing])
624         except KeyboardInterrupt:
625             sendlog.info('- - - wait for threads to finish')
626             for thread in activethreads():
627                 thread.join()
628             raise
629
630         r = self.object_put(
631             obj,
632             format='json',
633             hashmap=True,
634             content_type=content_type,
635             content_encoding=content_encoding,
636             if_etag_match=if_etag_match,
637             if_etag_not_match='*' if if_not_exist else None,
638             etag=etag,
639             json=hashmap,
640             permissions=sharing,
641             public=public,
642             success=201)
643         return r.headers
644
645     # download_* auxiliary methods
646     def _get_remote_blocks_info(self, obj, **restargs):
647         #retrieve object hashmap
648         myrange = restargs.pop('data_range', None)
649         hashmap = self.get_object_hashmap(obj, **restargs)
650         restargs['data_range'] = myrange
651         blocksize = int(hashmap['block_size'])
652         blockhash = hashmap['block_hash']
653         total_size = hashmap['bytes']
654         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
655         map_dict = {}
656         for i, h in enumerate(hashmap['hashes']):
657             #  map_dict[h] = i   CHAGE
658             if h in map_dict:
659                 map_dict[h].append(i)
660             else:
661                 map_dict[h] = [i]
662         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
663
664     def _dump_blocks_sync(
665             self, obj, remote_hashes, blocksize, total_size, dst, crange,
666             **args):
667         if not total_size:
668             return
669         for blockid, blockhash in enumerate(remote_hashes):
670             if blockhash:
671                 start = blocksize * blockid
672                 is_last = start + blocksize > total_size
673                 end = (total_size - 1) if is_last else (start + blocksize - 1)
674                 data_range = _range_up(start, end, total_size, crange)
675                 if not data_range:
676                     self._cb_next()
677                     continue
678                 args['data_range'] = 'bytes=%s' % data_range
679                 r = self.object_get(obj, success=(200, 206), **args)
680                 self._cb_next()
681                 dst.write(r.content)
682                 dst.flush()
683
684     def _get_block_async(self, obj, **args):
685         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
686         event.start()
687         return event
688
689     def _hash_from_file(self, fp, start, size, blockhash):
690         fp.seek(start)
691         block = readall(fp, size)
692         h = newhashlib(blockhash)
693         h.update(block.strip('\x00'))
694         return hexlify(h.digest())
695
696     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
697         """write the results of a greenleted rest call to a file
698
699         :param offset: the offset of the file up to blocksize
700         - e.g. if the range is 10-100, all blocks will be written to
701         normal_position - 10
702         """
703         for key, g in flying.items():
704             if g.isAlive():
705                 continue
706             if g.exception:
707                 raise g.exception
708             block = g.value.content
709             for block_start in blockids[key]:
710                 local_file.seek(block_start + offset)
711                 local_file.write(block)
712                 self._cb_next()
713             flying.pop(key)
714             blockids.pop(key)
715         local_file.flush()
716
717     def _dump_blocks_async(
718             self, obj, remote_hashes, blocksize, total_size, local_file,
719             blockhash=None, resume=False, filerange=None, **restargs):
720         file_size = fstat(local_file.fileno()).st_size if resume else 0
721         flying = dict()
722         blockid_dict = dict()
723         offset = 0
724
725         self._init_thread_limit()
726         for block_hash, blockids in remote_hashes.items():
727             blockids = [blk * blocksize for blk in blockids]
728             unsaved = [blk for blk in blockids if not (
729                 blk < file_size and block_hash == self._hash_from_file(
730                         local_file, blk, blocksize, blockhash))]
731             self._cb_next(len(blockids) - len(unsaved))
732             if unsaved:
733                 key = unsaved[0]
734                 if key:
735                     self._watch_thread_limit(flying.values())
736                     self._thread2file(
737                         flying, blockid_dict, local_file, offset,
738                         **restargs)
739                     end = total_size - 1 if (
740                         key + blocksize > total_size) else key + blocksize - 1
741                     data_range = _range_up(key, end, total_size, filerange)
742                     if not data_range:
743                         self._cb_next()
744                         continue
745                     restargs[
746                         'async_headers'] = {'Range': 'bytes=%s' % data_range}
747                     flying[key] = self._get_block_async(obj, **restargs)
748                     blockid_dict[key] = unsaved
749
750         for thread in flying.values():
751             thread.join()
752         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
753
754     def download_object(
755             self, obj, dst,
756             download_cb=None,
757             version=None,
758             resume=False,
759             range_str=None,
760             if_match=None,
761             if_none_match=None,
762             if_modified_since=None,
763             if_unmodified_since=None):
764         """Download an object (multiple connections, random blocks)
765
766         :param obj: (str) remote object path
767
768         :param dst: open file descriptor (wb+)
769
770         :param download_cb: optional progress.bar object for downloading
771
772         :param version: (str) file version
773
774         :param resume: (bool) if set, preserve already downloaded file parts
775
776         :param range_str: (str) from, to are file positions (int) in bytes
777
778         :param if_match: (str)
779
780         :param if_none_match: (str)
781
782         :param if_modified_since: (str) formated date
783
784         :param if_unmodified_since: (str) formated date"""
785         restargs = dict(
786             version=version,
787             data_range=None if range_str is None else 'bytes=%s' % range_str,
788             if_match=if_match,
789             if_none_match=if_none_match,
790             if_modified_since=if_modified_since,
791             if_unmodified_since=if_unmodified_since)
792
793         (
794             blocksize,
795             blockhash,
796             total_size,
797             hash_list,
798             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
799         assert total_size >= 0
800
801         if download_cb:
802             self.progress_bar_gen = download_cb(len(hash_list))
803             self._cb_next()
804
805         if dst.isatty():
806             self._dump_blocks_sync(
807                 obj,
808                 hash_list,
809                 blocksize,
810                 total_size,
811                 dst,
812                 range_str,
813                 **restargs)
814         else:
815             self._dump_blocks_async(
816                 obj,
817                 remote_hashes,
818                 blocksize,
819                 total_size,
820                 dst,
821                 blockhash,
822                 resume,
823                 range_str,
824                 **restargs)
825             if not range_str:
826                 dst.truncate(total_size)
827
828         self._complete_cb()
829
830     def download_to_string(
831             self, obj,
832             download_cb=None,
833             version=None,
834             range_str=None,
835             if_match=None,
836             if_none_match=None,
837             if_modified_since=None,
838             if_unmodified_since=None):
839         """Download an object to a string (multiple connections). This method
840         uses threads for http requests, but stores all content in memory.
841
842         :param obj: (str) remote object path
843
844         :param download_cb: optional progress.bar object for downloading
845
846         :param version: (str) file version
847
848         :param range_str: (str) from, to are file positions (int) in bytes
849
850         :param if_match: (str)
851
852         :param if_none_match: (str)
853
854         :param if_modified_since: (str) formated date
855
856         :param if_unmodified_since: (str) formated date
857
858         :returns: (str) the whole object contents
859         """
860         restargs = dict(
861             version=version,
862             data_range=None if range_str is None else 'bytes=%s' % range_str,
863             if_match=if_match,
864             if_none_match=if_none_match,
865             if_modified_since=if_modified_since,
866             if_unmodified_since=if_unmodified_since)
867
868         (
869             blocksize,
870             blockhash,
871             total_size,
872             hash_list,
873             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
874         assert total_size >= 0
875
876         if download_cb:
877             self.progress_bar_gen = download_cb(len(hash_list))
878             self._cb_next()
879
880         num_of_blocks = len(remote_hashes)
881         ret = [''] * num_of_blocks
882         self._init_thread_limit()
883         flying = dict()
884         try:
885             for blockid, blockhash in enumerate(remote_hashes):
886                 start = blocksize * blockid
887                 is_last = start + blocksize > total_size
888                 end = (total_size - 1) if is_last else (start + blocksize - 1)
889                 data_range_str = _range_up(start, end, end, range_str)
890                 if data_range_str:
891                     self._watch_thread_limit(flying.values())
892                     restargs['data_range'] = 'bytes=%s' % data_range_str
893                     flying[blockid] = self._get_block_async(obj, **restargs)
894                 for runid, thread in flying.items():
895                     if (blockid + 1) == num_of_blocks:
896                         thread.join()
897                     elif thread.isAlive():
898                         continue
899                     if thread.exception:
900                         raise thread.exception
901                     ret[runid] = thread.value.content
902                     self._cb_next()
903                     flying.pop(runid)
904             return ''.join(ret)
905         except KeyboardInterrupt:
906             sendlog.info('- - - wait for threads to finish')
907             for thread in activethreads():
908                 thread.join()
909
910     #Command Progress Bar method
911     def _cb_next(self, step=1):
912         if hasattr(self, 'progress_bar_gen'):
913             try:
914                 for i in xrange(step):
915                     self.progress_bar_gen.next()
916             except:
917                 pass
918
919     def _complete_cb(self):
920         while True:
921             try:
922                 self.progress_bar_gen.next()
923             except:
924                 break
925
926     def get_object_hashmap(
927             self, obj,
928             version=None,
929             if_match=None,
930             if_none_match=None,
931             if_modified_since=None,
932             if_unmodified_since=None):
933         """
934         :param obj: (str) remote object path
935
936         :param if_match: (str)
937
938         :param if_none_match: (str)
939
940         :param if_modified_since: (str) formated date
941
942         :param if_unmodified_since: (str) formated date
943
944         :returns: (list)
945         """
946         try:
947             r = self.object_get(
948                 obj,
949                 hashmap=True,
950                 version=version,
951                 if_etag_match=if_match,
952                 if_etag_not_match=if_none_match,
953                 if_modified_since=if_modified_since,
954                 if_unmodified_since=if_unmodified_since)
955         except ClientError as err:
956             if err.status == 304 or err.status == 412:
957                 return {}
958             raise
959         return r.json
960
961     def set_account_group(self, group, usernames):
962         """
963         :param group: (str)
964
965         :param usernames: (list)
966         """
967         r = self.account_post(update=True, groups={group: usernames})
968         return r
969
970     def del_account_group(self, group):
971         """
972         :param group: (str)
973         """
974         self.account_post(update=True, groups={group: []})
975
976     def get_account_info(self, until=None):
977         """
978         :param until: (str) formated date
979
980         :returns: (dict)
981         """
982         r = self.account_head(until=until)
983         if r.status_code == 401:
984             raise ClientError("No authorization", status=401)
985         return r.headers
986
987     def get_account_quota(self):
988         """
989         :returns: (dict)
990         """
991         return filter_in(
992             self.get_account_info(),
993             'X-Account-Policy-Quota',
994             exactMatch=True)
995
996     #def get_account_versioning(self):
997     #    """
998     #    :returns: (dict)
999     #    """
1000     #    return filter_in(
1001     #        self.get_account_info(),
1002     #        'X-Account-Policy-Versioning',
1003     #        exactMatch=True)
1004
1005     def get_account_meta(self, until=None):
1006         """
1007         :param until: (str) formated date
1008
1009         :returns: (dict)
1010         """
1011         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1012
1013     def get_account_group(self):
1014         """
1015         :returns: (dict)
1016         """
1017         return filter_in(self.get_account_info(), 'X-Account-Group-')
1018
1019     def set_account_meta(self, metapairs):
1020         """
1021         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1022         """
1023         assert(type(metapairs) is dict)
1024         r = self.account_post(update=True, metadata=metapairs)
1025         return r.headers
1026
1027     def del_account_meta(self, metakey):
1028         """
1029         :param metakey: (str) metadatum key
1030         """
1031         r = self.account_post(update=True, metadata={metakey: ''})
1032         return r.headers
1033
1034     #def set_account_quota(self, quota):
1035     #    """
1036     #    :param quota: (int)
1037     #    """
1038     #    self.account_post(update=True, quota=quota)
1039
1040     #def set_account_versioning(self, versioning):
1041     #    """
1042     #    :param versioning: (str)
1043     #    """
1044     #    r = self.account_post(update=True, versioning=versioning)
1045     #    return r.headers
1046
1047     def list_containers(self):
1048         """
1049         :returns: (dict)
1050         """
1051         r = self.account_get()
1052         return r.json
1053
1054     def del_container(self, until=None, delimiter=None):
1055         """
1056         :param until: (str) formated date
1057
1058         :param delimiter: (str) with / empty container
1059
1060         :raises ClientError: 404 Container does not exist
1061
1062         :raises ClientError: 409 Container is not empty
1063         """
1064         self._assert_container()
1065         r = self.container_delete(
1066             until=until,
1067             delimiter=delimiter,
1068             success=(204, 404, 409))
1069         if r.status_code == 404:
1070             raise ClientError(
1071                 'Container "%s" does not exist' % self.container,
1072                 r.status_code)
1073         elif r.status_code == 409:
1074             raise ClientError(
1075                 'Container "%s" is not empty' % self.container,
1076                 r.status_code)
1077         return r.headers
1078
1079     def get_container_versioning(self, container=None):
1080         """
1081         :param container: (str)
1082
1083         :returns: (dict)
1084         """
1085         cnt_back_up = self.container
1086         try:
1087             self.container = container or cnt_back_up
1088             return filter_in(
1089                 self.get_container_info(),
1090                 'X-Container-Policy-Versioning')
1091         finally:
1092             self.container = cnt_back_up
1093
1094     def get_container_limit(self, container=None):
1095         """
1096         :param container: (str)
1097
1098         :returns: (dict)
1099         """
1100         cnt_back_up = self.container
1101         try:
1102             self.container = container or cnt_back_up
1103             return filter_in(
1104                 self.get_container_info(),
1105                 'X-Container-Policy-Quota')
1106         finally:
1107             self.container = cnt_back_up
1108
1109     def get_container_info(self, until=None):
1110         """
1111         :param until: (str) formated date
1112
1113         :returns: (dict)
1114
1115         :raises ClientError: 404 Container not found
1116         """
1117         try:
1118             r = self.container_head(until=until)
1119         except ClientError as err:
1120             err.details.append('for container %s' % self.container)
1121             raise err
1122         return r.headers
1123
1124     def get_container_meta(self, until=None):
1125         """
1126         :param until: (str) formated date
1127
1128         :returns: (dict)
1129         """
1130         return filter_in(
1131             self.get_container_info(until=until),
1132             'X-Container-Meta')
1133
1134     def get_container_object_meta(self, until=None):
1135         """
1136         :param until: (str) formated date
1137
1138         :returns: (dict)
1139         """
1140         return filter_in(
1141             self.get_container_info(until=until),
1142             'X-Container-Object-Meta')
1143
1144     def set_container_meta(self, metapairs):
1145         """
1146         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1147         """
1148         assert(type(metapairs) is dict)
1149         r = self.container_post(update=True, metadata=metapairs)
1150         return r.headers
1151
1152     def del_container_meta(self, metakey):
1153         """
1154         :param metakey: (str) metadatum key
1155
1156         :returns: (dict) response headers
1157         """
1158         r = self.container_post(update=True, metadata={metakey: ''})
1159         return r.headers
1160
1161     def set_container_limit(self, limit):
1162         """
1163         :param limit: (int)
1164         """
1165         r = self.container_post(update=True, quota=limit)
1166         return r.headers
1167
1168     def set_container_versioning(self, versioning):
1169         """
1170         :param versioning: (str)
1171         """
1172         r = self.container_post(update=True, versioning=versioning)
1173         return r.headers
1174
1175     def del_object(self, obj, until=None, delimiter=None):
1176         """
1177         :param obj: (str) remote object path
1178
1179         :param until: (str) formated date
1180
1181         :param delimiter: (str)
1182         """
1183         self._assert_container()
1184         r = self.object_delete(obj, until=until, delimiter=delimiter)
1185         return r.headers
1186
1187     def set_object_meta(self, obj, metapairs):
1188         """
1189         :param obj: (str) remote object path
1190
1191         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1192         """
1193         assert(type(metapairs) is dict)
1194         r = self.object_post(obj, update=True, metadata=metapairs)
1195         return r.headers
1196
1197     def del_object_meta(self, obj, metakey):
1198         """
1199         :param obj: (str) remote object path
1200
1201         :param metakey: (str) metadatum key
1202         """
1203         r = self.object_post(obj, update=True, metadata={metakey: ''})
1204         return r.headers
1205
1206     def publish_object(self, obj):
1207         """
1208         :param obj: (str) remote object path
1209
1210         :returns: (str) access url
1211         """
1212         self.object_post(obj, update=True, public=True)
1213         info = self.get_object_info(obj)
1214         return info['x-object-public']
1215         pref, sep, rest = self.base_url.partition('//')
1216         base = rest.split('/')[0]
1217         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1218
1219     def unpublish_object(self, obj):
1220         """
1221         :param obj: (str) remote object path
1222         """
1223         r = self.object_post(obj, update=True, public=False)
1224         return r.headers
1225
1226     def get_object_info(self, obj, version=None):
1227         """
1228         :param obj: (str) remote object path
1229
1230         :param version: (str)
1231
1232         :returns: (dict)
1233         """
1234         try:
1235             r = self.object_head(obj, version=version)
1236             return r.headers
1237         except ClientError as ce:
1238             if ce.status == 404:
1239                 raise ClientError('Object %s not found' % obj, status=404)
1240             raise
1241
1242     def get_object_meta(self, obj, version=None):
1243         """
1244         :param obj: (str) remote object path
1245
1246         :param version: (str)
1247
1248         :returns: (dict)
1249         """
1250         return filter_in(
1251             self.get_object_info(obj, version=version),
1252             'X-Object-Meta')
1253
1254     def get_object_sharing(self, obj):
1255         """
1256         :param obj: (str) remote object path
1257
1258         :returns: (dict)
1259         """
1260         r = filter_in(
1261             self.get_object_info(obj),
1262             'X-Object-Sharing',
1263             exactMatch=True)
1264         reply = {}
1265         if len(r) > 0:
1266             perms = r['x-object-sharing'].split(';')
1267             for perm in perms:
1268                 try:
1269                     perm.index('=')
1270                 except ValueError:
1271                     raise ClientError('Incorrect reply format')
1272                 (key, val) = perm.strip().split('=')
1273                 reply[key] = val
1274         return reply
1275
1276     def set_object_sharing(
1277             self, obj,
1278             read_permission=False, write_permission=False):
1279         """Give read/write permisions to an object.
1280
1281         :param obj: (str) remote object path
1282
1283         :param read_permission: (list - bool) users and user groups that get
1284             read permission for this object - False means all previous read
1285             permissions will be removed
1286
1287         :param write_permission: (list - bool) of users and user groups to get
1288            write permission for this object - False means all previous write
1289            permissions will be removed
1290
1291         :returns: (dict) response headers
1292         """
1293
1294         perms = dict(read=read_permission or '', write=write_permission or '')
1295         r = self.object_post(obj, update=True, permissions=perms)
1296         return r.headers
1297
1298     def del_object_sharing(self, obj):
1299         """
1300         :param obj: (str) remote object path
1301         """
1302         return self.set_object_sharing(obj)
1303
1304     def append_object(self, obj, source_file, upload_cb=None):
1305         """
1306         :param obj: (str) remote object path
1307
1308         :param source_file: open file descriptor
1309
1310         :param upload_db: progress.bar for uploading
1311         """
1312         self._assert_container()
1313         meta = self.get_container_info()
1314         blocksize = int(meta['x-container-block-size'])
1315         filesize = fstat(source_file.fileno()).st_size
1316         nblocks = 1 + (filesize - 1) // blocksize
1317         offset = 0
1318         headers = {}
1319         if upload_cb:
1320             self.progress_bar_gen = upload_cb(nblocks)
1321             self._cb_next()
1322         flying = {}
1323         self._init_thread_limit()
1324         try:
1325             for i in range(nblocks):
1326                 block = source_file.read(min(blocksize, filesize - offset))
1327                 offset += len(block)
1328
1329                 self._watch_thread_limit(flying.values())
1330                 unfinished = {}
1331                 flying[i] = SilentEvent(
1332                     method=self.object_post,
1333                     obj=obj,
1334                     update=True,
1335                     content_range='bytes */*',
1336                     content_type='application/octet-stream',
1337                     content_length=len(block),
1338                     data=block)
1339                 flying[i].start()
1340
1341                 for key, thread in flying.items():
1342                     if thread.isAlive():
1343                         if i < nblocks:
1344                             unfinished[key] = thread
1345                             continue
1346                         thread.join()
1347                     if thread.exception:
1348                         raise thread.exception
1349                     headers[key] = thread.value.headers
1350                     self._cb_next()
1351                 flying = unfinished
1352         except KeyboardInterrupt:
1353             sendlog.info('- - - wait for threads to finish')
1354             for thread in activethreads():
1355                 thread.join()
1356         finally:
1357             from time import sleep
1358             sleep(2 * len(activethreads()))
1359         return headers.values()
1360
1361     def truncate_object(self, obj, upto_bytes):
1362         """
1363         :param obj: (str) remote object path
1364
1365         :param upto_bytes: max number of bytes to leave on file
1366
1367         :returns: (dict) response headers
1368         """
1369         r = self.object_post(
1370             obj,
1371             update=True,
1372             content_range='bytes 0-%s/*' % upto_bytes,
1373             content_type='application/octet-stream',
1374             object_bytes=upto_bytes,
1375             source_object=path4url(self.container, obj))
1376         return r.headers
1377
1378     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1379         """Overwrite a part of an object from local source file
1380
1381         :param obj: (str) remote object path
1382
1383         :param start: (int) position in bytes to start overwriting from
1384
1385         :param end: (int) position in bytes to stop overwriting at
1386
1387         :param source_file: open file descriptor
1388
1389         :param upload_db: progress.bar for uploading
1390         """
1391
1392         r = self.get_object_info(obj)
1393         rf_size = int(r['content-length'])
1394         if rf_size < int(start):
1395             raise ClientError(
1396                 'Range start exceeds file size',
1397                 status=416)
1398         elif rf_size < int(end):
1399             raise ClientError(
1400                 'Range end exceeds file size',
1401                 status=416)
1402         self._assert_container()
1403         meta = self.get_container_info()
1404         blocksize = int(meta['x-container-block-size'])
1405         filesize = fstat(source_file.fileno()).st_size
1406         datasize = int(end) - int(start) + 1
1407         nblocks = 1 + (datasize - 1) // blocksize
1408         offset = 0
1409         if upload_cb:
1410             self.progress_bar_gen = upload_cb(nblocks)
1411             self._cb_next()
1412         headers = []
1413         for i in range(nblocks):
1414             read_size = min(blocksize, filesize - offset, datasize - offset)
1415             block = source_file.read(read_size)
1416             r = self.object_post(
1417                 obj,
1418                 update=True,
1419                 content_type='application/octet-stream',
1420                 content_length=len(block),
1421                 content_range='bytes %s-%s/*' % (
1422                     start + offset,
1423                     start + offset + len(block) - 1),
1424                 data=block)
1425             headers.append(dict(r.headers))
1426             offset += len(block)
1427
1428             self._cb_next
1429         return headers
1430
1431     def copy_object(
1432             self, src_container, src_object, dst_container,
1433             dst_object=None,
1434             source_version=None,
1435             source_account=None,
1436             public=False,
1437             content_type=None,
1438             delimiter=None):
1439         """
1440         :param src_container: (str) source container
1441
1442         :param src_object: (str) source object path
1443
1444         :param dst_container: (str) destination container
1445
1446         :param dst_object: (str) destination object path
1447
1448         :param source_version: (str) source object version
1449
1450         :param source_account: (str) account to copy from
1451
1452         :param public: (bool)
1453
1454         :param content_type: (str)
1455
1456         :param delimiter: (str)
1457
1458         :returns: (dict) response headers
1459         """
1460         self._assert_account()
1461         self.container = dst_container
1462         src_path = path4url(src_container, src_object)
1463         r = self.object_put(
1464             dst_object or src_object,
1465             success=201,
1466             copy_from=src_path,
1467             content_length=0,
1468             source_version=source_version,
1469             source_account=source_account,
1470             public=public,
1471             content_type=content_type,
1472             delimiter=delimiter)
1473         return r.headers
1474
1475     def move_object(
1476             self, src_container, src_object, dst_container,
1477             dst_object=False,
1478             source_account=None,
1479             source_version=None,
1480             public=False,
1481             content_type=None,
1482             delimiter=None):
1483         """
1484         :param src_container: (str) source container
1485
1486         :param src_object: (str) source object path
1487
1488         :param dst_container: (str) destination container
1489
1490         :param dst_object: (str) destination object path
1491
1492         :param source_account: (str) account to move from
1493
1494         :param source_version: (str) source object version
1495
1496         :param public: (bool)
1497
1498         :param content_type: (str)
1499
1500         :param delimiter: (str)
1501
1502         :returns: (dict) response headers
1503         """
1504         self._assert_account()
1505         self.container = dst_container
1506         dst_object = dst_object or src_object
1507         src_path = path4url(src_container, src_object)
1508         r = self.object_put(
1509             dst_object,
1510             success=201,
1511             move_from=src_path,
1512             content_length=0,
1513             source_account=source_account,
1514             source_version=source_version,
1515             public=public,
1516             content_type=content_type,
1517             delimiter=delimiter)
1518         return r.headers
1519
1520     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1521         """Get accounts that share with self.account
1522
1523         :param limit: (str)
1524
1525         :param marker: (str)
1526
1527         :returns: (dict)
1528         """
1529         self._assert_account()
1530
1531         self.set_param('format', 'json')
1532         self.set_param('limit', limit, iff=limit is not None)
1533         self.set_param('marker', marker, iff=marker is not None)
1534
1535         path = ''
1536         success = kwargs.pop('success', (200, 204))
1537         r = self.get(path, *args, success=success, **kwargs)
1538         return r.json
1539
1540     def get_object_versionlist(self, obj):
1541         """
1542         :param obj: (str) remote object path
1543
1544         :returns: (list)
1545         """
1546         self._assert_container()
1547         r = self.object_get(obj, format='json', version='list')
1548         return r.json['versions']