Minor typos
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, max_value, a_range):
56     """
57     :param start: (int) the window bottom
58
59     :param end: (int) the window top
60
61     :param max_value: (int) maximum accepted value
62
63     :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64         where X: x|x-y|-x where x < y and x, y natural numbers
65
66     :returns: (str) a range string cut-off for the start-end range
67         an empty response means this window is out of range
68     """
69     assert start >= 0, '_range_up called w. start(%s) < 0' % start
70     assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71         end, start)
72     assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73         max_value, end)
74     if not a_range:
75         return '%s-%s' % (start, end)
76     selected = []
77     for some_range in a_range.split(','):
78         v0, sep, v1 = some_range.partition('-')
79         if v0:
80             v0 = int(v0)
81             if sep:
82                 v1 = int(v1)
83                 if v1 < start or v0 > end or v1 < v0:
84                     continue
85                 v0 = v0 if v0 > start else start
86                 v1 = v1 if v1 < end else end
87                 selected.append('%s-%s' % (v0, v1))
88             elif v0 < start:
89                 continue
90             else:
91                 v1 = v0 if v0 <= end else end
92                 selected.append('%s-%s' % (start, v1))
93         else:
94             v1 = int(v1)
95             if max_value - v1 > end:
96                 continue
97             v0 = (max_value - v1) if max_value - v1 > start else start
98             selected.append('%s-%s' % (v0, end))
99     return ','.join(selected)
100
101
102 class PithosClient(PithosRestClient):
103     """Synnefo Pithos+ API client"""
104
105     def __init__(self, base_url, token, account=None, container=None):
106         super(PithosClient, self).__init__(base_url, token, account, container)
107
108     def create_container(
109             self,
110             container=None, sizelimit=None, versioning=None, metadata=None,
111             **kwargs):
112         """
113         :param container: (str) if not given, self.container is used instead
114
115         :param sizelimit: (int) container total size limit in bytes
116
117         :param versioning: (str) can be auto or whatever supported by server
118
119         :param metadata: (dict) Custom user-defined metadata of the form
120             { 'name1': 'value1', 'name2': 'value2', ... }
121
122         :returns: (dict) response headers
123         """
124         cnt_back_up = self.container
125         try:
126             self.container = container or cnt_back_up
127             r = self.container_put(
128                 quota=sizelimit, versioning=versioning, metadata=metadata,
129                 **kwargs)
130             return r.headers
131         finally:
132             self.container = cnt_back_up
133
134     def purge_container(self, container=None):
135         """Delete an empty container and destroy associated blocks"""
136         cnt_back_up = self.container
137         try:
138             self.container = container or cnt_back_up
139             r = self.container_delete(until=unicode(time()))
140         finally:
141             self.container = cnt_back_up
142         return r.headers
143
144     def upload_object_unchunked(
145             self, obj, f,
146             withHashFile=False,
147             size=None,
148             etag=None,
149             content_encoding=None,
150             content_disposition=None,
151             content_type=None,
152             sharing=None,
153             public=None):
154         """
155         :param obj: (str) remote object path
156
157         :param f: open file descriptor
158
159         :param withHashFile: (bool)
160
161         :param size: (int) size of data to upload
162
163         :param etag: (str)
164
165         :param content_encoding: (str)
166
167         :param content_disposition: (str)
168
169         :param content_type: (str)
170
171         :param sharing: {'read':[user and/or grp names],
172             'write':[usr and/or grp names]}
173
174         :param public: (bool)
175
176         :returns: (dict) created object metadata
177         """
178         self._assert_container()
179
180         if withHashFile:
181             data = f.read()
182             try:
183                 import json
184                 data = json.dumps(json.loads(data))
185             except ValueError:
186                 raise ClientError('"%s" is not json-formated' % f.name, 1)
187             except SyntaxError:
188                 msg = '"%s" is not a valid hashmap file' % f.name
189                 raise ClientError(msg, 1)
190             f = StringIO(data)
191         else:
192             data = readall(f, size) if size else f.read()
193         r = self.object_put(
194             obj,
195             data=data,
196             etag=etag,
197             content_encoding=content_encoding,
198             content_disposition=content_disposition,
199             content_type=content_type,
200             permissions=sharing,
201             public=public,
202             success=201)
203         return r.headers
204
205     def create_object_by_manifestation(
206             self, obj,
207             etag=None,
208             content_encoding=None,
209             content_disposition=None,
210             content_type=None,
211             sharing=None,
212             public=None):
213         """
214         :param obj: (str) remote object path
215
216         :param etag: (str)
217
218         :param content_encoding: (str)
219
220         :param content_disposition: (str)
221
222         :param content_type: (str)
223
224         :param sharing: {'read':[user and/or grp names],
225             'write':[usr and/or grp names]}
226
227         :param public: (bool)
228
229         :returns: (dict) created object metadata
230         """
231         self._assert_container()
232         r = self.object_put(
233             obj,
234             content_length=0,
235             etag=etag,
236             content_encoding=content_encoding,
237             content_disposition=content_disposition,
238             content_type=content_type,
239             permissions=sharing,
240             public=public,
241             manifest='%s/%s' % (self.container, obj))
242         return r.headers
243
244     # upload_* auxiliary methods
245     def _put_block_async(self, data, hash):
246         event = SilentEvent(method=self._put_block, data=data, hash=hash)
247         event.start()
248         return event
249
250     def _put_block(self, data, hash):
251         r = self.container_post(
252             update=True,
253             content_type='application/octet-stream',
254             content_length=len(data),
255             data=data,
256             format='json')
257         assert r.json[0] == hash, 'Local hash does not match server'
258
259     def _get_file_block_info(self, fileobj, size=None, cache=None):
260         """
261         :param fileobj: (file descriptor) source
262
263         :param size: (int) size of data to upload from source
264
265         :param cache: (dict) if provided, cache container info response to
266         avoid redundant calls
267         """
268         if isinstance(cache, dict):
269             try:
270                 meta = cache[self.container]
271             except KeyError:
272                 meta = self.get_container_info()
273                 cache[self.container] = meta
274         else:
275             meta = self.get_container_info()
276         blocksize = int(meta['x-container-block-size'])
277         blockhash = meta['x-container-block-hash']
278         size = size if size is not None else fstat(fileobj.fileno()).st_size
279         nblocks = 1 + (size - 1) // blocksize
280         return (blocksize, blockhash, size, nblocks)
281
282     def _create_object_or_get_missing_hashes(
283             self, obj, json,
284             size=None,
285             format='json',
286             hashmap=True,
287             content_type=None,
288             if_etag_match=None,
289             if_etag_not_match=None,
290             content_encoding=None,
291             content_disposition=None,
292             permissions=None,
293             public=None,
294             success=(201, 409)):
295         r = self.object_put(
296             obj,
297             format='json',
298             hashmap=True,
299             content_type=content_type,
300             json=json,
301             if_etag_match=if_etag_match,
302             if_etag_not_match=if_etag_not_match,
303             content_encoding=content_encoding,
304             content_disposition=content_disposition,
305             permissions=permissions,
306             public=public,
307             success=success)
308         return (None if r.status_code == 201 else r.json), r.headers
309
310     def _calculate_blocks_for_upload(
311             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
312             hash_cb=None):
313         offset = 0
314         if hash_cb:
315             hash_gen = hash_cb(nblocks)
316             hash_gen.next()
317
318         for i in range(nblocks):
319             block = readall(fileobj, min(blocksize, size - offset))
320             bytes = len(block)
321             hash = _pithos_hash(block, blockhash)
322             hashes.append(hash)
323             hmap[hash] = (offset, bytes)
324             offset += bytes
325             if hash_cb:
326                 hash_gen.next()
327         msg = ('Failed to calculate uploaded blocks:'
328                ' Offset and object size do not match')
329         assert offset == size, msg
330
331     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
332         """upload missing blocks asynchronously"""
333
334         self._init_thread_limit()
335
336         flying = []
337         failures = []
338         for hash in missing:
339             offset, bytes = hmap[hash]
340             fileobj.seek(offset)
341             data = readall(fileobj, bytes)
342             r = self._put_block_async(data, hash)
343             flying.append(r)
344             unfinished = self._watch_thread_limit(flying)
345             for thread in set(flying).difference(unfinished):
346                 if thread.exception:
347                     failures.append(thread)
348                     if isinstance(
349                             thread.exception,
350                             ClientError) and thread.exception.status == 502:
351                         self.POOLSIZE = self._thread_limit
352                 elif thread.isAlive():
353                     flying.append(thread)
354                 elif upload_gen:
355                     try:
356                         upload_gen.next()
357                     except:
358                         pass
359             flying = unfinished
360
361         for thread in flying:
362             thread.join()
363             if thread.exception:
364                 failures.append(thread)
365             elif upload_gen:
366                 try:
367                     upload_gen.next()
368                 except:
369                     pass
370
371         return [failure.kwargs['hash'] for failure in failures]
372
373     def upload_object(
374             self, obj, f,
375             size=None,
376             hash_cb=None,
377             upload_cb=None,
378             etag=None,
379             if_etag_match=None,
380             if_not_exist=None,
381             content_encoding=None,
382             content_disposition=None,
383             content_type=None,
384             sharing=None,
385             public=None,
386             container_info_cache=None):
387         """Upload an object using multiple connections (threads)
388
389         :param obj: (str) remote object path
390
391         :param f: open file descriptor (rb)
392
393         :param hash_cb: optional progress.bar object for calculating hashes
394
395         :param upload_cb: optional progress.bar object for uploading
396
397         :param etag: (str)
398
399         :param if_etag_match: (str) Push that value to if-match header at file
400             creation
401
402         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
403             it does not exist remotely, otherwise the operation will fail.
404             Involves the case of an object with the same path is created while
405             the object is being uploaded.
406
407         :param content_encoding: (str)
408
409         :param content_disposition: (str)
410
411         :param content_type: (str)
412
413         :param sharing: {'read':[user and/or grp names],
414             'write':[usr and/or grp names]}
415
416         :param public: (bool)
417
418         :param container_info_cache: (dict) if given, avoid redundant calls to
419             server for container info (block size and hash information)
420         """
421         self._assert_container()
422
423         block_info = (
424             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
425                 f, size, container_info_cache)
426         (hashes, hmap, offset) = ([], {}, 0)
427         if not content_type:
428             content_type = 'application/octet-stream'
429
430         self._calculate_blocks_for_upload(
431             *block_info,
432             hashes=hashes,
433             hmap=hmap,
434             fileobj=f,
435             hash_cb=hash_cb)
436
437         hashmap = dict(bytes=size, hashes=hashes)
438         missing, obj_headers = self._create_object_or_get_missing_hashes(
439             obj, hashmap,
440             content_type=content_type,
441             size=size,
442             if_etag_match=if_etag_match,
443             if_etag_not_match='*' if if_not_exist else None,
444             content_encoding=content_encoding,
445             content_disposition=content_disposition,
446             permissions=sharing,
447             public=public)
448
449         if missing is None:
450             return obj_headers
451
452         if upload_cb:
453             upload_gen = upload_cb(len(missing))
454             for i in range(len(missing), len(hashmap['hashes']) + 1):
455                 try:
456                     upload_gen.next()
457                 except:
458                     upload_gen = None
459         else:
460             upload_gen = None
461
462         retries = 7
463         try:
464             while retries:
465                 sendlog.info('%s blocks missing' % len(missing))
466                 num_of_blocks = len(missing)
467                 missing = self._upload_missing_blocks(
468                     missing,
469                     hmap,
470                     f,
471                     upload_gen)
472                 if missing:
473                     if num_of_blocks == len(missing):
474                         retries -= 1
475                     else:
476                         num_of_blocks = len(missing)
477                 else:
478                     break
479             if missing:
480                 try:
481                     details = ['%s' % thread.exception for thread in missing]
482                 except Exception:
483                     details = ['Also, failed to read thread exceptions']
484                 raise ClientError(
485                     '%s blocks failed to upload' % len(missing),
486                     details=details)
487         except KeyboardInterrupt:
488             sendlog.info('- - - wait for threads to finish')
489             for thread in activethreads():
490                 thread.join()
491             raise
492
493         r = self.object_put(
494             obj,
495             format='json',
496             hashmap=True,
497             content_type=content_type,
498             content_encoding=content_encoding,
499             if_etag_match=if_etag_match,
500             if_etag_not_match='*' if if_not_exist else None,
501             etag=etag,
502             json=hashmap,
503             permissions=sharing,
504             public=public,
505             success=201)
506         return r.headers
507
508     def upload_from_string(
509             self, obj, input_str,
510             hash_cb=None,
511             upload_cb=None,
512             etag=None,
513             if_etag_match=None,
514             if_not_exist=None,
515             content_encoding=None,
516             content_disposition=None,
517             content_type=None,
518             sharing=None,
519             public=None,
520             container_info_cache=None):
521         """Upload an object using multiple connections (threads)
522
523         :param obj: (str) remote object path
524
525         :param input_str: (str) upload content
526
527         :param hash_cb: optional progress.bar object for calculating hashes
528
529         :param upload_cb: optional progress.bar object for uploading
530
531         :param etag: (str)
532
533         :param if_etag_match: (str) Push that value to if-match header at file
534             creation
535
536         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
537             it does not exist remotely, otherwise the operation will fail.
538             Involves the case of an object with the same path is created while
539             the object is being uploaded.
540
541         :param content_encoding: (str)
542
543         :param content_disposition: (str)
544
545         :param content_type: (str)
546
547         :param sharing: {'read':[user and/or grp names],
548             'write':[usr and/or grp names]}
549
550         :param public: (bool)
551
552         :param container_info_cache: (dict) if given, avoid redundant calls to
553             server for container info (block size and hash information)
554         """
555         self._assert_container()
556
557         blocksize, blockhash, size, nblocks = self._get_file_block_info(
558                 fileobj=None, size=len(input_str), cache=container_info_cache)
559         (hashes, hmap, offset) = ([], {}, 0)
560         if not content_type:
561             content_type = 'application/octet-stream'
562
563         hashes = []
564         hmap = {}
565         for blockid in range(nblocks):
566             start = blockid * blocksize
567             block = input_str[start: (start + blocksize)]
568             hashes.append(_pithos_hash(block, blockhash))
569             hmap[hashes[blockid]] = (start, block)
570
571         hashmap = dict(bytes=size, hashes=hashes)
572         missing, obj_headers = self._create_object_or_get_missing_hashes(
573             obj, hashmap,
574             content_type=content_type,
575             size=size,
576             if_etag_match=if_etag_match,
577             if_etag_not_match='*' if if_not_exist else None,
578             content_encoding=content_encoding,
579             content_disposition=content_disposition,
580             permissions=sharing,
581             public=public)
582         if missing is None:
583             return obj_headers
584         num_of_missing = len(missing)
585
586         if upload_cb:
587             self.progress_bar_gen = upload_cb(nblocks)
588             for i in range(nblocks + 1 - num_of_missing):
589                 self._cb_next()
590
591         tries = 7
592         old_failures = 0
593         try:
594             while tries and missing:
595                 flying = []
596                 failures = []
597                 for hash in missing:
598                     offset, block = hmap[hash]
599                     bird = self._put_block_async(block, hash)
600                     flying.append(bird)
601                     unfinished = self._watch_thread_limit(flying)
602                     for thread in set(flying).difference(unfinished):
603                         if thread.exception:
604                             failures.append(thread.kwargs['hash'])
605                         if thread.isAlive():
606                             flying.append(thread)
607                         else:
608                             self._cb_next()
609                     flying = unfinished
610                 for thread in flying:
611                     thread.join()
612                     if thread.exception:
613                         failures.append(thread.kwargs['hash'])
614                     self._cb_next()
615                 missing = failures
616                 if missing and len(missing) == old_failures:
617                     tries -= 1
618                 old_failures = len(missing)
619             if missing:
620                 raise ClientError(
621                     '%s blocks failed to upload' % len(missing),
622                     details=['%s' % thread.exception for thread in missing])
623         except KeyboardInterrupt:
624             sendlog.info('- - - wait for threads to finish')
625             for thread in activethreads():
626                 thread.join()
627             raise
628
629         r = self.object_put(
630             obj,
631             format='json',
632             hashmap=True,
633             content_type=content_type,
634             content_encoding=content_encoding,
635             if_etag_match=if_etag_match,
636             if_etag_not_match='*' if if_not_exist else None,
637             etag=etag,
638             json=hashmap,
639             permissions=sharing,
640             public=public,
641             success=201)
642         return r.headers
643
644     # download_* auxiliary methods
645     def _get_remote_blocks_info(self, obj, **restargs):
646         #retrieve object hashmap
647         myrange = restargs.pop('data_range', None)
648         hashmap = self.get_object_hashmap(obj, **restargs)
649         restargs['data_range'] = myrange
650         blocksize = int(hashmap['block_size'])
651         blockhash = hashmap['block_hash']
652         total_size = hashmap['bytes']
653         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
654         map_dict = {}
655         for i, h in enumerate(hashmap['hashes']):
656             #  map_dict[h] = i   CHAGE
657             if h in map_dict:
658                 map_dict[h].append(i)
659             else:
660                 map_dict[h] = [i]
661         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
662
663     def _dump_blocks_sync(
664             self, obj, remote_hashes, blocksize, total_size, dst, crange,
665             **args):
666         if not total_size:
667             return
668         for blockid, blockhash in enumerate(remote_hashes):
669             if blockhash:
670                 start = blocksize * blockid
671                 is_last = start + blocksize > total_size
672                 end = (total_size - 1) if is_last else (start + blocksize - 1)
673                 data_range = _range_up(start, end, total_size, crange)
674                 if not data_range:
675                     self._cb_next()
676                     continue
677                 args['data_range'] = 'bytes=%s' % data_range
678                 r = self.object_get(obj, success=(200, 206), **args)
679                 self._cb_next()
680                 dst.write(r.content)
681                 dst.flush()
682
683     def _get_block_async(self, obj, **args):
684         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
685         event.start()
686         return event
687
688     def _hash_from_file(self, fp, start, size, blockhash):
689         fp.seek(start)
690         block = readall(fp, size)
691         h = newhashlib(blockhash)
692         h.update(block.strip('\x00'))
693         return hexlify(h.digest())
694
695     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
696         """write the results of a greenleted rest call to a file
697
698         :param offset: the offset of the file up to blocksize
699         - e.g. if the range is 10-100, all blocks will be written to
700         normal_position - 10
701         """
702         for key, g in flying.items():
703             if g.isAlive():
704                 continue
705             if g.exception:
706                 raise g.exception
707             block = g.value.content
708             for block_start in blockids[key]:
709                 local_file.seek(block_start + offset)
710                 local_file.write(block)
711                 self._cb_next()
712             flying.pop(key)
713             blockids.pop(key)
714         local_file.flush()
715
716     def _dump_blocks_async(
717             self, obj, remote_hashes, blocksize, total_size, local_file,
718             blockhash=None, resume=False, filerange=None, **restargs):
719         file_size = fstat(local_file.fileno()).st_size if resume else 0
720         flying = dict()
721         blockid_dict = dict()
722         offset = 0
723
724         self._init_thread_limit()
725         for block_hash, blockids in remote_hashes.items():
726             blockids = [blk * blocksize for blk in blockids]
727             unsaved = [blk for blk in blockids if not (
728                 blk < file_size and block_hash == self._hash_from_file(
729                         local_file, blk, blocksize, blockhash))]
730             self._cb_next(len(blockids) - len(unsaved))
731             if unsaved:
732                 key = unsaved[0]
733                 self._watch_thread_limit(flying.values())
734                 self._thread2file(
735                     flying, blockid_dict, local_file, offset,
736                     **restargs)
737                 end = total_size - 1 if (
738                     key + blocksize > total_size) else key + blocksize - 1
739                 if end < key:
740                     self._cb_next()
741                     continue
742                 data_range = _range_up(key, end, total_size, filerange)
743                 if not data_range:
744                     self._cb_next()
745                     continue
746                 restargs[
747                     'async_headers'] = {'Range': 'bytes=%s' % data_range}
748                 flying[key] = self._get_block_async(obj, **restargs)
749                 blockid_dict[key] = unsaved
750
751         for thread in flying.values():
752             thread.join()
753         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
754
755     def download_object(
756             self, obj, dst,
757             download_cb=None,
758             version=None,
759             resume=False,
760             range_str=None,
761             if_match=None,
762             if_none_match=None,
763             if_modified_since=None,
764             if_unmodified_since=None):
765         """Download an object (multiple connections, random blocks)
766
767         :param obj: (str) remote object path
768
769         :param dst: open file descriptor (wb+)
770
771         :param download_cb: optional progress.bar object for downloading
772
773         :param version: (str) file version
774
775         :param resume: (bool) if set, preserve already downloaded file parts
776
777         :param range_str: (str) from, to are file positions (int) in bytes
778
779         :param if_match: (str)
780
781         :param if_none_match: (str)
782
783         :param if_modified_since: (str) formated date
784
785         :param if_unmodified_since: (str) formated date"""
786         restargs = dict(
787             version=version,
788             data_range=None if range_str is None else 'bytes=%s' % range_str,
789             if_match=if_match,
790             if_none_match=if_none_match,
791             if_modified_since=if_modified_since,
792             if_unmodified_since=if_unmodified_since)
793
794         (
795             blocksize,
796             blockhash,
797             total_size,
798             hash_list,
799             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
800         assert total_size >= 0
801
802         if download_cb:
803             self.progress_bar_gen = download_cb(len(hash_list))
804             self._cb_next()
805
806         if dst.isatty():
807             self._dump_blocks_sync(
808                 obj,
809                 hash_list,
810                 blocksize,
811                 total_size,
812                 dst,
813                 range_str,
814                 **restargs)
815         else:
816             self._dump_blocks_async(
817                 obj,
818                 remote_hashes,
819                 blocksize,
820                 total_size,
821                 dst,
822                 blockhash,
823                 resume,
824                 range_str,
825                 **restargs)
826             if not range_str:
827                 dst.truncate(total_size)
828
829         self._complete_cb()
830
831     def download_to_string(
832             self, obj,
833             download_cb=None,
834             version=None,
835             range_str=None,
836             if_match=None,
837             if_none_match=None,
838             if_modified_since=None,
839             if_unmodified_since=None):
840         """Download an object to a string (multiple connections). This method
841         uses threads for http requests, but stores all content in memory.
842
843         :param obj: (str) remote object path
844
845         :param download_cb: optional progress.bar object for downloading
846
847         :param version: (str) file version
848
849         :param range_str: (str) from, to are file positions (int) in bytes
850
851         :param if_match: (str)
852
853         :param if_none_match: (str)
854
855         :param if_modified_since: (str) formated date
856
857         :param if_unmodified_since: (str) formated date
858
859         :returns: (str) the whole object contents
860         """
861         restargs = dict(
862             version=version,
863             data_range=None if range_str is None else 'bytes=%s' % range_str,
864             if_match=if_match,
865             if_none_match=if_none_match,
866             if_modified_since=if_modified_since,
867             if_unmodified_since=if_unmodified_since)
868
869         (
870             blocksize,
871             blockhash,
872             total_size,
873             hash_list,
874             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
875         assert total_size >= 0
876
877         if download_cb:
878             self.progress_bar_gen = download_cb(len(hash_list))
879             self._cb_next()
880
881         num_of_blocks = len(remote_hashes)
882         ret = [''] * num_of_blocks
883         self._init_thread_limit()
884         flying = dict()
885         try:
886             for blockid, blockhash in enumerate(remote_hashes):
887                 start = blocksize * blockid
888                 is_last = start + blocksize > total_size
889                 end = (total_size - 1) if is_last else (start + blocksize - 1)
890                 data_range_str = _range_up(start, end, end, range_str)
891                 if data_range_str:
892                     self._watch_thread_limit(flying.values())
893                     restargs['data_range'] = 'bytes=%s' % data_range_str
894                     flying[blockid] = self._get_block_async(obj, **restargs)
895                 for runid, thread in flying.items():
896                     if (blockid + 1) == num_of_blocks:
897                         thread.join()
898                     elif thread.isAlive():
899                         continue
900                     if thread.exception:
901                         raise thread.exception
902                     ret[runid] = thread.value.content
903                     self._cb_next()
904                     flying.pop(runid)
905             return ''.join(ret)
906         except KeyboardInterrupt:
907             sendlog.info('- - - wait for threads to finish')
908             for thread in activethreads():
909                 thread.join()
910
911     #Command Progress Bar method
912     def _cb_next(self, step=1):
913         if hasattr(self, 'progress_bar_gen'):
914             try:
915                 for i in xrange(step):
916                     self.progress_bar_gen.next()
917             except:
918                 pass
919
920     def _complete_cb(self):
921         while True:
922             try:
923                 self.progress_bar_gen.next()
924             except:
925                 break
926
927     def get_object_hashmap(
928             self, obj,
929             version=None,
930             if_match=None,
931             if_none_match=None,
932             if_modified_since=None,
933             if_unmodified_since=None):
934         """
935         :param obj: (str) remote object path
936
937         :param if_match: (str)
938
939         :param if_none_match: (str)
940
941         :param if_modified_since: (str) formated date
942
943         :param if_unmodified_since: (str) formated date
944
945         :returns: (list)
946         """
947         try:
948             r = self.object_get(
949                 obj,
950                 hashmap=True,
951                 version=version,
952                 if_etag_match=if_match,
953                 if_etag_not_match=if_none_match,
954                 if_modified_since=if_modified_since,
955                 if_unmodified_since=if_unmodified_since)
956         except ClientError as err:
957             if err.status == 304 or err.status == 412:
958                 return {}
959             raise
960         return r.json
961
962     def set_account_group(self, group, usernames):
963         """
964         :param group: (str)
965
966         :param usernames: (list)
967         """
968         r = self.account_post(update=True, groups={group: usernames})
969         return r
970
971     def del_account_group(self, group):
972         """
973         :param group: (str)
974         """
975         self.account_post(update=True, groups={group: []})
976
977     def get_account_info(self, until=None):
978         """
979         :param until: (str) formated date
980
981         :returns: (dict)
982         """
983         r = self.account_head(until=until)
984         if r.status_code == 401:
985             raise ClientError("No authorization", status=401)
986         return r.headers
987
988     def get_account_quota(self):
989         """
990         :returns: (dict)
991         """
992         return filter_in(
993             self.get_account_info(),
994             'X-Account-Policy-Quota',
995             exactMatch=True)
996
997     #def get_account_versioning(self):
998     #    """
999     #    :returns: (dict)
1000     #    """
1001     #    return filter_in(
1002     #        self.get_account_info(),
1003     #        'X-Account-Policy-Versioning',
1004     #        exactMatch=True)
1005
1006     def get_account_meta(self, until=None):
1007         """
1008         :param until: (str) formated date
1009
1010         :returns: (dict)
1011         """
1012         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1013
1014     def get_account_group(self):
1015         """
1016         :returns: (dict)
1017         """
1018         return filter_in(self.get_account_info(), 'X-Account-Group-')
1019
1020     def set_account_meta(self, metapairs):
1021         """
1022         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1023         """
1024         assert(type(metapairs) is dict)
1025         r = self.account_post(update=True, metadata=metapairs)
1026         return r.headers
1027
1028     def del_account_meta(self, metakey):
1029         """
1030         :param metakey: (str) metadatum key
1031         """
1032         r = self.account_post(update=True, metadata={metakey: ''})
1033         return r.headers
1034
1035     #def set_account_quota(self, quota):
1036     #    """
1037     #    :param quota: (int)
1038     #    """
1039     #    self.account_post(update=True, quota=quota)
1040
1041     #def set_account_versioning(self, versioning):
1042     #    """
1043     #    :param versioning: (str)
1044     #    """
1045     #    r = self.account_post(update=True, versioning=versioning)
1046     #    return r.headers
1047
1048     def list_containers(self):
1049         """
1050         :returns: (dict)
1051         """
1052         r = self.account_get()
1053         return r.json
1054
1055     def del_container(self, until=None, delimiter=None):
1056         """
1057         :param until: (str) formated date
1058
1059         :param delimiter: (str) with / empty container
1060
1061         :raises ClientError: 404 Container does not exist
1062
1063         :raises ClientError: 409 Container is not empty
1064         """
1065         self._assert_container()
1066         r = self.container_delete(
1067             until=until,
1068             delimiter=delimiter,
1069             success=(204, 404, 409))
1070         if r.status_code == 404:
1071             raise ClientError(
1072                 'Container "%s" does not exist' % self.container,
1073                 r.status_code)
1074         elif r.status_code == 409:
1075             raise ClientError(
1076                 'Container "%s" is not empty' % self.container,
1077                 r.status_code)
1078         return r.headers
1079
1080     def get_container_versioning(self, container=None):
1081         """
1082         :param container: (str)
1083
1084         :returns: (dict)
1085         """
1086         cnt_back_up = self.container
1087         try:
1088             self.container = container or cnt_back_up
1089             return filter_in(
1090                 self.get_container_info(),
1091                 'X-Container-Policy-Versioning')
1092         finally:
1093             self.container = cnt_back_up
1094
1095     def get_container_limit(self, container=None):
1096         """
1097         :param container: (str)
1098
1099         :returns: (dict)
1100         """
1101         cnt_back_up = self.container
1102         try:
1103             self.container = container or cnt_back_up
1104             return filter_in(
1105                 self.get_container_info(),
1106                 'X-Container-Policy-Quota')
1107         finally:
1108             self.container = cnt_back_up
1109
1110     def get_container_info(self, until=None):
1111         """
1112         :param until: (str) formated date
1113
1114         :returns: (dict)
1115
1116         :raises ClientError: 404 Container not found
1117         """
1118         try:
1119             r = self.container_head(until=until)
1120         except ClientError as err:
1121             err.details.append('for container %s' % self.container)
1122             raise err
1123         return r.headers
1124
1125     def get_container_meta(self, until=None):
1126         """
1127         :param until: (str) formated date
1128
1129         :returns: (dict)
1130         """
1131         return filter_in(
1132             self.get_container_info(until=until),
1133             'X-Container-Meta')
1134
1135     def get_container_object_meta(self, until=None):
1136         """
1137         :param until: (str) formated date
1138
1139         :returns: (dict)
1140         """
1141         return filter_in(
1142             self.get_container_info(until=until),
1143             'X-Container-Object-Meta')
1144
1145     def set_container_meta(self, metapairs):
1146         """
1147         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1148         """
1149         assert(type(metapairs) is dict)
1150         r = self.container_post(update=True, metadata=metapairs)
1151         return r.headers
1152
1153     def del_container_meta(self, metakey):
1154         """
1155         :param metakey: (str) metadatum key
1156
1157         :returns: (dict) response headers
1158         """
1159         r = self.container_post(update=True, metadata={metakey: ''})
1160         return r.headers
1161
1162     def set_container_limit(self, limit):
1163         """
1164         :param limit: (int)
1165         """
1166         r = self.container_post(update=True, quota=limit)
1167         return r.headers
1168
1169     def set_container_versioning(self, versioning):
1170         """
1171         :param versioning: (str)
1172         """
1173         r = self.container_post(update=True, versioning=versioning)
1174         return r.headers
1175
1176     def del_object(self, obj, until=None, delimiter=None):
1177         """
1178         :param obj: (str) remote object path
1179
1180         :param until: (str) formated date
1181
1182         :param delimiter: (str)
1183         """
1184         self._assert_container()
1185         r = self.object_delete(obj, until=until, delimiter=delimiter)
1186         return r.headers
1187
1188     def set_object_meta(self, obj, metapairs):
1189         """
1190         :param obj: (str) remote object path
1191
1192         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1193         """
1194         assert(type(metapairs) is dict)
1195         r = self.object_post(obj, update=True, metadata=metapairs)
1196         return r.headers
1197
1198     def del_object_meta(self, obj, metakey):
1199         """
1200         :param obj: (str) remote object path
1201
1202         :param metakey: (str) metadatum key
1203         """
1204         r = self.object_post(obj, update=True, metadata={metakey: ''})
1205         return r.headers
1206
1207     def publish_object(self, obj):
1208         """
1209         :param obj: (str) remote object path
1210
1211         :returns: (str) access url
1212         """
1213         self.object_post(obj, update=True, public=True)
1214         info = self.get_object_info(obj)
1215         return info['x-object-public']
1216         pref, sep, rest = self.base_url.partition('//')
1217         base = rest.split('/')[0]
1218         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1219
1220     def unpublish_object(self, obj):
1221         """
1222         :param obj: (str) remote object path
1223         """
1224         r = self.object_post(obj, update=True, public=False)
1225         return r.headers
1226
1227     def get_object_info(self, obj, version=None):
1228         """
1229         :param obj: (str) remote object path
1230
1231         :param version: (str)
1232
1233         :returns: (dict)
1234         """
1235         try:
1236             r = self.object_head(obj, version=version)
1237             return r.headers
1238         except ClientError as ce:
1239             if ce.status == 404:
1240                 raise ClientError('Object %s not found' % obj, status=404)
1241             raise
1242
1243     def get_object_meta(self, obj, version=None):
1244         """
1245         :param obj: (str) remote object path
1246
1247         :param version: (str)
1248
1249         :returns: (dict)
1250         """
1251         return filter_in(
1252             self.get_object_info(obj, version=version),
1253             'X-Object-Meta')
1254
1255     def get_object_sharing(self, obj):
1256         """
1257         :param obj: (str) remote object path
1258
1259         :returns: (dict)
1260         """
1261         r = filter_in(
1262             self.get_object_info(obj),
1263             'X-Object-Sharing',
1264             exactMatch=True)
1265         reply = {}
1266         if len(r) > 0:
1267             perms = r['x-object-sharing'].split(';')
1268             for perm in perms:
1269                 try:
1270                     perm.index('=')
1271                 except ValueError:
1272                     raise ClientError('Incorrect reply format')
1273                 (key, val) = perm.strip().split('=')
1274                 reply[key] = val
1275         return reply
1276
1277     def set_object_sharing(
1278             self, obj,
1279             read_permission=False, write_permission=False):
1280         """Give read/write permisions to an object.
1281
1282         :param obj: (str) remote object path
1283
1284         :param read_permission: (list - bool) users and user groups that get
1285             read permission for this object - False means all previous read
1286             permissions will be removed
1287
1288         :param write_permission: (list - bool) of users and user groups to get
1289            write permission for this object - False means all previous write
1290            permissions will be removed
1291
1292         :returns: (dict) response headers
1293         """
1294
1295         perms = dict(read=read_permission or '', write=write_permission or '')
1296         r = self.object_post(obj, update=True, permissions=perms)
1297         return r.headers
1298
1299     def del_object_sharing(self, obj):
1300         """
1301         :param obj: (str) remote object path
1302         """
1303         return self.set_object_sharing(obj)
1304
1305     def append_object(self, obj, source_file, upload_cb=None):
1306         """
1307         :param obj: (str) remote object path
1308
1309         :param source_file: open file descriptor
1310
1311         :param upload_db: progress.bar for uploading
1312         """
1313         self._assert_container()
1314         meta = self.get_container_info()
1315         blocksize = int(meta['x-container-block-size'])
1316         filesize = fstat(source_file.fileno()).st_size
1317         nblocks = 1 + (filesize - 1) // blocksize
1318         offset = 0
1319         headers = {}
1320         if upload_cb:
1321             self.progress_bar_gen = upload_cb(nblocks)
1322             self._cb_next()
1323         flying = {}
1324         self._init_thread_limit()
1325         try:
1326             for i in range(nblocks):
1327                 block = source_file.read(min(blocksize, filesize - offset))
1328                 offset += len(block)
1329
1330                 self._watch_thread_limit(flying.values())
1331                 unfinished = {}
1332                 flying[i] = SilentEvent(
1333                     method=self.object_post,
1334                     obj=obj,
1335                     update=True,
1336                     content_range='bytes */*',
1337                     content_type='application/octet-stream',
1338                     content_length=len(block),
1339                     data=block)
1340                 flying[i].start()
1341
1342                 for key, thread in flying.items():
1343                     if thread.isAlive():
1344                         if i < nblocks:
1345                             unfinished[key] = thread
1346                             continue
1347                         thread.join()
1348                     if thread.exception:
1349                         raise thread.exception
1350                     headers[key] = thread.value.headers
1351                     self._cb_next()
1352                 flying = unfinished
1353         except KeyboardInterrupt:
1354             sendlog.info('- - - wait for threads to finish')
1355             for thread in activethreads():
1356                 thread.join()
1357         finally:
1358             from time import sleep
1359             sleep(2 * len(activethreads()))
1360         return headers.values()
1361
1362     def truncate_object(self, obj, upto_bytes):
1363         """
1364         :param obj: (str) remote object path
1365
1366         :param upto_bytes: max number of bytes to leave on file
1367
1368         :returns: (dict) response headers
1369         """
1370         r = self.object_post(
1371             obj,
1372             update=True,
1373             content_range='bytes 0-%s/*' % upto_bytes,
1374             content_type='application/octet-stream',
1375             object_bytes=upto_bytes,
1376             source_object=path4url(self.container, obj))
1377         return r.headers
1378
1379     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1380         """Overwrite a part of an object from local source file
1381
1382         :param obj: (str) remote object path
1383
1384         :param start: (int) position in bytes to start overwriting from
1385
1386         :param end: (int) position in bytes to stop overwriting at
1387
1388         :param source_file: open file descriptor
1389
1390         :param upload_db: progress.bar for uploading
1391         """
1392
1393         r = self.get_object_info(obj)
1394         rf_size = int(r['content-length'])
1395         if rf_size < int(start):
1396             raise ClientError(
1397                 'Range start exceeds file size',
1398                 status=416)
1399         elif rf_size < int(end):
1400             raise ClientError(
1401                 'Range end exceeds file size',
1402                 status=416)
1403         self._assert_container()
1404         meta = self.get_container_info()
1405         blocksize = int(meta['x-container-block-size'])
1406         filesize = fstat(source_file.fileno()).st_size
1407         datasize = int(end) - int(start) + 1
1408         nblocks = 1 + (datasize - 1) // blocksize
1409         offset = 0
1410         if upload_cb:
1411             self.progress_bar_gen = upload_cb(nblocks)
1412             self._cb_next()
1413         headers = []
1414         for i in range(nblocks):
1415             read_size = min(blocksize, filesize - offset, datasize - offset)
1416             block = source_file.read(read_size)
1417             r = self.object_post(
1418                 obj,
1419                 update=True,
1420                 content_type='application/octet-stream',
1421                 content_length=len(block),
1422                 content_range='bytes %s-%s/*' % (
1423                     start + offset,
1424                     start + offset + len(block) - 1),
1425                 data=block)
1426             headers.append(dict(r.headers))
1427             offset += len(block)
1428
1429             self._cb_next
1430         return headers
1431
1432     def copy_object(
1433             self, src_container, src_object, dst_container,
1434             dst_object=None,
1435             source_version=None,
1436             source_account=None,
1437             public=False,
1438             content_type=None,
1439             delimiter=None):
1440         """
1441         :param src_container: (str) source container
1442
1443         :param src_object: (str) source object path
1444
1445         :param dst_container: (str) destination container
1446
1447         :param dst_object: (str) destination object path
1448
1449         :param source_version: (str) source object version
1450
1451         :param source_account: (str) account to copy from
1452
1453         :param public: (bool)
1454
1455         :param content_type: (str)
1456
1457         :param delimiter: (str)
1458
1459         :returns: (dict) response headers
1460         """
1461         self._assert_account()
1462         self.container = dst_container
1463         src_path = path4url(src_container, src_object)
1464         r = self.object_put(
1465             dst_object or src_object,
1466             success=201,
1467             copy_from=src_path,
1468             content_length=0,
1469             source_version=source_version,
1470             source_account=source_account,
1471             public=public,
1472             content_type=content_type,
1473             delimiter=delimiter)
1474         return r.headers
1475
1476     def move_object(
1477             self, src_container, src_object, dst_container,
1478             dst_object=False,
1479             source_account=None,
1480             source_version=None,
1481             public=False,
1482             content_type=None,
1483             delimiter=None):
1484         """
1485         :param src_container: (str) source container
1486
1487         :param src_object: (str) source object path
1488
1489         :param dst_container: (str) destination container
1490
1491         :param dst_object: (str) destination object path
1492
1493         :param source_account: (str) account to move from
1494
1495         :param source_version: (str) source object version
1496
1497         :param public: (bool)
1498
1499         :param content_type: (str)
1500
1501         :param delimiter: (str)
1502
1503         :returns: (dict) response headers
1504         """
1505         self._assert_account()
1506         self.container = dst_container
1507         dst_object = dst_object or src_object
1508         src_path = path4url(src_container, src_object)
1509         r = self.object_put(
1510             dst_object,
1511             success=201,
1512             move_from=src_path,
1513             content_length=0,
1514             source_account=source_account,
1515             source_version=source_version,
1516             public=public,
1517             content_type=content_type,
1518             delimiter=delimiter)
1519         return r.headers
1520
1521     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1522         """Get accounts that share with self.account
1523
1524         :param limit: (str)
1525
1526         :param marker: (str)
1527
1528         :returns: (dict)
1529         """
1530         self._assert_account()
1531
1532         self.set_param('format', 'json')
1533         self.set_param('limit', limit, iff=limit is not None)
1534         self.set_param('marker', marker, iff=marker is not None)
1535
1536         path = ''
1537         success = kwargs.pop('success', (200, 204))
1538         r = self.get(path, *args, success=success, **kwargs)
1539         return r.json
1540
1541     def get_object_versionlist(self, obj):
1542         """
1543         :param obj: (str) remote object path
1544
1545         :returns: (list)
1546         """
1547         self._assert_container()
1548         r = self.object_get(obj, format='json', version='list')
1549         return r.json['versions']