Update copyright dates for changes files
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2014 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, max_value, a_range):
56     """
57     :param start: (int) the window bottom
58
59     :param end: (int) the window top
60
61     :param max_value: (int) maximum accepted value
62
63     :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64         where X: x|x-y|-x where x < y and x, y natural numbers
65
66     :returns: (str) a range string cut-off for the start-end range
67         an empty response means this window is out of range
68     """
69     assert start >= 0, '_range_up called w. start(%s) < 0' % start
70     assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71         end, start)
72     assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73         max_value, end)
74     if not a_range:
75         return '%s-%s' % (start, end)
76     selected = []
77     for some_range in a_range.split(','):
78         v0, sep, v1 = some_range.partition('-')
79         if v0:
80             v0 = int(v0)
81             if sep:
82                 v1 = int(v1)
83                 if v1 < start or v0 > end or v1 < v0:
84                     continue
85                 v0 = v0 if v0 > start else start
86                 v1 = v1 if v1 < end else end
87                 selected.append('%s-%s' % (v0, v1))
88             elif v0 < start:
89                 continue
90             else:
91                 v1 = v0 if v0 <= end else end
92                 selected.append('%s-%s' % (start, v1))
93         else:
94             v1 = int(v1)
95             if max_value - v1 > end:
96                 continue
97             v0 = (max_value - v1) if max_value - v1 > start else start
98             selected.append('%s-%s' % (v0, end))
99     return ','.join(selected)
100
101
102 class PithosClient(PithosRestClient):
103     """Synnefo Pithos+ API client"""
104
105     def __init__(self, base_url, token, account=None, container=None):
106         super(PithosClient, self).__init__(base_url, token, account, container)
107
108     def create_container(
109             self,
110             container=None, sizelimit=None, versioning=None, metadata=None,
111             **kwargs):
112         """
113         :param container: (str) if not given, self.container is used instead
114
115         :param sizelimit: (int) container total size limit in bytes
116
117         :param versioning: (str) can be auto or whatever supported by server
118
119         :param metadata: (dict) Custom user-defined metadata of the form
120             { 'name1': 'value1', 'name2': 'value2', ... }
121
122         :returns: (dict) response headers
123         """
124         cnt_back_up = self.container
125         try:
126             self.container = container or cnt_back_up
127             r = self.container_put(
128                 quota=sizelimit, versioning=versioning, metadata=metadata,
129                 **kwargs)
130             return r.headers
131         finally:
132             self.container = cnt_back_up
133
134     def purge_container(self, container=None):
135         """Delete an empty container and destroy associated blocks"""
136         cnt_back_up = self.container
137         try:
138             self.container = container or cnt_back_up
139             r = self.container_delete(until=unicode(time()))
140         finally:
141             self.container = cnt_back_up
142         return r.headers
143
144     def upload_object_unchunked(
145             self, obj, f,
146             withHashFile=False,
147             size=None,
148             etag=None,
149             content_encoding=None,
150             content_disposition=None,
151             content_type=None,
152             sharing=None,
153             public=None):
154         """
155         :param obj: (str) remote object path
156
157         :param f: open file descriptor
158
159         :param withHashFile: (bool)
160
161         :param size: (int) size of data to upload
162
163         :param etag: (str)
164
165         :param content_encoding: (str)
166
167         :param content_disposition: (str)
168
169         :param content_type: (str)
170
171         :param sharing: {'read':[user and/or grp names],
172             'write':[usr and/or grp names]}
173
174         :param public: (bool)
175
176         :returns: (dict) created object metadata
177         """
178         self._assert_container()
179
180         if withHashFile:
181             data = f.read()
182             try:
183                 import json
184                 data = json.dumps(json.loads(data))
185             except ValueError:
186                 raise ClientError('"%s" is not json-formated' % f.name, 1)
187             except SyntaxError:
188                 msg = '"%s" is not a valid hashmap file' % f.name
189                 raise ClientError(msg, 1)
190             f = StringIO(data)
191         else:
192             data = readall(f, size) if size else f.read()
193         r = self.object_put(
194             obj,
195             data=data,
196             etag=etag,
197             content_encoding=content_encoding,
198             content_disposition=content_disposition,
199             content_type=content_type,
200             permissions=sharing,
201             public=public,
202             success=201)
203         return r.headers
204
205     def create_object_by_manifestation(
206             self, obj,
207             etag=None,
208             content_encoding=None,
209             content_disposition=None,
210             content_type=None,
211             sharing=None,
212             public=None):
213         """
214         :param obj: (str) remote object path
215
216         :param etag: (str)
217
218         :param content_encoding: (str)
219
220         :param content_disposition: (str)
221
222         :param content_type: (str)
223
224         :param sharing: {'read':[user and/or grp names],
225             'write':[usr and/or grp names]}
226
227         :param public: (bool)
228
229         :returns: (dict) created object metadata
230         """
231         self._assert_container()
232         r = self.object_put(
233             obj,
234             content_length=0,
235             etag=etag,
236             content_encoding=content_encoding,
237             content_disposition=content_disposition,
238             content_type=content_type,
239             permissions=sharing,
240             public=public,
241             manifest='%s/%s' % (self.container, obj))
242         return r.headers
243
244     # upload_* auxiliary methods
245     def _put_block_async(self, data, hash):
246         event = SilentEvent(method=self._put_block, data=data, hash=hash)
247         event.start()
248         return event
249
250     def _put_block(self, data, hash):
251         r = self.container_post(
252             update=True,
253             content_type='application/octet-stream',
254             content_length=len(data),
255             data=data,
256             format='json')
257         assert r.json[0] == hash, 'Local hash does not match server'
258
259     def _get_file_block_info(self, fileobj, size=None, cache=None):
260         """
261         :param fileobj: (file descriptor) source
262
263         :param size: (int) size of data to upload from source
264
265         :param cache: (dict) if provided, cache container info response to
266         avoid redundant calls
267         """
268         if isinstance(cache, dict):
269             try:
270                 meta = cache[self.container]
271             except KeyError:
272                 meta = self.get_container_info()
273                 cache[self.container] = meta
274         else:
275             meta = self.get_container_info()
276         blocksize = int(meta['x-container-block-size'])
277         blockhash = meta['x-container-block-hash']
278         size = size if size is not None else fstat(fileobj.fileno()).st_size
279         nblocks = 1 + (size - 1) // blocksize
280         return (blocksize, blockhash, size, nblocks)
281
282     def _create_object_or_get_missing_hashes(
283             self, obj, json,
284             size=None,
285             format='json',
286             hashmap=True,
287             content_type=None,
288             if_etag_match=None,
289             if_etag_not_match=None,
290             content_encoding=None,
291             content_disposition=None,
292             permissions=None,
293             public=None,
294             success=(201, 409)):
295         r = self.object_put(
296             obj,
297             format='json',
298             hashmap=True,
299             content_type=content_type,
300             json=json,
301             if_etag_match=if_etag_match,
302             if_etag_not_match=if_etag_not_match,
303             content_encoding=content_encoding,
304             content_disposition=content_disposition,
305             permissions=permissions,
306             public=public,
307             success=success)
308         return (None if r.status_code == 201 else r.json), r.headers
309
310     def _calculate_blocks_for_upload(
311             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
312             hash_cb=None):
313         offset = 0
314         if hash_cb:
315             hash_gen = hash_cb(nblocks)
316             hash_gen.next()
317
318         for i in xrange(nblocks):
319             block = readall(fileobj, min(blocksize, size - offset))
320             bytes = len(block)
321             if bytes <= 0:
322                 break
323             hash = _pithos_hash(block, blockhash)
324             hashes.append(hash)
325             hmap[hash] = (offset, bytes)
326             offset += bytes
327             if hash_cb:
328                 hash_gen.next()
329         msg = ('Failed to calculate uploading blocks: '
330                'read bytes(%s) != requested size (%s)' % (offset, size))
331         assert offset == size, msg
332
333     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
334         """upload missing blocks asynchronously"""
335
336         self._init_thread_limit()
337
338         flying = []
339         failures = []
340         for hash in missing:
341             offset, bytes = hmap[hash]
342             fileobj.seek(offset)
343             data = readall(fileobj, bytes)
344             r = self._put_block_async(data, hash)
345             flying.append(r)
346             unfinished = self._watch_thread_limit(flying)
347             for thread in set(flying).difference(unfinished):
348                 if thread.exception:
349                     failures.append(thread)
350                     if isinstance(
351                             thread.exception,
352                             ClientError) and thread.exception.status == 502:
353                         self.POOLSIZE = self._thread_limit
354                 elif thread.isAlive():
355                     flying.append(thread)
356                 elif upload_gen:
357                     try:
358                         upload_gen.next()
359                     except:
360                         pass
361             flying = unfinished
362
363         for thread in flying:
364             thread.join()
365             if thread.exception:
366                 failures.append(thread)
367             elif upload_gen:
368                 try:
369                     upload_gen.next()
370                 except:
371                     pass
372
373         return [failure.kwargs['hash'] for failure in failures]
374
375     def upload_object(
376             self, obj, f,
377             size=None,
378             hash_cb=None,
379             upload_cb=None,
380             etag=None,
381             if_etag_match=None,
382             if_not_exist=None,
383             content_encoding=None,
384             content_disposition=None,
385             content_type=None,
386             sharing=None,
387             public=None,
388             container_info_cache=None):
389         """Upload an object using multiple connections (threads)
390
391         :param obj: (str) remote object path
392
393         :param f: open file descriptor (rb)
394
395         :param hash_cb: optional progress.bar object for calculating hashes
396
397         :param upload_cb: optional progress.bar object for uploading
398
399         :param etag: (str)
400
401         :param if_etag_match: (str) Push that value to if-match header at file
402             creation
403
404         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
405             it does not exist remotely, otherwise the operation will fail.
406             Involves the case of an object with the same path is created while
407             the object is being uploaded.
408
409         :param content_encoding: (str)
410
411         :param content_disposition: (str)
412
413         :param content_type: (str)
414
415         :param sharing: {'read':[user and/or grp names],
416             'write':[usr and/or grp names]}
417
418         :param public: (bool)
419
420         :param container_info_cache: (dict) if given, avoid redundant calls to
421             server for container info (block size and hash information)
422         """
423         self._assert_container()
424
425         block_info = (
426             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
427                 f, size, container_info_cache)
428         (hashes, hmap, offset) = ([], {}, 0)
429         content_type = content_type or 'application/octet-stream'
430
431         self._calculate_blocks_for_upload(
432             *block_info,
433             hashes=hashes,
434             hmap=hmap,
435             fileobj=f,
436             hash_cb=hash_cb)
437
438         hashmap = dict(bytes=size, hashes=hashes)
439         missing, obj_headers = self._create_object_or_get_missing_hashes(
440             obj, hashmap,
441             content_type=content_type,
442             size=size,
443             if_etag_match=if_etag_match,
444             if_etag_not_match='*' if if_not_exist else None,
445             content_encoding=content_encoding,
446             content_disposition=content_disposition,
447             permissions=sharing,
448             public=public)
449
450         if missing is None:
451             return obj_headers
452
453         if upload_cb:
454             upload_gen = upload_cb(len(missing))
455             for i in range(len(missing), len(hashmap['hashes']) + 1):
456                 try:
457                     upload_gen.next()
458                 except:
459                     upload_gen = None
460         else:
461             upload_gen = None
462
463         retries = 7
464         try:
465             while retries:
466                 sendlog.info('%s blocks missing' % len(missing))
467                 num_of_blocks = len(missing)
468                 missing = self._upload_missing_blocks(
469                     missing, hmap, f, upload_gen)
470                 if missing:
471                     if num_of_blocks == len(missing):
472                         retries -= 1
473                     else:
474                         num_of_blocks = len(missing)
475                 else:
476                     break
477             if missing:
478                 try:
479                     details = ['%s' % thread.exception for thread in missing]
480                 except Exception:
481                     details = ['Also, failed to read thread exceptions']
482                 raise ClientError(
483                     '%s blocks failed to upload' % len(missing),
484                     details=details)
485         except KeyboardInterrupt:
486             sendlog.info('- - - wait for threads to finish')
487             for thread in activethreads():
488                 thread.join()
489             raise
490
491         r = self.object_put(
492             obj,
493             format='json',
494             hashmap=True,
495             content_type=content_type,
496             content_encoding=content_encoding,
497             if_etag_match=if_etag_match,
498             if_etag_not_match='*' if if_not_exist else None,
499             etag=etag,
500             json=hashmap,
501             permissions=sharing,
502             public=public,
503             success=201)
504         return r.headers
505
506     def upload_from_string(
507             self, obj, input_str,
508             hash_cb=None,
509             upload_cb=None,
510             etag=None,
511             if_etag_match=None,
512             if_not_exist=None,
513             content_encoding=None,
514             content_disposition=None,
515             content_type=None,
516             sharing=None,
517             public=None,
518             container_info_cache=None):
519         """Upload an object using multiple connections (threads)
520
521         :param obj: (str) remote object path
522
523         :param input_str: (str) upload content
524
525         :param hash_cb: optional progress.bar object for calculating hashes
526
527         :param upload_cb: optional progress.bar object for uploading
528
529         :param etag: (str)
530
531         :param if_etag_match: (str) Push that value to if-match header at file
532             creation
533
534         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
535             it does not exist remotely, otherwise the operation will fail.
536             Involves the case of an object with the same path is created while
537             the object is being uploaded.
538
539         :param content_encoding: (str)
540
541         :param content_disposition: (str)
542
543         :param content_type: (str)
544
545         :param sharing: {'read':[user and/or grp names],
546             'write':[usr and/or grp names]}
547
548         :param public: (bool)
549
550         :param container_info_cache: (dict) if given, avoid redundant calls to
551             server for container info (block size and hash information)
552         """
553         self._assert_container()
554
555         blocksize, blockhash, size, nblocks = self._get_file_block_info(
556                 fileobj=None, size=len(input_str), cache=container_info_cache)
557         (hashes, hmap, offset) = ([], {}, 0)
558         if not content_type:
559             content_type = 'application/octet-stream'
560
561         hashes = []
562         hmap = {}
563         for blockid in range(nblocks):
564             start = blockid * blocksize
565             block = input_str[start: (start + blocksize)]
566             hashes.append(_pithos_hash(block, blockhash))
567             hmap[hashes[blockid]] = (start, block)
568
569         hashmap = dict(bytes=size, hashes=hashes)
570         missing, obj_headers = self._create_object_or_get_missing_hashes(
571             obj, hashmap,
572             content_type=content_type,
573             size=size,
574             if_etag_match=if_etag_match,
575             if_etag_not_match='*' if if_not_exist else None,
576             content_encoding=content_encoding,
577             content_disposition=content_disposition,
578             permissions=sharing,
579             public=public)
580         if missing is None:
581             return obj_headers
582         num_of_missing = len(missing)
583
584         if upload_cb:
585             self.progress_bar_gen = upload_cb(nblocks)
586             for i in range(nblocks + 1 - num_of_missing):
587                 self._cb_next()
588
589         tries = 7
590         old_failures = 0
591         try:
592             while tries and missing:
593                 flying = []
594                 failures = []
595                 for hash in missing:
596                     offset, block = hmap[hash]
597                     bird = self._put_block_async(block, hash)
598                     flying.append(bird)
599                     unfinished = self._watch_thread_limit(flying)
600                     for thread in set(flying).difference(unfinished):
601                         if thread.exception:
602                             failures.append(thread.kwargs['hash'])
603                         if thread.isAlive():
604                             flying.append(thread)
605                         else:
606                             self._cb_next()
607                     flying = unfinished
608                 for thread in flying:
609                     thread.join()
610                     if thread.exception:
611                         failures.append(thread.kwargs['hash'])
612                     self._cb_next()
613                 missing = failures
614                 if missing and len(missing) == old_failures:
615                     tries -= 1
616                 old_failures = len(missing)
617             if missing:
618                 raise ClientError('%s blocks failed to upload' % len(missing))
619         except KeyboardInterrupt:
620             sendlog.info('- - - wait for threads to finish')
621             for thread in activethreads():
622                 thread.join()
623             raise
624         self._cb_next()
625
626         r = self.object_put(
627             obj,
628             format='json',
629             hashmap=True,
630             content_type=content_type,
631             content_encoding=content_encoding,
632             if_etag_match=if_etag_match,
633             if_etag_not_match='*' if if_not_exist else None,
634             etag=etag,
635             json=hashmap,
636             permissions=sharing,
637             public=public,
638             success=201)
639         return r.headers
640
641     # download_* auxiliary methods
642     def _get_remote_blocks_info(self, obj, **restargs):
643         #retrieve object hashmap
644         myrange = restargs.pop('data_range', None)
645         hashmap = self.get_object_hashmap(obj, **restargs)
646         restargs['data_range'] = myrange
647         blocksize = int(hashmap['block_size'])
648         blockhash = hashmap['block_hash']
649         total_size = hashmap['bytes']
650         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
651         map_dict = {}
652         for i, h in enumerate(hashmap['hashes']):
653             #  map_dict[h] = i   CHAGE
654             if h in map_dict:
655                 map_dict[h].append(i)
656             else:
657                 map_dict[h] = [i]
658         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
659
660     def _dump_blocks_sync(
661             self, obj, remote_hashes, blocksize, total_size, dst, crange,
662             **args):
663         if not total_size:
664             return
665         for blockid, blockhash in enumerate(remote_hashes):
666             if blockhash:
667                 start = blocksize * blockid
668                 is_last = start + blocksize > total_size
669                 end = (total_size - 1) if is_last else (start + blocksize - 1)
670                 data_range = _range_up(start, end, total_size, crange)
671                 if not data_range:
672                     self._cb_next()
673                     continue
674                 args['data_range'] = 'bytes=%s' % data_range
675                 r = self.object_get(obj, success=(200, 206), **args)
676                 self._cb_next()
677                 dst.write(r.content)
678                 dst.flush()
679
680     def _get_block_async(self, obj, **args):
681         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
682         event.start()
683         return event
684
685     def _hash_from_file(self, fp, start, size, blockhash):
686         fp.seek(start)
687         block = readall(fp, size)
688         h = newhashlib(blockhash)
689         h.update(block.strip('\x00'))
690         return hexlify(h.digest())
691
692     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
693         """write the results of a greenleted rest call to a file
694
695         :param offset: the offset of the file up to blocksize
696         - e.g. if the range is 10-100, all blocks will be written to
697         normal_position - 10
698         """
699         for key, g in flying.items():
700             if g.isAlive():
701                 continue
702             if g.exception:
703                 raise g.exception
704             block = g.value.content
705             for block_start in blockids[key]:
706                 local_file.seek(block_start + offset)
707                 local_file.write(block)
708                 self._cb_next()
709             flying.pop(key)
710             blockids.pop(key)
711         local_file.flush()
712
713     def _dump_blocks_async(
714             self, obj, remote_hashes, blocksize, total_size, local_file,
715             blockhash=None, resume=False, filerange=None, **restargs):
716         file_size = fstat(local_file.fileno()).st_size if resume else 0
717         flying = dict()
718         blockid_dict = dict()
719         offset = 0
720
721         self._init_thread_limit()
722         for block_hash, blockids in remote_hashes.items():
723             blockids = [blk * blocksize for blk in blockids]
724             unsaved = [blk for blk in blockids if not (
725                 blk < file_size and block_hash == self._hash_from_file(
726                         local_file, blk, blocksize, blockhash))]
727             self._cb_next(len(blockids) - len(unsaved))
728             if unsaved:
729                 key = unsaved[0]
730                 self._watch_thread_limit(flying.values())
731                 self._thread2file(
732                     flying, blockid_dict, local_file, offset,
733                     **restargs)
734                 end = total_size - 1 if (
735                     key + blocksize > total_size) else key + blocksize - 1
736                 if end < key:
737                     self._cb_next()
738                     continue
739                 data_range = _range_up(key, end, total_size, filerange)
740                 if not data_range:
741                     self._cb_next()
742                     continue
743                 restargs[
744                     'async_headers'] = {'Range': 'bytes=%s' % data_range}
745                 flying[key] = self._get_block_async(obj, **restargs)
746                 blockid_dict[key] = unsaved
747
748         for thread in flying.values():
749             thread.join()
750         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
751
752     def download_object(
753             self, obj, dst,
754             download_cb=None,
755             version=None,
756             resume=False,
757             range_str=None,
758             if_match=None,
759             if_none_match=None,
760             if_modified_since=None,
761             if_unmodified_since=None):
762         """Download an object (multiple connections, random blocks)
763
764         :param obj: (str) remote object path
765
766         :param dst: open file descriptor (wb+)
767
768         :param download_cb: optional progress.bar object for downloading
769
770         :param version: (str) file version
771
772         :param resume: (bool) if set, preserve already downloaded file parts
773
774         :param range_str: (str) from, to are file positions (int) in bytes
775
776         :param if_match: (str)
777
778         :param if_none_match: (str)
779
780         :param if_modified_since: (str) formated date
781
782         :param if_unmodified_since: (str) formated date"""
783         restargs = dict(
784             version=version,
785             data_range=None if range_str is None else 'bytes=%s' % range_str,
786             if_match=if_match,
787             if_none_match=if_none_match,
788             if_modified_since=if_modified_since,
789             if_unmodified_since=if_unmodified_since)
790
791         (
792             blocksize,
793             blockhash,
794             total_size,
795             hash_list,
796             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
797         assert total_size >= 0
798
799         if download_cb:
800             self.progress_bar_gen = download_cb(len(hash_list))
801             self._cb_next()
802
803         if dst.isatty():
804             self._dump_blocks_sync(
805                 obj,
806                 hash_list,
807                 blocksize,
808                 total_size,
809                 dst,
810                 range_str,
811                 **restargs)
812         else:
813             self._dump_blocks_async(
814                 obj,
815                 remote_hashes,
816                 blocksize,
817                 total_size,
818                 dst,
819                 blockhash,
820                 resume,
821                 range_str,
822                 **restargs)
823             if not range_str:
824                 dst.truncate(total_size)
825
826         self._complete_cb()
827
828     def download_to_string(
829             self, obj,
830             download_cb=None,
831             version=None,
832             range_str=None,
833             if_match=None,
834             if_none_match=None,
835             if_modified_since=None,
836             if_unmodified_since=None):
837         """Download an object to a string (multiple connections). This method
838         uses threads for http requests, but stores all content in memory.
839
840         :param obj: (str) remote object path
841
842         :param download_cb: optional progress.bar object for downloading
843
844         :param version: (str) file version
845
846         :param range_str: (str) from, to are file positions (int) in bytes
847
848         :param if_match: (str)
849
850         :param if_none_match: (str)
851
852         :param if_modified_since: (str) formated date
853
854         :param if_unmodified_since: (str) formated date
855
856         :returns: (str) the whole object contents
857         """
858         restargs = dict(
859             version=version,
860             data_range=None if range_str is None else 'bytes=%s' % range_str,
861             if_match=if_match,
862             if_none_match=if_none_match,
863             if_modified_since=if_modified_since,
864             if_unmodified_since=if_unmodified_since)
865
866         (
867             blocksize,
868             blockhash,
869             total_size,
870             hash_list,
871             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
872         assert total_size >= 0
873
874         if download_cb:
875             self.progress_bar_gen = download_cb(len(hash_list))
876             self._cb_next()
877
878         num_of_blocks = len(remote_hashes)
879         ret = [''] * num_of_blocks
880         self._init_thread_limit()
881         flying = dict()
882         try:
883             for blockid, blockhash in enumerate(remote_hashes):
884                 start = blocksize * blockid
885                 is_last = start + blocksize > total_size
886                 end = (total_size - 1) if is_last else (start + blocksize - 1)
887                 data_range_str = _range_up(start, end, end, range_str)
888                 if data_range_str:
889                     self._watch_thread_limit(flying.values())
890                     restargs['data_range'] = 'bytes=%s' % data_range_str
891                     flying[blockid] = self._get_block_async(obj, **restargs)
892                 for runid, thread in flying.items():
893                     if (blockid + 1) == num_of_blocks:
894                         thread.join()
895                     elif thread.isAlive():
896                         continue
897                     if thread.exception:
898                         raise thread.exception
899                     ret[runid] = thread.value.content
900                     self._cb_next()
901                     flying.pop(runid)
902             return ''.join(ret)
903         except KeyboardInterrupt:
904             sendlog.info('- - - wait for threads to finish')
905             for thread in activethreads():
906                 thread.join()
907
908     #Command Progress Bar method
909     def _cb_next(self, step=1):
910         if hasattr(self, 'progress_bar_gen'):
911             try:
912                 for i in xrange(step):
913                     self.progress_bar_gen.next()
914             except:
915                 pass
916
917     def _complete_cb(self):
918         while True:
919             try:
920                 self.progress_bar_gen.next()
921             except:
922                 break
923
924     def get_object_hashmap(
925             self, obj,
926             version=None,
927             if_match=None,
928             if_none_match=None,
929             if_modified_since=None,
930             if_unmodified_since=None):
931         """
932         :param obj: (str) remote object path
933
934         :param if_match: (str)
935
936         :param if_none_match: (str)
937
938         :param if_modified_since: (str) formated date
939
940         :param if_unmodified_since: (str) formated date
941
942         :returns: (list)
943         """
944         try:
945             r = self.object_get(
946                 obj,
947                 hashmap=True,
948                 version=version,
949                 if_etag_match=if_match,
950                 if_etag_not_match=if_none_match,
951                 if_modified_since=if_modified_since,
952                 if_unmodified_since=if_unmodified_since)
953         except ClientError as err:
954             if err.status == 304 or err.status == 412:
955                 return {}
956             raise
957         return r.json
958
959     def set_account_group(self, group, usernames):
960         """
961         :param group: (str)
962
963         :param usernames: (list)
964         """
965         r = self.account_post(update=True, groups={group: usernames})
966         return r
967
968     def del_account_group(self, group):
969         """
970         :param group: (str)
971         """
972         self.account_post(update=True, groups={group: []})
973
974     def get_account_info(self, until=None):
975         """
976         :param until: (str) formated date
977
978         :returns: (dict)
979         """
980         r = self.account_head(until=until)
981         if r.status_code == 401:
982             raise ClientError("No authorization", status=401)
983         return r.headers
984
985     def get_account_quota(self):
986         """
987         :returns: (dict)
988         """
989         return filter_in(
990             self.get_account_info(),
991             'X-Account-Policy-Quota',
992             exactMatch=True)
993
994     #def get_account_versioning(self):
995     #    """
996     #    :returns: (dict)
997     #    """
998     #    return filter_in(
999     #        self.get_account_info(),
1000     #        'X-Account-Policy-Versioning',
1001     #        exactMatch=True)
1002
1003     def get_account_meta(self, until=None):
1004         """
1005         :param until: (str) formated date
1006
1007         :returns: (dict)
1008         """
1009         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1010
1011     def get_account_group(self):
1012         """
1013         :returns: (dict)
1014         """
1015         return filter_in(self.get_account_info(), 'X-Account-Group-')
1016
1017     def set_account_meta(self, metapairs):
1018         """
1019         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1020         """
1021         assert(type(metapairs) is dict)
1022         r = self.account_post(update=True, metadata=metapairs)
1023         return r.headers
1024
1025     def del_account_meta(self, metakey):
1026         """
1027         :param metakey: (str) metadatum key
1028         """
1029         r = self.account_post(update=True, metadata={metakey: ''})
1030         return r.headers
1031
1032     #def set_account_quota(self, quota):
1033     #    """
1034     #    :param quota: (int)
1035     #    """
1036     #    self.account_post(update=True, quota=quota)
1037
1038     #def set_account_versioning(self, versioning):
1039     #    """
1040     #    :param versioning: (str)
1041     #    """
1042     #    r = self.account_post(update=True, versioning=versioning)
1043     #    return r.headers
1044
1045     def list_containers(self):
1046         """
1047         :returns: (dict)
1048         """
1049         r = self.account_get()
1050         return r.json
1051
1052     def del_container(self, until=None, delimiter=None):
1053         """
1054         :param until: (str) formated date
1055
1056         :param delimiter: (str) with / empty container
1057
1058         :raises ClientError: 404 Container does not exist
1059
1060         :raises ClientError: 409 Container is not empty
1061         """
1062         self._assert_container()
1063         r = self.container_delete(
1064             until=until,
1065             delimiter=delimiter,
1066             success=(204, 404, 409))
1067         if r.status_code == 404:
1068             raise ClientError(
1069                 'Container "%s" does not exist' % self.container,
1070                 r.status_code)
1071         elif r.status_code == 409:
1072             raise ClientError(
1073                 'Container "%s" is not empty' % self.container,
1074                 r.status_code)
1075         return r.headers
1076
1077     def get_container_versioning(self, container=None):
1078         """
1079         :param container: (str)
1080
1081         :returns: (dict)
1082         """
1083         cnt_back_up = self.container
1084         try:
1085             self.container = container or cnt_back_up
1086             return filter_in(
1087                 self.get_container_info(),
1088                 'X-Container-Policy-Versioning')
1089         finally:
1090             self.container = cnt_back_up
1091
1092     def get_container_limit(self, container=None):
1093         """
1094         :param container: (str)
1095
1096         :returns: (dict)
1097         """
1098         cnt_back_up = self.container
1099         try:
1100             self.container = container or cnt_back_up
1101             return filter_in(
1102                 self.get_container_info(),
1103                 'X-Container-Policy-Quota')
1104         finally:
1105             self.container = cnt_back_up
1106
1107     def get_container_info(self, container=None, until=None):
1108         """
1109         :param until: (str) formated date
1110
1111         :returns: (dict)
1112
1113         :raises ClientError: 404 Container not found
1114         """
1115         bck_cont = self.container
1116         try:
1117             self.container = container or bck_cont
1118             self._assert_container()
1119             r = self.container_head(until=until)
1120         except ClientError as err:
1121             err.details.append('for container %s' % self.container)
1122             raise err
1123         finally:
1124             self.container = bck_cont
1125         return r.headers
1126
1127     def get_container_meta(self, until=None):
1128         """
1129         :param until: (str) formated date
1130
1131         :returns: (dict)
1132         """
1133         return filter_in(
1134             self.get_container_info(until=until), 'X-Container-Meta')
1135
1136     def get_container_object_meta(self, until=None):
1137         """
1138         :param until: (str) formated date
1139
1140         :returns: (dict)
1141         """
1142         return filter_in(
1143             self.get_container_info(until=until), 'X-Container-Object-Meta')
1144
1145     def set_container_meta(self, metapairs):
1146         """
1147         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1148         """
1149         assert(type(metapairs) is dict)
1150         r = self.container_post(update=True, metadata=metapairs)
1151         return r.headers
1152
1153     def del_container_meta(self, metakey):
1154         """
1155         :param metakey: (str) metadatum key
1156
1157         :returns: (dict) response headers
1158         """
1159         r = self.container_post(update=True, metadata={metakey: ''})
1160         return r.headers
1161
1162     def set_container_limit(self, limit):
1163         """
1164         :param limit: (int)
1165         """
1166         r = self.container_post(update=True, quota=limit)
1167         return r.headers
1168
1169     def set_container_versioning(self, versioning):
1170         """
1171         :param versioning: (str)
1172         """
1173         r = self.container_post(update=True, versioning=versioning)
1174         return r.headers
1175
1176     def del_object(self, obj, until=None, delimiter=None):
1177         """
1178         :param obj: (str) remote object path
1179
1180         :param until: (str) formated date
1181
1182         :param delimiter: (str)
1183         """
1184         self._assert_container()
1185         r = self.object_delete(obj, until=until, delimiter=delimiter)
1186         return r.headers
1187
1188     def set_object_meta(self, obj, metapairs):
1189         """
1190         :param obj: (str) remote object path
1191
1192         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1193         """
1194         assert(type(metapairs) is dict)
1195         r = self.object_post(obj, update=True, metadata=metapairs)
1196         return r.headers
1197
1198     def del_object_meta(self, obj, metakey):
1199         """
1200         :param obj: (str) remote object path
1201
1202         :param metakey: (str) metadatum key
1203         """
1204         r = self.object_post(obj, update=True, metadata={metakey: ''})
1205         return r.headers
1206
1207     def publish_object(self, obj):
1208         """
1209         :param obj: (str) remote object path
1210
1211         :returns: (str) access url
1212         """
1213         self.object_post(obj, update=True, public=True)
1214         info = self.get_object_info(obj)
1215         return info['x-object-public']
1216         pref, sep, rest = self.base_url.partition('//')
1217         base = rest.split('/')[0]
1218         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1219
1220     def unpublish_object(self, obj):
1221         """
1222         :param obj: (str) remote object path
1223         """
1224         r = self.object_post(obj, update=True, public=False)
1225         return r.headers
1226
1227     def get_object_info(self, obj, version=None):
1228         """
1229         :param obj: (str) remote object path
1230
1231         :param version: (str)
1232
1233         :returns: (dict)
1234         """
1235         try:
1236             r = self.object_head(obj, version=version)
1237             return r.headers
1238         except ClientError as ce:
1239             if ce.status == 404:
1240                 raise ClientError('Object %s not found' % obj, status=404)
1241             raise
1242
1243     def get_object_meta(self, obj, version=None):
1244         """
1245         :param obj: (str) remote object path
1246
1247         :param version: (str)
1248
1249         :returns: (dict)
1250         """
1251         return filter_in(
1252             self.get_object_info(obj, version=version),
1253             'X-Object-Meta')
1254
1255     def get_object_sharing(self, obj):
1256         """
1257         :param obj: (str) remote object path
1258
1259         :returns: (dict)
1260         """
1261         r = filter_in(
1262             self.get_object_info(obj),
1263             'X-Object-Sharing',
1264             exactMatch=True)
1265         reply = {}
1266         if len(r) > 0:
1267             perms = r['x-object-sharing'].split(';')
1268             for perm in perms:
1269                 try:
1270                     perm.index('=')
1271                 except ValueError:
1272                     raise ClientError('Incorrect reply format')
1273                 (key, val) = perm.strip().split('=')
1274                 reply[key] = val
1275         return reply
1276
1277     def set_object_sharing(
1278             self, obj,
1279             read_permission=False, write_permission=False):
1280         """Give read/write permisions to an object.
1281
1282         :param obj: (str) remote object path
1283
1284         :param read_permission: (list - bool) users and user groups that get
1285             read permission for this object - False means all previous read
1286             permissions will be removed
1287
1288         :param write_permission: (list - bool) of users and user groups to get
1289            write permission for this object - False means all previous write
1290            permissions will be removed
1291
1292         :returns: (dict) response headers
1293         """
1294
1295         perms = dict(read=read_permission or '', write=write_permission or '')
1296         r = self.object_post(obj, update=True, permissions=perms)
1297         return r.headers
1298
1299     def del_object_sharing(self, obj):
1300         """
1301         :param obj: (str) remote object path
1302         """
1303         return self.set_object_sharing(obj)
1304
1305     def append_object(self, obj, source_file, upload_cb=None):
1306         """
1307         :param obj: (str) remote object path
1308
1309         :param source_file: open file descriptor
1310
1311         :param upload_db: progress.bar for uploading
1312         """
1313         self._assert_container()
1314         meta = self.get_container_info()
1315         blocksize = int(meta['x-container-block-size'])
1316         filesize = fstat(source_file.fileno()).st_size
1317         nblocks = 1 + (filesize - 1) // blocksize
1318         offset = 0
1319         headers = {}
1320         if upload_cb:
1321             self.progress_bar_gen = upload_cb(nblocks)
1322             self._cb_next()
1323         flying = {}
1324         self._init_thread_limit()
1325         try:
1326             for i in range(nblocks):
1327                 block = source_file.read(min(blocksize, filesize - offset))
1328                 offset += len(block)
1329
1330                 self._watch_thread_limit(flying.values())
1331                 unfinished = {}
1332                 flying[i] = SilentEvent(
1333                     method=self.object_post,
1334                     obj=obj,
1335                     update=True,
1336                     content_range='bytes */*',
1337                     content_type='application/octet-stream',
1338                     content_length=len(block),
1339                     data=block)
1340                 flying[i].start()
1341
1342                 for key, thread in flying.items():
1343                     if thread.isAlive():
1344                         if i < nblocks:
1345                             unfinished[key] = thread
1346                             continue
1347                         thread.join()
1348                     if thread.exception:
1349                         raise thread.exception
1350                     headers[key] = thread.value.headers
1351                     self._cb_next()
1352                 flying = unfinished
1353         except KeyboardInterrupt:
1354             sendlog.info('- - - wait for threads to finish')
1355             for thread in activethreads():
1356                 thread.join()
1357         finally:
1358             from time import sleep
1359             sleep(2 * len(activethreads()))
1360             self._cb_next()
1361         return headers.values()
1362
1363     def truncate_object(self, obj, upto_bytes):
1364         """
1365         :param obj: (str) remote object path
1366
1367         :param upto_bytes: max number of bytes to leave on file
1368
1369         :returns: (dict) response headers
1370         """
1371         ctype = self.get_object_info(obj)['content-type']
1372         r = self.object_post(
1373             obj,
1374             update=True,
1375             content_range='bytes 0-%s/*' % upto_bytes,
1376             content_type=ctype,
1377             object_bytes=upto_bytes,
1378             source_object=path4url(self.container, obj))
1379         return r.headers
1380
1381     def overwrite_object(
1382             self, obj, start, end, source_file,
1383             source_version=None, upload_cb=None):
1384         """Overwrite a part of an object from local source file
1385         ATTENTION: content_type must always be application/octet-stream
1386
1387         :param obj: (str) remote object path
1388
1389         :param start: (int) position in bytes to start overwriting from
1390
1391         :param end: (int) position in bytes to stop overwriting at
1392
1393         :param source_file: open file descriptor
1394
1395         :param upload_db: progress.bar for uploading
1396         """
1397
1398         self._assert_container()
1399         r = self.get_object_info(obj, version=source_version)
1400         rf_size = int(r['content-length'])
1401         start, end = int(start), int(end)
1402         assert rf_size >= start, 'Range start %s exceeds file size %s' % (
1403             start, rf_size)
1404         meta = self.get_container_info()
1405         blocksize = int(meta['x-container-block-size'])
1406         filesize = fstat(source_file.fileno()).st_size
1407         datasize = end - start + 1
1408         nblocks = 1 + (datasize - 1) // blocksize
1409         offset = 0
1410         if upload_cb:
1411             self.progress_bar_gen = upload_cb(nblocks)
1412             self._cb_next()
1413         headers = []
1414         for i in range(nblocks):
1415             read_size = min(blocksize, filesize - offset, datasize - offset)
1416             block = source_file.read(read_size)
1417             r = self.object_post(
1418                 obj,
1419                 update=True,
1420                 content_type='application/octet-stream',
1421                 content_length=len(block),
1422                 content_range='bytes %s-%s/*' % (
1423                     start + offset,
1424                     start + offset + len(block) - 1),
1425                 source_version=source_version,
1426                 data=block)
1427             headers.append(dict(r.headers))
1428             offset += len(block)
1429             self._cb_next()
1430         self._cb_next()
1431         return headers
1432
1433     def copy_object(
1434             self, src_container, src_object, dst_container,
1435             dst_object=None,
1436             source_version=None,
1437             source_account=None,
1438             public=False,
1439             content_type=None,
1440             delimiter=None):
1441         """
1442         :param src_container: (str) source container
1443
1444         :param src_object: (str) source object path
1445
1446         :param dst_container: (str) destination container
1447
1448         :param dst_object: (str) destination object path
1449
1450         :param source_version: (str) source object version
1451
1452         :param source_account: (str) account to copy from
1453
1454         :param public: (bool)
1455
1456         :param content_type: (str)
1457
1458         :param delimiter: (str)
1459
1460         :returns: (dict) response headers
1461         """
1462         self._assert_account()
1463         self.container = dst_container
1464         src_path = path4url(src_container, src_object)
1465         r = self.object_put(
1466             dst_object or src_object,
1467             success=201,
1468             copy_from=src_path,
1469             content_length=0,
1470             source_version=source_version,
1471             source_account=source_account,
1472             public=public,
1473             content_type=content_type,
1474             delimiter=delimiter)
1475         return r.headers
1476
1477     def move_object(
1478             self, src_container, src_object, dst_container,
1479             dst_object=False,
1480             source_account=None,
1481             source_version=None,
1482             public=False,
1483             content_type=None,
1484             delimiter=None):
1485         """
1486         :param src_container: (str) source container
1487
1488         :param src_object: (str) source object path
1489
1490         :param dst_container: (str) destination container
1491
1492         :param dst_object: (str) destination object path
1493
1494         :param source_account: (str) account to move from
1495
1496         :param source_version: (str) source object version
1497
1498         :param public: (bool)
1499
1500         :param content_type: (str)
1501
1502         :param delimiter: (str)
1503
1504         :returns: (dict) response headers
1505         """
1506         self._assert_account()
1507         self.container = dst_container
1508         dst_object = dst_object or src_object
1509         src_path = path4url(src_container, src_object)
1510         r = self.object_put(
1511             dst_object,
1512             success=201,
1513             move_from=src_path,
1514             content_length=0,
1515             source_account=source_account,
1516             source_version=source_version,
1517             public=public,
1518             content_type=content_type,
1519             delimiter=delimiter)
1520         return r.headers
1521
1522     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1523         """Get accounts that share with self.account
1524
1525         :param limit: (str)
1526
1527         :param marker: (str)
1528
1529         :returns: (dict)
1530         """
1531         self._assert_account()
1532
1533         self.set_param('format', 'json')
1534         self.set_param('limit', limit, iff=limit is not None)
1535         self.set_param('marker', marker, iff=marker is not None)
1536
1537         path = ''
1538         success = kwargs.pop('success', (200, 204))
1539         r = self.get(path, *args, success=success, **kwargs)
1540         return r.json
1541
1542     def get_object_versionlist(self, obj):
1543         """
1544         :param obj: (str) remote object path
1545
1546         :returns: (list)
1547         """
1548         self._assert_container()
1549         r = self.object_get(obj, format='json', version='list')
1550         return r.json['versions']