Add source version in file overwrite
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, max_value, a_range):
56     """
57     :param start: (int) the window bottom
58
59     :param end: (int) the window top
60
61     :param max_value: (int) maximum accepted value
62
63     :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64         where X: x|x-y|-x where x < y and x, y natural numbers
65
66     :returns: (str) a range string cut-off for the start-end range
67         an empty response means this window is out of range
68     """
69     assert start >= 0, '_range_up called w. start(%s) < 0' % start
70     assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71         end, start)
72     assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73         max_value, end)
74     if not a_range:
75         return '%s-%s' % (start, end)
76     selected = []
77     for some_range in a_range.split(','):
78         v0, sep, v1 = some_range.partition('-')
79         if v0:
80             v0 = int(v0)
81             if sep:
82                 v1 = int(v1)
83                 if v1 < start or v0 > end or v1 < v0:
84                     continue
85                 v0 = v0 if v0 > start else start
86                 v1 = v1 if v1 < end else end
87                 selected.append('%s-%s' % (v0, v1))
88             elif v0 < start:
89                 continue
90             else:
91                 v1 = v0 if v0 <= end else end
92                 selected.append('%s-%s' % (start, v1))
93         else:
94             v1 = int(v1)
95             if max_value - v1 > end:
96                 continue
97             v0 = (max_value - v1) if max_value - v1 > start else start
98             selected.append('%s-%s' % (v0, end))
99     return ','.join(selected)
100
101
102 class PithosClient(PithosRestClient):
103     """Synnefo Pithos+ API client"""
104
105     def __init__(self, base_url, token, account=None, container=None):
106         super(PithosClient, self).__init__(base_url, token, account, container)
107
108     def create_container(
109             self,
110             container=None, sizelimit=None, versioning=None, metadata=None,
111             **kwargs):
112         """
113         :param container: (str) if not given, self.container is used instead
114
115         :param sizelimit: (int) container total size limit in bytes
116
117         :param versioning: (str) can be auto or whatever supported by server
118
119         :param metadata: (dict) Custom user-defined metadata of the form
120             { 'name1': 'value1', 'name2': 'value2', ... }
121
122         :returns: (dict) response headers
123         """
124         cnt_back_up = self.container
125         try:
126             self.container = container or cnt_back_up
127             r = self.container_put(
128                 quota=sizelimit, versioning=versioning, metadata=metadata,
129                 **kwargs)
130             return r.headers
131         finally:
132             self.container = cnt_back_up
133
134     def purge_container(self, container=None):
135         """Delete an empty container and destroy associated blocks"""
136         cnt_back_up = self.container
137         try:
138             self.container = container or cnt_back_up
139             r = self.container_delete(until=unicode(time()))
140         finally:
141             self.container = cnt_back_up
142         return r.headers
143
144     def upload_object_unchunked(
145             self, obj, f,
146             withHashFile=False,
147             size=None,
148             etag=None,
149             content_encoding=None,
150             content_disposition=None,
151             content_type=None,
152             sharing=None,
153             public=None):
154         """
155         :param obj: (str) remote object path
156
157         :param f: open file descriptor
158
159         :param withHashFile: (bool)
160
161         :param size: (int) size of data to upload
162
163         :param etag: (str)
164
165         :param content_encoding: (str)
166
167         :param content_disposition: (str)
168
169         :param content_type: (str)
170
171         :param sharing: {'read':[user and/or grp names],
172             'write':[usr and/or grp names]}
173
174         :param public: (bool)
175
176         :returns: (dict) created object metadata
177         """
178         self._assert_container()
179
180         if withHashFile:
181             data = f.read()
182             try:
183                 import json
184                 data = json.dumps(json.loads(data))
185             except ValueError:
186                 raise ClientError('"%s" is not json-formated' % f.name, 1)
187             except SyntaxError:
188                 msg = '"%s" is not a valid hashmap file' % f.name
189                 raise ClientError(msg, 1)
190             f = StringIO(data)
191         else:
192             data = readall(f, size) if size else f.read()
193         r = self.object_put(
194             obj,
195             data=data,
196             etag=etag,
197             content_encoding=content_encoding,
198             content_disposition=content_disposition,
199             content_type=content_type,
200             permissions=sharing,
201             public=public,
202             success=201)
203         return r.headers
204
205     def create_object_by_manifestation(
206             self, obj,
207             etag=None,
208             content_encoding=None,
209             content_disposition=None,
210             content_type=None,
211             sharing=None,
212             public=None):
213         """
214         :param obj: (str) remote object path
215
216         :param etag: (str)
217
218         :param content_encoding: (str)
219
220         :param content_disposition: (str)
221
222         :param content_type: (str)
223
224         :param sharing: {'read':[user and/or grp names],
225             'write':[usr and/or grp names]}
226
227         :param public: (bool)
228
229         :returns: (dict) created object metadata
230         """
231         self._assert_container()
232         r = self.object_put(
233             obj,
234             content_length=0,
235             etag=etag,
236             content_encoding=content_encoding,
237             content_disposition=content_disposition,
238             content_type=content_type,
239             permissions=sharing,
240             public=public,
241             manifest='%s/%s' % (self.container, obj))
242         return r.headers
243
244     # upload_* auxiliary methods
245     def _put_block_async(self, data, hash):
246         event = SilentEvent(method=self._put_block, data=data, hash=hash)
247         event.start()
248         return event
249
250     def _put_block(self, data, hash):
251         r = self.container_post(
252             update=True,
253             content_type='application/octet-stream',
254             content_length=len(data),
255             data=data,
256             format='json')
257         assert r.json[0] == hash, 'Local hash does not match server'
258
259     def _get_file_block_info(self, fileobj, size=None, cache=None):
260         """
261         :param fileobj: (file descriptor) source
262
263         :param size: (int) size of data to upload from source
264
265         :param cache: (dict) if provided, cache container info response to
266         avoid redundant calls
267         """
268         if isinstance(cache, dict):
269             try:
270                 meta = cache[self.container]
271             except KeyError:
272                 meta = self.get_container_info()
273                 cache[self.container] = meta
274         else:
275             meta = self.get_container_info()
276         blocksize = int(meta['x-container-block-size'])
277         blockhash = meta['x-container-block-hash']
278         size = size if size is not None else fstat(fileobj.fileno()).st_size
279         nblocks = 1 + (size - 1) // blocksize
280         return (blocksize, blockhash, size, nblocks)
281
282     def _create_object_or_get_missing_hashes(
283             self, obj, json,
284             size=None,
285             format='json',
286             hashmap=True,
287             content_type=None,
288             if_etag_match=None,
289             if_etag_not_match=None,
290             content_encoding=None,
291             content_disposition=None,
292             permissions=None,
293             public=None,
294             success=(201, 409)):
295         r = self.object_put(
296             obj,
297             format='json',
298             hashmap=True,
299             content_type=content_type,
300             json=json,
301             if_etag_match=if_etag_match,
302             if_etag_not_match=if_etag_not_match,
303             content_encoding=content_encoding,
304             content_disposition=content_disposition,
305             permissions=permissions,
306             public=public,
307             success=success)
308         return (None if r.status_code == 201 else r.json), r.headers
309
310     def _calculate_blocks_for_upload(
311             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
312             hash_cb=None):
313         offset = 0
314         if hash_cb:
315             hash_gen = hash_cb(nblocks)
316             hash_gen.next()
317
318         for i in xrange(nblocks):
319             block = readall(fileobj, min(blocksize, size - offset))
320             bytes = len(block)
321             if bytes <= 0:
322                 break
323             hash = _pithos_hash(block, blockhash)
324             hashes.append(hash)
325             hmap[hash] = (offset, bytes)
326             offset += bytes
327             if hash_cb:
328                 hash_gen.next()
329         msg = ('Failed to calculate uploading blocks: '
330                'read bytes(%s) != requested size (%s)' % (offset, size))
331         assert offset == size, msg
332
333     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
334         """upload missing blocks asynchronously"""
335
336         self._init_thread_limit()
337
338         flying = []
339         failures = []
340         for hash in missing:
341             offset, bytes = hmap[hash]
342             fileobj.seek(offset)
343             data = readall(fileobj, bytes)
344             r = self._put_block_async(data, hash)
345             flying.append(r)
346             unfinished = self._watch_thread_limit(flying)
347             for thread in set(flying).difference(unfinished):
348                 if thread.exception:
349                     failures.append(thread)
350                     if isinstance(
351                             thread.exception,
352                             ClientError) and thread.exception.status == 502:
353                         self.POOLSIZE = self._thread_limit
354                 elif thread.isAlive():
355                     flying.append(thread)
356                 elif upload_gen:
357                     try:
358                         upload_gen.next()
359                     except:
360                         pass
361             flying = unfinished
362
363         for thread in flying:
364             thread.join()
365             if thread.exception:
366                 failures.append(thread)
367             elif upload_gen:
368                 try:
369                     upload_gen.next()
370                 except:
371                     pass
372
373         return [failure.kwargs['hash'] for failure in failures]
374
375     def upload_object(
376             self, obj, f,
377             size=None,
378             hash_cb=None,
379             upload_cb=None,
380             etag=None,
381             if_etag_match=None,
382             if_not_exist=None,
383             content_encoding=None,
384             content_disposition=None,
385             content_type=None,
386             sharing=None,
387             public=None,
388             container_info_cache=None):
389         """Upload an object using multiple connections (threads)
390
391         :param obj: (str) remote object path
392
393         :param f: open file descriptor (rb)
394
395         :param hash_cb: optional progress.bar object for calculating hashes
396
397         :param upload_cb: optional progress.bar object for uploading
398
399         :param etag: (str)
400
401         :param if_etag_match: (str) Push that value to if-match header at file
402             creation
403
404         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
405             it does not exist remotely, otherwise the operation will fail.
406             Involves the case of an object with the same path is created while
407             the object is being uploaded.
408
409         :param content_encoding: (str)
410
411         :param content_disposition: (str)
412
413         :param content_type: (str)
414
415         :param sharing: {'read':[user and/or grp names],
416             'write':[usr and/or grp names]}
417
418         :param public: (bool)
419
420         :param container_info_cache: (dict) if given, avoid redundant calls to
421             server for container info (block size and hash information)
422         """
423         self._assert_container()
424
425         block_info = (
426             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
427                 f, size, container_info_cache)
428         (hashes, hmap, offset) = ([], {}, 0)
429         if not content_type:
430             content_type = 'application/octet-stream'
431
432         self._calculate_blocks_for_upload(
433             *block_info,
434             hashes=hashes,
435             hmap=hmap,
436             fileobj=f,
437             hash_cb=hash_cb)
438
439         hashmap = dict(bytes=size, hashes=hashes)
440         missing, obj_headers = self._create_object_or_get_missing_hashes(
441             obj, hashmap,
442             content_type=content_type,
443             size=size,
444             if_etag_match=if_etag_match,
445             if_etag_not_match='*' if if_not_exist else None,
446             content_encoding=content_encoding,
447             content_disposition=content_disposition,
448             permissions=sharing,
449             public=public)
450
451         if missing is None:
452             return obj_headers
453
454         if upload_cb:
455             upload_gen = upload_cb(len(missing))
456             for i in range(len(missing), len(hashmap['hashes']) + 1):
457                 try:
458                     upload_gen.next()
459                 except:
460                     upload_gen = None
461         else:
462             upload_gen = None
463
464         retries = 7
465         try:
466             while retries:
467                 sendlog.info('%s blocks missing' % len(missing))
468                 num_of_blocks = len(missing)
469                 missing = self._upload_missing_blocks(
470                     missing,
471                     hmap,
472                     f,
473                     upload_gen)
474                 if missing:
475                     if num_of_blocks == len(missing):
476                         retries -= 1
477                     else:
478                         num_of_blocks = len(missing)
479                 else:
480                     break
481             if missing:
482                 try:
483                     details = ['%s' % thread.exception for thread in missing]
484                 except Exception:
485                     details = ['Also, failed to read thread exceptions']
486                 raise ClientError(
487                     '%s blocks failed to upload' % len(missing),
488                     details=details)
489         except KeyboardInterrupt:
490             sendlog.info('- - - wait for threads to finish')
491             for thread in activethreads():
492                 thread.join()
493             raise
494
495         r = self.object_put(
496             obj,
497             format='json',
498             hashmap=True,
499             content_type=content_type,
500             content_encoding=content_encoding,
501             if_etag_match=if_etag_match,
502             if_etag_not_match='*' if if_not_exist else None,
503             etag=etag,
504             json=hashmap,
505             permissions=sharing,
506             public=public,
507             success=201)
508         return r.headers
509
510     def upload_from_string(
511             self, obj, input_str,
512             hash_cb=None,
513             upload_cb=None,
514             etag=None,
515             if_etag_match=None,
516             if_not_exist=None,
517             content_encoding=None,
518             content_disposition=None,
519             content_type=None,
520             sharing=None,
521             public=None,
522             container_info_cache=None):
523         """Upload an object using multiple connections (threads)
524
525         :param obj: (str) remote object path
526
527         :param input_str: (str) upload content
528
529         :param hash_cb: optional progress.bar object for calculating hashes
530
531         :param upload_cb: optional progress.bar object for uploading
532
533         :param etag: (str)
534
535         :param if_etag_match: (str) Push that value to if-match header at file
536             creation
537
538         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
539             it does not exist remotely, otherwise the operation will fail.
540             Involves the case of an object with the same path is created while
541             the object is being uploaded.
542
543         :param content_encoding: (str)
544
545         :param content_disposition: (str)
546
547         :param content_type: (str)
548
549         :param sharing: {'read':[user and/or grp names],
550             'write':[usr and/or grp names]}
551
552         :param public: (bool)
553
554         :param container_info_cache: (dict) if given, avoid redundant calls to
555             server for container info (block size and hash information)
556         """
557         self._assert_container()
558
559         blocksize, blockhash, size, nblocks = self._get_file_block_info(
560                 fileobj=None, size=len(input_str), cache=container_info_cache)
561         (hashes, hmap, offset) = ([], {}, 0)
562         if not content_type:
563             content_type = 'application/octet-stream'
564
565         hashes = []
566         hmap = {}
567         for blockid in range(nblocks):
568             start = blockid * blocksize
569             block = input_str[start: (start + blocksize)]
570             hashes.append(_pithos_hash(block, blockhash))
571             hmap[hashes[blockid]] = (start, block)
572
573         hashmap = dict(bytes=size, hashes=hashes)
574         missing, obj_headers = self._create_object_or_get_missing_hashes(
575             obj, hashmap,
576             content_type=content_type,
577             size=size,
578             if_etag_match=if_etag_match,
579             if_etag_not_match='*' if if_not_exist else None,
580             content_encoding=content_encoding,
581             content_disposition=content_disposition,
582             permissions=sharing,
583             public=public)
584         if missing is None:
585             return obj_headers
586         num_of_missing = len(missing)
587
588         if upload_cb:
589             self.progress_bar_gen = upload_cb(nblocks)
590             for i in range(nblocks + 1 - num_of_missing):
591                 self._cb_next()
592
593         tries = 7
594         old_failures = 0
595         try:
596             while tries and missing:
597                 flying = []
598                 failures = []
599                 for hash in missing:
600                     offset, block = hmap[hash]
601                     bird = self._put_block_async(block, hash)
602                     flying.append(bird)
603                     unfinished = self._watch_thread_limit(flying)
604                     for thread in set(flying).difference(unfinished):
605                         if thread.exception:
606                             failures.append(thread.kwargs['hash'])
607                         if thread.isAlive():
608                             flying.append(thread)
609                         else:
610                             self._cb_next()
611                     flying = unfinished
612                 for thread in flying:
613                     thread.join()
614                     if thread.exception:
615                         failures.append(thread.kwargs['hash'])
616                     self._cb_next()
617                 missing = failures
618                 if missing and len(missing) == old_failures:
619                     tries -= 1
620                 old_failures = len(missing)
621             if missing:
622                 raise ClientError('%s blocks failed to upload' % len(missing))
623         except KeyboardInterrupt:
624             sendlog.info('- - - wait for threads to finish')
625             for thread in activethreads():
626                 thread.join()
627             raise
628
629         r = self.object_put(
630             obj,
631             format='json',
632             hashmap=True,
633             content_type=content_type,
634             content_encoding=content_encoding,
635             if_etag_match=if_etag_match,
636             if_etag_not_match='*' if if_not_exist else None,
637             etag=etag,
638             json=hashmap,
639             permissions=sharing,
640             public=public,
641             success=201)
642         return r.headers
643
644     # download_* auxiliary methods
645     def _get_remote_blocks_info(self, obj, **restargs):
646         #retrieve object hashmap
647         myrange = restargs.pop('data_range', None)
648         hashmap = self.get_object_hashmap(obj, **restargs)
649         restargs['data_range'] = myrange
650         blocksize = int(hashmap['block_size'])
651         blockhash = hashmap['block_hash']
652         total_size = hashmap['bytes']
653         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
654         map_dict = {}
655         for i, h in enumerate(hashmap['hashes']):
656             #  map_dict[h] = i   CHAGE
657             if h in map_dict:
658                 map_dict[h].append(i)
659             else:
660                 map_dict[h] = [i]
661         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
662
663     def _dump_blocks_sync(
664             self, obj, remote_hashes, blocksize, total_size, dst, crange,
665             **args):
666         if not total_size:
667             return
668         for blockid, blockhash in enumerate(remote_hashes):
669             if blockhash:
670                 start = blocksize * blockid
671                 is_last = start + blocksize > total_size
672                 end = (total_size - 1) if is_last else (start + blocksize - 1)
673                 data_range = _range_up(start, end, total_size, crange)
674                 if not data_range:
675                     self._cb_next()
676                     continue
677                 args['data_range'] = 'bytes=%s' % data_range
678                 r = self.object_get(obj, success=(200, 206), **args)
679                 self._cb_next()
680                 dst.write(r.content)
681                 dst.flush()
682
683     def _get_block_async(self, obj, **args):
684         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
685         event.start()
686         return event
687
688     def _hash_from_file(self, fp, start, size, blockhash):
689         fp.seek(start)
690         block = readall(fp, size)
691         h = newhashlib(blockhash)
692         h.update(block.strip('\x00'))
693         return hexlify(h.digest())
694
695     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
696         """write the results of a greenleted rest call to a file
697
698         :param offset: the offset of the file up to blocksize
699         - e.g. if the range is 10-100, all blocks will be written to
700         normal_position - 10
701         """
702         for key, g in flying.items():
703             if g.isAlive():
704                 continue
705             if g.exception:
706                 raise g.exception
707             block = g.value.content
708             for block_start in blockids[key]:
709                 local_file.seek(block_start + offset)
710                 local_file.write(block)
711                 self._cb_next()
712             flying.pop(key)
713             blockids.pop(key)
714         local_file.flush()
715
716     def _dump_blocks_async(
717             self, obj, remote_hashes, blocksize, total_size, local_file,
718             blockhash=None, resume=False, filerange=None, **restargs):
719         file_size = fstat(local_file.fileno()).st_size if resume else 0
720         flying = dict()
721         blockid_dict = dict()
722         offset = 0
723
724         self._init_thread_limit()
725         for block_hash, blockids in remote_hashes.items():
726             blockids = [blk * blocksize for blk in blockids]
727             unsaved = [blk for blk in blockids if not (
728                 blk < file_size and block_hash == self._hash_from_file(
729                         local_file, blk, blocksize, blockhash))]
730             self._cb_next(len(blockids) - len(unsaved))
731             if unsaved:
732                 key = unsaved[0]
733                 self._watch_thread_limit(flying.values())
734                 self._thread2file(
735                     flying, blockid_dict, local_file, offset,
736                     **restargs)
737                 end = total_size - 1 if (
738                     key + blocksize > total_size) else key + blocksize - 1
739                 if end < key:
740                     self._cb_next()
741                     continue
742                 data_range = _range_up(key, end, total_size, filerange)
743                 if not data_range:
744                     self._cb_next()
745                     continue
746                 restargs[
747                     'async_headers'] = {'Range': 'bytes=%s' % data_range}
748                 flying[key] = self._get_block_async(obj, **restargs)
749                 blockid_dict[key] = unsaved
750
751         for thread in flying.values():
752             thread.join()
753         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
754
755     def download_object(
756             self, obj, dst,
757             download_cb=None,
758             version=None,
759             resume=False,
760             range_str=None,
761             if_match=None,
762             if_none_match=None,
763             if_modified_since=None,
764             if_unmodified_since=None):
765         """Download an object (multiple connections, random blocks)
766
767         :param obj: (str) remote object path
768
769         :param dst: open file descriptor (wb+)
770
771         :param download_cb: optional progress.bar object for downloading
772
773         :param version: (str) file version
774
775         :param resume: (bool) if set, preserve already downloaded file parts
776
777         :param range_str: (str) from, to are file positions (int) in bytes
778
779         :param if_match: (str)
780
781         :param if_none_match: (str)
782
783         :param if_modified_since: (str) formated date
784
785         :param if_unmodified_since: (str) formated date"""
786         restargs = dict(
787             version=version,
788             data_range=None if range_str is None else 'bytes=%s' % range_str,
789             if_match=if_match,
790             if_none_match=if_none_match,
791             if_modified_since=if_modified_since,
792             if_unmodified_since=if_unmodified_since)
793
794         (
795             blocksize,
796             blockhash,
797             total_size,
798             hash_list,
799             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
800         assert total_size >= 0
801
802         if download_cb:
803             self.progress_bar_gen = download_cb(len(hash_list))
804             self._cb_next()
805
806         if dst.isatty():
807             self._dump_blocks_sync(
808                 obj,
809                 hash_list,
810                 blocksize,
811                 total_size,
812                 dst,
813                 range_str,
814                 **restargs)
815         else:
816             self._dump_blocks_async(
817                 obj,
818                 remote_hashes,
819                 blocksize,
820                 total_size,
821                 dst,
822                 blockhash,
823                 resume,
824                 range_str,
825                 **restargs)
826             if not range_str:
827                 dst.truncate(total_size)
828
829         self._complete_cb()
830
831     def download_to_string(
832             self, obj,
833             download_cb=None,
834             version=None,
835             range_str=None,
836             if_match=None,
837             if_none_match=None,
838             if_modified_since=None,
839             if_unmodified_since=None):
840         """Download an object to a string (multiple connections). This method
841         uses threads for http requests, but stores all content in memory.
842
843         :param obj: (str) remote object path
844
845         :param download_cb: optional progress.bar object for downloading
846
847         :param version: (str) file version
848
849         :param range_str: (str) from, to are file positions (int) in bytes
850
851         :param if_match: (str)
852
853         :param if_none_match: (str)
854
855         :param if_modified_since: (str) formated date
856
857         :param if_unmodified_since: (str) formated date
858
859         :returns: (str) the whole object contents
860         """
861         restargs = dict(
862             version=version,
863             data_range=None if range_str is None else 'bytes=%s' % range_str,
864             if_match=if_match,
865             if_none_match=if_none_match,
866             if_modified_since=if_modified_since,
867             if_unmodified_since=if_unmodified_since)
868
869         (
870             blocksize,
871             blockhash,
872             total_size,
873             hash_list,
874             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
875         assert total_size >= 0
876
877         if download_cb:
878             self.progress_bar_gen = download_cb(len(hash_list))
879             self._cb_next()
880
881         num_of_blocks = len(remote_hashes)
882         ret = [''] * num_of_blocks
883         self._init_thread_limit()
884         flying = dict()
885         try:
886             for blockid, blockhash in enumerate(remote_hashes):
887                 start = blocksize * blockid
888                 is_last = start + blocksize > total_size
889                 end = (total_size - 1) if is_last else (start + blocksize - 1)
890                 data_range_str = _range_up(start, end, end, range_str)
891                 if data_range_str:
892                     self._watch_thread_limit(flying.values())
893                     restargs['data_range'] = 'bytes=%s' % data_range_str
894                     flying[blockid] = self._get_block_async(obj, **restargs)
895                 for runid, thread in flying.items():
896                     if (blockid + 1) == num_of_blocks:
897                         thread.join()
898                     elif thread.isAlive():
899                         continue
900                     if thread.exception:
901                         raise thread.exception
902                     ret[runid] = thread.value.content
903                     self._cb_next()
904                     flying.pop(runid)
905             return ''.join(ret)
906         except KeyboardInterrupt:
907             sendlog.info('- - - wait for threads to finish')
908             for thread in activethreads():
909                 thread.join()
910
911     #Command Progress Bar method
912     def _cb_next(self, step=1):
913         if hasattr(self, 'progress_bar_gen'):
914             try:
915                 for i in xrange(step):
916                     self.progress_bar_gen.next()
917             except:
918                 pass
919
920     def _complete_cb(self):
921         while True:
922             try:
923                 self.progress_bar_gen.next()
924             except:
925                 break
926
927     def get_object_hashmap(
928             self, obj,
929             version=None,
930             if_match=None,
931             if_none_match=None,
932             if_modified_since=None,
933             if_unmodified_since=None):
934         """
935         :param obj: (str) remote object path
936
937         :param if_match: (str)
938
939         :param if_none_match: (str)
940
941         :param if_modified_since: (str) formated date
942
943         :param if_unmodified_since: (str) formated date
944
945         :returns: (list)
946         """
947         try:
948             r = self.object_get(
949                 obj,
950                 hashmap=True,
951                 version=version,
952                 if_etag_match=if_match,
953                 if_etag_not_match=if_none_match,
954                 if_modified_since=if_modified_since,
955                 if_unmodified_since=if_unmodified_since)
956         except ClientError as err:
957             if err.status == 304 or err.status == 412:
958                 return {}
959             raise
960         return r.json
961
962     def set_account_group(self, group, usernames):
963         """
964         :param group: (str)
965
966         :param usernames: (list)
967         """
968         r = self.account_post(update=True, groups={group: usernames})
969         return r
970
971     def del_account_group(self, group):
972         """
973         :param group: (str)
974         """
975         self.account_post(update=True, groups={group: []})
976
977     def get_account_info(self, until=None):
978         """
979         :param until: (str) formated date
980
981         :returns: (dict)
982         """
983         r = self.account_head(until=until)
984         if r.status_code == 401:
985             raise ClientError("No authorization", status=401)
986         return r.headers
987
988     def get_account_quota(self):
989         """
990         :returns: (dict)
991         """
992         return filter_in(
993             self.get_account_info(),
994             'X-Account-Policy-Quota',
995             exactMatch=True)
996
997     #def get_account_versioning(self):
998     #    """
999     #    :returns: (dict)
1000     #    """
1001     #    return filter_in(
1002     #        self.get_account_info(),
1003     #        'X-Account-Policy-Versioning',
1004     #        exactMatch=True)
1005
1006     def get_account_meta(self, until=None):
1007         """
1008         :param until: (str) formated date
1009
1010         :returns: (dict)
1011         """
1012         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1013
1014     def get_account_group(self):
1015         """
1016         :returns: (dict)
1017         """
1018         return filter_in(self.get_account_info(), 'X-Account-Group-')
1019
1020     def set_account_meta(self, metapairs):
1021         """
1022         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1023         """
1024         assert(type(metapairs) is dict)
1025         r = self.account_post(update=True, metadata=metapairs)
1026         return r.headers
1027
1028     def del_account_meta(self, metakey):
1029         """
1030         :param metakey: (str) metadatum key
1031         """
1032         r = self.account_post(update=True, metadata={metakey: ''})
1033         return r.headers
1034
1035     #def set_account_quota(self, quota):
1036     #    """
1037     #    :param quota: (int)
1038     #    """
1039     #    self.account_post(update=True, quota=quota)
1040
1041     #def set_account_versioning(self, versioning):
1042     #    """
1043     #    :param versioning: (str)
1044     #    """
1045     #    r = self.account_post(update=True, versioning=versioning)
1046     #    return r.headers
1047
1048     def list_containers(self):
1049         """
1050         :returns: (dict)
1051         """
1052         r = self.account_get()
1053         return r.json
1054
1055     def del_container(self, until=None, delimiter=None):
1056         """
1057         :param until: (str) formated date
1058
1059         :param delimiter: (str) with / empty container
1060
1061         :raises ClientError: 404 Container does not exist
1062
1063         :raises ClientError: 409 Container is not empty
1064         """
1065         self._assert_container()
1066         r = self.container_delete(
1067             until=until,
1068             delimiter=delimiter,
1069             success=(204, 404, 409))
1070         if r.status_code == 404:
1071             raise ClientError(
1072                 'Container "%s" does not exist' % self.container,
1073                 r.status_code)
1074         elif r.status_code == 409:
1075             raise ClientError(
1076                 'Container "%s" is not empty' % self.container,
1077                 r.status_code)
1078         return r.headers
1079
1080     def get_container_versioning(self, container=None):
1081         """
1082         :param container: (str)
1083
1084         :returns: (dict)
1085         """
1086         cnt_back_up = self.container
1087         try:
1088             self.container = container or cnt_back_up
1089             return filter_in(
1090                 self.get_container_info(),
1091                 'X-Container-Policy-Versioning')
1092         finally:
1093             self.container = cnt_back_up
1094
1095     def get_container_limit(self, container=None):
1096         """
1097         :param container: (str)
1098
1099         :returns: (dict)
1100         """
1101         cnt_back_up = self.container
1102         try:
1103             self.container = container or cnt_back_up
1104             return filter_in(
1105                 self.get_container_info(),
1106                 'X-Container-Policy-Quota')
1107         finally:
1108             self.container = cnt_back_up
1109
1110     def get_container_info(self, container=None, until=None):
1111         """
1112         :param until: (str) formated date
1113
1114         :returns: (dict)
1115
1116         :raises ClientError: 404 Container not found
1117         """
1118         bck_cont = self.container
1119         try:
1120             self.container = container or bck_cont
1121             self._assert_container()
1122             r = self.container_head(until=until)
1123         except ClientError as err:
1124             err.details.append('for container %s' % self.container)
1125             raise err
1126         finally:
1127             self.container = bck_cont
1128         return r.headers
1129
1130     def get_container_meta(self, until=None):
1131         """
1132         :param until: (str) formated date
1133
1134         :returns: (dict)
1135         """
1136         return filter_in(
1137             self.get_container_info(until=until), 'X-Container-Meta')
1138
1139     def get_container_object_meta(self, until=None):
1140         """
1141         :param until: (str) formated date
1142
1143         :returns: (dict)
1144         """
1145         return filter_in(
1146             self.get_container_info(until=until), 'X-Container-Object-Meta')
1147
1148     def set_container_meta(self, metapairs):
1149         """
1150         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1151         """
1152         assert(type(metapairs) is dict)
1153         r = self.container_post(update=True, metadata=metapairs)
1154         return r.headers
1155
1156     def del_container_meta(self, metakey):
1157         """
1158         :param metakey: (str) metadatum key
1159
1160         :returns: (dict) response headers
1161         """
1162         r = self.container_post(update=True, metadata={metakey: ''})
1163         return r.headers
1164
1165     def set_container_limit(self, limit):
1166         """
1167         :param limit: (int)
1168         """
1169         r = self.container_post(update=True, quota=limit)
1170         return r.headers
1171
1172     def set_container_versioning(self, versioning):
1173         """
1174         :param versioning: (str)
1175         """
1176         r = self.container_post(update=True, versioning=versioning)
1177         return r.headers
1178
1179     def del_object(self, obj, until=None, delimiter=None):
1180         """
1181         :param obj: (str) remote object path
1182
1183         :param until: (str) formated date
1184
1185         :param delimiter: (str)
1186         """
1187         self._assert_container()
1188         r = self.object_delete(obj, until=until, delimiter=delimiter)
1189         return r.headers
1190
1191     def set_object_meta(self, obj, metapairs):
1192         """
1193         :param obj: (str) remote object path
1194
1195         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1196         """
1197         assert(type(metapairs) is dict)
1198         r = self.object_post(obj, update=True, metadata=metapairs)
1199         return r.headers
1200
1201     def del_object_meta(self, obj, metakey):
1202         """
1203         :param obj: (str) remote object path
1204
1205         :param metakey: (str) metadatum key
1206         """
1207         r = self.object_post(obj, update=True, metadata={metakey: ''})
1208         return r.headers
1209
1210     def publish_object(self, obj):
1211         """
1212         :param obj: (str) remote object path
1213
1214         :returns: (str) access url
1215         """
1216         self.object_post(obj, update=True, public=True)
1217         info = self.get_object_info(obj)
1218         return info['x-object-public']
1219         pref, sep, rest = self.base_url.partition('//')
1220         base = rest.split('/')[0]
1221         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1222
1223     def unpublish_object(self, obj):
1224         """
1225         :param obj: (str) remote object path
1226         """
1227         r = self.object_post(obj, update=True, public=False)
1228         return r.headers
1229
1230     def get_object_info(self, obj, version=None):
1231         """
1232         :param obj: (str) remote object path
1233
1234         :param version: (str)
1235
1236         :returns: (dict)
1237         """
1238         try:
1239             r = self.object_head(obj, version=version)
1240             return r.headers
1241         except ClientError as ce:
1242             if ce.status == 404:
1243                 raise ClientError('Object %s not found' % obj, status=404)
1244             raise
1245
1246     def get_object_meta(self, obj, version=None):
1247         """
1248         :param obj: (str) remote object path
1249
1250         :param version: (str)
1251
1252         :returns: (dict)
1253         """
1254         return filter_in(
1255             self.get_object_info(obj, version=version),
1256             'X-Object-Meta')
1257
1258     def get_object_sharing(self, obj):
1259         """
1260         :param obj: (str) remote object path
1261
1262         :returns: (dict)
1263         """
1264         r = filter_in(
1265             self.get_object_info(obj),
1266             'X-Object-Sharing',
1267             exactMatch=True)
1268         reply = {}
1269         if len(r) > 0:
1270             perms = r['x-object-sharing'].split(';')
1271             for perm in perms:
1272                 try:
1273                     perm.index('=')
1274                 except ValueError:
1275                     raise ClientError('Incorrect reply format')
1276                 (key, val) = perm.strip().split('=')
1277                 reply[key] = val
1278         return reply
1279
1280     def set_object_sharing(
1281             self, obj,
1282             read_permission=False, write_permission=False):
1283         """Give read/write permisions to an object.
1284
1285         :param obj: (str) remote object path
1286
1287         :param read_permission: (list - bool) users and user groups that get
1288             read permission for this object - False means all previous read
1289             permissions will be removed
1290
1291         :param write_permission: (list - bool) of users and user groups to get
1292            write permission for this object - False means all previous write
1293            permissions will be removed
1294
1295         :returns: (dict) response headers
1296         """
1297
1298         perms = dict(read=read_permission or '', write=write_permission or '')
1299         r = self.object_post(obj, update=True, permissions=perms)
1300         return r.headers
1301
1302     def del_object_sharing(self, obj):
1303         """
1304         :param obj: (str) remote object path
1305         """
1306         return self.set_object_sharing(obj)
1307
1308     def append_object(self, obj, source_file, upload_cb=None):
1309         """
1310         :param obj: (str) remote object path
1311
1312         :param source_file: open file descriptor
1313
1314         :param upload_db: progress.bar for uploading
1315         """
1316         self._assert_container()
1317         meta = self.get_container_info()
1318         blocksize = int(meta['x-container-block-size'])
1319         filesize = fstat(source_file.fileno()).st_size
1320         nblocks = 1 + (filesize - 1) // blocksize
1321         offset = 0
1322         headers = {}
1323         if upload_cb:
1324             self.progress_bar_gen = upload_cb(nblocks)
1325             self._cb_next()
1326         flying = {}
1327         self._init_thread_limit()
1328         try:
1329             for i in range(nblocks):
1330                 block = source_file.read(min(blocksize, filesize - offset))
1331                 offset += len(block)
1332
1333                 self._watch_thread_limit(flying.values())
1334                 unfinished = {}
1335                 flying[i] = SilentEvent(
1336                     method=self.object_post,
1337                     obj=obj,
1338                     update=True,
1339                     content_range='bytes */*',
1340                     content_type='application/octet-stream',
1341                     content_length=len(block),
1342                     data=block)
1343                 flying[i].start()
1344
1345                 for key, thread in flying.items():
1346                     if thread.isAlive():
1347                         if i < nblocks:
1348                             unfinished[key] = thread
1349                             continue
1350                         thread.join()
1351                     if thread.exception:
1352                         raise thread.exception
1353                     headers[key] = thread.value.headers
1354                     self._cb_next()
1355                 flying = unfinished
1356         except KeyboardInterrupt:
1357             sendlog.info('- - - wait for threads to finish')
1358             for thread in activethreads():
1359                 thread.join()
1360         finally:
1361             from time import sleep
1362             sleep(2 * len(activethreads()))
1363         return headers.values()
1364
1365     def truncate_object(self, obj, upto_bytes):
1366         """
1367         :param obj: (str) remote object path
1368
1369         :param upto_bytes: max number of bytes to leave on file
1370
1371         :returns: (dict) response headers
1372         """
1373         ctype = self.get_object_info(obj)['content-type']
1374         r = self.object_post(
1375             obj,
1376             update=True,
1377             content_range='bytes 0-%s/*' % upto_bytes,
1378             content_type=ctype,
1379             object_bytes=upto_bytes,
1380             source_object=path4url(self.container, obj))
1381         return r.headers
1382
1383     def overwrite_object(
1384             self, obj, start, end, source_file,
1385             source_version=None, upload_cb=None):
1386         """Overwrite a part of an object from local source file
1387         ATTENTION: content_type must always be application/octet-stream
1388
1389         :param obj: (str) remote object path
1390
1391         :param start: (int) position in bytes to start overwriting from
1392
1393         :param end: (int) position in bytes to stop overwriting at
1394
1395         :param source_file: open file descriptor
1396
1397         :param upload_db: progress.bar for uploading
1398         """
1399
1400         self._assert_container()
1401         r = self.get_object_info(obj, version=source_version)
1402         rf_size = int(r['content-length'])
1403         start, end = int(start), int(end)
1404         assert rf_size >= start, 'Range start %s exceeds file size %s' % (
1405             start, rf_size)
1406         meta = self.get_container_info()
1407         blocksize = int(meta['x-container-block-size'])
1408         filesize = fstat(source_file.fileno()).st_size
1409         datasize = end - start + 1
1410         nblocks = 1 + (datasize - 1) // blocksize
1411         offset = 0
1412         if upload_cb:
1413             self.progress_bar_gen = upload_cb(nblocks)
1414             self._cb_next()
1415         headers = []
1416         for i in range(nblocks):
1417             read_size = min(blocksize, filesize - offset, datasize - offset)
1418             block = source_file.read(read_size)
1419             r = self.object_post(
1420                 obj,
1421                 update=True,
1422                 content_type='application/octet-stream',
1423                 content_length=len(block),
1424                 content_range='bytes %s-%s/*' % (
1425                     start + offset,
1426                     start + offset + len(block) - 1),
1427                 source_version=source_version,
1428                 data=block)
1429             headers.append(dict(r.headers))
1430             offset += len(block)
1431
1432             self._cb_next
1433         return headers
1434
1435     def copy_object(
1436             self, src_container, src_object, dst_container,
1437             dst_object=None,
1438             source_version=None,
1439             source_account=None,
1440             public=False,
1441             content_type=None,
1442             delimiter=None):
1443         """
1444         :param src_container: (str) source container
1445
1446         :param src_object: (str) source object path
1447
1448         :param dst_container: (str) destination container
1449
1450         :param dst_object: (str) destination object path
1451
1452         :param source_version: (str) source object version
1453
1454         :param source_account: (str) account to copy from
1455
1456         :param public: (bool)
1457
1458         :param content_type: (str)
1459
1460         :param delimiter: (str)
1461
1462         :returns: (dict) response headers
1463         """
1464         self._assert_account()
1465         self.container = dst_container
1466         src_path = path4url(src_container, src_object)
1467         r = self.object_put(
1468             dst_object or src_object,
1469             success=201,
1470             copy_from=src_path,
1471             content_length=0,
1472             source_version=source_version,
1473             source_account=source_account,
1474             public=public,
1475             content_type=content_type,
1476             delimiter=delimiter)
1477         return r.headers
1478
1479     def move_object(
1480             self, src_container, src_object, dst_container,
1481             dst_object=False,
1482             source_account=None,
1483             source_version=None,
1484             public=False,
1485             content_type=None,
1486             delimiter=None):
1487         """
1488         :param src_container: (str) source container
1489
1490         :param src_object: (str) source object path
1491
1492         :param dst_container: (str) destination container
1493
1494         :param dst_object: (str) destination object path
1495
1496         :param source_account: (str) account to move from
1497
1498         :param source_version: (str) source object version
1499
1500         :param public: (bool)
1501
1502         :param content_type: (str)
1503
1504         :param delimiter: (str)
1505
1506         :returns: (dict) response headers
1507         """
1508         self._assert_account()
1509         self.container = dst_container
1510         dst_object = dst_object or src_object
1511         src_path = path4url(src_container, src_object)
1512         r = self.object_put(
1513             dst_object,
1514             success=201,
1515             move_from=src_path,
1516             content_length=0,
1517             source_account=source_account,
1518             source_version=source_version,
1519             public=public,
1520             content_type=content_type,
1521             delimiter=delimiter)
1522         return r.headers
1523
1524     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1525         """Get accounts that share with self.account
1526
1527         :param limit: (str)
1528
1529         :param marker: (str)
1530
1531         :returns: (dict)
1532         """
1533         self._assert_account()
1534
1535         self.set_param('format', 'json')
1536         self.set_param('limit', limit, iff=limit is not None)
1537         self.set_param('marker', marker, iff=marker is not None)
1538
1539         path = ''
1540         success = kwargs.pop('success', (200, 204))
1541         r = self.get(path, *args, success=success, **kwargs)
1542         return r.json
1543
1544     def get_object_versionlist(self, obj):
1545         """
1546         :param obj: (str) remote object path
1547
1548         :returns: (list)
1549         """
1550         self._assert_container()
1551         r = self.object_get(obj, format='json', version='list')
1552         return r.json['versions']