Merge branch 'hotfix-0.12.10'
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, max_value, a_range):
56     """
57     :param start: (int) the window bottom
58
59     :param end: (int) the window top
60
61     :param max_value: (int) maximum accepted value
62
63     :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64         where X: x|x-y|-x where x < y and x, y natural numbers
65
66     :returns: (str) a range string cut-off for the start-end range
67         an empty response means this window is out of range
68     """
69     assert start >= 0, '_range_up called w. start(%s) < 0' % start
70     assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71         end, start)
72     assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73         max_value, end)
74     if not a_range:
75         return '%s-%s' % (start, end)
76     selected = []
77     for some_range in a_range.split(','):
78         v0, sep, v1 = some_range.partition('-')
79         if v0:
80             v0 = int(v0)
81             if sep:
82                 v1 = int(v1)
83                 if v1 < start or v0 > end or v1 < v0:
84                     continue
85                 v0 = v0 if v0 > start else start
86                 v1 = v1 if v1 < end else end
87                 selected.append('%s-%s' % (v0, v1))
88             elif v0 < start:
89                 continue
90             else:
91                 v1 = v0 if v0 <= end else end
92                 selected.append('%s-%s' % (start, v1))
93         else:
94             v1 = int(v1)
95             if max_value - v1 > end:
96                 continue
97             v0 = (max_value - v1) if max_value - v1 > start else start
98             selected.append('%s-%s' % (v0, end))
99     return ','.join(selected)
100
101
102 class PithosClient(PithosRestClient):
103     """Synnefo Pithos+ API client"""
104
105     def __init__(self, base_url, token, account=None, container=None):
106         super(PithosClient, self).__init__(base_url, token, account, container)
107
108     def create_container(
109             self,
110             container=None, sizelimit=None, versioning=None, metadata=None,
111             **kwargs):
112         """
113         :param container: (str) if not given, self.container is used instead
114
115         :param sizelimit: (int) container total size limit in bytes
116
117         :param versioning: (str) can be auto or whatever supported by server
118
119         :param metadata: (dict) Custom user-defined metadata of the form
120             { 'name1': 'value1', 'name2': 'value2', ... }
121
122         :returns: (dict) response headers
123         """
124         cnt_back_up = self.container
125         try:
126             self.container = container or cnt_back_up
127             r = self.container_put(
128                 quota=sizelimit, versioning=versioning, metadata=metadata,
129                 **kwargs)
130             return r.headers
131         finally:
132             self.container = cnt_back_up
133
134     def purge_container(self, container=None):
135         """Delete an empty container and destroy associated blocks"""
136         cnt_back_up = self.container
137         try:
138             self.container = container or cnt_back_up
139             r = self.container_delete(until=unicode(time()))
140         finally:
141             self.container = cnt_back_up
142         return r.headers
143
144     def upload_object_unchunked(
145             self, obj, f,
146             withHashFile=False,
147             size=None,
148             etag=None,
149             content_encoding=None,
150             content_disposition=None,
151             content_type=None,
152             sharing=None,
153             public=None):
154         """
155         :param obj: (str) remote object path
156
157         :param f: open file descriptor
158
159         :param withHashFile: (bool)
160
161         :param size: (int) size of data to upload
162
163         :param etag: (str)
164
165         :param content_encoding: (str)
166
167         :param content_disposition: (str)
168
169         :param content_type: (str)
170
171         :param sharing: {'read':[user and/or grp names],
172             'write':[usr and/or grp names]}
173
174         :param public: (bool)
175
176         :returns: (dict) created object metadata
177         """
178         self._assert_container()
179
180         if withHashFile:
181             data = f.read()
182             try:
183                 import json
184                 data = json.dumps(json.loads(data))
185             except ValueError:
186                 raise ClientError('"%s" is not json-formated' % f.name, 1)
187             except SyntaxError:
188                 msg = '"%s" is not a valid hashmap file' % f.name
189                 raise ClientError(msg, 1)
190             f = StringIO(data)
191         else:
192             data = readall(f, size) if size else f.read()
193         r = self.object_put(
194             obj,
195             data=data,
196             etag=etag,
197             content_encoding=content_encoding,
198             content_disposition=content_disposition,
199             content_type=content_type,
200             permissions=sharing,
201             public=public,
202             success=201)
203         return r.headers
204
205     def create_object_by_manifestation(
206             self, obj,
207             etag=None,
208             content_encoding=None,
209             content_disposition=None,
210             content_type=None,
211             sharing=None,
212             public=None):
213         """
214         :param obj: (str) remote object path
215
216         :param etag: (str)
217
218         :param content_encoding: (str)
219
220         :param content_disposition: (str)
221
222         :param content_type: (str)
223
224         :param sharing: {'read':[user and/or grp names],
225             'write':[usr and/or grp names]}
226
227         :param public: (bool)
228
229         :returns: (dict) created object metadata
230         """
231         self._assert_container()
232         r = self.object_put(
233             obj,
234             content_length=0,
235             etag=etag,
236             content_encoding=content_encoding,
237             content_disposition=content_disposition,
238             content_type=content_type,
239             permissions=sharing,
240             public=public,
241             manifest='%s/%s' % (self.container, obj))
242         return r.headers
243
244     # upload_* auxiliary methods
245     def _put_block_async(self, data, hash):
246         event = SilentEvent(method=self._put_block, data=data, hash=hash)
247         event.start()
248         return event
249
250     def _put_block(self, data, hash):
251         r = self.container_post(
252             update=True,
253             content_type='application/octet-stream',
254             content_length=len(data),
255             data=data,
256             format='json')
257         assert r.json[0] == hash, 'Local hash does not match server'
258
259     def _get_file_block_info(self, fileobj, size=None, cache=None):
260         """
261         :param fileobj: (file descriptor) source
262
263         :param size: (int) size of data to upload from source
264
265         :param cache: (dict) if provided, cache container info response to
266         avoid redundant calls
267         """
268         if isinstance(cache, dict):
269             try:
270                 meta = cache[self.container]
271             except KeyError:
272                 meta = self.get_container_info()
273                 cache[self.container] = meta
274         else:
275             meta = self.get_container_info()
276         blocksize = int(meta['x-container-block-size'])
277         blockhash = meta['x-container-block-hash']
278         size = size if size is not None else fstat(fileobj.fileno()).st_size
279         nblocks = 1 + (size - 1) // blocksize
280         return (blocksize, blockhash, size, nblocks)
281
282     def _create_object_or_get_missing_hashes(
283             self, obj, json,
284             size=None,
285             format='json',
286             hashmap=True,
287             content_type=None,
288             if_etag_match=None,
289             if_etag_not_match=None,
290             content_encoding=None,
291             content_disposition=None,
292             permissions=None,
293             public=None,
294             success=(201, 409)):
295         r = self.object_put(
296             obj,
297             format='json',
298             hashmap=True,
299             content_type=content_type,
300             json=json,
301             if_etag_match=if_etag_match,
302             if_etag_not_match=if_etag_not_match,
303             content_encoding=content_encoding,
304             content_disposition=content_disposition,
305             permissions=permissions,
306             public=public,
307             success=success)
308         return (None if r.status_code == 201 else r.json), r.headers
309
310     def _calculate_blocks_for_upload(
311             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
312             hash_cb=None):
313         offset = 0
314         if hash_cb:
315             hash_gen = hash_cb(nblocks)
316             hash_gen.next()
317
318         for i in xrange(nblocks):
319             block = readall(fileobj, min(blocksize, size - offset))
320             bytes = len(block)
321             if bytes <= 0:
322                 break
323             hash = _pithos_hash(block, blockhash)
324             hashes.append(hash)
325             hmap[hash] = (offset, bytes)
326             offset += bytes
327             if hash_cb:
328                 hash_gen.next()
329         msg = ('Failed to calculate uploading blocks: '
330                'read bytes(%s) != requested size (%s)' % (offset, size))
331         assert offset == size, msg
332
333     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
334         """upload missing blocks asynchronously"""
335
336         self._init_thread_limit()
337
338         flying = []
339         failures = []
340         for hash in missing:
341             offset, bytes = hmap[hash]
342             fileobj.seek(offset)
343             data = readall(fileobj, bytes)
344             r = self._put_block_async(data, hash)
345             flying.append(r)
346             unfinished = self._watch_thread_limit(flying)
347             for thread in set(flying).difference(unfinished):
348                 if thread.exception:
349                     failures.append(thread)
350                     if isinstance(
351                             thread.exception,
352                             ClientError) and thread.exception.status == 502:
353                         self.POOLSIZE = self._thread_limit
354                 elif thread.isAlive():
355                     flying.append(thread)
356                 elif upload_gen:
357                     try:
358                         upload_gen.next()
359                     except:
360                         pass
361             flying = unfinished
362
363         for thread in flying:
364             thread.join()
365             if thread.exception:
366                 failures.append(thread)
367             elif upload_gen:
368                 try:
369                     upload_gen.next()
370                 except:
371                     pass
372
373         return [failure.kwargs['hash'] for failure in failures]
374
375     def upload_object(
376             self, obj, f,
377             size=None,
378             hash_cb=None,
379             upload_cb=None,
380             etag=None,
381             if_etag_match=None,
382             if_not_exist=None,
383             content_encoding=None,
384             content_disposition=None,
385             content_type=None,
386             sharing=None,
387             public=None,
388             container_info_cache=None):
389         """Upload an object using multiple connections (threads)
390
391         :param obj: (str) remote object path
392
393         :param f: open file descriptor (rb)
394
395         :param hash_cb: optional progress.bar object for calculating hashes
396
397         :param upload_cb: optional progress.bar object for uploading
398
399         :param etag: (str)
400
401         :param if_etag_match: (str) Push that value to if-match header at file
402             creation
403
404         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
405             it does not exist remotely, otherwise the operation will fail.
406             Involves the case of an object with the same path is created while
407             the object is being uploaded.
408
409         :param content_encoding: (str)
410
411         :param content_disposition: (str)
412
413         :param content_type: (str)
414
415         :param sharing: {'read':[user and/or grp names],
416             'write':[usr and/or grp names]}
417
418         :param public: (bool)
419
420         :param container_info_cache: (dict) if given, avoid redundant calls to
421             server for container info (block size and hash information)
422         """
423         self._assert_container()
424
425         block_info = (
426             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
427                 f, size, container_info_cache)
428         (hashes, hmap, offset) = ([], {}, 0)
429         if not content_type:
430             content_type = 'application/octet-stream'
431
432         self._calculate_blocks_for_upload(
433             *block_info,
434             hashes=hashes,
435             hmap=hmap,
436             fileobj=f,
437             hash_cb=hash_cb)
438
439         hashmap = dict(bytes=size, hashes=hashes)
440         missing, obj_headers = self._create_object_or_get_missing_hashes(
441             obj, hashmap,
442             content_type=content_type,
443             size=size,
444             if_etag_match=if_etag_match,
445             if_etag_not_match='*' if if_not_exist else None,
446             content_encoding=content_encoding,
447             content_disposition=content_disposition,
448             permissions=sharing,
449             public=public)
450
451         if missing is None:
452             return obj_headers
453
454         if upload_cb:
455             upload_gen = upload_cb(len(missing))
456             for i in range(len(missing), len(hashmap['hashes']) + 1):
457                 try:
458                     upload_gen.next()
459                 except:
460                     upload_gen = None
461         else:
462             upload_gen = None
463
464         retries = 7
465         try:
466             while retries:
467                 sendlog.info('%s blocks missing' % len(missing))
468                 num_of_blocks = len(missing)
469                 missing = self._upload_missing_blocks(
470                     missing,
471                     hmap,
472                     f,
473                     upload_gen)
474                 if missing:
475                     if num_of_blocks == len(missing):
476                         retries -= 1
477                     else:
478                         num_of_blocks = len(missing)
479                 else:
480                     break
481             if missing:
482                 try:
483                     details = ['%s' % thread.exception for thread in missing]
484                 except Exception:
485                     details = ['Also, failed to read thread exceptions']
486                 raise ClientError(
487                     '%s blocks failed to upload' % len(missing),
488                     details=details)
489         except KeyboardInterrupt:
490             sendlog.info('- - - wait for threads to finish')
491             for thread in activethreads():
492                 thread.join()
493             raise
494
495         r = self.object_put(
496             obj,
497             format='json',
498             hashmap=True,
499             content_type=content_type,
500             content_encoding=content_encoding,
501             if_etag_match=if_etag_match,
502             if_etag_not_match='*' if if_not_exist else None,
503             etag=etag,
504             json=hashmap,
505             permissions=sharing,
506             public=public,
507             success=201)
508         return r.headers
509
510     def upload_from_string(
511             self, obj, input_str,
512             hash_cb=None,
513             upload_cb=None,
514             etag=None,
515             if_etag_match=None,
516             if_not_exist=None,
517             content_encoding=None,
518             content_disposition=None,
519             content_type=None,
520             sharing=None,
521             public=None,
522             container_info_cache=None):
523         """Upload an object using multiple connections (threads)
524
525         :param obj: (str) remote object path
526
527         :param input_str: (str) upload content
528
529         :param hash_cb: optional progress.bar object for calculating hashes
530
531         :param upload_cb: optional progress.bar object for uploading
532
533         :param etag: (str)
534
535         :param if_etag_match: (str) Push that value to if-match header at file
536             creation
537
538         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
539             it does not exist remotely, otherwise the operation will fail.
540             Involves the case of an object with the same path is created while
541             the object is being uploaded.
542
543         :param content_encoding: (str)
544
545         :param content_disposition: (str)
546
547         :param content_type: (str)
548
549         :param sharing: {'read':[user and/or grp names],
550             'write':[usr and/or grp names]}
551
552         :param public: (bool)
553
554         :param container_info_cache: (dict) if given, avoid redundant calls to
555             server for container info (block size and hash information)
556         """
557         self._assert_container()
558
559         blocksize, blockhash, size, nblocks = self._get_file_block_info(
560                 fileobj=None, size=len(input_str), cache=container_info_cache)
561         (hashes, hmap, offset) = ([], {}, 0)
562         if not content_type:
563             content_type = 'application/octet-stream'
564
565         hashes = []
566         hmap = {}
567         for blockid in range(nblocks):
568             start = blockid * blocksize
569             block = input_str[start: (start + blocksize)]
570             hashes.append(_pithos_hash(block, blockhash))
571             hmap[hashes[blockid]] = (start, block)
572
573         hashmap = dict(bytes=size, hashes=hashes)
574         missing, obj_headers = self._create_object_or_get_missing_hashes(
575             obj, hashmap,
576             content_type=content_type,
577             size=size,
578             if_etag_match=if_etag_match,
579             if_etag_not_match='*' if if_not_exist else None,
580             content_encoding=content_encoding,
581             content_disposition=content_disposition,
582             permissions=sharing,
583             public=public)
584         if missing is None:
585             return obj_headers
586         num_of_missing = len(missing)
587
588         if upload_cb:
589             self.progress_bar_gen = upload_cb(nblocks)
590             for i in range(nblocks + 1 - num_of_missing):
591                 self._cb_next()
592
593         tries = 7
594         old_failures = 0
595         try:
596             while tries and missing:
597                 flying = []
598                 failures = []
599                 for hash in missing:
600                     offset, block = hmap[hash]
601                     bird = self._put_block_async(block, hash)
602                     flying.append(bird)
603                     unfinished = self._watch_thread_limit(flying)
604                     for thread in set(flying).difference(unfinished):
605                         if thread.exception:
606                             failures.append(thread.kwargs['hash'])
607                         if thread.isAlive():
608                             flying.append(thread)
609                         else:
610                             self._cb_next()
611                     flying = unfinished
612                 for thread in flying:
613                     thread.join()
614                     if thread.exception:
615                         failures.append(thread.kwargs['hash'])
616                     self._cb_next()
617                 missing = failures
618                 if missing and len(missing) == old_failures:
619                     tries -= 1
620                 old_failures = len(missing)
621             if missing:
622                 raise ClientError(
623                     '%s blocks failed to upload' % len(missing),
624                     details=['%s' % thread.exception for thread in missing])
625         except KeyboardInterrupt:
626             sendlog.info('- - - wait for threads to finish')
627             for thread in activethreads():
628                 thread.join()
629             raise
630
631         r = self.object_put(
632             obj,
633             format='json',
634             hashmap=True,
635             content_type=content_type,
636             content_encoding=content_encoding,
637             if_etag_match=if_etag_match,
638             if_etag_not_match='*' if if_not_exist else None,
639             etag=etag,
640             json=hashmap,
641             permissions=sharing,
642             public=public,
643             success=201)
644         return r.headers
645
646     # download_* auxiliary methods
647     def _get_remote_blocks_info(self, obj, **restargs):
648         #retrieve object hashmap
649         myrange = restargs.pop('data_range', None)
650         hashmap = self.get_object_hashmap(obj, **restargs)
651         restargs['data_range'] = myrange
652         blocksize = int(hashmap['block_size'])
653         blockhash = hashmap['block_hash']
654         total_size = hashmap['bytes']
655         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
656         map_dict = {}
657         for i, h in enumerate(hashmap['hashes']):
658             #  map_dict[h] = i   CHAGE
659             if h in map_dict:
660                 map_dict[h].append(i)
661             else:
662                 map_dict[h] = [i]
663         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
664
665     def _dump_blocks_sync(
666             self, obj, remote_hashes, blocksize, total_size, dst, crange,
667             **args):
668         if not total_size:
669             return
670         for blockid, blockhash in enumerate(remote_hashes):
671             if blockhash:
672                 start = blocksize * blockid
673                 is_last = start + blocksize > total_size
674                 end = (total_size - 1) if is_last else (start + blocksize - 1)
675                 data_range = _range_up(start, end, total_size, crange)
676                 if not data_range:
677                     self._cb_next()
678                     continue
679                 args['data_range'] = 'bytes=%s' % data_range
680                 r = self.object_get(obj, success=(200, 206), **args)
681                 self._cb_next()
682                 dst.write(r.content)
683                 dst.flush()
684
685     def _get_block_async(self, obj, **args):
686         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
687         event.start()
688         return event
689
690     def _hash_from_file(self, fp, start, size, blockhash):
691         fp.seek(start)
692         block = readall(fp, size)
693         h = newhashlib(blockhash)
694         h.update(block.strip('\x00'))
695         return hexlify(h.digest())
696
697     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
698         """write the results of a greenleted rest call to a file
699
700         :param offset: the offset of the file up to blocksize
701         - e.g. if the range is 10-100, all blocks will be written to
702         normal_position - 10
703         """
704         for key, g in flying.items():
705             if g.isAlive():
706                 continue
707             if g.exception:
708                 raise g.exception
709             block = g.value.content
710             for block_start in blockids[key]:
711                 local_file.seek(block_start + offset)
712                 local_file.write(block)
713                 self._cb_next()
714             flying.pop(key)
715             blockids.pop(key)
716         local_file.flush()
717
718     def _dump_blocks_async(
719             self, obj, remote_hashes, blocksize, total_size, local_file,
720             blockhash=None, resume=False, filerange=None, **restargs):
721         file_size = fstat(local_file.fileno()).st_size if resume else 0
722         flying = dict()
723         blockid_dict = dict()
724         offset = 0
725
726         self._init_thread_limit()
727         for block_hash, blockids in remote_hashes.items():
728             blockids = [blk * blocksize for blk in blockids]
729             unsaved = [blk for blk in blockids if not (
730                 blk < file_size and block_hash == self._hash_from_file(
731                         local_file, blk, blocksize, blockhash))]
732             self._cb_next(len(blockids) - len(unsaved))
733             if unsaved:
734                 key = unsaved[0]
735                 self._watch_thread_limit(flying.values())
736                 self._thread2file(
737                     flying, blockid_dict, local_file, offset,
738                     **restargs)
739                 end = total_size - 1 if (
740                     key + blocksize > total_size) else key + blocksize - 1
741                 if end < key:
742                     self._cb_next()
743                     continue
744                 data_range = _range_up(key, end, total_size, filerange)
745                 if not data_range:
746                     self._cb_next()
747                     continue
748                 restargs[
749                     'async_headers'] = {'Range': 'bytes=%s' % data_range}
750                 flying[key] = self._get_block_async(obj, **restargs)
751                 blockid_dict[key] = unsaved
752
753         for thread in flying.values():
754             thread.join()
755         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
756
757     def download_object(
758             self, obj, dst,
759             download_cb=None,
760             version=None,
761             resume=False,
762             range_str=None,
763             if_match=None,
764             if_none_match=None,
765             if_modified_since=None,
766             if_unmodified_since=None):
767         """Download an object (multiple connections, random blocks)
768
769         :param obj: (str) remote object path
770
771         :param dst: open file descriptor (wb+)
772
773         :param download_cb: optional progress.bar object for downloading
774
775         :param version: (str) file version
776
777         :param resume: (bool) if set, preserve already downloaded file parts
778
779         :param range_str: (str) from, to are file positions (int) in bytes
780
781         :param if_match: (str)
782
783         :param if_none_match: (str)
784
785         :param if_modified_since: (str) formated date
786
787         :param if_unmodified_since: (str) formated date"""
788         restargs = dict(
789             version=version,
790             data_range=None if range_str is None else 'bytes=%s' % range_str,
791             if_match=if_match,
792             if_none_match=if_none_match,
793             if_modified_since=if_modified_since,
794             if_unmodified_since=if_unmodified_since)
795
796         (
797             blocksize,
798             blockhash,
799             total_size,
800             hash_list,
801             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
802         assert total_size >= 0
803
804         if download_cb:
805             self.progress_bar_gen = download_cb(len(hash_list))
806             self._cb_next()
807
808         if dst.isatty():
809             self._dump_blocks_sync(
810                 obj,
811                 hash_list,
812                 blocksize,
813                 total_size,
814                 dst,
815                 range_str,
816                 **restargs)
817         else:
818             self._dump_blocks_async(
819                 obj,
820                 remote_hashes,
821                 blocksize,
822                 total_size,
823                 dst,
824                 blockhash,
825                 resume,
826                 range_str,
827                 **restargs)
828             if not range_str:
829                 dst.truncate(total_size)
830
831         self._complete_cb()
832
833     def download_to_string(
834             self, obj,
835             download_cb=None,
836             version=None,
837             range_str=None,
838             if_match=None,
839             if_none_match=None,
840             if_modified_since=None,
841             if_unmodified_since=None):
842         """Download an object to a string (multiple connections). This method
843         uses threads for http requests, but stores all content in memory.
844
845         :param obj: (str) remote object path
846
847         :param download_cb: optional progress.bar object for downloading
848
849         :param version: (str) file version
850
851         :param range_str: (str) from, to are file positions (int) in bytes
852
853         :param if_match: (str)
854
855         :param if_none_match: (str)
856
857         :param if_modified_since: (str) formated date
858
859         :param if_unmodified_since: (str) formated date
860
861         :returns: (str) the whole object contents
862         """
863         restargs = dict(
864             version=version,
865             data_range=None if range_str is None else 'bytes=%s' % range_str,
866             if_match=if_match,
867             if_none_match=if_none_match,
868             if_modified_since=if_modified_since,
869             if_unmodified_since=if_unmodified_since)
870
871         (
872             blocksize,
873             blockhash,
874             total_size,
875             hash_list,
876             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
877         assert total_size >= 0
878
879         if download_cb:
880             self.progress_bar_gen = download_cb(len(hash_list))
881             self._cb_next()
882
883         num_of_blocks = len(remote_hashes)
884         ret = [''] * num_of_blocks
885         self._init_thread_limit()
886         flying = dict()
887         try:
888             for blockid, blockhash in enumerate(remote_hashes):
889                 start = blocksize * blockid
890                 is_last = start + blocksize > total_size
891                 end = (total_size - 1) if is_last else (start + blocksize - 1)
892                 data_range_str = _range_up(start, end, end, range_str)
893                 if data_range_str:
894                     self._watch_thread_limit(flying.values())
895                     restargs['data_range'] = 'bytes=%s' % data_range_str
896                     flying[blockid] = self._get_block_async(obj, **restargs)
897                 for runid, thread in flying.items():
898                     if (blockid + 1) == num_of_blocks:
899                         thread.join()
900                     elif thread.isAlive():
901                         continue
902                     if thread.exception:
903                         raise thread.exception
904                     ret[runid] = thread.value.content
905                     self._cb_next()
906                     flying.pop(runid)
907             return ''.join(ret)
908         except KeyboardInterrupt:
909             sendlog.info('- - - wait for threads to finish')
910             for thread in activethreads():
911                 thread.join()
912
913     #Command Progress Bar method
914     def _cb_next(self, step=1):
915         if hasattr(self, 'progress_bar_gen'):
916             try:
917                 for i in xrange(step):
918                     self.progress_bar_gen.next()
919             except:
920                 pass
921
922     def _complete_cb(self):
923         while True:
924             try:
925                 self.progress_bar_gen.next()
926             except:
927                 break
928
929     def get_object_hashmap(
930             self, obj,
931             version=None,
932             if_match=None,
933             if_none_match=None,
934             if_modified_since=None,
935             if_unmodified_since=None):
936         """
937         :param obj: (str) remote object path
938
939         :param if_match: (str)
940
941         :param if_none_match: (str)
942
943         :param if_modified_since: (str) formated date
944
945         :param if_unmodified_since: (str) formated date
946
947         :returns: (list)
948         """
949         try:
950             r = self.object_get(
951                 obj,
952                 hashmap=True,
953                 version=version,
954                 if_etag_match=if_match,
955                 if_etag_not_match=if_none_match,
956                 if_modified_since=if_modified_since,
957                 if_unmodified_since=if_unmodified_since)
958         except ClientError as err:
959             if err.status == 304 or err.status == 412:
960                 return {}
961             raise
962         return r.json
963
964     def set_account_group(self, group, usernames):
965         """
966         :param group: (str)
967
968         :param usernames: (list)
969         """
970         r = self.account_post(update=True, groups={group: usernames})
971         return r
972
973     def del_account_group(self, group):
974         """
975         :param group: (str)
976         """
977         self.account_post(update=True, groups={group: []})
978
979     def get_account_info(self, until=None):
980         """
981         :param until: (str) formated date
982
983         :returns: (dict)
984         """
985         r = self.account_head(until=until)
986         if r.status_code == 401:
987             raise ClientError("No authorization", status=401)
988         return r.headers
989
990     def get_account_quota(self):
991         """
992         :returns: (dict)
993         """
994         return filter_in(
995             self.get_account_info(),
996             'X-Account-Policy-Quota',
997             exactMatch=True)
998
999     #def get_account_versioning(self):
1000     #    """
1001     #    :returns: (dict)
1002     #    """
1003     #    return filter_in(
1004     #        self.get_account_info(),
1005     #        'X-Account-Policy-Versioning',
1006     #        exactMatch=True)
1007
1008     def get_account_meta(self, until=None):
1009         """
1010         :param until: (str) formated date
1011
1012         :returns: (dict)
1013         """
1014         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1015
1016     def get_account_group(self):
1017         """
1018         :returns: (dict)
1019         """
1020         return filter_in(self.get_account_info(), 'X-Account-Group-')
1021
1022     def set_account_meta(self, metapairs):
1023         """
1024         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1025         """
1026         assert(type(metapairs) is dict)
1027         r = self.account_post(update=True, metadata=metapairs)
1028         return r.headers
1029
1030     def del_account_meta(self, metakey):
1031         """
1032         :param metakey: (str) metadatum key
1033         """
1034         r = self.account_post(update=True, metadata={metakey: ''})
1035         return r.headers
1036
1037     #def set_account_quota(self, quota):
1038     #    """
1039     #    :param quota: (int)
1040     #    """
1041     #    self.account_post(update=True, quota=quota)
1042
1043     #def set_account_versioning(self, versioning):
1044     #    """
1045     #    :param versioning: (str)
1046     #    """
1047     #    r = self.account_post(update=True, versioning=versioning)
1048     #    return r.headers
1049
1050     def list_containers(self):
1051         """
1052         :returns: (dict)
1053         """
1054         r = self.account_get()
1055         return r.json
1056
1057     def del_container(self, until=None, delimiter=None):
1058         """
1059         :param until: (str) formated date
1060
1061         :param delimiter: (str) with / empty container
1062
1063         :raises ClientError: 404 Container does not exist
1064
1065         :raises ClientError: 409 Container is not empty
1066         """
1067         self._assert_container()
1068         r = self.container_delete(
1069             until=until,
1070             delimiter=delimiter,
1071             success=(204, 404, 409))
1072         if r.status_code == 404:
1073             raise ClientError(
1074                 'Container "%s" does not exist' % self.container,
1075                 r.status_code)
1076         elif r.status_code == 409:
1077             raise ClientError(
1078                 'Container "%s" is not empty' % self.container,
1079                 r.status_code)
1080         return r.headers
1081
1082     def get_container_versioning(self, container=None):
1083         """
1084         :param container: (str)
1085
1086         :returns: (dict)
1087         """
1088         cnt_back_up = self.container
1089         try:
1090             self.container = container or cnt_back_up
1091             return filter_in(
1092                 self.get_container_info(),
1093                 'X-Container-Policy-Versioning')
1094         finally:
1095             self.container = cnt_back_up
1096
1097     def get_container_limit(self, container=None):
1098         """
1099         :param container: (str)
1100
1101         :returns: (dict)
1102         """
1103         cnt_back_up = self.container
1104         try:
1105             self.container = container or cnt_back_up
1106             return filter_in(
1107                 self.get_container_info(),
1108                 'X-Container-Policy-Quota')
1109         finally:
1110             self.container = cnt_back_up
1111
1112     def get_container_info(self, until=None):
1113         """
1114         :param until: (str) formated date
1115
1116         :returns: (dict)
1117
1118         :raises ClientError: 404 Container not found
1119         """
1120         try:
1121             r = self.container_head(until=until)
1122         except ClientError as err:
1123             err.details.append('for container %s' % self.container)
1124             raise err
1125         return r.headers
1126
1127     def get_container_meta(self, until=None):
1128         """
1129         :param until: (str) formated date
1130
1131         :returns: (dict)
1132         """
1133         return filter_in(
1134             self.get_container_info(until=until),
1135             'X-Container-Meta')
1136
1137     def get_container_object_meta(self, until=None):
1138         """
1139         :param until: (str) formated date
1140
1141         :returns: (dict)
1142         """
1143         return filter_in(
1144             self.get_container_info(until=until),
1145             'X-Container-Object-Meta')
1146
1147     def set_container_meta(self, metapairs):
1148         """
1149         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1150         """
1151         assert(type(metapairs) is dict)
1152         r = self.container_post(update=True, metadata=metapairs)
1153         return r.headers
1154
1155     def del_container_meta(self, metakey):
1156         """
1157         :param metakey: (str) metadatum key
1158
1159         :returns: (dict) response headers
1160         """
1161         r = self.container_post(update=True, metadata={metakey: ''})
1162         return r.headers
1163
1164     def set_container_limit(self, limit):
1165         """
1166         :param limit: (int)
1167         """
1168         r = self.container_post(update=True, quota=limit)
1169         return r.headers
1170
1171     def set_container_versioning(self, versioning):
1172         """
1173         :param versioning: (str)
1174         """
1175         r = self.container_post(update=True, versioning=versioning)
1176         return r.headers
1177
1178     def del_object(self, obj, until=None, delimiter=None):
1179         """
1180         :param obj: (str) remote object path
1181
1182         :param until: (str) formated date
1183
1184         :param delimiter: (str)
1185         """
1186         self._assert_container()
1187         r = self.object_delete(obj, until=until, delimiter=delimiter)
1188         return r.headers
1189
1190     def set_object_meta(self, obj, metapairs):
1191         """
1192         :param obj: (str) remote object path
1193
1194         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1195         """
1196         assert(type(metapairs) is dict)
1197         r = self.object_post(obj, update=True, metadata=metapairs)
1198         return r.headers
1199
1200     def del_object_meta(self, obj, metakey):
1201         """
1202         :param obj: (str) remote object path
1203
1204         :param metakey: (str) metadatum key
1205         """
1206         r = self.object_post(obj, update=True, metadata={metakey: ''})
1207         return r.headers
1208
1209     def publish_object(self, obj):
1210         """
1211         :param obj: (str) remote object path
1212
1213         :returns: (str) access url
1214         """
1215         self.object_post(obj, update=True, public=True)
1216         info = self.get_object_info(obj)
1217         return info['x-object-public']
1218         pref, sep, rest = self.base_url.partition('//')
1219         base = rest.split('/')[0]
1220         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1221
1222     def unpublish_object(self, obj):
1223         """
1224         :param obj: (str) remote object path
1225         """
1226         r = self.object_post(obj, update=True, public=False)
1227         return r.headers
1228
1229     def get_object_info(self, obj, version=None):
1230         """
1231         :param obj: (str) remote object path
1232
1233         :param version: (str)
1234
1235         :returns: (dict)
1236         """
1237         try:
1238             r = self.object_head(obj, version=version)
1239             return r.headers
1240         except ClientError as ce:
1241             if ce.status == 404:
1242                 raise ClientError('Object %s not found' % obj, status=404)
1243             raise
1244
1245     def get_object_meta(self, obj, version=None):
1246         """
1247         :param obj: (str) remote object path
1248
1249         :param version: (str)
1250
1251         :returns: (dict)
1252         """
1253         return filter_in(
1254             self.get_object_info(obj, version=version),
1255             'X-Object-Meta')
1256
1257     def get_object_sharing(self, obj):
1258         """
1259         :param obj: (str) remote object path
1260
1261         :returns: (dict)
1262         """
1263         r = filter_in(
1264             self.get_object_info(obj),
1265             'X-Object-Sharing',
1266             exactMatch=True)
1267         reply = {}
1268         if len(r) > 0:
1269             perms = r['x-object-sharing'].split(';')
1270             for perm in perms:
1271                 try:
1272                     perm.index('=')
1273                 except ValueError:
1274                     raise ClientError('Incorrect reply format')
1275                 (key, val) = perm.strip().split('=')
1276                 reply[key] = val
1277         return reply
1278
1279     def set_object_sharing(
1280             self, obj,
1281             read_permission=False, write_permission=False):
1282         """Give read/write permisions to an object.
1283
1284         :param obj: (str) remote object path
1285
1286         :param read_permission: (list - bool) users and user groups that get
1287             read permission for this object - False means all previous read
1288             permissions will be removed
1289
1290         :param write_permission: (list - bool) of users and user groups to get
1291            write permission for this object - False means all previous write
1292            permissions will be removed
1293
1294         :returns: (dict) response headers
1295         """
1296
1297         perms = dict(read=read_permission or '', write=write_permission or '')
1298         r = self.object_post(obj, update=True, permissions=perms)
1299         return r.headers
1300
1301     def del_object_sharing(self, obj):
1302         """
1303         :param obj: (str) remote object path
1304         """
1305         return self.set_object_sharing(obj)
1306
1307     def append_object(self, obj, source_file, upload_cb=None):
1308         """
1309         :param obj: (str) remote object path
1310
1311         :param source_file: open file descriptor
1312
1313         :param upload_db: progress.bar for uploading
1314         """
1315         self._assert_container()
1316         meta = self.get_container_info()
1317         blocksize = int(meta['x-container-block-size'])
1318         filesize = fstat(source_file.fileno()).st_size
1319         nblocks = 1 + (filesize - 1) // blocksize
1320         offset = 0
1321         headers = {}
1322         if upload_cb:
1323             self.progress_bar_gen = upload_cb(nblocks)
1324             self._cb_next()
1325         flying = {}
1326         self._init_thread_limit()
1327         try:
1328             for i in range(nblocks):
1329                 block = source_file.read(min(blocksize, filesize - offset))
1330                 offset += len(block)
1331
1332                 self._watch_thread_limit(flying.values())
1333                 unfinished = {}
1334                 flying[i] = SilentEvent(
1335                     method=self.object_post,
1336                     obj=obj,
1337                     update=True,
1338                     content_range='bytes */*',
1339                     content_type='application/octet-stream',
1340                     content_length=len(block),
1341                     data=block)
1342                 flying[i].start()
1343
1344                 for key, thread in flying.items():
1345                     if thread.isAlive():
1346                         if i < nblocks:
1347                             unfinished[key] = thread
1348                             continue
1349                         thread.join()
1350                     if thread.exception:
1351                         raise thread.exception
1352                     headers[key] = thread.value.headers
1353                     self._cb_next()
1354                 flying = unfinished
1355         except KeyboardInterrupt:
1356             sendlog.info('- - - wait for threads to finish')
1357             for thread in activethreads():
1358                 thread.join()
1359         finally:
1360             from time import sleep
1361             sleep(2 * len(activethreads()))
1362         return headers.values()
1363
1364     def truncate_object(self, obj, upto_bytes):
1365         """
1366         :param obj: (str) remote object path
1367
1368         :param upto_bytes: max number of bytes to leave on file
1369
1370         :returns: (dict) response headers
1371         """
1372         r = self.object_post(
1373             obj,
1374             update=True,
1375             content_range='bytes 0-%s/*' % upto_bytes,
1376             content_type='application/octet-stream',
1377             object_bytes=upto_bytes,
1378             source_object=path4url(self.container, obj))
1379         return r.headers
1380
1381     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1382         """Overwrite a part of an object from local source file
1383
1384         :param obj: (str) remote object path
1385
1386         :param start: (int) position in bytes to start overwriting from
1387
1388         :param end: (int) position in bytes to stop overwriting at
1389
1390         :param source_file: open file descriptor
1391
1392         :param upload_db: progress.bar for uploading
1393         """
1394
1395         r = self.get_object_info(obj)
1396         rf_size = int(r['content-length'])
1397         if rf_size < int(start):
1398             raise ClientError(
1399                 'Range start exceeds file size',
1400                 status=416)
1401         elif rf_size < int(end):
1402             raise ClientError(
1403                 'Range end exceeds file size',
1404                 status=416)
1405         self._assert_container()
1406         meta = self.get_container_info()
1407         blocksize = int(meta['x-container-block-size'])
1408         filesize = fstat(source_file.fileno()).st_size
1409         datasize = int(end) - int(start) + 1
1410         nblocks = 1 + (datasize - 1) // blocksize
1411         offset = 0
1412         if upload_cb:
1413             self.progress_bar_gen = upload_cb(nblocks)
1414             self._cb_next()
1415         headers = []
1416         for i in range(nblocks):
1417             read_size = min(blocksize, filesize - offset, datasize - offset)
1418             block = source_file.read(read_size)
1419             r = self.object_post(
1420                 obj,
1421                 update=True,
1422                 content_type='application/octet-stream',
1423                 content_length=len(block),
1424                 content_range='bytes %s-%s/*' % (
1425                     start + offset,
1426                     start + offset + len(block) - 1),
1427                 data=block)
1428             headers.append(dict(r.headers))
1429             offset += len(block)
1430
1431             self._cb_next
1432         return headers
1433
1434     def copy_object(
1435             self, src_container, src_object, dst_container,
1436             dst_object=None,
1437             source_version=None,
1438             source_account=None,
1439             public=False,
1440             content_type=None,
1441             delimiter=None):
1442         """
1443         :param src_container: (str) source container
1444
1445         :param src_object: (str) source object path
1446
1447         :param dst_container: (str) destination container
1448
1449         :param dst_object: (str) destination object path
1450
1451         :param source_version: (str) source object version
1452
1453         :param source_account: (str) account to copy from
1454
1455         :param public: (bool)
1456
1457         :param content_type: (str)
1458
1459         :param delimiter: (str)
1460
1461         :returns: (dict) response headers
1462         """
1463         self._assert_account()
1464         self.container = dst_container
1465         src_path = path4url(src_container, src_object)
1466         r = self.object_put(
1467             dst_object or src_object,
1468             success=201,
1469             copy_from=src_path,
1470             content_length=0,
1471             source_version=source_version,
1472             source_account=source_account,
1473             public=public,
1474             content_type=content_type,
1475             delimiter=delimiter)
1476         return r.headers
1477
1478     def move_object(
1479             self, src_container, src_object, dst_container,
1480             dst_object=False,
1481             source_account=None,
1482             source_version=None,
1483             public=False,
1484             content_type=None,
1485             delimiter=None):
1486         """
1487         :param src_container: (str) source container
1488
1489         :param src_object: (str) source object path
1490
1491         :param dst_container: (str) destination container
1492
1493         :param dst_object: (str) destination object path
1494
1495         :param source_account: (str) account to move from
1496
1497         :param source_version: (str) source object version
1498
1499         :param public: (bool)
1500
1501         :param content_type: (str)
1502
1503         :param delimiter: (str)
1504
1505         :returns: (dict) response headers
1506         """
1507         self._assert_account()
1508         self.container = dst_container
1509         dst_object = dst_object or src_object
1510         src_path = path4url(src_container, src_object)
1511         r = self.object_put(
1512             dst_object,
1513             success=201,
1514             move_from=src_path,
1515             content_length=0,
1516             source_account=source_account,
1517             source_version=source_version,
1518             public=public,
1519             content_type=content_type,
1520             delimiter=delimiter)
1521         return r.headers
1522
1523     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1524         """Get accounts that share with self.account
1525
1526         :param limit: (str)
1527
1528         :param marker: (str)
1529
1530         :returns: (dict)
1531         """
1532         self._assert_account()
1533
1534         self.set_param('format', 'json')
1535         self.set_param('limit', limit, iff=limit is not None)
1536         self.set_param('marker', marker, iff=marker is not None)
1537
1538         path = ''
1539         success = kwargs.pop('success', (200, 204))
1540         r = self.get(path, *args, success=success, **kwargs)
1541         return r.json
1542
1543     def get_object_versionlist(self, obj):
1544         """
1545         :param obj: (str) remote object path
1546
1547         :returns: (list)
1548         """
1549         self._assert_container()
1550         r = self.object_get(obj, format='json', version='list')
1551         return r.json['versions']