Fix deprecated terms in documentation
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def create_container(
75             self,
76             container=None, sizelimit=None, versioning=None, metadata=None):
77         """
78         :param container: (str) if not given, self.container is used instead
79
80         :param sizelimit: (int) container total size limit in bytes
81
82         :param versioning: (str) can be auto or whatever supported by server
83
84         :param metadata: (dict) Custom user-defined metadata of the form
85             { 'name1': 'value1', 'name2': 'value2', ... }
86
87         :returns: (dict) response headers
88         """
89         cnt_back_up = self.container
90         try:
91             self.container = container or cnt_back_up
92             r = self.container_put(
93                 quota=sizelimit, versioning=versioning, metadata=metadata)
94             return r.headers
95         finally:
96             self.container = cnt_back_up
97
98     def purge_container(self, container=None):
99         """Delete an empty container and destroy associated blocks
100         """
101         cnt_back_up = self.container
102         try:
103             self.container = container or cnt_back_up
104             r = self.container_delete(until=unicode(time()))
105         finally:
106             self.container = cnt_back_up
107         return r.headers
108
109     def upload_object_unchunked(
110             self, obj, f,
111             withHashFile=False,
112             size=None,
113             etag=None,
114             content_encoding=None,
115             content_disposition=None,
116             content_type=None,
117             sharing=None,
118             public=None):
119         """
120         :param obj: (str) remote object path
121
122         :param f: open file descriptor
123
124         :param withHashFile: (bool)
125
126         :param size: (int) size of data to upload
127
128         :param etag: (str)
129
130         :param content_encoding: (str)
131
132         :param content_disposition: (str)
133
134         :param content_type: (str)
135
136         :param sharing: {'read':[user and/or grp names],
137             'write':[usr and/or grp names]}
138
139         :param public: (bool)
140
141         :returns: (dict) created object metadata
142         """
143         self._assert_container()
144
145         if withHashFile:
146             data = f.read()
147             try:
148                 import json
149                 data = json.dumps(json.loads(data))
150             except ValueError:
151                 raise ClientError('"%s" is not json-formated' % f.name, 1)
152             except SyntaxError:
153                 msg = '"%s" is not a valid hashmap file' % f.name
154                 raise ClientError(msg, 1)
155             f = StringIO(data)
156         else:
157             data = f.read(size) if size else f.read()
158         r = self.object_put(
159             obj,
160             data=data,
161             etag=etag,
162             content_encoding=content_encoding,
163             content_disposition=content_disposition,
164             content_type=content_type,
165             permissions=sharing,
166             public=public,
167             success=201)
168         return r.headers
169
170     def create_object_by_manifestation(
171             self, obj,
172             etag=None,
173             content_encoding=None,
174             content_disposition=None,
175             content_type=None,
176             sharing=None,
177             public=None):
178         """
179         :param obj: (str) remote object path
180
181         :param etag: (str)
182
183         :param content_encoding: (str)
184
185         :param content_disposition: (str)
186
187         :param content_type: (str)
188
189         :param sharing: {'read':[user and/or grp names],
190             'write':[usr and/or grp names]}
191
192         :param public: (bool)
193
194         :returns: (dict) created object metadata
195         """
196         self._assert_container()
197         r = self.object_put(
198             obj,
199             content_length=0,
200             etag=etag,
201             content_encoding=content_encoding,
202             content_disposition=content_disposition,
203             content_type=content_type,
204             permissions=sharing,
205             public=public,
206             manifest='%s/%s' % (self.container, obj))
207         return r.headers
208
209     # upload_* auxiliary methods
210     def _put_block_async(self, data, hash):
211         event = SilentEvent(method=self._put_block, data=data, hash=hash)
212         event.start()
213         return event
214
215     def _put_block(self, data, hash):
216         r = self.container_post(
217             update=True,
218             content_type='application/octet-stream',
219             content_length=len(data),
220             data=data,
221             format='json')
222         assert r.json[0] == hash, 'Local hash does not match server'
223
224     def _get_file_block_info(self, fileobj, size=None, cache=None):
225         """
226         :param fileobj: (file descriptor) source
227
228         :param size: (int) size of data to upload from source
229
230         :param cache: (dict) if provided, cache container info response to
231         avoid redundant calls
232         """
233         if isinstance(cache, dict):
234             try:
235                 meta = cache[self.container]
236             except KeyError:
237                 meta = self.get_container_info()
238                 cache[self.container] = meta
239         else:
240             meta = self.get_container_info()
241         blocksize = int(meta['x-container-block-size'])
242         blockhash = meta['x-container-block-hash']
243         size = size if size is not None else fstat(fileobj.fileno()).st_size
244         nblocks = 1 + (size - 1) // blocksize
245         return (blocksize, blockhash, size, nblocks)
246
247     def _create_object_or_get_missing_hashes(
248             self, obj, json,
249             size=None,
250             format='json',
251             hashmap=True,
252             content_type=None,
253             if_etag_match=None,
254             if_etag_not_match=None,
255             content_encoding=None,
256             content_disposition=None,
257             permissions=None,
258             public=None,
259             success=(201, 409)):
260         r = self.object_put(
261             obj,
262             format='json',
263             hashmap=True,
264             content_type=content_type,
265             json=json,
266             if_etag_match=if_etag_match,
267             if_etag_not_match=if_etag_not_match,
268             content_encoding=content_encoding,
269             content_disposition=content_disposition,
270             permissions=permissions,
271             public=public,
272             success=success)
273         return (None if r.status_code == 201 else r.json), r.headers
274
275     def _calculate_blocks_for_upload(
276             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277             hash_cb=None):
278         offset = 0
279         if hash_cb:
280             hash_gen = hash_cb(nblocks)
281             hash_gen.next()
282
283         for i in range(nblocks):
284             block = fileobj.read(min(blocksize, size - offset))
285             bytes = len(block)
286             hash = _pithos_hash(block, blockhash)
287             hashes.append(hash)
288             hmap[hash] = (offset, bytes)
289             offset += bytes
290             if hash_cb:
291                 hash_gen.next()
292         msg = 'Failed to calculate uploaded blocks:'
293         ' Offset and object size do not match'
294         assert offset == size, msg
295
296     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297         """upload missing blocks asynchronously"""
298
299         self._init_thread_limit()
300
301         flying = []
302         failures = []
303         for hash in missing:
304             offset, bytes = hmap[hash]
305             fileobj.seek(offset)
306             data = fileobj.read(bytes)
307             r = self._put_block_async(data, hash)
308             flying.append(r)
309             unfinished = self._watch_thread_limit(flying)
310             for thread in set(flying).difference(unfinished):
311                 if thread.exception:
312                     failures.append(thread)
313                     if isinstance(
314                             thread.exception,
315                             ClientError) and thread.exception.status == 502:
316                         self.POOLSIZE = self._thread_limit
317                 elif thread.isAlive():
318                     flying.append(thread)
319                 elif upload_gen:
320                     try:
321                         upload_gen.next()
322                     except:
323                         pass
324             flying = unfinished
325
326         for thread in flying:
327             thread.join()
328             if thread.exception:
329                 failures.append(thread)
330             elif upload_gen:
331                 try:
332                     upload_gen.next()
333                 except:
334                     pass
335
336         return [failure.kwargs['hash'] for failure in failures]
337
338     def upload_object(
339             self, obj, f,
340             size=None,
341             hash_cb=None,
342             upload_cb=None,
343             etag=None,
344             if_etag_match=None,
345             if_not_exist=None,
346             content_encoding=None,
347             content_disposition=None,
348             content_type=None,
349             sharing=None,
350             public=None,
351             container_info_cache=None):
352         """Upload an object using multiple connections (threads)
353
354         :param obj: (str) remote object path
355
356         :param f: open file descriptor (rb)
357
358         :param hash_cb: optional progress.bar object for calculating hashes
359
360         :param upload_cb: optional progress.bar object for uploading
361
362         :param etag: (str)
363
364         :param if_etag_match: (str) Push that value to if-match header at file
365             creation
366
367         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368             it does not exist remotely, otherwise the operation will fail.
369             Involves the case of an object with the same path is created while
370             the object is being uploaded.
371
372         :param content_encoding: (str)
373
374         :param content_disposition: (str)
375
376         :param content_type: (str)
377
378         :param sharing: {'read':[user and/or grp names],
379             'write':[usr and/or grp names]}
380
381         :param public: (bool)
382
383         :param container_info_cache: (dict) if given, avoid redundant calls to
384             server for container info (block size and hash information)
385         """
386         self._assert_container()
387
388         block_info = (
389             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390                 f, size, container_info_cache)
391         (hashes, hmap, offset) = ([], {}, 0)
392         if not content_type:
393             content_type = 'application/octet-stream'
394
395         self._calculate_blocks_for_upload(
396             *block_info,
397             hashes=hashes,
398             hmap=hmap,
399             fileobj=f,
400             hash_cb=hash_cb)
401
402         hashmap = dict(bytes=size, hashes=hashes)
403         missing, obj_headers = self._create_object_or_get_missing_hashes(
404             obj, hashmap,
405             content_type=content_type,
406             size=size,
407             if_etag_match=if_etag_match,
408             if_etag_not_match='*' if if_not_exist else None,
409             content_encoding=content_encoding,
410             content_disposition=content_disposition,
411             permissions=sharing,
412             public=public)
413
414         if missing is None:
415             return obj_headers
416
417         if upload_cb:
418             upload_gen = upload_cb(len(missing))
419             for i in range(len(missing), len(hashmap['hashes']) + 1):
420                 try:
421                     upload_gen.next()
422                 except:
423                     upload_gen = None
424         else:
425             upload_gen = None
426
427         retries = 7
428         try:
429             while retries:
430                 sendlog.info('%s blocks missing' % len(missing))
431                 num_of_blocks = len(missing)
432                 missing = self._upload_missing_blocks(
433                     missing,
434                     hmap,
435                     f,
436                     upload_gen)
437                 if missing:
438                     if num_of_blocks == len(missing):
439                         retries -= 1
440                     else:
441                         num_of_blocks = len(missing)
442                 else:
443                     break
444             if missing:
445                 raise ClientError(
446                     '%s blocks failed to upload' % len(missing),
447                     details=['%s' % thread.exception for thread in missing])
448         except KeyboardInterrupt:
449             sendlog.info('- - - wait for threads to finish')
450             for thread in activethreads():
451                 thread.join()
452             raise
453
454         r = self.object_put(
455             obj,
456             format='json',
457             hashmap=True,
458             content_type=content_type,
459             if_etag_match=if_etag_match,
460             if_etag_not_match='*' if if_not_exist else None,
461             etag=etag,
462             json=hashmap,
463             permissions=sharing,
464             public=public,
465             success=201)
466         return r.headers
467
468     def upload_from_string(
469             self, obj, input_str,
470             hash_cb=None,
471             upload_cb=None,
472             etag=None,
473             if_etag_match=None,
474             if_not_exist=None,
475             content_encoding=None,
476             content_disposition=None,
477             content_type=None,
478             sharing=None,
479             public=None,
480             container_info_cache=None):
481         """Upload an object using multiple connections (threads)
482
483         :param obj: (str) remote object path
484
485         :param input_str: (str) upload content
486
487         :param hash_cb: optional progress.bar object for calculating hashes
488
489         :param upload_cb: optional progress.bar object for uploading
490
491         :param etag: (str)
492
493         :param if_etag_match: (str) Push that value to if-match header at file
494             creation
495
496         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497             it does not exist remotely, otherwise the operation will fail.
498             Involves the case of an object with the same path is created while
499             the object is being uploaded.
500
501         :param content_encoding: (str)
502
503         :param content_disposition: (str)
504
505         :param content_type: (str)
506
507         :param sharing: {'read':[user and/or grp names],
508             'write':[usr and/or grp names]}
509
510         :param public: (bool)
511
512         :param container_info_cache: (dict) if given, avoid redundant calls to
513             server for container info (block size and hash information)
514         """
515         self._assert_container()
516
517         blocksize, blockhash, size, nblocks = self._get_file_block_info(
518                 fileobj=None, size=len(input_str), cache=container_info_cache)
519         (hashes, hmap, offset) = ([], {}, 0)
520         if not content_type:
521             content_type = 'application/octet-stream'
522
523         hashes = []
524         hmap = {}
525         for blockid in range(nblocks):
526             start = blockid * blocksize
527             block = input_str[start: (start + blocksize)]
528             hashes.append(_pithos_hash(block, blockhash))
529             hmap[hashes[blockid]] = (start, block)
530
531         hashmap = dict(bytes=size, hashes=hashes)
532         missing, obj_headers = self._create_object_or_get_missing_hashes(
533             obj, hashmap,
534             content_type=content_type,
535             size=size,
536             if_etag_match=if_etag_match,
537             if_etag_not_match='*' if if_not_exist else None,
538             content_encoding=content_encoding,
539             content_disposition=content_disposition,
540             permissions=sharing,
541             public=public)
542         if missing is None:
543             return obj_headers
544         num_of_missing = len(missing)
545
546         if upload_cb:
547             self.progress_bar_gen = upload_cb(nblocks)
548             for i in range(nblocks + 1 - num_of_missing):
549                 self._cb_next()
550
551         tries = 7
552         old_failures = 0
553         try:
554             while tries and missing:
555                 flying = []
556                 failures = []
557                 for hash in missing:
558                     offset, block = hmap[hash]
559                     bird = self._put_block_async(block, hash)
560                     flying.append(bird)
561                     unfinished = self._watch_thread_limit(flying)
562                     for thread in set(flying).difference(unfinished):
563                         if thread.exception:
564                             failures.append(thread.kwargs['hash'])
565                         if thread.isAlive():
566                             flying.append(thread)
567                         else:
568                             self._cb_next()
569                     flying = unfinished
570                 for thread in flying:
571                     thread.join()
572                     if thread.exception:
573                         failures.append(thread.kwargs['hash'])
574                     self._cb_next()
575                 missing = failures
576                 if missing and len(missing) == old_failures:
577                     tries -= 1
578                 old_failures = len(missing)
579             if missing:
580                 raise ClientError(
581                     '%s blocks failed to upload' % len(missing),
582                     details=['%s' % thread.exception for thread in missing])
583         except KeyboardInterrupt:
584             sendlog.info('- - - wait for threads to finish')
585             for thread in activethreads():
586                 thread.join()
587             raise
588
589         r = self.object_put(
590             obj,
591             format='json',
592             hashmap=True,
593             content_type=content_type,
594             if_etag_match=if_etag_match,
595             if_etag_not_match='*' if if_not_exist else None,
596             etag=etag,
597             json=hashmap,
598             permissions=sharing,
599             public=public,
600             success=201)
601         return r.headers
602
603     # download_* auxiliary methods
604     def _get_remote_blocks_info(self, obj, **restargs):
605         #retrieve object hashmap
606         myrange = restargs.pop('data_range', None)
607         hashmap = self.get_object_hashmap(obj, **restargs)
608         restargs['data_range'] = myrange
609         blocksize = int(hashmap['block_size'])
610         blockhash = hashmap['block_hash']
611         total_size = hashmap['bytes']
612         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
613         map_dict = {}
614         for i, h in enumerate(hashmap['hashes']):
615             #  map_dict[h] = i   CHAGE
616             if h in map_dict:
617                 map_dict[h].append(i)
618             else:
619                 map_dict[h] = [i]
620         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
621
622     def _dump_blocks_sync(
623             self, obj, remote_hashes, blocksize, total_size, dst, range,
624             **args):
625         for blockid, blockhash in enumerate(remote_hashes):
626             if blockhash:
627                 start = blocksize * blockid
628                 is_last = start + blocksize > total_size
629                 end = (total_size - 1) if is_last else (start + blocksize - 1)
630                 (start, end) = _range_up(start, end, range)
631                 args['data_range'] = 'bytes=%s-%s' % (start, end)
632                 r = self.object_get(obj, success=(200, 206), **args)
633                 self._cb_next()
634                 dst.write(r.content)
635                 dst.flush()
636
637     def _get_block_async(self, obj, **args):
638         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
639         event.start()
640         return event
641
642     def _hash_from_file(self, fp, start, size, blockhash):
643         fp.seek(start)
644         block = fp.read(size)
645         h = newhashlib(blockhash)
646         h.update(block.strip('\x00'))
647         return hexlify(h.digest())
648
649     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
650         """write the results of a greenleted rest call to a file
651
652         :param offset: the offset of the file up to blocksize
653         - e.g. if the range is 10-100, all blocks will be written to
654         normal_position - 10
655         """
656         for key, g in flying.items():
657             if g.isAlive():
658                 continue
659             if g.exception:
660                 raise g.exception
661             block = g.value.content
662             for block_start in blockids[key]:
663                 local_file.seek(block_start + offset)
664                 local_file.write(block)
665                 self._cb_next()
666             flying.pop(key)
667             blockids.pop(key)
668         local_file.flush()
669
670     def _dump_blocks_async(
671             self, obj, remote_hashes, blocksize, total_size, local_file,
672             blockhash=None, resume=False, filerange=None, **restargs):
673         file_size = fstat(local_file.fileno()).st_size if resume else 0
674         flying = dict()
675         blockid_dict = dict()
676         offset = 0
677         if filerange is not None:
678             rstart = int(filerange.split('-')[0])
679             offset = rstart if blocksize > rstart else rstart % blocksize
680
681         self._init_thread_limit()
682         for block_hash, blockids in remote_hashes.items():
683             blockids = [blk * blocksize for blk in blockids]
684             unsaved = [blk for blk in blockids if not (
685                 blk < file_size and block_hash == self._hash_from_file(
686                         local_file, blk, blocksize, blockhash))]
687             self._cb_next(len(blockids) - len(unsaved))
688             if unsaved:
689                 key = unsaved[0]
690                 self._watch_thread_limit(flying.values())
691                 self._thread2file(
692                     flying, blockid_dict, local_file, offset,
693                     **restargs)
694                 end = total_size - 1 if (
695                     key + blocksize > total_size) else key + blocksize - 1
696                 start, end = _range_up(key, end, filerange)
697                 if start == end:
698                     self._cb_next()
699                     continue
700                 restargs['async_headers'] = {
701                     'Range': 'bytes=%s-%s' % (start, end)}
702                 flying[key] = self._get_block_async(obj, **restargs)
703                 blockid_dict[key] = unsaved
704
705         for thread in flying.values():
706             thread.join()
707         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
708
709     def download_object(
710             self, obj, dst,
711             download_cb=None,
712             version=None,
713             resume=False,
714             range_str=None,
715             if_match=None,
716             if_none_match=None,
717             if_modified_since=None,
718             if_unmodified_since=None):
719         """Download an object (multiple connections, random blocks)
720
721         :param obj: (str) remote object path
722
723         :param dst: open file descriptor (wb+)
724
725         :param download_cb: optional progress.bar object for downloading
726
727         :param version: (str) file version
728
729         :param resume: (bool) if set, preserve already downloaded file parts
730
731         :param range_str: (str) from, to are file positions (int) in bytes
732
733         :param if_match: (str)
734
735         :param if_none_match: (str)
736
737         :param if_modified_since: (str) formated date
738
739         :param if_unmodified_since: (str) formated date"""
740         restargs = dict(
741             version=version,
742             data_range=None if range_str is None else 'bytes=%s' % range_str,
743             if_match=if_match,
744             if_none_match=if_none_match,
745             if_modified_since=if_modified_since,
746             if_unmodified_since=if_unmodified_since)
747
748         (
749             blocksize,
750             blockhash,
751             total_size,
752             hash_list,
753             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
754         assert total_size >= 0
755
756         if download_cb:
757             self.progress_bar_gen = download_cb(len(hash_list))
758             self._cb_next()
759
760         if dst.isatty():
761             self._dump_blocks_sync(
762                 obj,
763                 hash_list,
764                 blocksize,
765                 total_size,
766                 dst,
767                 range_str,
768                 **restargs)
769         else:
770             self._dump_blocks_async(
771                 obj,
772                 remote_hashes,
773                 blocksize,
774                 total_size,
775                 dst,
776                 blockhash,
777                 resume,
778                 range_str,
779                 **restargs)
780             if not range_str:
781                 dst.truncate(total_size)
782
783         self._complete_cb()
784
785     def download_to_string(
786             self, obj,
787             download_cb=None,
788             version=None,
789             range_str=None,
790             if_match=None,
791             if_none_match=None,
792             if_modified_since=None,
793             if_unmodified_since=None):
794         """Download an object to a string (multiple connections). This method
795         uses threads for http requests, but stores all content in memory.
796
797         :param obj: (str) remote object path
798
799         :param download_cb: optional progress.bar object for downloading
800
801         :param version: (str) file version
802
803         :param range_str: (str) from, to are file positions (int) in bytes
804
805         :param if_match: (str)
806
807         :param if_none_match: (str)
808
809         :param if_modified_since: (str) formated date
810
811         :param if_unmodified_since: (str) formated date
812
813         :returns: (str) the whole object contents
814         """
815         restargs = dict(
816             version=version,
817             data_range=None if range_str is None else 'bytes=%s' % range_str,
818             if_match=if_match,
819             if_none_match=if_none_match,
820             if_modified_since=if_modified_since,
821             if_unmodified_since=if_unmodified_since)
822
823         (
824             blocksize,
825             blockhash,
826             total_size,
827             hash_list,
828             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
829         assert total_size >= 0
830
831         if download_cb:
832             self.progress_bar_gen = download_cb(len(hash_list))
833             self._cb_next()
834
835         num_of_blocks = len(remote_hashes)
836         ret = [''] * num_of_blocks
837         self._init_thread_limit()
838         flying = dict()
839         try:
840             for blockid, blockhash in enumerate(remote_hashes):
841                 start = blocksize * blockid
842                 is_last = start + blocksize > total_size
843                 end = (total_size - 1) if is_last else (start + blocksize - 1)
844                 (start, end) = _range_up(start, end, range_str)
845                 if start < end:
846                     self._watch_thread_limit(flying.values())
847                     flying[blockid] = self._get_block_async(obj, **restargs)
848                 for runid, thread in flying.items():
849                     if (blockid + 1) == num_of_blocks:
850                         thread.join()
851                     elif thread.isAlive():
852                         continue
853                     if thread.exception:
854                         raise thread.exception
855                     ret[runid] = thread.value.content
856                     self._cb_next()
857                     flying.pop(runid)
858             return ''.join(ret)
859         except KeyboardInterrupt:
860             sendlog.info('- - - wait for threads to finish')
861             for thread in activethreads():
862                 thread.join()
863
864     #Command Progress Bar method
865     def _cb_next(self, step=1):
866         if hasattr(self, 'progress_bar_gen'):
867             try:
868                 for i in xrange(step):
869                     self.progress_bar_gen.next()
870             except:
871                 pass
872
873     def _complete_cb(self):
874         while True:
875             try:
876                 self.progress_bar_gen.next()
877             except:
878                 break
879
880     def get_object_hashmap(
881             self, obj,
882             version=None,
883             if_match=None,
884             if_none_match=None,
885             if_modified_since=None,
886             if_unmodified_since=None,
887             data_range=None):
888         """
889         :param obj: (str) remote object path
890
891         :param if_match: (str)
892
893         :param if_none_match: (str)
894
895         :param if_modified_since: (str) formated date
896
897         :param if_unmodified_since: (str) formated date
898
899         :param data_range: (str) from-to where from and to are integers
900             denoting file positions in bytes
901
902         :returns: (list)
903         """
904         try:
905             r = self.object_get(
906                 obj,
907                 hashmap=True,
908                 version=version,
909                 if_etag_match=if_match,
910                 if_etag_not_match=if_none_match,
911                 if_modified_since=if_modified_since,
912                 if_unmodified_since=if_unmodified_since,
913                 data_range=data_range)
914         except ClientError as err:
915             if err.status == 304 or err.status == 412:
916                 return {}
917             raise
918         return r.json
919
920     def set_account_group(self, group, usernames):
921         """
922         :param group: (str)
923
924         :param usernames: (list)
925         """
926         r = self.account_post(update=True, groups={group: usernames})
927         return r
928
929     def del_account_group(self, group):
930         """
931         :param group: (str)
932         """
933         self.account_post(update=True, groups={group: []})
934
935     def get_account_info(self, until=None):
936         """
937         :param until: (str) formated date
938
939         :returns: (dict)
940         """
941         r = self.account_head(until=until)
942         if r.status_code == 401:
943             raise ClientError("No authorization", status=401)
944         return r.headers
945
946     def get_account_quota(self):
947         """
948         :returns: (dict)
949         """
950         return filter_in(
951             self.get_account_info(),
952             'X-Account-Policy-Quota',
953             exactMatch=True)
954
955     #def get_account_versioning(self):
956     #    """
957     #    :returns: (dict)
958     #    """
959     #    return filter_in(
960     #        self.get_account_info(),
961     #        'X-Account-Policy-Versioning',
962     #        exactMatch=True)
963
964     def get_account_meta(self, until=None):
965         """
966         :param until: (str) formated date
967
968         :returns: (dict)
969         """
970         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
971
972     def get_account_group(self):
973         """
974         :returns: (dict)
975         """
976         return filter_in(self.get_account_info(), 'X-Account-Group-')
977
978     def set_account_meta(self, metapairs):
979         """
980         :param metapairs: (dict) {key1:val1, key2:val2, ...}
981         """
982         assert(type(metapairs) is dict)
983         r = self.account_post(update=True, metadata=metapairs)
984         return r.headers
985
986     def del_account_meta(self, metakey):
987         """
988         :param metakey: (str) metadatum key
989         """
990         r = self.account_post(update=True, metadata={metakey: ''})
991         return r.headers
992
993     #def set_account_quota(self, quota):
994     #    """
995     #    :param quota: (int)
996     #    """
997     #    self.account_post(update=True, quota=quota)
998
999     #def set_account_versioning(self, versioning):
1000     #    """
1001     #    :param versioning: (str)
1002     #    """
1003     #    r = self.account_post(update=True, versioning=versioning)
1004     #    return r.headers
1005
1006     def list_containers(self):
1007         """
1008         :returns: (dict)
1009         """
1010         r = self.account_get()
1011         return r.json
1012
1013     def del_container(self, until=None, delimiter=None):
1014         """
1015         :param until: (str) formated date
1016
1017         :param delimiter: (str) with / empty container
1018
1019         :raises ClientError: 404 Container does not exist
1020
1021         :raises ClientError: 409 Container is not empty
1022         """
1023         self._assert_container()
1024         r = self.container_delete(
1025             until=until,
1026             delimiter=delimiter,
1027             success=(204, 404, 409))
1028         if r.status_code == 404:
1029             raise ClientError(
1030                 'Container "%s" does not exist' % self.container,
1031                 r.status_code)
1032         elif r.status_code == 409:
1033             raise ClientError(
1034                 'Container "%s" is not empty' % self.container,
1035                 r.status_code)
1036         return r.headers
1037
1038     def get_container_versioning(self, container=None):
1039         """
1040         :param container: (str)
1041
1042         :returns: (dict)
1043         """
1044         cnt_back_up = self.container
1045         try:
1046             self.container = container or cnt_back_up
1047             return filter_in(
1048                 self.get_container_info(),
1049                 'X-Container-Policy-Versioning')
1050         finally:
1051             self.container = cnt_back_up
1052
1053     def get_container_limit(self, container=None):
1054         """
1055         :param container: (str)
1056
1057         :returns: (dict)
1058         """
1059         cnt_back_up = self.container
1060         try:
1061             self.container = container or cnt_back_up
1062             return filter_in(
1063                 self.get_container_info(),
1064                 'X-Container-Policy-Quota')
1065         finally:
1066             self.container = cnt_back_up
1067
1068     def get_container_info(self, until=None):
1069         """
1070         :param until: (str) formated date
1071
1072         :returns: (dict)
1073
1074         :raises ClientError: 404 Container not found
1075         """
1076         try:
1077             r = self.container_head(until=until)
1078         except ClientError as err:
1079             err.details.append('for container %s' % self.container)
1080             raise err
1081         return r.headers
1082
1083     def get_container_meta(self, until=None):
1084         """
1085         :param until: (str) formated date
1086
1087         :returns: (dict)
1088         """
1089         return filter_in(
1090             self.get_container_info(until=until),
1091             'X-Container-Meta')
1092
1093     def get_container_object_meta(self, until=None):
1094         """
1095         :param until: (str) formated date
1096
1097         :returns: (dict)
1098         """
1099         return filter_in(
1100             self.get_container_info(until=until),
1101             'X-Container-Object-Meta')
1102
1103     def set_container_meta(self, metapairs):
1104         """
1105         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1106         """
1107         assert(type(metapairs) is dict)
1108         r = self.container_post(update=True, metadata=metapairs)
1109         return r.headers
1110
1111     def del_container_meta(self, metakey):
1112         """
1113         :param metakey: (str) metadatum key
1114
1115         :returns: (dict) response headers
1116         """
1117         r = self.container_post(update=True, metadata={metakey: ''})
1118         return r.headers
1119
1120     def set_container_limit(self, limit):
1121         """
1122         :param limit: (int)
1123         """
1124         r = self.container_post(update=True, quota=limit)
1125         return r.headers
1126
1127     def set_container_versioning(self, versioning):
1128         """
1129         :param versioning: (str)
1130         """
1131         r = self.container_post(update=True, versioning=versioning)
1132         return r.headers
1133
1134     def del_object(self, obj, until=None, delimiter=None):
1135         """
1136         :param obj: (str) remote object path
1137
1138         :param until: (str) formated date
1139
1140         :param delimiter: (str)
1141         """
1142         self._assert_container()
1143         r = self.object_delete(obj, until=until, delimiter=delimiter)
1144         return r.headers
1145
1146     def set_object_meta(self, obj, metapairs):
1147         """
1148         :param obj: (str) remote object path
1149
1150         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1151         """
1152         assert(type(metapairs) is dict)
1153         r = self.object_post(obj, update=True, metadata=metapairs)
1154         return r.headers
1155
1156     def del_object_meta(self, obj, metakey):
1157         """
1158         :param obj: (str) remote object path
1159
1160         :param metakey: (str) metadatum key
1161         """
1162         r = self.object_post(obj, update=True, metadata={metakey: ''})
1163         return r.headers
1164
1165     def publish_object(self, obj):
1166         """
1167         :param obj: (str) remote object path
1168
1169         :returns: (str) access url
1170         """
1171         self.object_post(obj, update=True, public=True)
1172         info = self.get_object_info(obj)
1173         return info['x-object-public']
1174         pref, sep, rest = self.base_url.partition('//')
1175         base = rest.split('/')[0]
1176         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1177
1178     def unpublish_object(self, obj):
1179         """
1180         :param obj: (str) remote object path
1181         """
1182         r = self.object_post(obj, update=True, public=False)
1183         return r.headers
1184
1185     def get_object_info(self, obj, version=None):
1186         """
1187         :param obj: (str) remote object path
1188
1189         :param version: (str)
1190
1191         :returns: (dict)
1192         """
1193         try:
1194             r = self.object_head(obj, version=version)
1195             return r.headers
1196         except ClientError as ce:
1197             if ce.status == 404:
1198                 raise ClientError('Object %s not found' % obj, status=404)
1199             raise
1200
1201     def get_object_meta(self, obj, version=None):
1202         """
1203         :param obj: (str) remote object path
1204
1205         :param version: (str)
1206
1207         :returns: (dict)
1208         """
1209         return filter_in(
1210             self.get_object_info(obj, version=version),
1211             'X-Object-Meta')
1212
1213     def get_object_sharing(self, obj):
1214         """
1215         :param obj: (str) remote object path
1216
1217         :returns: (dict)
1218         """
1219         r = filter_in(
1220             self.get_object_info(obj),
1221             'X-Object-Sharing',
1222             exactMatch=True)
1223         reply = {}
1224         if len(r) > 0:
1225             perms = r['x-object-sharing'].split(';')
1226             for perm in perms:
1227                 try:
1228                     perm.index('=')
1229                 except ValueError:
1230                     raise ClientError('Incorrect reply format')
1231                 (key, val) = perm.strip().split('=')
1232                 reply[key] = val
1233         return reply
1234
1235     def set_object_sharing(
1236             self, obj,
1237             read_permission=False, write_permission=False):
1238         """Give read/write permisions to an object.
1239
1240         :param obj: (str) remote object path
1241
1242         :param read_permission: (list - bool) users and user groups that get
1243             read permission for this object - False means all previous read
1244             permissions will be removed
1245
1246         :param write_permission: (list - bool) of users and user groups to get
1247            write permission for this object - False means all previous write
1248            permissions will be removed
1249
1250         :returns: (dict) response headers
1251         """
1252
1253         perms = dict(read=read_permission or '', write=write_permission or '')
1254         r = self.object_post(obj, update=True, permissions=perms)
1255         return r.headers
1256
1257     def del_object_sharing(self, obj):
1258         """
1259         :param obj: (str) remote object path
1260         """
1261         return self.set_object_sharing(obj)
1262
1263     def append_object(self, obj, source_file, upload_cb=None):
1264         """
1265         :param obj: (str) remote object path
1266
1267         :param source_file: open file descriptor
1268
1269         :param upload_db: progress.bar for uploading
1270         """
1271         self._assert_container()
1272         meta = self.get_container_info()
1273         blocksize = int(meta['x-container-block-size'])
1274         filesize = fstat(source_file.fileno()).st_size
1275         nblocks = 1 + (filesize - 1) // blocksize
1276         offset = 0
1277         headers = {}
1278         if upload_cb:
1279             self.progress_bar_gen = upload_cb(nblocks)
1280             self._cb_next()
1281         flying = {}
1282         self._init_thread_limit()
1283         try:
1284             for i in range(nblocks):
1285                 block = source_file.read(min(blocksize, filesize - offset))
1286                 offset += len(block)
1287
1288                 self._watch_thread_limit(flying.values())
1289                 unfinished = {}
1290                 flying[i] = SilentEvent(
1291                     method=self.object_post,
1292                     obj=obj,
1293                     update=True,
1294                     content_range='bytes */*',
1295                     content_type='application/octet-stream',
1296                     content_length=len(block),
1297                     data=block)
1298                 flying[i].start()
1299
1300                 for key, thread in flying.items():
1301                     if thread.isAlive():
1302                         if i < nblocks:
1303                             unfinished[key] = thread
1304                             continue
1305                         thread.join()
1306                     if thread.exception:
1307                         raise thread.exception
1308                     headers[key] = thread.value.headers
1309                     self._cb_next()
1310                 flying = unfinished
1311         except KeyboardInterrupt:
1312             sendlog.info('- - - wait for threads to finish')
1313             for thread in activethreads():
1314                 thread.join()
1315         finally:
1316             from time import sleep
1317             sleep(2 * len(activethreads()))
1318         return headers.values()
1319
1320     def truncate_object(self, obj, upto_bytes):
1321         """
1322         :param obj: (str) remote object path
1323
1324         :param upto_bytes: max number of bytes to leave on file
1325
1326         :returns: (dict) response headers
1327         """
1328         r = self.object_post(
1329             obj,
1330             update=True,
1331             content_range='bytes 0-%s/*' % upto_bytes,
1332             content_type='application/octet-stream',
1333             object_bytes=upto_bytes,
1334             source_object=path4url(self.container, obj))
1335         return r.headers
1336
1337     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1338         """Overwrite a part of an object from local source file
1339
1340         :param obj: (str) remote object path
1341
1342         :param start: (int) position in bytes to start overwriting from
1343
1344         :param end: (int) position in bytes to stop overwriting at
1345
1346         :param source_file: open file descriptor
1347
1348         :param upload_db: progress.bar for uploading
1349         """
1350
1351         r = self.get_object_info(obj)
1352         rf_size = int(r['content-length'])
1353         if rf_size < int(start):
1354             raise ClientError(
1355                 'Range start exceeds file size',
1356                 status=416)
1357         elif rf_size < int(end):
1358             raise ClientError(
1359                 'Range end exceeds file size',
1360                 status=416)
1361         self._assert_container()
1362         meta = self.get_container_info()
1363         blocksize = int(meta['x-container-block-size'])
1364         filesize = fstat(source_file.fileno()).st_size
1365         datasize = int(end) - int(start) + 1
1366         nblocks = 1 + (datasize - 1) // blocksize
1367         offset = 0
1368         if upload_cb:
1369             self.progress_bar_gen = upload_cb(nblocks)
1370             self._cb_next()
1371         headers = []
1372         for i in range(nblocks):
1373             read_size = min(blocksize, filesize - offset, datasize - offset)
1374             block = source_file.read(read_size)
1375             r = self.object_post(
1376                 obj,
1377                 update=True,
1378                 content_type='application/octet-stream',
1379                 content_length=len(block),
1380                 content_range='bytes %s-%s/*' % (
1381                     start + offset,
1382                     start + offset + len(block) - 1),
1383                 data=block)
1384             headers.append(dict(r.headers))
1385             offset += len(block)
1386
1387             self._cb_next
1388         return headers
1389
1390     def copy_object(
1391             self, src_container, src_object, dst_container,
1392             dst_object=None,
1393             source_version=None,
1394             source_account=None,
1395             public=False,
1396             content_type=None,
1397             delimiter=None):
1398         """
1399         :param src_container: (str) source container
1400
1401         :param src_object: (str) source object path
1402
1403         :param dst_container: (str) destination container
1404
1405         :param dst_object: (str) destination object path
1406
1407         :param source_version: (str) source object version
1408
1409         :param source_account: (str) account to copy from
1410
1411         :param public: (bool)
1412
1413         :param content_type: (str)
1414
1415         :param delimiter: (str)
1416
1417         :returns: (dict) response headers
1418         """
1419         self._assert_account()
1420         self.container = dst_container
1421         src_path = path4url(src_container, src_object)
1422         r = self.object_put(
1423             dst_object or src_object,
1424             success=201,
1425             copy_from=src_path,
1426             content_length=0,
1427             source_version=source_version,
1428             source_account=source_account,
1429             public=public,
1430             content_type=content_type,
1431             delimiter=delimiter)
1432         return r.headers
1433
1434     def move_object(
1435             self, src_container, src_object, dst_container,
1436             dst_object=False,
1437             source_account=None,
1438             source_version=None,
1439             public=False,
1440             content_type=None,
1441             delimiter=None):
1442         """
1443         :param src_container: (str) source container
1444
1445         :param src_object: (str) source object path
1446
1447         :param dst_container: (str) destination container
1448
1449         :param dst_object: (str) destination object path
1450
1451         :param source_account: (str) account to move from
1452
1453         :param source_version: (str) source object version
1454
1455         :param public: (bool)
1456
1457         :param content_type: (str)
1458
1459         :param delimiter: (str)
1460
1461         :returns: (dict) response headers
1462         """
1463         self._assert_account()
1464         self.container = dst_container
1465         dst_object = dst_object or src_object
1466         src_path = path4url(src_container, src_object)
1467         r = self.object_put(
1468             dst_object,
1469             success=201,
1470             move_from=src_path,
1471             content_length=0,
1472             source_account=source_account,
1473             source_version=source_version,
1474             public=public,
1475             content_type=content_type,
1476             delimiter=delimiter)
1477         return r.headers
1478
1479     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1480         """Get accounts that share with self.account
1481
1482         :param limit: (str)
1483
1484         :param marker: (str)
1485
1486         :returns: (dict)
1487         """
1488         self._assert_account()
1489
1490         self.set_param('format', 'json')
1491         self.set_param('limit', limit, iff=limit is not None)
1492         self.set_param('marker', marker, iff=marker is not None)
1493
1494         path = ''
1495         success = kwargs.pop('success', (200, 204))
1496         r = self.get(path, *args, success=success, **kwargs)
1497         return r.json
1498
1499     def get_object_versionlist(self, obj):
1500         """
1501         :param obj: (str) remote object path
1502
1503         :returns: (list)
1504         """
1505         self._assert_container()
1506         r = self.object_get(obj, format='json', version='list')
1507         return r.json['versions']