Fix all minor typos and modifications in tests
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def create_container(
75             self,
76             container=None, sizelimit=None, versioning=None, metadata=None):
77         """
78         :param container: (str) if not given, self.container is used instead
79
80         :param sizelimit: (int) container total size limit in bytes
81
82         :param versioning: (str) can be auto or whatever supported by server
83
84         :param metadata: (dict) Custom user-defined metadata of the form
85             { 'name1': 'value1', 'name2': 'value2', ... }
86
87         :returns: (dict) response headers
88         """
89         cnt_back_up = self.container
90         try:
91             self.container = container or cnt_back_up
92             r = self.container_put(
93                 quota=sizelimit, versioning=versioning, metadata=metadata)
94             return r.headers
95         finally:
96             self.container = cnt_back_up
97
98     def purge_container(self, container=None):
99         """Delete an empty container and destroy associated blocks
100         """
101         cnt_back_up = self.container
102         try:
103             self.container = container or cnt_back_up
104             r = self.container_delete(until=unicode(time()))
105         finally:
106             self.container = cnt_back_up
107         return r.headers
108
109     def upload_object_unchunked(
110             self, obj, f,
111             withHashFile=False,
112             size=None,
113             etag=None,
114             content_encoding=None,
115             content_disposition=None,
116             content_type=None,
117             sharing=None,
118             public=None):
119         """
120         :param obj: (str) remote object path
121
122         :param f: open file descriptor
123
124         :param withHashFile: (bool)
125
126         :param size: (int) size of data to upload
127
128         :param etag: (str)
129
130         :param content_encoding: (str)
131
132         :param content_disposition: (str)
133
134         :param content_type: (str)
135
136         :param sharing: {'read':[user and/or grp names],
137             'write':[usr and/or grp names]}
138
139         :param public: (bool)
140
141         :returns: (dict) created object metadata
142         """
143         self._assert_container()
144
145         if withHashFile:
146             data = f.read()
147             try:
148                 import json
149                 data = json.dumps(json.loads(data))
150             except ValueError:
151                 raise ClientError('"%s" is not json-formated' % f.name, 1)
152             except SyntaxError:
153                 msg = '"%s" is not a valid hashmap file' % f.name
154                 raise ClientError(msg, 1)
155             f = StringIO(data)
156         else:
157             data = f.read(size) if size else f.read()
158         r = self.object_put(
159             obj,
160             data=data,
161             etag=etag,
162             content_encoding=content_encoding,
163             content_disposition=content_disposition,
164             content_type=content_type,
165             permissions=sharing,
166             public=public,
167             success=201)
168         return r.headers
169
170     def create_object_by_manifestation(
171             self, obj,
172             etag=None,
173             content_encoding=None,
174             content_disposition=None,
175             content_type=None,
176             sharing=None,
177             public=None):
178         """
179         :param obj: (str) remote object path
180
181         :param etag: (str)
182
183         :param content_encoding: (str)
184
185         :param content_disposition: (str)
186
187         :param content_type: (str)
188
189         :param sharing: {'read':[user and/or grp names],
190             'write':[usr and/or grp names]}
191
192         :param public: (bool)
193
194         :returns: (dict) created object metadata
195         """
196         self._assert_container()
197         r = self.object_put(
198             obj,
199             content_length=0,
200             etag=etag,
201             content_encoding=content_encoding,
202             content_disposition=content_disposition,
203             content_type=content_type,
204             permissions=sharing,
205             public=public,
206             manifest='%s/%s' % (self.container, obj))
207         return r.headers
208
209     # upload_* auxiliary methods
210     def _put_block_async(self, data, hash):
211         event = SilentEvent(method=self._put_block, data=data, hash=hash)
212         event.start()
213         return event
214
215     def _put_block(self, data, hash):
216         r = self.container_post(
217             update=True,
218             content_type='application/octet-stream',
219             content_length=len(data),
220             data=data,
221             format='json')
222         assert r.json[0] == hash, 'Local hash does not match server'
223
224     def _get_file_block_info(self, fileobj, size=None, cache=None):
225         """
226         :param fileobj: (file descriptor) source
227
228         :param size: (int) size of data to upload from source
229
230         :param cache: (dict) if provided, cache container info response to
231         avoid redundant calls
232         """
233         if isinstance(cache, dict):
234             try:
235                 meta = cache[self.container]
236             except KeyError:
237                 meta = self.get_container_info()
238                 cache[self.container] = meta
239         else:
240             meta = self.get_container_info()
241         blocksize = int(meta['x-container-block-size'])
242         blockhash = meta['x-container-block-hash']
243         size = size if size is not None else fstat(fileobj.fileno()).st_size
244         nblocks = 1 + (size - 1) // blocksize
245         return (blocksize, blockhash, size, nblocks)
246
247     def _create_object_or_get_missing_hashes(
248             self, obj, json,
249             size=None,
250             format='json',
251             hashmap=True,
252             content_type=None,
253             if_etag_match=None,
254             if_etag_not_match=None,
255             content_encoding=None,
256             content_disposition=None,
257             permissions=None,
258             public=None,
259             success=(201, 409)):
260         r = self.object_put(
261             obj,
262             format='json',
263             hashmap=True,
264             content_type=content_type,
265             json=json,
266             if_etag_match=if_etag_match,
267             if_etag_not_match=if_etag_not_match,
268             content_encoding=content_encoding,
269             content_disposition=content_disposition,
270             permissions=permissions,
271             public=public,
272             success=success)
273         return (None if r.status_code == 201 else r.json), r.headers
274
275     def _calculate_blocks_for_upload(
276             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277             hash_cb=None):
278         offset = 0
279         if hash_cb:
280             hash_gen = hash_cb(nblocks)
281             hash_gen.next()
282
283         for i in range(nblocks):
284             block = fileobj.read(min(blocksize, size - offset))
285             bytes = len(block)
286             hash = _pithos_hash(block, blockhash)
287             hashes.append(hash)
288             hmap[hash] = (offset, bytes)
289             offset += bytes
290             if hash_cb:
291                 hash_gen.next()
292         msg = 'Failed to calculate uploaded blocks:'
293         ' Offset and object size do not match'
294         assert offset == size, msg
295
296     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297         """upload missing blocks asynchronously"""
298
299         self._init_thread_limit()
300
301         flying = []
302         failures = []
303         for hash in missing:
304             offset, bytes = hmap[hash]
305             fileobj.seek(offset)
306             data = fileobj.read(bytes)
307             r = self._put_block_async(data, hash)
308             flying.append(r)
309             unfinished = self._watch_thread_limit(flying)
310             for thread in set(flying).difference(unfinished):
311                 if thread.exception:
312                     failures.append(thread)
313                     if isinstance(
314                             thread.exception,
315                             ClientError) and thread.exception.status == 502:
316                         self.POOLSIZE = self._thread_limit
317                 elif thread.isAlive():
318                     flying.append(thread)
319                 elif upload_gen:
320                     try:
321                         upload_gen.next()
322                     except:
323                         pass
324             flying = unfinished
325
326         for thread in flying:
327             thread.join()
328             if thread.exception:
329                 failures.append(thread)
330             elif upload_gen:
331                 try:
332                     upload_gen.next()
333                 except:
334                     pass
335
336         return [failure.kwargs['hash'] for failure in failures]
337
338     def upload_object(
339             self, obj, f,
340             size=None,
341             hash_cb=None,
342             upload_cb=None,
343             etag=None,
344             if_etag_match=None,
345             if_not_exist=None,
346             content_encoding=None,
347             content_disposition=None,
348             content_type=None,
349             sharing=None,
350             public=None,
351             container_info_cache=None):
352         """Upload an object using multiple connections (threads)
353
354         :param obj: (str) remote object path
355
356         :param f: open file descriptor (rb)
357
358         :param hash_cb: optional progress.bar object for calculating hashes
359
360         :param upload_cb: optional progress.bar object for uploading
361
362         :param etag: (str)
363
364         :param if_etag_match: (str) Push that value to if-match header at file
365             creation
366
367         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368             it does not exist remotely, otherwise the operation will fail.
369             Involves the case of an object with the same path is created while
370             the object is being uploaded.
371
372         :param content_encoding: (str)
373
374         :param content_disposition: (str)
375
376         :param content_type: (str)
377
378         :param sharing: {'read':[user and/or grp names],
379             'write':[usr and/or grp names]}
380
381         :param public: (bool)
382
383         :param container_info_cache: (dict) if given, avoid redundant calls to
384             server for container info (block size and hash information)
385         """
386         self._assert_container()
387
388         block_info = (
389             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390                 f, size, container_info_cache)
391         (hashes, hmap, offset) = ([], {}, 0)
392         if not content_type:
393             content_type = 'application/octet-stream'
394
395         self._calculate_blocks_for_upload(
396             *block_info,
397             hashes=hashes,
398             hmap=hmap,
399             fileobj=f,
400             hash_cb=hash_cb)
401
402         hashmap = dict(bytes=size, hashes=hashes)
403         missing, obj_headers = self._create_object_or_get_missing_hashes(
404             obj, hashmap,
405             content_type=content_type,
406             size=size,
407             if_etag_match=if_etag_match,
408             if_etag_not_match='*' if if_not_exist else None,
409             content_encoding=content_encoding,
410             content_disposition=content_disposition,
411             permissions=sharing,
412             public=public)
413
414         if missing is None:
415             return obj_headers
416
417         if upload_cb:
418             upload_gen = upload_cb(len(missing))
419             for i in range(len(missing), len(hashmap['hashes']) + 1):
420                 try:
421                     upload_gen.next()
422                 except:
423                     upload_gen = None
424         else:
425             upload_gen = None
426
427         retries = 7
428         try:
429             while retries:
430                 sendlog.info('%s blocks missing' % len(missing))
431                 num_of_blocks = len(missing)
432                 missing = self._upload_missing_blocks(
433                     missing,
434                     hmap,
435                     f,
436                     upload_gen)
437                 if missing:
438                     if num_of_blocks == len(missing):
439                         retries -= 1
440                     else:
441                         num_of_blocks = len(missing)
442                 else:
443                     break
444             if missing:
445                 raise ClientError(
446                     '%s blocks failed to upload' % len(missing),
447                     details=['%s' % thread.exception for thread in missing])
448         except KeyboardInterrupt:
449             sendlog.info('- - - wait for threads to finish')
450             for thread in activethreads():
451                 thread.join()
452             raise
453
454         r = self.object_put(
455             obj,
456             format='json',
457             hashmap=True,
458             content_type=content_type,
459             if_etag_match=if_etag_match,
460             if_etag_not_match='*' if if_not_exist else None,
461             etag=etag,
462             json=hashmap,
463             permissions=sharing,
464             public=public,
465             success=201)
466         return r.headers
467
468     def upload_from_string(
469             self, obj, input_str,
470             hash_cb=None,
471             upload_cb=None,
472             etag=None,
473             if_etag_match=None,
474             if_not_exist=None,
475             content_encoding=None,
476             content_disposition=None,
477             content_type=None,
478             sharing=None,
479             public=None,
480             container_info_cache=None):
481         """Upload an object using multiple connections (threads)
482
483         :param obj: (str) remote object path
484
485         :param input_str: (str) upload content
486
487         :param hash_cb: optional progress.bar object for calculating hashes
488
489         :param upload_cb: optional progress.bar object for uploading
490
491         :param etag: (str)
492
493         :param if_etag_match: (str) Push that value to if-match header at file
494             creation
495
496         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497             it does not exist remotely, otherwise the operation will fail.
498             Involves the case of an object with the same path is created while
499             the object is being uploaded.
500
501         :param content_encoding: (str)
502
503         :param content_disposition: (str)
504
505         :param content_type: (str)
506
507         :param sharing: {'read':[user and/or grp names],
508             'write':[usr and/or grp names]}
509
510         :param public: (bool)
511
512         :param container_info_cache: (dict) if given, avoid redundant calls to
513             server for container info (block size and hash information)
514         """
515         self._assert_container()
516
517         blocksize, blockhash, size, nblocks = self._get_file_block_info(
518                 fileobj=None, size=len(input_str), cache=container_info_cache)
519         (hashes, hmap, offset) = ([], {}, 0)
520         if not content_type:
521             content_type = 'application/octet-stream'
522
523         hashes = []
524         hmap = {}
525         for blockid in range(nblocks):
526             start = blockid * blocksize
527             block = input_str[start: (start + blocksize)]
528             hashes.append(_pithos_hash(block, blockhash))
529             hmap[hashes[blockid]] = (start, block)
530
531         hashmap = dict(bytes=size, hashes=hashes)
532         missing, obj_headers = self._create_object_or_get_missing_hashes(
533             obj, hashmap,
534             content_type=content_type,
535             size=size,
536             if_etag_match=if_etag_match,
537             if_etag_not_match='*' if if_not_exist else None,
538             content_encoding=content_encoding,
539             content_disposition=content_disposition,
540             permissions=sharing,
541             public=public)
542         if missing is None:
543             return obj_headers
544         num_of_missing = len(missing)
545
546         if upload_cb:
547             self.progress_bar_gen = upload_cb(nblocks)
548             for i in range(nblocks + 1 - num_of_missing):
549                 self._cb_next()
550
551         tries = 7
552         old_failures = 0
553         try:
554             while tries and missing:
555                 flying = []
556                 failures = []
557                 for hash in missing:
558                     offset, block = hmap[hash]
559                     bird = self._put_block_async(block, hash)
560                     flying.append(bird)
561                     unfinished = self._watch_thread_limit(flying)
562                     for thread in set(flying).difference(unfinished):
563                         if thread.exception:
564                             failures.append(thread.kwargs['hash'])
565                         if thread.isAlive():
566                             flying.append(thread)
567                         else:
568                             self._cb_next()
569                     flying = unfinished
570                 for thread in flying:
571                     thread.join()
572                     if thread.exception:
573                         failures.append(thread.kwargs['hash'])
574                     self._cb_next()
575                 missing = failures
576                 if missing and len(missing) == old_failures:
577                     tries -= 1
578                 old_failures = len(missing)
579             if missing:
580                 raise ClientError(
581                     '%s blocks failed to upload' % len(missing),
582                     details=['%s' % thread.exception for thread in missing])
583         except KeyboardInterrupt:
584             sendlog.info('- - - wait for threads to finish')
585             for thread in activethreads():
586                 thread.join()
587             raise
588
589         r = self.object_put(
590             obj,
591             format='json',
592             hashmap=True,
593             content_type=content_type,
594             if_etag_match=if_etag_match,
595             if_etag_not_match='*' if if_not_exist else None,
596             etag=etag,
597             json=hashmap,
598             permissions=sharing,
599             public=public,
600             success=201)
601         return r.headers
602
603     # download_* auxiliary methods
604     def _get_remote_blocks_info(self, obj, **restargs):
605         #retrieve object hashmap
606         myrange = restargs.pop('data_range', None)
607         hashmap = self.get_object_hashmap(obj, **restargs)
608         restargs['data_range'] = myrange
609         blocksize = int(hashmap['block_size'])
610         blockhash = hashmap['block_hash']
611         total_size = hashmap['bytes']
612         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
613         map_dict = {}
614         for i, h in enumerate(hashmap['hashes']):
615             #  map_dict[h] = i   CHAGE
616             if h in map_dict:
617                 map_dict[h].append(i)
618             else:
619                 map_dict[h] = [i]
620         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
621
622     def _dump_blocks_sync(
623             self, obj, remote_hashes, blocksize, total_size, dst, range,
624             **args):
625         for blockid, blockhash in enumerate(remote_hashes):
626             if blockhash:
627                 start = blocksize * blockid
628                 is_last = start + blocksize > total_size
629                 end = (total_size - 1) if is_last else (start + blocksize - 1)
630                 (start, end) = _range_up(start, end, range)
631                 args['data_range'] = 'bytes=%s-%s' % (start, end)
632                 r = self.object_get(obj, success=(200, 206), **args)
633                 self._cb_next()
634                 dst.write(r.content)
635                 dst.flush()
636
637     def _get_block_async(self, obj, **args):
638         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
639         event.start()
640         return event
641
642     def _hash_from_file(self, fp, start, size, blockhash):
643         fp.seek(start)
644         block = fp.read(size)
645         h = newhashlib(blockhash)
646         h.update(block.strip('\x00'))
647         return hexlify(h.digest())
648
649     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
650         """write the results of a greenleted rest call to a file
651
652         :param offset: the offset of the file up to blocksize
653         - e.g. if the range is 10-100, all blocks will be written to
654         normal_position - 10
655         """
656         for key, g in flying.items():
657             if g.isAlive():
658                 continue
659             if g.exception:
660                 raise g.exception
661             block = g.value.content
662             for block_start in blockids[key]:
663                 local_file.seek(block_start + offset)
664                 local_file.write(block)
665                 self._cb_next()
666             flying.pop(key)
667             blockids.pop(key)
668         local_file.flush()
669
670     def _dump_blocks_async(
671             self, obj, remote_hashes, blocksize, total_size, local_file,
672             blockhash=None, resume=False, filerange=None, **restargs):
673         file_size = fstat(local_file.fileno()).st_size if resume else 0
674         flying = dict()
675         blockid_dict = dict()
676         offset = 0
677         if filerange is not None:
678             rstart = int(filerange.split('-')[0])
679             offset = rstart if blocksize > rstart else rstart % blocksize
680
681         self._init_thread_limit()
682         for block_hash, blockids in remote_hashes.items():
683             blockids = [blk * blocksize for blk in blockids]
684             unsaved = [blk for blk in blockids if not (
685                 blk < file_size and block_hash == self._hash_from_file(
686                         local_file, blk, blocksize, blockhash))]
687             self._cb_next(len(blockids) - len(unsaved))
688             if unsaved:
689                 key = unsaved[0]
690                 self._watch_thread_limit(flying.values())
691                 self._thread2file(
692                     flying, blockid_dict, local_file, offset,
693                     **restargs)
694                 end = total_size - 1 if (
695                     key + blocksize > total_size) else key + blocksize - 1
696                 start, end = _range_up(key, end, filerange)
697                 if start == end:
698                     self._cb_next()
699                     continue
700                 restargs['async_headers'] = {
701                     'Range': 'bytes=%s-%s' % (start, end)}
702                 flying[key] = self._get_block_async(obj, **restargs)
703                 blockid_dict[key] = unsaved
704
705         for thread in flying.values():
706             thread.join()
707         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
708
709     def download_object(
710             self, obj, dst,
711             download_cb=None,
712             version=None,
713             resume=False,
714             range_str=None,
715             if_match=None,
716             if_none_match=None,
717             if_modified_since=None,
718             if_unmodified_since=None):
719         """Download an object (multiple connections, random blocks)
720
721         :param obj: (str) remote object path
722
723         :param dst: open file descriptor (wb+)
724
725         :param download_cb: optional progress.bar object for downloading
726
727         :param version: (str) file version
728
729         :param resume: (bool) if set, preserve already downloaded file parts
730
731         :param range_str: (str) from, to are file positions (int) in bytes
732
733         :param if_match: (str)
734
735         :param if_none_match: (str)
736
737         :param if_modified_since: (str) formated date
738
739         :param if_unmodified_since: (str) formated date"""
740         restargs = dict(
741             version=version,
742             data_range=None if range_str is None else 'bytes=%s' % range_str,
743             if_match=if_match,
744             if_none_match=if_none_match,
745             if_modified_since=if_modified_since,
746             if_unmodified_since=if_unmodified_since)
747
748         (
749             blocksize,
750             blockhash,
751             total_size,
752             hash_list,
753             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
754         assert total_size >= 0
755
756         if download_cb:
757             self.progress_bar_gen = download_cb(len(hash_list))
758             self._cb_next()
759
760         if dst.isatty():
761             self._dump_blocks_sync(
762                 obj,
763                 hash_list,
764                 blocksize,
765                 total_size,
766                 dst,
767                 range_str,
768                 **restargs)
769         else:
770             self._dump_blocks_async(
771                 obj,
772                 remote_hashes,
773                 blocksize,
774                 total_size,
775                 dst,
776                 blockhash,
777                 resume,
778                 range_str,
779                 **restargs)
780             if not range_str:
781                 dst.truncate(total_size)
782
783         self._complete_cb()
784
785     def download_to_string(
786             self, obj,
787             download_cb=None,
788             version=None,
789             range_str=None,
790             if_match=None,
791             if_none_match=None,
792             if_modified_since=None,
793             if_unmodified_since=None):
794         """Download an object to a string (multiple connections). This method
795         uses threads for http requests, but stores all content in memory.
796
797         :param obj: (str) remote object path
798
799         :param download_cb: optional progress.bar object for downloading
800
801         :param version: (str) file version
802
803         :param range_str: (str) from, to are file positions (int) in bytes
804
805         :param if_match: (str)
806
807         :param if_none_match: (str)
808
809         :param if_modified_since: (str) formated date
810
811         :param if_unmodified_since: (str) formated date
812
813         :returns: (str) the whole object contents
814         """
815         restargs = dict(
816             version=version,
817             data_range=None if range_str is None else 'bytes=%s' % range_str,
818             if_match=if_match,
819             if_none_match=if_none_match,
820             if_modified_since=if_modified_since,
821             if_unmodified_since=if_unmodified_since)
822
823         (
824             blocksize,
825             blockhash,
826             total_size,
827             hash_list,
828             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
829         assert total_size >= 0
830
831         if download_cb:
832             self.progress_bar_gen = download_cb(len(hash_list))
833             self._cb_next()
834
835         num_of_blocks = len(remote_hashes)
836         ret = [''] * num_of_blocks
837         self._init_thread_limit()
838         flying = dict()
839         try:
840             for blockid, blockhash in enumerate(remote_hashes):
841                 start = blocksize * blockid
842                 is_last = start + blocksize > total_size
843                 end = (total_size - 1) if is_last else (start + blocksize - 1)
844                 (start, end) = _range_up(start, end, range_str)
845                 if start < end:
846                     self._watch_thread_limit(flying.values())
847                     flying[blockid] = self._get_block_async(obj, **restargs)
848                 for runid, thread in flying.items():
849                     if (blockid + 1) == num_of_blocks:
850                         thread.join()
851                     elif thread.isAlive():
852                         continue
853                     if thread.exception:
854                         raise thread.exception
855                     ret[runid] = thread.value.content
856                     self._cb_next()
857                     flying.pop(runid)
858             return ''.join(ret)
859         except KeyboardInterrupt:
860             sendlog.info('- - - wait for threads to finish')
861             for thread in activethreads():
862                 thread.join()
863
864     #Command Progress Bar method
865     def _cb_next(self, step=1):
866         if hasattr(self, 'progress_bar_gen'):
867             try:
868                 for i in xrange(step):
869                     self.progress_bar_gen.next()
870             except:
871                 pass
872
873     def _complete_cb(self):
874         while True:
875             try:
876                 self.progress_bar_gen.next()
877             except:
878                 break
879
880     def get_object_hashmap(
881             self, obj,
882             version=None,
883             if_match=None,
884             if_none_match=None,
885             if_modified_since=None,
886             if_unmodified_since=None,
887             data_range=None):
888         """
889         :param obj: (str) remote object path
890
891         :param if_match: (str)
892
893         :param if_none_match: (str)
894
895         :param if_modified_since: (str) formated date
896
897         :param if_unmodified_since: (str) formated date
898
899         :param data_range: (str) from-to where from and to are integers
900             denoting file positions in bytes
901
902         :returns: (list)
903         """
904         try:
905             r = self.object_get(
906                 obj,
907                 hashmap=True,
908                 version=version,
909                 if_etag_match=if_match,
910                 if_etag_not_match=if_none_match,
911                 if_modified_since=if_modified_since,
912                 if_unmodified_since=if_unmodified_since,
913                 data_range=data_range)
914         except ClientError as err:
915             if err.status == 304 or err.status == 412:
916                 return {}
917             raise
918         return r.json
919
920     def set_account_group(self, group, usernames):
921         """
922         :param group: (str)
923
924         :param usernames: (list)
925         """
926         r = self.account_post(update=True, groups={group: usernames})
927         return r
928
929     def del_account_group(self, group):
930         """
931         :param group: (str)
932         """
933         self.account_post(update=True, groups={group: []})
934
935     def get_account_info(self, until=None):
936         """
937         :param until: (str) formated date
938
939         :returns: (dict)
940         """
941         r = self.account_head(until=until)
942         if r.status_code == 401:
943             raise ClientError("No authorization", status=401)
944         return r.headers
945
946     def get_account_quota(self):
947         """
948         :returns: (dict)
949         """
950         return filter_in(
951             self.get_account_info(),
952             'X-Account-Policy-Quota',
953             exactMatch=True)
954
955     #def get_account_versioning(self):
956     #    """
957     #    :returns: (dict)
958     #    """
959     #    return filter_in(
960     #        self.get_account_info(),
961     #        'X-Account-Policy-Versioning',
962     #        exactMatch=True)
963
964     def get_account_meta(self, until=None):
965         """
966         :param until: (str) formated date
967
968         :returns: (dict)
969         """
970         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
971
972     def get_account_group(self):
973         """
974         :returns: (dict)
975         """
976         return filter_in(self.get_account_info(), 'X-Account-Group-')
977
978     def set_account_meta(self, metapairs):
979         """
980         :param metapairs: (dict) {key1:val1, key2:val2, ...}
981         """
982         assert(type(metapairs) is dict)
983         r = self.account_post(update=True, metadata=metapairs)
984         return r.headers
985
986     def del_account_meta(self, metakey):
987         """
988         :param metakey: (str) metadatum key
989         """
990         r = self.account_post(update=True, metadata={metakey: ''})
991         return r.headers
992
993     #def set_account_quota(self, quota):
994     #    """
995     #    :param quota: (int)
996     #    """
997     #    self.account_post(update=True, quota=quota)
998
999     #def set_account_versioning(self, versioning):
1000     #    """
1001     #    :param versioning: (str)
1002     #    """
1003     #    r = self.account_post(update=True, versioning=versioning)
1004     #    return r.headers
1005
1006     def list_containers(self):
1007         """
1008         :returns: (dict)
1009         """
1010         r = self.account_get()
1011         return r.json
1012
1013     def del_container(self, until=None, delimiter=None):
1014         """
1015         :param until: (str) formated date
1016
1017         :param delimiter: (str) with / empty container
1018
1019         :raises ClientError: 404 Container does not exist
1020
1021         :raises ClientError: 409 Container is not empty
1022         """
1023         self._assert_container()
1024         r = self.container_delete(
1025             until=until,
1026             delimiter=delimiter,
1027             success=(204, 404, 409))
1028         if r.status_code == 404:
1029             raise ClientError(
1030                 'Container "%s" does not exist' % self.container,
1031                 r.status_code)
1032         elif r.status_code == 409:
1033             raise ClientError(
1034                 'Container "%s" is not empty' % self.container,
1035                 r.status_code)
1036         return r.headers
1037
1038     def get_container_versioning(self, container=None):
1039         """
1040         :param container: (str)
1041
1042         :returns: (dict)
1043         """
1044         cnt_back_up = self.container
1045         try:
1046             self.container = container or cnt_back_up
1047             return filter_in(
1048                 self.get_container_info(),
1049                 'X-Container-Policy-Versioning')
1050         finally:
1051             self.container = cnt_back_up
1052
1053     def get_container_limit(self, container=None):
1054         """
1055         :param container: (str)
1056
1057         :returns: (dict)
1058         """
1059         cnt_back_up = self.container
1060         try:
1061             self.container = container or cnt_back_up
1062             return filter_in(
1063                 self.get_container_info(),
1064                 'X-Container-Policy-Quota')
1065         finally:
1066             self.container = cnt_back_up
1067
1068     def get_container_info(self, until=None):
1069         """
1070         :param until: (str) formated date
1071
1072         :returns: (dict)
1073
1074         :raises ClientError: 404 Container not found
1075         """
1076         try:
1077             r = self.container_head(until=until)
1078         except ClientError as err:
1079             err.details.append('for container %s' % self.container)
1080             raise err
1081         return r.headers
1082
1083     def get_container_meta(self, until=None):
1084         """
1085         :param until: (str) formated date
1086
1087         :returns: (dict)
1088         """
1089         return filter_in(
1090             self.get_container_info(until=until),
1091             'X-Container-Meta')
1092
1093     def get_container_object_meta(self, until=None):
1094         """
1095         :param until: (str) formated date
1096
1097         :returns: (dict)
1098         """
1099         return filter_in(
1100             self.get_container_info(until=until),
1101             'X-Container-Object-Meta')
1102
1103     def set_container_meta(self, metapairs):
1104         """
1105         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1106         """
1107         assert(type(metapairs) is dict)
1108         r = self.container_post(update=True, metadata=metapairs)
1109         return r.headers
1110
1111     def del_container_meta(self, metakey):
1112         """
1113         :param metakey: (str) metadatum key
1114
1115         :returns: (dict) response headers
1116         """
1117         r = self.container_post(update=True, metadata={metakey: ''})
1118         return r.headers
1119
1120     def set_container_limit(self, limit):
1121         """
1122         :param limit: (int)
1123         """
1124         r = self.container_post(update=True, quota=limit)
1125         return r.headers
1126
1127     def set_container_versioning(self, versioning):
1128         """
1129         :param versioning: (str)
1130         """
1131         r = self.container_post(update=True, versioning=versioning)
1132         return r.headers
1133
1134     def del_object(self, obj, until=None, delimiter=None):
1135         """
1136         :param obj: (str) remote object path
1137
1138         :param until: (str) formated date
1139
1140         :param delimiter: (str)
1141         """
1142         self._assert_container()
1143         r = self.object_delete(obj, until=until, delimiter=delimiter)
1144         return r.headers
1145
1146     def set_object_meta(self, obj, metapairs):
1147         """
1148         :param obj: (str) remote object path
1149
1150         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1151         """
1152         assert(type(metapairs) is dict)
1153         r = self.object_post(obj, update=True, metadata=metapairs)
1154         return r.headers
1155
1156     def del_object_meta(self, obj, metakey):
1157         """
1158         :param obj: (str) remote object path
1159
1160         :param metakey: (str) metadatum key
1161         """
1162         r = self.object_post(obj, update=True, metadata={metakey: ''})
1163         return r.headers
1164
1165     def publish_object(self, obj):
1166         """
1167         :param obj: (str) remote object path
1168
1169         :returns: (str) access url
1170         """
1171         self.object_post(obj, update=True, public=True)
1172         info = self.get_object_info(obj)
1173         pref, sep, rest = self.base_url.partition('//')
1174         base = rest.split('/')[0]
1175         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1176
1177     def unpublish_object(self, obj):
1178         """
1179         :param obj: (str) remote object path
1180         """
1181         r = self.object_post(obj, update=True, public=False)
1182         return r.headers
1183
1184     def get_object_info(self, obj, version=None):
1185         """
1186         :param obj: (str) remote object path
1187
1188         :param version: (str)
1189
1190         :returns: (dict)
1191         """
1192         try:
1193             r = self.object_head(obj, version=version)
1194             return r.headers
1195         except ClientError as ce:
1196             if ce.status == 404:
1197                 raise ClientError('Object %s not found' % obj, status=404)
1198             raise
1199
1200     def get_object_meta(self, obj, version=None):
1201         """
1202         :param obj: (str) remote object path
1203
1204         :param version: (str)
1205
1206         :returns: (dict)
1207         """
1208         return filter_in(
1209             self.get_object_info(obj, version=version),
1210             'X-Object-Meta')
1211
1212     def get_object_sharing(self, obj):
1213         """
1214         :param obj: (str) remote object path
1215
1216         :returns: (dict)
1217         """
1218         r = filter_in(
1219             self.get_object_info(obj),
1220             'X-Object-Sharing',
1221             exactMatch=True)
1222         reply = {}
1223         if len(r) > 0:
1224             perms = r['x-object-sharing'].split(';')
1225             for perm in perms:
1226                 try:
1227                     perm.index('=')
1228                 except ValueError:
1229                     raise ClientError('Incorrect reply format')
1230                 (key, val) = perm.strip().split('=')
1231                 reply[key] = val
1232         return reply
1233
1234     def set_object_sharing(
1235             self, obj,
1236             read_permission=False, write_permission=False):
1237         """Give read/write permisions to an object.
1238
1239         :param obj: (str) remote object path
1240
1241         :param read_permission: (list - bool) users and user groups that get
1242             read permission for this object - False means all previous read
1243             permissions will be removed
1244
1245         :param write_permission: (list - bool) of users and user groups to get
1246            write permission for this object - False means all previous write
1247            permissions will be removed
1248
1249         :returns: (dict) response headers
1250         """
1251
1252         perms = dict(read=read_permission or '', write=write_permission or '')
1253         r = self.object_post(obj, update=True, permissions=perms)
1254         return r.headers
1255
1256     def del_object_sharing(self, obj):
1257         """
1258         :param obj: (str) remote object path
1259         """
1260         return self.set_object_sharing(obj)
1261
1262     def append_object(self, obj, source_file, upload_cb=None):
1263         """
1264         :param obj: (str) remote object path
1265
1266         :param source_file: open file descriptor
1267
1268         :param upload_db: progress.bar for uploading
1269         """
1270         self._assert_container()
1271         meta = self.get_container_info()
1272         blocksize = int(meta['x-container-block-size'])
1273         filesize = fstat(source_file.fileno()).st_size
1274         nblocks = 1 + (filesize - 1) // blocksize
1275         offset = 0
1276         headers = {}
1277         if upload_cb:
1278             self.progress_bar_gen = upload_cb(nblocks)
1279             self._cb_next()
1280         flying = {}
1281         self._init_thread_limit()
1282         try:
1283             for i in range(nblocks):
1284                 block = source_file.read(min(blocksize, filesize - offset))
1285                 offset += len(block)
1286
1287                 self._watch_thread_limit(flying.values())
1288                 unfinished = {}
1289                 flying[i] = SilentEvent(
1290                     method=self.object_post,
1291                     obj=obj,
1292                     update=True,
1293                     content_range='bytes */*',
1294                     content_type='application/octet-stream',
1295                     content_length=len(block),
1296                     data=block)
1297                 flying[i].start()
1298
1299                 for key, thread in flying.items():
1300                     if thread.isAlive():
1301                         if i < nblocks:
1302                             unfinished[key] = thread
1303                             continue
1304                         thread.join()
1305                     if thread.exception:
1306                         raise thread.exception
1307                     headers[key] = thread.value.headers
1308                     self._cb_next()
1309                 flying = unfinished
1310         except KeyboardInterrupt:
1311             sendlog.info('- - - wait for threads to finish')
1312             for thread in activethreads():
1313                 thread.join()
1314         finally:
1315             from time import sleep
1316             sleep(2 * len(activethreads()))
1317         return headers.values()
1318
1319     def truncate_object(self, obj, upto_bytes):
1320         """
1321         :param obj: (str) remote object path
1322
1323         :param upto_bytes: max number of bytes to leave on file
1324
1325         :returns: (dict) response headers
1326         """
1327         r = self.object_post(
1328             obj,
1329             update=True,
1330             content_range='bytes 0-%s/*' % upto_bytes,
1331             content_type='application/octet-stream',
1332             object_bytes=upto_bytes,
1333             source_object=path4url(self.container, obj))
1334         return r.headers
1335
1336     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1337         """Overwrite a part of an object from local source file
1338
1339         :param obj: (str) remote object path
1340
1341         :param start: (int) position in bytes to start overwriting from
1342
1343         :param end: (int) position in bytes to stop overwriting at
1344
1345         :param source_file: open file descriptor
1346
1347         :param upload_db: progress.bar for uploading
1348         """
1349
1350         r = self.get_object_info(obj)
1351         rf_size = int(r['content-length'])
1352         if rf_size < int(start):
1353             raise ClientError(
1354                 'Range start exceeds file size',
1355                 status=416)
1356         elif rf_size < int(end):
1357             raise ClientError(
1358                 'Range end exceeds file size',
1359                 status=416)
1360         self._assert_container()
1361         meta = self.get_container_info()
1362         blocksize = int(meta['x-container-block-size'])
1363         filesize = fstat(source_file.fileno()).st_size
1364         datasize = int(end) - int(start) + 1
1365         nblocks = 1 + (datasize - 1) // blocksize
1366         offset = 0
1367         if upload_cb:
1368             self.progress_bar_gen = upload_cb(nblocks)
1369             self._cb_next()
1370         headers = []
1371         for i in range(nblocks):
1372             read_size = min(blocksize, filesize - offset, datasize - offset)
1373             block = source_file.read(read_size)
1374             r = self.object_post(
1375                 obj,
1376                 update=True,
1377                 content_type='application/octet-stream',
1378                 content_length=len(block),
1379                 content_range='bytes %s-%s/*' % (
1380                     start + offset,
1381                     start + offset + len(block) - 1),
1382                 data=block)
1383             headers.append(dict(r.headers))
1384             offset += len(block)
1385
1386             self._cb_next
1387         return headers
1388
1389     def copy_object(
1390             self, src_container, src_object, dst_container,
1391             dst_object=None,
1392             source_version=None,
1393             source_account=None,
1394             public=False,
1395             content_type=None,
1396             delimiter=None):
1397         """
1398         :param src_container: (str) source container
1399
1400         :param src_object: (str) source object path
1401
1402         :param dst_container: (str) destination container
1403
1404         :param dst_object: (str) destination object path
1405
1406         :param source_version: (str) source object version
1407
1408         :param source_account: (str) account to copy from
1409
1410         :param public: (bool)
1411
1412         :param content_type: (str)
1413
1414         :param delimiter: (str)
1415
1416         :returns: (dict) response headers
1417         """
1418         self._assert_account()
1419         self.container = dst_container
1420         src_path = path4url(src_container, src_object)
1421         r = self.object_put(
1422             dst_object or src_object,
1423             success=201,
1424             copy_from=src_path,
1425             content_length=0,
1426             source_version=source_version,
1427             source_account=source_account,
1428             public=public,
1429             content_type=content_type,
1430             delimiter=delimiter)
1431         return r.headers
1432
1433     def move_object(
1434             self, src_container, src_object, dst_container,
1435             dst_object=False,
1436             source_account=None,
1437             source_version=None,
1438             public=False,
1439             content_type=None,
1440             delimiter=None):
1441         """
1442         :param src_container: (str) source container
1443
1444         :param src_object: (str) source object path
1445
1446         :param dst_container: (str) destination container
1447
1448         :param dst_object: (str) destination object path
1449
1450         :param source_account: (str) account to move from
1451
1452         :param source_version: (str) source object version
1453
1454         :param public: (bool)
1455
1456         :param content_type: (str)
1457
1458         :param delimiter: (str)
1459
1460         :returns: (dict) response headers
1461         """
1462         self._assert_account()
1463         self.container = dst_container
1464         dst_object = dst_object or src_object
1465         src_path = path4url(src_container, src_object)
1466         r = self.object_put(
1467             dst_object,
1468             success=201,
1469             move_from=src_path,
1470             content_length=0,
1471             source_account=source_account,
1472             source_version=source_version,
1473             public=public,
1474             content_type=content_type,
1475             delimiter=delimiter)
1476         return r.headers
1477
1478     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1479         """Get accounts that share with self.account
1480
1481         :param limit: (str)
1482
1483         :param marker: (str)
1484
1485         :returns: (dict)
1486         """
1487         self._assert_account()
1488
1489         self.set_param('format', 'json')
1490         self.set_param('limit', limit, iff=limit is not None)
1491         self.set_param('marker', marker, iff=marker is not None)
1492
1493         path = ''
1494         success = kwargs.pop('success', (200, 204))
1495         r = self.get(path, *args, success=success, **kwargs)
1496         return r.json
1497
1498     def get_object_versionlist(self, obj):
1499         """
1500         :param obj: (str) remote object path
1501
1502         :returns: (list)
1503         """
1504         self._assert_container()
1505         r = self.object_get(obj, format='json', version='list')
1506         return r.json['versions']