Do not allow thread exceptions to stop errors
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def create_container(
75             self,
76             container=None, sizelimit=None, versioning=None, metadata=None):
77         """
78         :param container: (str) if not given, self.container is used instead
79
80         :param sizelimit: (int) container total size limit in bytes
81
82         :param versioning: (str) can be auto or whatever supported by server
83
84         :param metadata: (dict) Custom user-defined metadata of the form
85             { 'name1': 'value1', 'name2': 'value2', ... }
86
87         :returns: (dict) response headers
88         """
89         cnt_back_up = self.container
90         try:
91             self.container = container or cnt_back_up
92             r = self.container_put(
93                 quota=sizelimit, versioning=versioning, metadata=metadata)
94             return r.headers
95         finally:
96             self.container = cnt_back_up
97
98     def purge_container(self, container=None):
99         """Delete an empty container and destroy associated blocks
100         """
101         cnt_back_up = self.container
102         try:
103             self.container = container or cnt_back_up
104             r = self.container_delete(until=unicode(time()))
105         finally:
106             self.container = cnt_back_up
107         return r.headers
108
109     def upload_object_unchunked(
110             self, obj, f,
111             withHashFile=False,
112             size=None,
113             etag=None,
114             content_encoding=None,
115             content_disposition=None,
116             content_type=None,
117             sharing=None,
118             public=None):
119         """
120         :param obj: (str) remote object path
121
122         :param f: open file descriptor
123
124         :param withHashFile: (bool)
125
126         :param size: (int) size of data to upload
127
128         :param etag: (str)
129
130         :param content_encoding: (str)
131
132         :param content_disposition: (str)
133
134         :param content_type: (str)
135
136         :param sharing: {'read':[user and/or grp names],
137             'write':[usr and/or grp names]}
138
139         :param public: (bool)
140
141         :returns: (dict) created object metadata
142         """
143         self._assert_container()
144
145         if withHashFile:
146             data = f.read()
147             try:
148                 import json
149                 data = json.dumps(json.loads(data))
150             except ValueError:
151                 raise ClientError('"%s" is not json-formated' % f.name, 1)
152             except SyntaxError:
153                 msg = '"%s" is not a valid hashmap file' % f.name
154                 raise ClientError(msg, 1)
155             f = StringIO(data)
156         else:
157             data = f.read(size) if size else f.read()
158         r = self.object_put(
159             obj,
160             data=data,
161             etag=etag,
162             content_encoding=content_encoding,
163             content_disposition=content_disposition,
164             content_type=content_type,
165             permissions=sharing,
166             public=public,
167             success=201)
168         return r.headers
169
170     def create_object_by_manifestation(
171             self, obj,
172             etag=None,
173             content_encoding=None,
174             content_disposition=None,
175             content_type=None,
176             sharing=None,
177             public=None):
178         """
179         :param obj: (str) remote object path
180
181         :param etag: (str)
182
183         :param content_encoding: (str)
184
185         :param content_disposition: (str)
186
187         :param content_type: (str)
188
189         :param sharing: {'read':[user and/or grp names],
190             'write':[usr and/or grp names]}
191
192         :param public: (bool)
193
194         :returns: (dict) created object metadata
195         """
196         self._assert_container()
197         r = self.object_put(
198             obj,
199             content_length=0,
200             etag=etag,
201             content_encoding=content_encoding,
202             content_disposition=content_disposition,
203             content_type=content_type,
204             permissions=sharing,
205             public=public,
206             manifest='%s/%s' % (self.container, obj))
207         return r.headers
208
209     # upload_* auxiliary methods
210     def _put_block_async(self, data, hash):
211         event = SilentEvent(method=self._put_block, data=data, hash=hash)
212         event.start()
213         return event
214
215     def _put_block(self, data, hash):
216         r = self.container_post(
217             update=True,
218             content_type='application/octet-stream',
219             content_length=len(data),
220             data=data,
221             format='json')
222         assert r.json[0] == hash, 'Local hash does not match server'
223
224     def _get_file_block_info(self, fileobj, size=None, cache=None):
225         """
226         :param fileobj: (file descriptor) source
227
228         :param size: (int) size of data to upload from source
229
230         :param cache: (dict) if provided, cache container info response to
231         avoid redundant calls
232         """
233         if isinstance(cache, dict):
234             try:
235                 meta = cache[self.container]
236             except KeyError:
237                 meta = self.get_container_info()
238                 cache[self.container] = meta
239         else:
240             meta = self.get_container_info()
241         blocksize = int(meta['x-container-block-size'])
242         blockhash = meta['x-container-block-hash']
243         size = size if size is not None else fstat(fileobj.fileno()).st_size
244         nblocks = 1 + (size - 1) // blocksize
245         return (blocksize, blockhash, size, nblocks)
246
247     def _create_object_or_get_missing_hashes(
248             self, obj, json,
249             size=None,
250             format='json',
251             hashmap=True,
252             content_type=None,
253             if_etag_match=None,
254             if_etag_not_match=None,
255             content_encoding=None,
256             content_disposition=None,
257             permissions=None,
258             public=None,
259             success=(201, 409)):
260         r = self.object_put(
261             obj,
262             format='json',
263             hashmap=True,
264             content_type=content_type,
265             json=json,
266             if_etag_match=if_etag_match,
267             if_etag_not_match=if_etag_not_match,
268             content_encoding=content_encoding,
269             content_disposition=content_disposition,
270             permissions=permissions,
271             public=public,
272             success=success)
273         return (None if r.status_code == 201 else r.json), r.headers
274
275     def _calculate_blocks_for_upload(
276             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277             hash_cb=None):
278         offset = 0
279         if hash_cb:
280             hash_gen = hash_cb(nblocks)
281             hash_gen.next()
282
283         for i in range(nblocks):
284             block = fileobj.read(min(blocksize, size - offset))
285             bytes = len(block)
286             hash = _pithos_hash(block, blockhash)
287             hashes.append(hash)
288             hmap[hash] = (offset, bytes)
289             offset += bytes
290             if hash_cb:
291                 hash_gen.next()
292         msg = 'Failed to calculate uploaded blocks:'
293         ' Offset and object size do not match'
294         assert offset == size, msg
295
296     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297         """upload missing blocks asynchronously"""
298
299         self._init_thread_limit()
300
301         flying = []
302         failures = []
303         for hash in missing:
304             offset, bytes = hmap[hash]
305             fileobj.seek(offset)
306             data = fileobj.read(bytes)
307             r = self._put_block_async(data, hash)
308             flying.append(r)
309             unfinished = self._watch_thread_limit(flying)
310             for thread in set(flying).difference(unfinished):
311                 if thread.exception:
312                     failures.append(thread)
313                     if isinstance(
314                             thread.exception,
315                             ClientError) and thread.exception.status == 502:
316                         self.POOLSIZE = self._thread_limit
317                 elif thread.isAlive():
318                     flying.append(thread)
319                 elif upload_gen:
320                     try:
321                         upload_gen.next()
322                     except:
323                         pass
324             flying = unfinished
325
326         for thread in flying:
327             thread.join()
328             if thread.exception:
329                 failures.append(thread)
330             elif upload_gen:
331                 try:
332                     upload_gen.next()
333                 except:
334                     pass
335
336         return [failure.kwargs['hash'] for failure in failures]
337
338     def upload_object(
339             self, obj, f,
340             size=None,
341             hash_cb=None,
342             upload_cb=None,
343             etag=None,
344             if_etag_match=None,
345             if_not_exist=None,
346             content_encoding=None,
347             content_disposition=None,
348             content_type=None,
349             sharing=None,
350             public=None,
351             container_info_cache=None):
352         """Upload an object using multiple connections (threads)
353
354         :param obj: (str) remote object path
355
356         :param f: open file descriptor (rb)
357
358         :param hash_cb: optional progress.bar object for calculating hashes
359
360         :param upload_cb: optional progress.bar object for uploading
361
362         :param etag: (str)
363
364         :param if_etag_match: (str) Push that value to if-match header at file
365             creation
366
367         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368             it does not exist remotely, otherwise the operation will fail.
369             Involves the case of an object with the same path is created while
370             the object is being uploaded.
371
372         :param content_encoding: (str)
373
374         :param content_disposition: (str)
375
376         :param content_type: (str)
377
378         :param sharing: {'read':[user and/or grp names],
379             'write':[usr and/or grp names]}
380
381         :param public: (bool)
382
383         :param container_info_cache: (dict) if given, avoid redundant calls to
384             server for container info (block size and hash information)
385         """
386         self._assert_container()
387
388         block_info = (
389             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390                 f, size, container_info_cache)
391         (hashes, hmap, offset) = ([], {}, 0)
392         if not content_type:
393             content_type = 'application/octet-stream'
394
395         self._calculate_blocks_for_upload(
396             *block_info,
397             hashes=hashes,
398             hmap=hmap,
399             fileobj=f,
400             hash_cb=hash_cb)
401
402         hashmap = dict(bytes=size, hashes=hashes)
403         missing, obj_headers = self._create_object_or_get_missing_hashes(
404             obj, hashmap,
405             content_type=content_type,
406             size=size,
407             if_etag_match=if_etag_match,
408             if_etag_not_match='*' if if_not_exist else None,
409             content_encoding=content_encoding,
410             content_disposition=content_disposition,
411             permissions=sharing,
412             public=public)
413
414         if missing is None:
415             return obj_headers
416
417         if upload_cb:
418             upload_gen = upload_cb(len(missing))
419             for i in range(len(missing), len(hashmap['hashes']) + 1):
420                 try:
421                     upload_gen.next()
422                 except:
423                     upload_gen = None
424         else:
425             upload_gen = None
426
427         retries = 7
428         try:
429             while retries:
430                 sendlog.info('%s blocks missing' % len(missing))
431                 num_of_blocks = len(missing)
432                 missing = self._upload_missing_blocks(
433                     missing,
434                     hmap,
435                     f,
436                     upload_gen)
437                 if missing:
438                     if num_of_blocks == len(missing):
439                         retries -= 1
440                     else:
441                         num_of_blocks = len(missing)
442                 else:
443                     break
444             if missing:
445                 try:
446                     details = ['%s' % thread.exception for thread in missing]
447                 except Exception:
448                     details = []
449                 raise ClientError(
450                     '%s blocks failed to upload' % len(missing),
451                     details=details)
452         except KeyboardInterrupt:
453             sendlog.info('- - - wait for threads to finish')
454             for thread in activethreads():
455                 thread.join()
456             raise
457
458         r = self.object_put(
459             obj,
460             format='json',
461             hashmap=True,
462             content_type=content_type,
463             content_encoding=content_encoding,
464             if_etag_match=if_etag_match,
465             if_etag_not_match='*' if if_not_exist else None,
466             etag=etag,
467             json=hashmap,
468             permissions=sharing,
469             public=public,
470             success=201)
471         return r.headers
472
473     def upload_from_string(
474             self, obj, input_str,
475             hash_cb=None,
476             upload_cb=None,
477             etag=None,
478             if_etag_match=None,
479             if_not_exist=None,
480             content_encoding=None,
481             content_disposition=None,
482             content_type=None,
483             sharing=None,
484             public=None,
485             container_info_cache=None):
486         """Upload an object using multiple connections (threads)
487
488         :param obj: (str) remote object path
489
490         :param input_str: (str) upload content
491
492         :param hash_cb: optional progress.bar object for calculating hashes
493
494         :param upload_cb: optional progress.bar object for uploading
495
496         :param etag: (str)
497
498         :param if_etag_match: (str) Push that value to if-match header at file
499             creation
500
501         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
502             it does not exist remotely, otherwise the operation will fail.
503             Involves the case of an object with the same path is created while
504             the object is being uploaded.
505
506         :param content_encoding: (str)
507
508         :param content_disposition: (str)
509
510         :param content_type: (str)
511
512         :param sharing: {'read':[user and/or grp names],
513             'write':[usr and/or grp names]}
514
515         :param public: (bool)
516
517         :param container_info_cache: (dict) if given, avoid redundant calls to
518             server for container info (block size and hash information)
519         """
520         self._assert_container()
521
522         blocksize, blockhash, size, nblocks = self._get_file_block_info(
523                 fileobj=None, size=len(input_str), cache=container_info_cache)
524         (hashes, hmap, offset) = ([], {}, 0)
525         if not content_type:
526             content_type = 'application/octet-stream'
527
528         hashes = []
529         hmap = {}
530         for blockid in range(nblocks):
531             start = blockid * blocksize
532             block = input_str[start: (start + blocksize)]
533             hashes.append(_pithos_hash(block, blockhash))
534             hmap[hashes[blockid]] = (start, block)
535
536         hashmap = dict(bytes=size, hashes=hashes)
537         missing, obj_headers = self._create_object_or_get_missing_hashes(
538             obj, hashmap,
539             content_type=content_type,
540             size=size,
541             if_etag_match=if_etag_match,
542             if_etag_not_match='*' if if_not_exist else None,
543             content_encoding=content_encoding,
544             content_disposition=content_disposition,
545             permissions=sharing,
546             public=public)
547         if missing is None:
548             return obj_headers
549         num_of_missing = len(missing)
550
551         if upload_cb:
552             self.progress_bar_gen = upload_cb(nblocks)
553             for i in range(nblocks + 1 - num_of_missing):
554                 self._cb_next()
555
556         tries = 7
557         old_failures = 0
558         try:
559             while tries and missing:
560                 flying = []
561                 failures = []
562                 for hash in missing:
563                     offset, block = hmap[hash]
564                     bird = self._put_block_async(block, hash)
565                     flying.append(bird)
566                     unfinished = self._watch_thread_limit(flying)
567                     for thread in set(flying).difference(unfinished):
568                         if thread.exception:
569                             failures.append(thread.kwargs['hash'])
570                         if thread.isAlive():
571                             flying.append(thread)
572                         else:
573                             self._cb_next()
574                     flying = unfinished
575                 for thread in flying:
576                     thread.join()
577                     if thread.exception:
578                         failures.append(thread.kwargs['hash'])
579                     self._cb_next()
580                 missing = failures
581                 if missing and len(missing) == old_failures:
582                     tries -= 1
583                 old_failures = len(missing)
584             if missing:
585                 raise ClientError(
586                     '%s blocks failed to upload' % len(missing),
587                     details=['%s' % thread.exception for thread in missing])
588         except KeyboardInterrupt:
589             sendlog.info('- - - wait for threads to finish')
590             for thread in activethreads():
591                 thread.join()
592             raise
593
594         r = self.object_put(
595             obj,
596             format='json',
597             hashmap=True,
598             content_type=content_type,
599             if_etag_match=if_etag_match,
600             if_etag_not_match='*' if if_not_exist else None,
601             etag=etag,
602             json=hashmap,
603             permissions=sharing,
604             public=public,
605             success=201)
606         return r.headers
607
608     # download_* auxiliary methods
609     def _get_remote_blocks_info(self, obj, **restargs):
610         #retrieve object hashmap
611         myrange = restargs.pop('data_range', None)
612         hashmap = self.get_object_hashmap(obj, **restargs)
613         restargs['data_range'] = myrange
614         blocksize = int(hashmap['block_size'])
615         blockhash = hashmap['block_hash']
616         total_size = hashmap['bytes']
617         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
618         map_dict = {}
619         for i, h in enumerate(hashmap['hashes']):
620             #  map_dict[h] = i   CHAGE
621             if h in map_dict:
622                 map_dict[h].append(i)
623             else:
624                 map_dict[h] = [i]
625         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
626
627     def _dump_blocks_sync(
628             self, obj, remote_hashes, blocksize, total_size, dst, range,
629             **args):
630         for blockid, blockhash in enumerate(remote_hashes):
631             if blockhash:
632                 start = blocksize * blockid
633                 is_last = start + blocksize > total_size
634                 end = (total_size - 1) if is_last else (start + blocksize - 1)
635                 (start, end) = _range_up(start, end, range)
636                 args['data_range'] = 'bytes=%s-%s' % (start, end)
637                 r = self.object_get(obj, success=(200, 206), **args)
638                 self._cb_next()
639                 dst.write(r.content)
640                 dst.flush()
641
642     def _get_block_async(self, obj, **args):
643         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
644         event.start()
645         return event
646
647     def _hash_from_file(self, fp, start, size, blockhash):
648         fp.seek(start)
649         block = fp.read(size)
650         h = newhashlib(blockhash)
651         h.update(block.strip('\x00'))
652         return hexlify(h.digest())
653
654     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
655         """write the results of a greenleted rest call to a file
656
657         :param offset: the offset of the file up to blocksize
658         - e.g. if the range is 10-100, all blocks will be written to
659         normal_position - 10
660         """
661         for key, g in flying.items():
662             if g.isAlive():
663                 continue
664             if g.exception:
665                 raise g.exception
666             block = g.value.content
667             for block_start in blockids[key]:
668                 local_file.seek(block_start + offset)
669                 local_file.write(block)
670                 self._cb_next()
671             flying.pop(key)
672             blockids.pop(key)
673         local_file.flush()
674
675     def _dump_blocks_async(
676             self, obj, remote_hashes, blocksize, total_size, local_file,
677             blockhash=None, resume=False, filerange=None, **restargs):
678         file_size = fstat(local_file.fileno()).st_size if resume else 0
679         flying = dict()
680         blockid_dict = dict()
681         offset = 0
682         if filerange is not None:
683             rstart = int(filerange.split('-')[0])
684             offset = rstart if blocksize > rstart else rstart % blocksize
685
686         self._init_thread_limit()
687         for block_hash, blockids in remote_hashes.items():
688             blockids = [blk * blocksize for blk in blockids]
689             unsaved = [blk for blk in blockids if not (
690                 blk < file_size and block_hash == self._hash_from_file(
691                         local_file, blk, blocksize, blockhash))]
692             self._cb_next(len(blockids) - len(unsaved))
693             if unsaved:
694                 key = unsaved[0]
695                 self._watch_thread_limit(flying.values())
696                 self._thread2file(
697                     flying, blockid_dict, local_file, offset,
698                     **restargs)
699                 end = total_size - 1 if (
700                     key + blocksize > total_size) else key + blocksize - 1
701                 start, end = _range_up(key, end, filerange)
702                 if start == end:
703                     self._cb_next()
704                     continue
705                 restargs['async_headers'] = {
706                     'Range': 'bytes=%s-%s' % (start, end)}
707                 flying[key] = self._get_block_async(obj, **restargs)
708                 blockid_dict[key] = unsaved
709
710         for thread in flying.values():
711             thread.join()
712         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
713
714     def download_object(
715             self, obj, dst,
716             download_cb=None,
717             version=None,
718             resume=False,
719             range_str=None,
720             if_match=None,
721             if_none_match=None,
722             if_modified_since=None,
723             if_unmodified_since=None):
724         """Download an object (multiple connections, random blocks)
725
726         :param obj: (str) remote object path
727
728         :param dst: open file descriptor (wb+)
729
730         :param download_cb: optional progress.bar object for downloading
731
732         :param version: (str) file version
733
734         :param resume: (bool) if set, preserve already downloaded file parts
735
736         :param range_str: (str) from, to are file positions (int) in bytes
737
738         :param if_match: (str)
739
740         :param if_none_match: (str)
741
742         :param if_modified_since: (str) formated date
743
744         :param if_unmodified_since: (str) formated date"""
745         restargs = dict(
746             version=version,
747             data_range=None if range_str is None else 'bytes=%s' % range_str,
748             if_match=if_match,
749             if_none_match=if_none_match,
750             if_modified_since=if_modified_since,
751             if_unmodified_since=if_unmodified_since)
752
753         (
754             blocksize,
755             blockhash,
756             total_size,
757             hash_list,
758             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
759         assert total_size >= 0
760
761         if download_cb:
762             self.progress_bar_gen = download_cb(len(hash_list))
763             self._cb_next()
764
765         if dst.isatty():
766             self._dump_blocks_sync(
767                 obj,
768                 hash_list,
769                 blocksize,
770                 total_size,
771                 dst,
772                 range_str,
773                 **restargs)
774         else:
775             self._dump_blocks_async(
776                 obj,
777                 remote_hashes,
778                 blocksize,
779                 total_size,
780                 dst,
781                 blockhash,
782                 resume,
783                 range_str,
784                 **restargs)
785             if not range_str:
786                 dst.truncate(total_size)
787
788         self._complete_cb()
789
790     def download_to_string(
791             self, obj,
792             download_cb=None,
793             version=None,
794             range_str=None,
795             if_match=None,
796             if_none_match=None,
797             if_modified_since=None,
798             if_unmodified_since=None):
799         """Download an object to a string (multiple connections). This method
800         uses threads for http requests, but stores all content in memory.
801
802         :param obj: (str) remote object path
803
804         :param download_cb: optional progress.bar object for downloading
805
806         :param version: (str) file version
807
808         :param range_str: (str) from, to are file positions (int) in bytes
809
810         :param if_match: (str)
811
812         :param if_none_match: (str)
813
814         :param if_modified_since: (str) formated date
815
816         :param if_unmodified_since: (str) formated date
817
818         :returns: (str) the whole object contents
819         """
820         restargs = dict(
821             version=version,
822             data_range=None if range_str is None else 'bytes=%s' % range_str,
823             if_match=if_match,
824             if_none_match=if_none_match,
825             if_modified_since=if_modified_since,
826             if_unmodified_since=if_unmodified_since)
827
828         (
829             blocksize,
830             blockhash,
831             total_size,
832             hash_list,
833             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
834         assert total_size >= 0
835
836         if download_cb:
837             self.progress_bar_gen = download_cb(len(hash_list))
838             self._cb_next()
839
840         num_of_blocks = len(remote_hashes)
841         ret = [''] * num_of_blocks
842         self._init_thread_limit()
843         flying = dict()
844         try:
845             for blockid, blockhash in enumerate(remote_hashes):
846                 start = blocksize * blockid
847                 is_last = start + blocksize > total_size
848                 end = (total_size - 1) if is_last else (start + blocksize - 1)
849                 (start, end) = _range_up(start, end, range_str)
850                 if start < end:
851                     self._watch_thread_limit(flying.values())
852                     flying[blockid] = self._get_block_async(obj, **restargs)
853                 for runid, thread in flying.items():
854                     if (blockid + 1) == num_of_blocks:
855                         thread.join()
856                     elif thread.isAlive():
857                         continue
858                     if thread.exception:
859                         raise thread.exception
860                     ret[runid] = thread.value.content
861                     self._cb_next()
862                     flying.pop(runid)
863             return ''.join(ret)
864         except KeyboardInterrupt:
865             sendlog.info('- - - wait for threads to finish')
866             for thread in activethreads():
867                 thread.join()
868
869     #Command Progress Bar method
870     def _cb_next(self, step=1):
871         if hasattr(self, 'progress_bar_gen'):
872             try:
873                 for i in xrange(step):
874                     self.progress_bar_gen.next()
875             except:
876                 pass
877
878     def _complete_cb(self):
879         while True:
880             try:
881                 self.progress_bar_gen.next()
882             except:
883                 break
884
885     def get_object_hashmap(
886             self, obj,
887             version=None,
888             if_match=None,
889             if_none_match=None,
890             if_modified_since=None,
891             if_unmodified_since=None,
892             data_range=None):
893         """
894         :param obj: (str) remote object path
895
896         :param if_match: (str)
897
898         :param if_none_match: (str)
899
900         :param if_modified_since: (str) formated date
901
902         :param if_unmodified_since: (str) formated date
903
904         :param data_range: (str) from-to where from and to are integers
905             denoting file positions in bytes
906
907         :returns: (list)
908         """
909         try:
910             r = self.object_get(
911                 obj,
912                 hashmap=True,
913                 version=version,
914                 if_etag_match=if_match,
915                 if_etag_not_match=if_none_match,
916                 if_modified_since=if_modified_since,
917                 if_unmodified_since=if_unmodified_since,
918                 data_range=data_range)
919         except ClientError as err:
920             if err.status == 304 or err.status == 412:
921                 return {}
922             raise
923         return r.json
924
925     def set_account_group(self, group, usernames):
926         """
927         :param group: (str)
928
929         :param usernames: (list)
930         """
931         r = self.account_post(update=True, groups={group: usernames})
932         return r
933
934     def del_account_group(self, group):
935         """
936         :param group: (str)
937         """
938         self.account_post(update=True, groups={group: []})
939
940     def get_account_info(self, until=None):
941         """
942         :param until: (str) formated date
943
944         :returns: (dict)
945         """
946         r = self.account_head(until=until)
947         if r.status_code == 401:
948             raise ClientError("No authorization", status=401)
949         return r.headers
950
951     def get_account_quota(self):
952         """
953         :returns: (dict)
954         """
955         return filter_in(
956             self.get_account_info(),
957             'X-Account-Policy-Quota',
958             exactMatch=True)
959
960     #def get_account_versioning(self):
961     #    """
962     #    :returns: (dict)
963     #    """
964     #    return filter_in(
965     #        self.get_account_info(),
966     #        'X-Account-Policy-Versioning',
967     #        exactMatch=True)
968
969     def get_account_meta(self, until=None):
970         """
971         :param until: (str) formated date
972
973         :returns: (dict)
974         """
975         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
976
977     def get_account_group(self):
978         """
979         :returns: (dict)
980         """
981         return filter_in(self.get_account_info(), 'X-Account-Group-')
982
983     def set_account_meta(self, metapairs):
984         """
985         :param metapairs: (dict) {key1:val1, key2:val2, ...}
986         """
987         assert(type(metapairs) is dict)
988         r = self.account_post(update=True, metadata=metapairs)
989         return r.headers
990
991     def del_account_meta(self, metakey):
992         """
993         :param metakey: (str) metadatum key
994         """
995         r = self.account_post(update=True, metadata={metakey: ''})
996         return r.headers
997
998     #def set_account_quota(self, quota):
999     #    """
1000     #    :param quota: (int)
1001     #    """
1002     #    self.account_post(update=True, quota=quota)
1003
1004     #def set_account_versioning(self, versioning):
1005     #    """
1006     #    :param versioning: (str)
1007     #    """
1008     #    r = self.account_post(update=True, versioning=versioning)
1009     #    return r.headers
1010
1011     def list_containers(self):
1012         """
1013         :returns: (dict)
1014         """
1015         r = self.account_get()
1016         return r.json
1017
1018     def del_container(self, until=None, delimiter=None):
1019         """
1020         :param until: (str) formated date
1021
1022         :param delimiter: (str) with / empty container
1023
1024         :raises ClientError: 404 Container does not exist
1025
1026         :raises ClientError: 409 Container is not empty
1027         """
1028         self._assert_container()
1029         r = self.container_delete(
1030             until=until,
1031             delimiter=delimiter,
1032             success=(204, 404, 409))
1033         if r.status_code == 404:
1034             raise ClientError(
1035                 'Container "%s" does not exist' % self.container,
1036                 r.status_code)
1037         elif r.status_code == 409:
1038             raise ClientError(
1039                 'Container "%s" is not empty' % self.container,
1040                 r.status_code)
1041         return r.headers
1042
1043     def get_container_versioning(self, container=None):
1044         """
1045         :param container: (str)
1046
1047         :returns: (dict)
1048         """
1049         cnt_back_up = self.container
1050         try:
1051             self.container = container or cnt_back_up
1052             return filter_in(
1053                 self.get_container_info(),
1054                 'X-Container-Policy-Versioning')
1055         finally:
1056             self.container = cnt_back_up
1057
1058     def get_container_limit(self, container=None):
1059         """
1060         :param container: (str)
1061
1062         :returns: (dict)
1063         """
1064         cnt_back_up = self.container
1065         try:
1066             self.container = container or cnt_back_up
1067             return filter_in(
1068                 self.get_container_info(),
1069                 'X-Container-Policy-Quota')
1070         finally:
1071             self.container = cnt_back_up
1072
1073     def get_container_info(self, until=None):
1074         """
1075         :param until: (str) formated date
1076
1077         :returns: (dict)
1078
1079         :raises ClientError: 404 Container not found
1080         """
1081         try:
1082             r = self.container_head(until=until)
1083         except ClientError as err:
1084             err.details.append('for container %s' % self.container)
1085             raise err
1086         return r.headers
1087
1088     def get_container_meta(self, until=None):
1089         """
1090         :param until: (str) formated date
1091
1092         :returns: (dict)
1093         """
1094         return filter_in(
1095             self.get_container_info(until=until),
1096             'X-Container-Meta')
1097
1098     def get_container_object_meta(self, until=None):
1099         """
1100         :param until: (str) formated date
1101
1102         :returns: (dict)
1103         """
1104         return filter_in(
1105             self.get_container_info(until=until),
1106             'X-Container-Object-Meta')
1107
1108     def set_container_meta(self, metapairs):
1109         """
1110         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1111         """
1112         assert(type(metapairs) is dict)
1113         r = self.container_post(update=True, metadata=metapairs)
1114         return r.headers
1115
1116     def del_container_meta(self, metakey):
1117         """
1118         :param metakey: (str) metadatum key
1119
1120         :returns: (dict) response headers
1121         """
1122         r = self.container_post(update=True, metadata={metakey: ''})
1123         return r.headers
1124
1125     def set_container_limit(self, limit):
1126         """
1127         :param limit: (int)
1128         """
1129         r = self.container_post(update=True, quota=limit)
1130         return r.headers
1131
1132     def set_container_versioning(self, versioning):
1133         """
1134         :param versioning: (str)
1135         """
1136         r = self.container_post(update=True, versioning=versioning)
1137         return r.headers
1138
1139     def del_object(self, obj, until=None, delimiter=None):
1140         """
1141         :param obj: (str) remote object path
1142
1143         :param until: (str) formated date
1144
1145         :param delimiter: (str)
1146         """
1147         self._assert_container()
1148         r = self.object_delete(obj, until=until, delimiter=delimiter)
1149         return r.headers
1150
1151     def set_object_meta(self, obj, metapairs):
1152         """
1153         :param obj: (str) remote object path
1154
1155         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1156         """
1157         assert(type(metapairs) is dict)
1158         r = self.object_post(obj, update=True, metadata=metapairs)
1159         return r.headers
1160
1161     def del_object_meta(self, obj, metakey):
1162         """
1163         :param obj: (str) remote object path
1164
1165         :param metakey: (str) metadatum key
1166         """
1167         r = self.object_post(obj, update=True, metadata={metakey: ''})
1168         return r.headers
1169
1170     def publish_object(self, obj):
1171         """
1172         :param obj: (str) remote object path
1173
1174         :returns: (str) access url
1175         """
1176         self.object_post(obj, update=True, public=True)
1177         info = self.get_object_info(obj)
1178         return info['x-object-public']
1179         pref, sep, rest = self.base_url.partition('//')
1180         base = rest.split('/')[0]
1181         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1182
1183     def unpublish_object(self, obj):
1184         """
1185         :param obj: (str) remote object path
1186         """
1187         r = self.object_post(obj, update=True, public=False)
1188         return r.headers
1189
1190     def get_object_info(self, obj, version=None):
1191         """
1192         :param obj: (str) remote object path
1193
1194         :param version: (str)
1195
1196         :returns: (dict)
1197         """
1198         try:
1199             r = self.object_head(obj, version=version)
1200             return r.headers
1201         except ClientError as ce:
1202             if ce.status == 404:
1203                 raise ClientError('Object %s not found' % obj, status=404)
1204             raise
1205
1206     def get_object_meta(self, obj, version=None):
1207         """
1208         :param obj: (str) remote object path
1209
1210         :param version: (str)
1211
1212         :returns: (dict)
1213         """
1214         return filter_in(
1215             self.get_object_info(obj, version=version),
1216             'X-Object-Meta')
1217
1218     def get_object_sharing(self, obj):
1219         """
1220         :param obj: (str) remote object path
1221
1222         :returns: (dict)
1223         """
1224         r = filter_in(
1225             self.get_object_info(obj),
1226             'X-Object-Sharing',
1227             exactMatch=True)
1228         reply = {}
1229         if len(r) > 0:
1230             perms = r['x-object-sharing'].split(';')
1231             for perm in perms:
1232                 try:
1233                     perm.index('=')
1234                 except ValueError:
1235                     raise ClientError('Incorrect reply format')
1236                 (key, val) = perm.strip().split('=')
1237                 reply[key] = val
1238         return reply
1239
1240     def set_object_sharing(
1241             self, obj,
1242             read_permission=False, write_permission=False):
1243         """Give read/write permisions to an object.
1244
1245         :param obj: (str) remote object path
1246
1247         :param read_permission: (list - bool) users and user groups that get
1248             read permission for this object - False means all previous read
1249             permissions will be removed
1250
1251         :param write_permission: (list - bool) of users and user groups to get
1252            write permission for this object - False means all previous write
1253            permissions will be removed
1254
1255         :returns: (dict) response headers
1256         """
1257
1258         perms = dict(read=read_permission or '', write=write_permission or '')
1259         r = self.object_post(obj, update=True, permissions=perms)
1260         return r.headers
1261
1262     def del_object_sharing(self, obj):
1263         """
1264         :param obj: (str) remote object path
1265         """
1266         return self.set_object_sharing(obj)
1267
1268     def append_object(self, obj, source_file, upload_cb=None):
1269         """
1270         :param obj: (str) remote object path
1271
1272         :param source_file: open file descriptor
1273
1274         :param upload_db: progress.bar for uploading
1275         """
1276         self._assert_container()
1277         meta = self.get_container_info()
1278         blocksize = int(meta['x-container-block-size'])
1279         filesize = fstat(source_file.fileno()).st_size
1280         nblocks = 1 + (filesize - 1) // blocksize
1281         offset = 0
1282         headers = {}
1283         if upload_cb:
1284             self.progress_bar_gen = upload_cb(nblocks)
1285             self._cb_next()
1286         flying = {}
1287         self._init_thread_limit()
1288         try:
1289             for i in range(nblocks):
1290                 block = source_file.read(min(blocksize, filesize - offset))
1291                 offset += len(block)
1292
1293                 self._watch_thread_limit(flying.values())
1294                 unfinished = {}
1295                 flying[i] = SilentEvent(
1296                     method=self.object_post,
1297                     obj=obj,
1298                     update=True,
1299                     content_range='bytes */*',
1300                     content_type='application/octet-stream',
1301                     content_length=len(block),
1302                     data=block)
1303                 flying[i].start()
1304
1305                 for key, thread in flying.items():
1306                     if thread.isAlive():
1307                         if i < nblocks:
1308                             unfinished[key] = thread
1309                             continue
1310                         thread.join()
1311                     if thread.exception:
1312                         raise thread.exception
1313                     headers[key] = thread.value.headers
1314                     self._cb_next()
1315                 flying = unfinished
1316         except KeyboardInterrupt:
1317             sendlog.info('- - - wait for threads to finish')
1318             for thread in activethreads():
1319                 thread.join()
1320         finally:
1321             from time import sleep
1322             sleep(2 * len(activethreads()))
1323         return headers.values()
1324
1325     def truncate_object(self, obj, upto_bytes):
1326         """
1327         :param obj: (str) remote object path
1328
1329         :param upto_bytes: max number of bytes to leave on file
1330
1331         :returns: (dict) response headers
1332         """
1333         r = self.object_post(
1334             obj,
1335             update=True,
1336             content_range='bytes 0-%s/*' % upto_bytes,
1337             content_type='application/octet-stream',
1338             object_bytes=upto_bytes,
1339             source_object=path4url(self.container, obj))
1340         return r.headers
1341
1342     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1343         """Overwrite a part of an object from local source file
1344
1345         :param obj: (str) remote object path
1346
1347         :param start: (int) position in bytes to start overwriting from
1348
1349         :param end: (int) position in bytes to stop overwriting at
1350
1351         :param source_file: open file descriptor
1352
1353         :param upload_db: progress.bar for uploading
1354         """
1355
1356         r = self.get_object_info(obj)
1357         rf_size = int(r['content-length'])
1358         if rf_size < int(start):
1359             raise ClientError(
1360                 'Range start exceeds file size',
1361                 status=416)
1362         elif rf_size < int(end):
1363             raise ClientError(
1364                 'Range end exceeds file size',
1365                 status=416)
1366         self._assert_container()
1367         meta = self.get_container_info()
1368         blocksize = int(meta['x-container-block-size'])
1369         filesize = fstat(source_file.fileno()).st_size
1370         datasize = int(end) - int(start) + 1
1371         nblocks = 1 + (datasize - 1) // blocksize
1372         offset = 0
1373         if upload_cb:
1374             self.progress_bar_gen = upload_cb(nblocks)
1375             self._cb_next()
1376         headers = []
1377         for i in range(nblocks):
1378             read_size = min(blocksize, filesize - offset, datasize - offset)
1379             block = source_file.read(read_size)
1380             r = self.object_post(
1381                 obj,
1382                 update=True,
1383                 content_type='application/octet-stream',
1384                 content_length=len(block),
1385                 content_range='bytes %s-%s/*' % (
1386                     start + offset,
1387                     start + offset + len(block) - 1),
1388                 data=block)
1389             headers.append(dict(r.headers))
1390             offset += len(block)
1391
1392             self._cb_next
1393         return headers
1394
1395     def copy_object(
1396             self, src_container, src_object, dst_container,
1397             dst_object=None,
1398             source_version=None,
1399             source_account=None,
1400             public=False,
1401             content_type=None,
1402             delimiter=None):
1403         """
1404         :param src_container: (str) source container
1405
1406         :param src_object: (str) source object path
1407
1408         :param dst_container: (str) destination container
1409
1410         :param dst_object: (str) destination object path
1411
1412         :param source_version: (str) source object version
1413
1414         :param source_account: (str) account to copy from
1415
1416         :param public: (bool)
1417
1418         :param content_type: (str)
1419
1420         :param delimiter: (str)
1421
1422         :returns: (dict) response headers
1423         """
1424         self._assert_account()
1425         self.container = dst_container
1426         src_path = path4url(src_container, src_object)
1427         r = self.object_put(
1428             dst_object or src_object,
1429             success=201,
1430             copy_from=src_path,
1431             content_length=0,
1432             source_version=source_version,
1433             source_account=source_account,
1434             public=public,
1435             content_type=content_type,
1436             delimiter=delimiter)
1437         return r.headers
1438
1439     def move_object(
1440             self, src_container, src_object, dst_container,
1441             dst_object=False,
1442             source_account=None,
1443             source_version=None,
1444             public=False,
1445             content_type=None,
1446             delimiter=None):
1447         """
1448         :param src_container: (str) source container
1449
1450         :param src_object: (str) source object path
1451
1452         :param dst_container: (str) destination container
1453
1454         :param dst_object: (str) destination object path
1455
1456         :param source_account: (str) account to move from
1457
1458         :param source_version: (str) source object version
1459
1460         :param public: (bool)
1461
1462         :param content_type: (str)
1463
1464         :param delimiter: (str)
1465
1466         :returns: (dict) response headers
1467         """
1468         self._assert_account()
1469         self.container = dst_container
1470         dst_object = dst_object or src_object
1471         src_path = path4url(src_container, src_object)
1472         r = self.object_put(
1473             dst_object,
1474             success=201,
1475             move_from=src_path,
1476             content_length=0,
1477             source_account=source_account,
1478             source_version=source_version,
1479             public=public,
1480             content_type=content_type,
1481             delimiter=delimiter)
1482         return r.headers
1483
1484     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1485         """Get accounts that share with self.account
1486
1487         :param limit: (str)
1488
1489         :param marker: (str)
1490
1491         :returns: (dict)
1492         """
1493         self._assert_account()
1494
1495         self.set_param('format', 'json')
1496         self.set_param('limit', limit, iff=limit is not None)
1497         self.set_param('marker', marker, iff=marker is not None)
1498
1499         path = ''
1500         success = kwargs.pop('success', (200, 204))
1501         r = self.get(path, *args, success=success, **kwargs)
1502         return r.json
1503
1504     def get_object_versionlist(self, obj):
1505         """
1506         :param obj: (str) remote object path
1507
1508         :returns: (list)
1509         """
1510         self._assert_container()
1511         r = self.object_get(obj, format='json', version='list')
1512         return r.json['versions']