Fix typo in file upload
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, max_value, a_range):
56     """
57     :param start: (int) the window bottom
58
59     :param end: (int) the window top
60
61     :param max_value: (int) maximum accepted value
62
63     :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64         where X: x|x-y|-x where x < y and x, y natural numbers
65
66     :returns: (str) a range string cut-off for the start-end range
67         an empty response means this window is out of range
68     """
69     assert start >= 0, '_range_up called w. start(%s) < 0' % start
70     assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71         end, start)
72     assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73         max_value, end)
74     if not a_range:
75         return '%s-%s' % (start, end)
76     selected = []
77     for some_range in a_range.split(','):
78         v0, sep, v1 = some_range.partition('-')
79         if v0:
80             v0 = int(v0)
81             if sep:
82                 v1 = int(v1)
83                 if v1 < start or v0 > end or v1 < v0:
84                     continue
85                 v0 = v0 if v0 > start else start
86                 v1 = v1 if v1 < end else end
87                 selected.append('%s-%s' % (v0, v1))
88             elif v0 < start:
89                 continue
90             else:
91                 v1 = v0 if v0 <= end else end
92                 selected.append('%s-%s' % (start, v1))
93         else:
94             v1 = int(v1)
95             if max_value - v1 > end:
96                 continue
97             v0 = (max_value - v1) if max_value - v1 > start else start
98             selected.append('%s-%s' % (v0, end))
99     return ','.join(selected)
100
101
102 class PithosClient(PithosRestClient):
103     """Synnefo Pithos+ API client"""
104
105     def __init__(self, base_url, token, account=None, container=None):
106         super(PithosClient, self).__init__(base_url, token, account, container)
107
108     def create_container(
109             self,
110             container=None, sizelimit=None, versioning=None, metadata=None,
111             **kwargs):
112         """
113         :param container: (str) if not given, self.container is used instead
114
115         :param sizelimit: (int) container total size limit in bytes
116
117         :param versioning: (str) can be auto or whatever supported by server
118
119         :param metadata: (dict) Custom user-defined metadata of the form
120             { 'name1': 'value1', 'name2': 'value2', ... }
121
122         :returns: (dict) response headers
123         """
124         cnt_back_up = self.container
125         try:
126             self.container = container or cnt_back_up
127             r = self.container_put(
128                 quota=sizelimit, versioning=versioning, metadata=metadata,
129                 **kwargs)
130             return r.headers
131         finally:
132             self.container = cnt_back_up
133
134     def purge_container(self, container=None):
135         """Delete an empty container and destroy associated blocks
136         """
137         cnt_back_up = self.container
138         try:
139             self.container = container or cnt_back_up
140             r = self.container_delete(until=unicode(time()))
141         finally:
142             self.container = cnt_back_up
143         return r.headers
144
145     def upload_object_unchunked(
146             self, obj, f,
147             withHashFile=False,
148             size=None,
149             etag=None,
150             content_encoding=None,
151             content_disposition=None,
152             content_type=None,
153             sharing=None,
154             public=None):
155         """
156         :param obj: (str) remote object path
157
158         :param f: open file descriptor
159
160         :param withHashFile: (bool)
161
162         :param size: (int) size of data to upload
163
164         :param etag: (str)
165
166         :param content_encoding: (str)
167
168         :param content_disposition: (str)
169
170         :param content_type: (str)
171
172         :param sharing: {'read':[user and/or grp names],
173             'write':[usr and/or grp names]}
174
175         :param public: (bool)
176
177         :returns: (dict) created object metadata
178         """
179         self._assert_container()
180
181         if withHashFile:
182             data = f.read()
183             try:
184                 import json
185                 data = json.dumps(json.loads(data))
186             except ValueError:
187                 raise ClientError('"%s" is not json-formated' % f.name, 1)
188             except SyntaxError:
189                 msg = '"%s" is not a valid hashmap file' % f.name
190                 raise ClientError(msg, 1)
191             f = StringIO(data)
192         else:
193             data = readall(f, size) if size else f.read()
194         r = self.object_put(
195             obj,
196             data=data,
197             etag=etag,
198             content_encoding=content_encoding,
199             content_disposition=content_disposition,
200             content_type=content_type,
201             permissions=sharing,
202             public=public,
203             success=201)
204         return r.headers
205
206     def create_object_by_manifestation(
207             self, obj,
208             etag=None,
209             content_encoding=None,
210             content_disposition=None,
211             content_type=None,
212             sharing=None,
213             public=None):
214         """
215         :param obj: (str) remote object path
216
217         :param etag: (str)
218
219         :param content_encoding: (str)
220
221         :param content_disposition: (str)
222
223         :param content_type: (str)
224
225         :param sharing: {'read':[user and/or grp names],
226             'write':[usr and/or grp names]}
227
228         :param public: (bool)
229
230         :returns: (dict) created object metadata
231         """
232         self._assert_container()
233         r = self.object_put(
234             obj,
235             content_length=0,
236             etag=etag,
237             content_encoding=content_encoding,
238             content_disposition=content_disposition,
239             content_type=content_type,
240             permissions=sharing,
241             public=public,
242             manifest='%s/%s' % (self.container, obj))
243         return r.headers
244
245     # upload_* auxiliary methods
246     def _put_block_async(self, data, hash):
247         event = SilentEvent(method=self._put_block, data=data, hash=hash)
248         event.start()
249         return event
250
251     def _put_block(self, data, hash):
252         r = self.container_post(
253             update=True,
254             content_type='application/octet-stream',
255             content_length=len(data),
256             data=data,
257             format='json')
258         assert r.json[0] == hash, 'Local hash does not match server'
259
260     def _get_file_block_info(self, fileobj, size=None, cache=None):
261         """
262         :param fileobj: (file descriptor) source
263
264         :param size: (int) size of data to upload from source
265
266         :param cache: (dict) if provided, cache container info response to
267         avoid redundant calls
268         """
269         if isinstance(cache, dict):
270             try:
271                 meta = cache[self.container]
272             except KeyError:
273                 meta = self.get_container_info()
274                 cache[self.container] = meta
275         else:
276             meta = self.get_container_info()
277         blocksize = int(meta['x-container-block-size'])
278         blockhash = meta['x-container-block-hash']
279         size = size if size is not None else fstat(fileobj.fileno()).st_size
280         nblocks = 1 + (size - 1) // blocksize
281         return (blocksize, blockhash, size, nblocks)
282
283     def _create_object_or_get_missing_hashes(
284             self, obj, json,
285             size=None,
286             format='json',
287             hashmap=True,
288             content_type=None,
289             if_etag_match=None,
290             if_etag_not_match=None,
291             content_encoding=None,
292             content_disposition=None,
293             permissions=None,
294             public=None,
295             success=(201, 409)):
296         r = self.object_put(
297             obj,
298             format='json',
299             hashmap=True,
300             content_type=content_type,
301             json=json,
302             if_etag_match=if_etag_match,
303             if_etag_not_match=if_etag_not_match,
304             content_encoding=content_encoding,
305             content_disposition=content_disposition,
306             permissions=permissions,
307             public=public,
308             success=success)
309         return (None if r.status_code == 201 else r.json), r.headers
310
311     def _calculate_blocks_for_upload(
312             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
313             hash_cb=None):
314         offset = 0
315         if hash_cb:
316             hash_gen = hash_cb(nblocks)
317             hash_gen.next()
318
319         for i in range(nblocks):
320             block = readall(fileobj, min(blocksize, size - offset))
321             bytes = len(block)
322             hash = _pithos_hash(block, blockhash)
323             hashes.append(hash)
324             hmap[hash] = (offset, bytes)
325             offset += bytes
326             if hash_cb:
327                 hash_gen.next()
328         msg = ('Failed to calculate uploaded blocks:'
329                ' Offset and object size do not match')
330         assert offset == size, msg
331
332     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
333         """upload missing blocks asynchronously"""
334
335         self._init_thread_limit()
336
337         flying = []
338         failures = []
339         for hash in missing:
340             offset, bytes = hmap[hash]
341             fileobj.seek(offset)
342             data = readall(fileobj, bytes)
343             r = self._put_block_async(data, hash)
344             flying.append(r)
345             unfinished = self._watch_thread_limit(flying)
346             for thread in set(flying).difference(unfinished):
347                 if thread.exception:
348                     failures.append(thread)
349                     if isinstance(
350                             thread.exception,
351                             ClientError) and thread.exception.status == 502:
352                         self.POOLSIZE = self._thread_limit
353                 elif thread.isAlive():
354                     flying.append(thread)
355                 elif upload_gen:
356                     try:
357                         upload_gen.next()
358                     except:
359                         pass
360             flying = unfinished
361
362         for thread in flying:
363             thread.join()
364             if thread.exception:
365                 failures.append(thread)
366             elif upload_gen:
367                 try:
368                     upload_gen.next()
369                 except:
370                     pass
371
372         return [failure.kwargs['hash'] for failure in failures]
373
374     def upload_object(
375             self, obj, f,
376             size=None,
377             hash_cb=None,
378             upload_cb=None,
379             etag=None,
380             if_etag_match=None,
381             if_not_exist=None,
382             content_encoding=None,
383             content_disposition=None,
384             content_type=None,
385             sharing=None,
386             public=None,
387             container_info_cache=None):
388         """Upload an object using multiple connections (threads)
389
390         :param obj: (str) remote object path
391
392         :param f: open file descriptor (rb)
393
394         :param hash_cb: optional progress.bar object for calculating hashes
395
396         :param upload_cb: optional progress.bar object for uploading
397
398         :param etag: (str)
399
400         :param if_etag_match: (str) Push that value to if-match header at file
401             creation
402
403         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
404             it does not exist remotely, otherwise the operation will fail.
405             Involves the case of an object with the same path is created while
406             the object is being uploaded.
407
408         :param content_encoding: (str)
409
410         :param content_disposition: (str)
411
412         :param content_type: (str)
413
414         :param sharing: {'read':[user and/or grp names],
415             'write':[usr and/or grp names]}
416
417         :param public: (bool)
418
419         :param container_info_cache: (dict) if given, avoid redundant calls to
420             server for container info (block size and hash information)
421         """
422         self._assert_container()
423
424         block_info = (
425             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
426                 f, size, container_info_cache)
427         (hashes, hmap, offset) = ([], {}, 0)
428         if not content_type:
429             content_type = 'application/octet-stream'
430
431         self._calculate_blocks_for_upload(
432             *block_info,
433             hashes=hashes,
434             hmap=hmap,
435             fileobj=f,
436             hash_cb=hash_cb)
437
438         hashmap = dict(bytes=size, hashes=hashes)
439         missing, obj_headers = self._create_object_or_get_missing_hashes(
440             obj, hashmap,
441             content_type=content_type,
442             size=size,
443             if_etag_match=if_etag_match,
444             if_etag_not_match='*' if if_not_exist else None,
445             content_encoding=content_encoding,
446             content_disposition=content_disposition,
447             permissions=sharing,
448             public=public)
449
450         if missing is None:
451             return obj_headers
452
453         if upload_cb:
454             upload_gen = upload_cb(len(missing))
455             for i in range(len(missing), len(hashmap['hashes']) + 1):
456                 try:
457                     upload_gen.next()
458                 except:
459                     upload_gen = None
460         else:
461             upload_gen = None
462
463         retries = 7
464         try:
465             while retries:
466                 sendlog.info('%s blocks missing' % len(missing))
467                 num_of_blocks = len(missing)
468                 missing = self._upload_missing_blocks(
469                     missing,
470                     hmap,
471                     f,
472                     upload_gen)
473                 if missing:
474                     if num_of_blocks == len(missing):
475                         retries -= 1
476                     else:
477                         num_of_blocks = len(missing)
478                 else:
479                     break
480             if missing:
481                 try:
482                     details = ['%s' % thread.exception for thread in missing]
483                 except Exception:
484                     details = ['Also, failed to read thread exceptions']
485                 raise ClientError(
486                     '%s blocks failed to upload' % len(missing),
487                     details=details)
488         except KeyboardInterrupt:
489             sendlog.info('- - - wait for threads to finish')
490             for thread in activethreads():
491                 thread.join()
492             raise
493
494         r = self.object_put(
495             obj,
496             format='json',
497             hashmap=True,
498             content_type=content_type,
499             content_encoding=content_encoding,
500             if_etag_match=if_etag_match,
501             if_etag_not_match='*' if if_not_exist else None,
502             etag=etag,
503             json=hashmap,
504             permissions=sharing,
505             public=public,
506             success=201)
507         return r.headers
508
509     def upload_from_string(
510             self, obj, input_str,
511             hash_cb=None,
512             upload_cb=None,
513             etag=None,
514             if_etag_match=None,
515             if_not_exist=None,
516             content_encoding=None,
517             content_disposition=None,
518             content_type=None,
519             sharing=None,
520             public=None,
521             container_info_cache=None):
522         """Upload an object using multiple connections (threads)
523
524         :param obj: (str) remote object path
525
526         :param input_str: (str) upload content
527
528         :param hash_cb: optional progress.bar object for calculating hashes
529
530         :param upload_cb: optional progress.bar object for uploading
531
532         :param etag: (str)
533
534         :param if_etag_match: (str) Push that value to if-match header at file
535             creation
536
537         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
538             it does not exist remotely, otherwise the operation will fail.
539             Involves the case of an object with the same path is created while
540             the object is being uploaded.
541
542         :param content_encoding: (str)
543
544         :param content_disposition: (str)
545
546         :param content_type: (str)
547
548         :param sharing: {'read':[user and/or grp names],
549             'write':[usr and/or grp names]}
550
551         :param public: (bool)
552
553         :param container_info_cache: (dict) if given, avoid redundant calls to
554             server for container info (block size and hash information)
555         """
556         self._assert_container()
557
558         blocksize, blockhash, size, nblocks = self._get_file_block_info(
559                 fileobj=None, size=len(input_str), cache=container_info_cache)
560         (hashes, hmap, offset) = ([], {}, 0)
561         if not content_type:
562             content_type = 'application/octet-stream'
563
564         hashes = []
565         hmap = {}
566         for blockid in range(nblocks):
567             start = blockid * blocksize
568             block = input_str[start: (start + blocksize)]
569             hashes.append(_pithos_hash(block, blockhash))
570             hmap[hashes[blockid]] = (start, block)
571
572         hashmap = dict(bytes=size, hashes=hashes)
573         missing, obj_headers = self._create_object_or_get_missing_hashes(
574             obj, hashmap,
575             content_type=content_type,
576             size=size,
577             if_etag_match=if_etag_match,
578             if_etag_not_match='*' if if_not_exist else None,
579             content_encoding=content_encoding,
580             content_disposition=content_disposition,
581             permissions=sharing,
582             public=public)
583         if missing is None:
584             return obj_headers
585         num_of_missing = len(missing)
586
587         if upload_cb:
588             self.progress_bar_gen = upload_cb(nblocks)
589             for i in range(nblocks + 1 - num_of_missing):
590                 self._cb_next()
591
592         tries = 7
593         old_failures = 0
594         try:
595             while tries and missing:
596                 flying = []
597                 failures = []
598                 for hash in missing:
599                     offset, block = hmap[hash]
600                     bird = self._put_block_async(block, hash)
601                     flying.append(bird)
602                     unfinished = self._watch_thread_limit(flying)
603                     for thread in set(flying).difference(unfinished):
604                         if thread.exception:
605                             failures.append(thread.kwargs['hash'])
606                         if thread.isAlive():
607                             flying.append(thread)
608                         else:
609                             self._cb_next()
610                     flying = unfinished
611                 for thread in flying:
612                     thread.join()
613                     if thread.exception:
614                         failures.append(thread.kwargs['hash'])
615                     self._cb_next()
616                 missing = failures
617                 if missing and len(missing) == old_failures:
618                     tries -= 1
619                 old_failures = len(missing)
620             if missing:
621                 raise ClientError(
622                     '%s blocks failed to upload' % len(missing),
623                     details=['%s' % thread.exception for thread in missing])
624         except KeyboardInterrupt:
625             sendlog.info('- - - wait for threads to finish')
626             for thread in activethreads():
627                 thread.join()
628             raise
629
630         r = self.object_put(
631             obj,
632             format='json',
633             hashmap=True,
634             content_type=content_type,
635             content_encoding=content_encoding,
636             if_etag_match=if_etag_match,
637             if_etag_not_match='*' if if_not_exist else None,
638             etag=etag,
639             json=hashmap,
640             permissions=sharing,
641             public=public,
642             success=201)
643         return r.headers
644
645     # download_* auxiliary methods
646     def _get_remote_blocks_info(self, obj, **restargs):
647         #retrieve object hashmap
648         myrange = restargs.pop('data_range', None)
649         hashmap = self.get_object_hashmap(obj, **restargs)
650         restargs['data_range'] = myrange
651         blocksize = int(hashmap['block_size'])
652         blockhash = hashmap['block_hash']
653         total_size = hashmap['bytes']
654         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
655         map_dict = {}
656         for i, h in enumerate(hashmap['hashes']):
657             #  map_dict[h] = i   CHAGE
658             if h in map_dict:
659                 map_dict[h].append(i)
660             else:
661                 map_dict[h] = [i]
662         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
663
664     def _dump_blocks_sync(
665             self, obj, remote_hashes, blocksize, total_size, dst, crange,
666             **args):
667         if not total_size:
668             return
669         for blockid, blockhash in enumerate(remote_hashes):
670             if blockhash:
671                 start = blocksize * blockid
672                 is_last = start + blocksize > total_size
673                 end = (total_size - 1) if is_last else (start + blocksize - 1)
674                 data_range = _range_up(start, end, total_size, crange)
675                 if not data_range:
676                     self._cb_next()
677                     continue
678                 args['data_range'] = 'bytes=%s' % data_range
679                 r = self.object_get(obj, success=(200, 206), **args)
680                 self._cb_next()
681                 dst.write(r.content)
682                 dst.flush()
683
684     def _get_block_async(self, obj, **args):
685         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
686         event.start()
687         return event
688
689     def _hash_from_file(self, fp, start, size, blockhash):
690         fp.seek(start)
691         block = readall(fp, size)
692         h = newhashlib(blockhash)
693         h.update(block.strip('\x00'))
694         return hexlify(h.digest())
695
696     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
697         """write the results of a greenleted rest call to a file
698
699         :param offset: the offset of the file up to blocksize
700         - e.g. if the range is 10-100, all blocks will be written to
701         normal_position - 10
702         """
703         for key, g in flying.items():
704             if g.isAlive():
705                 continue
706             if g.exception:
707                 raise g.exception
708             block = g.value.content
709             for block_start in blockids[key]:
710                 local_file.seek(block_start + offset)
711                 local_file.write(block)
712                 self._cb_next()
713             flying.pop(key)
714             blockids.pop(key)
715         local_file.flush()
716
717     def _dump_blocks_async(
718             self, obj, remote_hashes, blocksize, total_size, local_file,
719             blockhash=None, resume=False, filerange=None, **restargs):
720         file_size = fstat(local_file.fileno()).st_size if resume else 0
721         flying = dict()
722         blockid_dict = dict()
723         offset = 0
724
725         self._init_thread_limit()
726         for block_hash, blockids in remote_hashes.items():
727             blockids = [blk * blocksize for blk in blockids]
728             unsaved = [blk for blk in blockids if not (
729                 blk < file_size and block_hash == self._hash_from_file(
730                         local_file, blk, blocksize, blockhash))]
731             self._cb_next(len(blockids) - len(unsaved))
732             if unsaved:
733                 key = unsaved[0]
734                 self._watch_thread_limit(flying.values())
735                 self._thread2file(
736                     flying, blockid_dict, local_file, offset,
737                     **restargs)
738                 end = total_size - 1 if (
739                     key + blocksize > total_size) else key + blocksize - 1
740                 if end < key:
741                     self._cb_next()
742                     continue
743                 data_range = _range_up(key, end, total_size, filerange)
744                 if not data_range:
745                     self._cb_next()
746                     continue
747                 restargs[
748                     'async_headers'] = {'Range': 'bytes=%s' % data_range}
749                 flying[key] = self._get_block_async(obj, **restargs)
750                 blockid_dict[key] = unsaved
751
752         for thread in flying.values():
753             thread.join()
754         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
755
756     def download_object(
757             self, obj, dst,
758             download_cb=None,
759             version=None,
760             resume=False,
761             range_str=None,
762             if_match=None,
763             if_none_match=None,
764             if_modified_since=None,
765             if_unmodified_since=None):
766         """Download an object (multiple connections, random blocks)
767
768         :param obj: (str) remote object path
769
770         :param dst: open file descriptor (wb+)
771
772         :param download_cb: optional progress.bar object for downloading
773
774         :param version: (str) file version
775
776         :param resume: (bool) if set, preserve already downloaded file parts
777
778         :param range_str: (str) from, to are file positions (int) in bytes
779
780         :param if_match: (str)
781
782         :param if_none_match: (str)
783
784         :param if_modified_since: (str) formated date
785
786         :param if_unmodified_since: (str) formated date"""
787         restargs = dict(
788             version=version,
789             data_range=None if range_str is None else 'bytes=%s' % range_str,
790             if_match=if_match,
791             if_none_match=if_none_match,
792             if_modified_since=if_modified_since,
793             if_unmodified_since=if_unmodified_since)
794
795         (
796             blocksize,
797             blockhash,
798             total_size,
799             hash_list,
800             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
801         assert total_size >= 0
802
803         if download_cb:
804             self.progress_bar_gen = download_cb(len(hash_list))
805             self._cb_next()
806
807         if dst.isatty():
808             self._dump_blocks_sync(
809                 obj,
810                 hash_list,
811                 blocksize,
812                 total_size,
813                 dst,
814                 range_str,
815                 **restargs)
816         else:
817             self._dump_blocks_async(
818                 obj,
819                 remote_hashes,
820                 blocksize,
821                 total_size,
822                 dst,
823                 blockhash,
824                 resume,
825                 range_str,
826                 **restargs)
827             if not range_str:
828                 dst.truncate(total_size)
829
830         self._complete_cb()
831
832     def download_to_string(
833             self, obj,
834             download_cb=None,
835             version=None,
836             range_str=None,
837             if_match=None,
838             if_none_match=None,
839             if_modified_since=None,
840             if_unmodified_since=None):
841         """Download an object to a string (multiple connections). This method
842         uses threads for http requests, but stores all content in memory.
843
844         :param obj: (str) remote object path
845
846         :param download_cb: optional progress.bar object for downloading
847
848         :param version: (str) file version
849
850         :param range_str: (str) from, to are file positions (int) in bytes
851
852         :param if_match: (str)
853
854         :param if_none_match: (str)
855
856         :param if_modified_since: (str) formated date
857
858         :param if_unmodified_since: (str) formated date
859
860         :returns: (str) the whole object contents
861         """
862         restargs = dict(
863             version=version,
864             data_range=None if range_str is None else 'bytes=%s' % range_str,
865             if_match=if_match,
866             if_none_match=if_none_match,
867             if_modified_since=if_modified_since,
868             if_unmodified_since=if_unmodified_since)
869
870         (
871             blocksize,
872             blockhash,
873             total_size,
874             hash_list,
875             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
876         assert total_size >= 0
877
878         if download_cb:
879             self.progress_bar_gen = download_cb(len(hash_list))
880             self._cb_next()
881
882         num_of_blocks = len(remote_hashes)
883         ret = [''] * num_of_blocks
884         self._init_thread_limit()
885         flying = dict()
886         try:
887             for blockid, blockhash in enumerate(remote_hashes):
888                 start = blocksize * blockid
889                 is_last = start + blocksize > total_size
890                 end = (total_size - 1) if is_last else (start + blocksize - 1)
891                 data_range_str = _range_up(start, end, end, range_str)
892                 if data_range_str:
893                     self._watch_thread_limit(flying.values())
894                     restargs['data_range'] = 'bytes=%s' % data_range_str
895                     flying[blockid] = self._get_block_async(obj, **restargs)
896                 for runid, thread in flying.items():
897                     if (blockid + 1) == num_of_blocks:
898                         thread.join()
899                     elif thread.isAlive():
900                         continue
901                     if thread.exception:
902                         raise thread.exception
903                     ret[runid] = thread.value.content
904                     self._cb_next()
905                     flying.pop(runid)
906             return ''.join(ret)
907         except KeyboardInterrupt:
908             sendlog.info('- - - wait for threads to finish')
909             for thread in activethreads():
910                 thread.join()
911
912     #Command Progress Bar method
913     def _cb_next(self, step=1):
914         if hasattr(self, 'progress_bar_gen'):
915             try:
916                 for i in xrange(step):
917                     self.progress_bar_gen.next()
918             except:
919                 pass
920
921     def _complete_cb(self):
922         while True:
923             try:
924                 self.progress_bar_gen.next()
925             except:
926                 break
927
928     def get_object_hashmap(
929             self, obj,
930             version=None,
931             if_match=None,
932             if_none_match=None,
933             if_modified_since=None,
934             if_unmodified_since=None):
935         """
936         :param obj: (str) remote object path
937
938         :param if_match: (str)
939
940         :param if_none_match: (str)
941
942         :param if_modified_since: (str) formated date
943
944         :param if_unmodified_since: (str) formated date
945
946         :returns: (list)
947         """
948         try:
949             r = self.object_get(
950                 obj,
951                 hashmap=True,
952                 version=version,
953                 if_etag_match=if_match,
954                 if_etag_not_match=if_none_match,
955                 if_modified_since=if_modified_since,
956                 if_unmodified_since=if_unmodified_since)
957         except ClientError as err:
958             if err.status == 304 or err.status == 412:
959                 return {}
960             raise
961         return r.json
962
963     def set_account_group(self, group, usernames):
964         """
965         :param group: (str)
966
967         :param usernames: (list)
968         """
969         r = self.account_post(update=True, groups={group: usernames})
970         return r
971
972     def del_account_group(self, group):
973         """
974         :param group: (str)
975         """
976         self.account_post(update=True, groups={group: []})
977
978     def get_account_info(self, until=None):
979         """
980         :param until: (str) formated date
981
982         :returns: (dict)
983         """
984         r = self.account_head(until=until)
985         if r.status_code == 401:
986             raise ClientError("No authorization", status=401)
987         return r.headers
988
989     def get_account_quota(self):
990         """
991         :returns: (dict)
992         """
993         return filter_in(
994             self.get_account_info(),
995             'X-Account-Policy-Quota',
996             exactMatch=True)
997
998     #def get_account_versioning(self):
999     #    """
1000     #    :returns: (dict)
1001     #    """
1002     #    return filter_in(
1003     #        self.get_account_info(),
1004     #        'X-Account-Policy-Versioning',
1005     #        exactMatch=True)
1006
1007     def get_account_meta(self, until=None):
1008         """
1009         :param until: (str) formated date
1010
1011         :returns: (dict)
1012         """
1013         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1014
1015     def get_account_group(self):
1016         """
1017         :returns: (dict)
1018         """
1019         return filter_in(self.get_account_info(), 'X-Account-Group-')
1020
1021     def set_account_meta(self, metapairs):
1022         """
1023         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1024         """
1025         assert(type(metapairs) is dict)
1026         r = self.account_post(update=True, metadata=metapairs)
1027         return r.headers
1028
1029     def del_account_meta(self, metakey):
1030         """
1031         :param metakey: (str) metadatum key
1032         """
1033         r = self.account_post(update=True, metadata={metakey: ''})
1034         return r.headers
1035
1036     #def set_account_quota(self, quota):
1037     #    """
1038     #    :param quota: (int)
1039     #    """
1040     #    self.account_post(update=True, quota=quota)
1041
1042     #def set_account_versioning(self, versioning):
1043     #    """
1044     #    :param versioning: (str)
1045     #    """
1046     #    r = self.account_post(update=True, versioning=versioning)
1047     #    return r.headers
1048
1049     def list_containers(self):
1050         """
1051         :returns: (dict)
1052         """
1053         r = self.account_get()
1054         return r.json
1055
1056     def del_container(self, until=None, delimiter=None):
1057         """
1058         :param until: (str) formated date
1059
1060         :param delimiter: (str) with / empty container
1061
1062         :raises ClientError: 404 Container does not exist
1063
1064         :raises ClientError: 409 Container is not empty
1065         """
1066         self._assert_container()
1067         r = self.container_delete(
1068             until=until,
1069             delimiter=delimiter,
1070             success=(204, 404, 409))
1071         if r.status_code == 404:
1072             raise ClientError(
1073                 'Container "%s" does not exist' % self.container,
1074                 r.status_code)
1075         elif r.status_code == 409:
1076             raise ClientError(
1077                 'Container "%s" is not empty' % self.container,
1078                 r.status_code)
1079         return r.headers
1080
1081     def get_container_versioning(self, container=None):
1082         """
1083         :param container: (str)
1084
1085         :returns: (dict)
1086         """
1087         cnt_back_up = self.container
1088         try:
1089             self.container = container or cnt_back_up
1090             return filter_in(
1091                 self.get_container_info(),
1092                 'X-Container-Policy-Versioning')
1093         finally:
1094             self.container = cnt_back_up
1095
1096     def get_container_limit(self, container=None):
1097         """
1098         :param container: (str)
1099
1100         :returns: (dict)
1101         """
1102         cnt_back_up = self.container
1103         try:
1104             self.container = container or cnt_back_up
1105             return filter_in(
1106                 self.get_container_info(),
1107                 'X-Container-Policy-Quota')
1108         finally:
1109             self.container = cnt_back_up
1110
1111     def get_container_info(self, until=None):
1112         """
1113         :param until: (str) formated date
1114
1115         :returns: (dict)
1116
1117         :raises ClientError: 404 Container not found
1118         """
1119         try:
1120             r = self.container_head(until=until)
1121         except ClientError as err:
1122             err.details.append('for container %s' % self.container)
1123             raise err
1124         return r.headers
1125
1126     def get_container_meta(self, until=None):
1127         """
1128         :param until: (str) formated date
1129
1130         :returns: (dict)
1131         """
1132         return filter_in(
1133             self.get_container_info(until=until),
1134             'X-Container-Meta')
1135
1136     def get_container_object_meta(self, until=None):
1137         """
1138         :param until: (str) formated date
1139
1140         :returns: (dict)
1141         """
1142         return filter_in(
1143             self.get_container_info(until=until),
1144             'X-Container-Object-Meta')
1145
1146     def set_container_meta(self, metapairs):
1147         """
1148         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1149         """
1150         assert(type(metapairs) is dict)
1151         r = self.container_post(update=True, metadata=metapairs)
1152         return r.headers
1153
1154     def del_container_meta(self, metakey):
1155         """
1156         :param metakey: (str) metadatum key
1157
1158         :returns: (dict) response headers
1159         """
1160         r = self.container_post(update=True, metadata={metakey: ''})
1161         return r.headers
1162
1163     def set_container_limit(self, limit):
1164         """
1165         :param limit: (int)
1166         """
1167         r = self.container_post(update=True, quota=limit)
1168         return r.headers
1169
1170     def set_container_versioning(self, versioning):
1171         """
1172         :param versioning: (str)
1173         """
1174         r = self.container_post(update=True, versioning=versioning)
1175         return r.headers
1176
1177     def del_object(self, obj, until=None, delimiter=None):
1178         """
1179         :param obj: (str) remote object path
1180
1181         :param until: (str) formated date
1182
1183         :param delimiter: (str)
1184         """
1185         self._assert_container()
1186         r = self.object_delete(obj, until=until, delimiter=delimiter)
1187         return r.headers
1188
1189     def set_object_meta(self, obj, metapairs):
1190         """
1191         :param obj: (str) remote object path
1192
1193         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1194         """
1195         assert(type(metapairs) is dict)
1196         r = self.object_post(obj, update=True, metadata=metapairs)
1197         return r.headers
1198
1199     def del_object_meta(self, obj, metakey):
1200         """
1201         :param obj: (str) remote object path
1202
1203         :param metakey: (str) metadatum key
1204         """
1205         r = self.object_post(obj, update=True, metadata={metakey: ''})
1206         return r.headers
1207
1208     def publish_object(self, obj):
1209         """
1210         :param obj: (str) remote object path
1211
1212         :returns: (str) access url
1213         """
1214         self.object_post(obj, update=True, public=True)
1215         info = self.get_object_info(obj)
1216         return info['x-object-public']
1217         pref, sep, rest = self.base_url.partition('//')
1218         base = rest.split('/')[0]
1219         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1220
1221     def unpublish_object(self, obj):
1222         """
1223         :param obj: (str) remote object path
1224         """
1225         r = self.object_post(obj, update=True, public=False)
1226         return r.headers
1227
1228     def get_object_info(self, obj, version=None):
1229         """
1230         :param obj: (str) remote object path
1231
1232         :param version: (str)
1233
1234         :returns: (dict)
1235         """
1236         try:
1237             r = self.object_head(obj, version=version)
1238             return r.headers
1239         except ClientError as ce:
1240             if ce.status == 404:
1241                 raise ClientError('Object %s not found' % obj, status=404)
1242             raise
1243
1244     def get_object_meta(self, obj, version=None):
1245         """
1246         :param obj: (str) remote object path
1247
1248         :param version: (str)
1249
1250         :returns: (dict)
1251         """
1252         return filter_in(
1253             self.get_object_info(obj, version=version),
1254             'X-Object-Meta')
1255
1256     def get_object_sharing(self, obj):
1257         """
1258         :param obj: (str) remote object path
1259
1260         :returns: (dict)
1261         """
1262         r = filter_in(
1263             self.get_object_info(obj),
1264             'X-Object-Sharing',
1265             exactMatch=True)
1266         reply = {}
1267         if len(r) > 0:
1268             perms = r['x-object-sharing'].split(';')
1269             for perm in perms:
1270                 try:
1271                     perm.index('=')
1272                 except ValueError:
1273                     raise ClientError('Incorrect reply format')
1274                 (key, val) = perm.strip().split('=')
1275                 reply[key] = val
1276         return reply
1277
1278     def set_object_sharing(
1279             self, obj,
1280             read_permission=False, write_permission=False):
1281         """Give read/write permisions to an object.
1282
1283         :param obj: (str) remote object path
1284
1285         :param read_permission: (list - bool) users and user groups that get
1286             read permission for this object - False means all previous read
1287             permissions will be removed
1288
1289         :param write_permission: (list - bool) of users and user groups to get
1290            write permission for this object - False means all previous write
1291            permissions will be removed
1292
1293         :returns: (dict) response headers
1294         """
1295
1296         perms = dict(read=read_permission or '', write=write_permission or '')
1297         r = self.object_post(obj, update=True, permissions=perms)
1298         return r.headers
1299
1300     def del_object_sharing(self, obj):
1301         """
1302         :param obj: (str) remote object path
1303         """
1304         return self.set_object_sharing(obj)
1305
1306     def append_object(self, obj, source_file, upload_cb=None):
1307         """
1308         :param obj: (str) remote object path
1309
1310         :param source_file: open file descriptor
1311
1312         :param upload_db: progress.bar for uploading
1313         """
1314         self._assert_container()
1315         meta = self.get_container_info()
1316         blocksize = int(meta['x-container-block-size'])
1317         filesize = fstat(source_file.fileno()).st_size
1318         nblocks = 1 + (filesize - 1) // blocksize
1319         offset = 0
1320         headers = {}
1321         if upload_cb:
1322             self.progress_bar_gen = upload_cb(nblocks)
1323             self._cb_next()
1324         flying = {}
1325         self._init_thread_limit()
1326         try:
1327             for i in range(nblocks):
1328                 block = source_file.read(min(blocksize, filesize - offset))
1329                 offset += len(block)
1330
1331                 self._watch_thread_limit(flying.values())
1332                 unfinished = {}
1333                 flying[i] = SilentEvent(
1334                     method=self.object_post,
1335                     obj=obj,
1336                     update=True,
1337                     content_range='bytes */*',
1338                     content_type='application/octet-stream',
1339                     content_length=len(block),
1340                     data=block)
1341                 flying[i].start()
1342
1343                 for key, thread in flying.items():
1344                     if thread.isAlive():
1345                         if i < nblocks:
1346                             unfinished[key] = thread
1347                             continue
1348                         thread.join()
1349                     if thread.exception:
1350                         raise thread.exception
1351                     headers[key] = thread.value.headers
1352                     self._cb_next()
1353                 flying = unfinished
1354         except KeyboardInterrupt:
1355             sendlog.info('- - - wait for threads to finish')
1356             for thread in activethreads():
1357                 thread.join()
1358         finally:
1359             from time import sleep
1360             sleep(2 * len(activethreads()))
1361         return headers.values()
1362
1363     def truncate_object(self, obj, upto_bytes):
1364         """
1365         :param obj: (str) remote object path
1366
1367         :param upto_bytes: max number of bytes to leave on file
1368
1369         :returns: (dict) response headers
1370         """
1371         r = self.object_post(
1372             obj,
1373             update=True,
1374             content_range='bytes 0-%s/*' % upto_bytes,
1375             content_type='application/octet-stream',
1376             object_bytes=upto_bytes,
1377             source_object=path4url(self.container, obj))
1378         return r.headers
1379
1380     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1381         """Overwrite a part of an object from local source file
1382
1383         :param obj: (str) remote object path
1384
1385         :param start: (int) position in bytes to start overwriting from
1386
1387         :param end: (int) position in bytes to stop overwriting at
1388
1389         :param source_file: open file descriptor
1390
1391         :param upload_db: progress.bar for uploading
1392         """
1393
1394         r = self.get_object_info(obj)
1395         rf_size = int(r['content-length'])
1396         if rf_size < int(start):
1397             raise ClientError(
1398                 'Range start exceeds file size',
1399                 status=416)
1400         elif rf_size < int(end):
1401             raise ClientError(
1402                 'Range end exceeds file size',
1403                 status=416)
1404         self._assert_container()
1405         meta = self.get_container_info()
1406         blocksize = int(meta['x-container-block-size'])
1407         filesize = fstat(source_file.fileno()).st_size
1408         datasize = int(end) - int(start) + 1
1409         nblocks = 1 + (datasize - 1) // blocksize
1410         offset = 0
1411         if upload_cb:
1412             self.progress_bar_gen = upload_cb(nblocks)
1413             self._cb_next()
1414         headers = []
1415         for i in range(nblocks):
1416             read_size = min(blocksize, filesize - offset, datasize - offset)
1417             block = source_file.read(read_size)
1418             r = self.object_post(
1419                 obj,
1420                 update=True,
1421                 content_type='application/octet-stream',
1422                 content_length=len(block),
1423                 content_range='bytes %s-%s/*' % (
1424                     start + offset,
1425                     start + offset + len(block) - 1),
1426                 data=block)
1427             headers.append(dict(r.headers))
1428             offset += len(block)
1429
1430             self._cb_next
1431         return headers
1432
1433     def copy_object(
1434             self, src_container, src_object, dst_container,
1435             dst_object=None,
1436             source_version=None,
1437             source_account=None,
1438             public=False,
1439             content_type=None,
1440             delimiter=None):
1441         """
1442         :param src_container: (str) source container
1443
1444         :param src_object: (str) source object path
1445
1446         :param dst_container: (str) destination container
1447
1448         :param dst_object: (str) destination object path
1449
1450         :param source_version: (str) source object version
1451
1452         :param source_account: (str) account to copy from
1453
1454         :param public: (bool)
1455
1456         :param content_type: (str)
1457
1458         :param delimiter: (str)
1459
1460         :returns: (dict) response headers
1461         """
1462         self._assert_account()
1463         self.container = dst_container
1464         src_path = path4url(src_container, src_object)
1465         r = self.object_put(
1466             dst_object or src_object,
1467             success=201,
1468             copy_from=src_path,
1469             content_length=0,
1470             source_version=source_version,
1471             source_account=source_account,
1472             public=public,
1473             content_type=content_type,
1474             delimiter=delimiter)
1475         return r.headers
1476
1477     def move_object(
1478             self, src_container, src_object, dst_container,
1479             dst_object=False,
1480             source_account=None,
1481             source_version=None,
1482             public=False,
1483             content_type=None,
1484             delimiter=None):
1485         """
1486         :param src_container: (str) source container
1487
1488         :param src_object: (str) source object path
1489
1490         :param dst_container: (str) destination container
1491
1492         :param dst_object: (str) destination object path
1493
1494         :param source_account: (str) account to move from
1495
1496         :param source_version: (str) source object version
1497
1498         :param public: (bool)
1499
1500         :param content_type: (str)
1501
1502         :param delimiter: (str)
1503
1504         :returns: (dict) response headers
1505         """
1506         self._assert_account()
1507         self.container = dst_container
1508         dst_object = dst_object or src_object
1509         src_path = path4url(src_container, src_object)
1510         r = self.object_put(
1511             dst_object,
1512             success=201,
1513             move_from=src_path,
1514             content_length=0,
1515             source_account=source_account,
1516             source_version=source_version,
1517             public=public,
1518             content_type=content_type,
1519             delimiter=delimiter)
1520         return r.headers
1521
1522     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1523         """Get accounts that share with self.account
1524
1525         :param limit: (str)
1526
1527         :param marker: (str)
1528
1529         :returns: (dict)
1530         """
1531         self._assert_account()
1532
1533         self.set_param('format', 'json')
1534         self.set_param('limit', limit, iff=limit is not None)
1535         self.set_param('marker', marker, iff=marker is not None)
1536
1537         path = ''
1538         success = kwargs.pop('success', (200, 204))
1539         r = self.get(path, *args, success=success, **kwargs)
1540         return r.json
1541
1542     def get_object_versionlist(self, obj):
1543         """
1544         :param obj: (str) remote object path
1545
1546         :returns: (list)
1547         """
1548         self._assert_container()
1549         r = self.object_get(obj, format='json', version='list')
1550         return r.json['versions']