Complete astakosclient exposition in astakos.py
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, max_value, a_range):
56     """
57     :param start: (int) the window bottom
58
59     :param end: (int) the window top
60
61     :param max_value: (int) maximum accepted value
62
63     :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64         where X: x|x-y|-x where x < y and x, y natural numbers
65
66     :returns: (str) a range string cut-off for the start-end range
67         an empty response means this window is out of range
68     """
69     assert start >= 0, '_range_up was called with start < 0'
70     assert end >= start, '_range_up was called with end < start'
71     assert end <= max_value, '_range_up was called with max_value < end'
72     if not a_range:
73         return '%s-%s' % (start, end)
74     selected = []
75     for some_range in a_range.split(','):
76         v0, sep, v1 = some_range.partition('-')
77         if v0:
78             v0 = int(v0)
79             if sep:
80                 v1 = int(v1)
81                 if v1 < start or v0 > end or v1 < v0:
82                     continue
83                 v0 = v0 if v0 > start else start
84                 v1 = v1 if v1 < end else end
85                 selected.append('%s-%s' % (v0, v1))
86             elif v0 < start:
87                 continue
88             else:
89                 v1 = v0 if v0 <= end else end
90                 selected.append('%s-%s' % (start, v1))
91         else:
92             v1 = int(v1)
93             if max_value - v1 > end:
94                 continue
95             v0 = (max_value - v1) if max_value - v1 > start else start
96             selected.append('%s-%s' % (v0, end))
97     return ','.join(selected)
98
99
100 class PithosClient(PithosRestClient):
101     """Synnefo Pithos+ API client"""
102
103     def __init__(self, base_url, token, account=None, container=None):
104         super(PithosClient, self).__init__(base_url, token, account, container)
105
106     def create_container(
107             self,
108             container=None, sizelimit=None, versioning=None, metadata=None,
109             **kwargs):
110         """
111         :param container: (str) if not given, self.container is used instead
112
113         :param sizelimit: (int) container total size limit in bytes
114
115         :param versioning: (str) can be auto or whatever supported by server
116
117         :param metadata: (dict) Custom user-defined metadata of the form
118             { 'name1': 'value1', 'name2': 'value2', ... }
119
120         :returns: (dict) response headers
121         """
122         cnt_back_up = self.container
123         try:
124             self.container = container or cnt_back_up
125             r = self.container_put(
126                 quota=sizelimit, versioning=versioning, metadata=metadata,
127                 **kwargs)
128             return r.headers
129         finally:
130             self.container = cnt_back_up
131
132     def purge_container(self, container=None):
133         """Delete an empty container and destroy associated blocks
134         """
135         cnt_back_up = self.container
136         try:
137             self.container = container or cnt_back_up
138             r = self.container_delete(until=unicode(time()))
139         finally:
140             self.container = cnt_back_up
141         return r.headers
142
143     def upload_object_unchunked(
144             self, obj, f,
145             withHashFile=False,
146             size=None,
147             etag=None,
148             content_encoding=None,
149             content_disposition=None,
150             content_type=None,
151             sharing=None,
152             public=None):
153         """
154         :param obj: (str) remote object path
155
156         :param f: open file descriptor
157
158         :param withHashFile: (bool)
159
160         :param size: (int) size of data to upload
161
162         :param etag: (str)
163
164         :param content_encoding: (str)
165
166         :param content_disposition: (str)
167
168         :param content_type: (str)
169
170         :param sharing: {'read':[user and/or grp names],
171             'write':[usr and/or grp names]}
172
173         :param public: (bool)
174
175         :returns: (dict) created object metadata
176         """
177         self._assert_container()
178
179         if withHashFile:
180             data = f.read()
181             try:
182                 import json
183                 data = json.dumps(json.loads(data))
184             except ValueError:
185                 raise ClientError('"%s" is not json-formated' % f.name, 1)
186             except SyntaxError:
187                 msg = '"%s" is not a valid hashmap file' % f.name
188                 raise ClientError(msg, 1)
189             f = StringIO(data)
190         else:
191             data = f.read(size) if size else f.read()
192         r = self.object_put(
193             obj,
194             data=data,
195             etag=etag,
196             content_encoding=content_encoding,
197             content_disposition=content_disposition,
198             content_type=content_type,
199             permissions=sharing,
200             public=public,
201             success=201)
202         return r.headers
203
204     def create_object_by_manifestation(
205             self, obj,
206             etag=None,
207             content_encoding=None,
208             content_disposition=None,
209             content_type=None,
210             sharing=None,
211             public=None):
212         """
213         :param obj: (str) remote object path
214
215         :param etag: (str)
216
217         :param content_encoding: (str)
218
219         :param content_disposition: (str)
220
221         :param content_type: (str)
222
223         :param sharing: {'read':[user and/or grp names],
224             'write':[usr and/or grp names]}
225
226         :param public: (bool)
227
228         :returns: (dict) created object metadata
229         """
230         self._assert_container()
231         r = self.object_put(
232             obj,
233             content_length=0,
234             etag=etag,
235             content_encoding=content_encoding,
236             content_disposition=content_disposition,
237             content_type=content_type,
238             permissions=sharing,
239             public=public,
240             manifest='%s/%s' % (self.container, obj))
241         return r.headers
242
243     # upload_* auxiliary methods
244     def _put_block_async(self, data, hash):
245         event = SilentEvent(method=self._put_block, data=data, hash=hash)
246         event.start()
247         return event
248
249     def _put_block(self, data, hash):
250         r = self.container_post(
251             update=True,
252             content_type='application/octet-stream',
253             content_length=len(data),
254             data=data,
255             format='json')
256         assert r.json[0] == hash, 'Local hash does not match server'
257
258     def _get_file_block_info(self, fileobj, size=None, cache=None):
259         """
260         :param fileobj: (file descriptor) source
261
262         :param size: (int) size of data to upload from source
263
264         :param cache: (dict) if provided, cache container info response to
265         avoid redundant calls
266         """
267         if isinstance(cache, dict):
268             try:
269                 meta = cache[self.container]
270             except KeyError:
271                 meta = self.get_container_info()
272                 cache[self.container] = meta
273         else:
274             meta = self.get_container_info()
275         blocksize = int(meta['x-container-block-size'])
276         blockhash = meta['x-container-block-hash']
277         size = size if size is not None else fstat(fileobj.fileno()).st_size
278         nblocks = 1 + (size - 1) // blocksize
279         return (blocksize, blockhash, size, nblocks)
280
281     def _create_object_or_get_missing_hashes(
282             self, obj, json,
283             size=None,
284             format='json',
285             hashmap=True,
286             content_type=None,
287             if_etag_match=None,
288             if_etag_not_match=None,
289             content_encoding=None,
290             content_disposition=None,
291             permissions=None,
292             public=None,
293             success=(201, 409)):
294         r = self.object_put(
295             obj,
296             format='json',
297             hashmap=True,
298             content_type=content_type,
299             json=json,
300             if_etag_match=if_etag_match,
301             if_etag_not_match=if_etag_not_match,
302             content_encoding=content_encoding,
303             content_disposition=content_disposition,
304             permissions=permissions,
305             public=public,
306             success=success)
307         return (None if r.status_code == 201 else r.json), r.headers
308
309     def _calculate_blocks_for_upload(
310             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
311             hash_cb=None):
312         offset = 0
313         if hash_cb:
314             hash_gen = hash_cb(nblocks)
315             hash_gen.next()
316
317         for i in range(nblocks):
318             block = fileobj.read(min(blocksize, size - offset))
319             bytes = len(block)
320             hash = _pithos_hash(block, blockhash)
321             hashes.append(hash)
322             hmap[hash] = (offset, bytes)
323             offset += bytes
324             if hash_cb:
325                 hash_gen.next()
326         msg = ('Failed to calculate uploaded blocks:'
327                ' Offset and object size do not match')
328         assert offset == size, msg
329
330     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
331         """upload missing blocks asynchronously"""
332
333         self._init_thread_limit()
334
335         flying = []
336         failures = []
337         for hash in missing:
338             offset, bytes = hmap[hash]
339             fileobj.seek(offset)
340             data = fileobj.read(bytes)
341             r = self._put_block_async(data, hash)
342             flying.append(r)
343             unfinished = self._watch_thread_limit(flying)
344             for thread in set(flying).difference(unfinished):
345                 if thread.exception:
346                     failures.append(thread)
347                     if isinstance(
348                             thread.exception,
349                             ClientError) and thread.exception.status == 502:
350                         self.POOLSIZE = self._thread_limit
351                 elif thread.isAlive():
352                     flying.append(thread)
353                 elif upload_gen:
354                     try:
355                         upload_gen.next()
356                     except:
357                         pass
358             flying = unfinished
359
360         for thread in flying:
361             thread.join()
362             if thread.exception:
363                 failures.append(thread)
364             elif upload_gen:
365                 try:
366                     upload_gen.next()
367                 except:
368                     pass
369
370         return [failure.kwargs['hash'] for failure in failures]
371
372     def upload_object(
373             self, obj, f,
374             size=None,
375             hash_cb=None,
376             upload_cb=None,
377             etag=None,
378             if_etag_match=None,
379             if_not_exist=None,
380             content_encoding=None,
381             content_disposition=None,
382             content_type=None,
383             sharing=None,
384             public=None,
385             container_info_cache=None):
386         """Upload an object using multiple connections (threads)
387
388         :param obj: (str) remote object path
389
390         :param f: open file descriptor (rb)
391
392         :param hash_cb: optional progress.bar object for calculating hashes
393
394         :param upload_cb: optional progress.bar object for uploading
395
396         :param etag: (str)
397
398         :param if_etag_match: (str) Push that value to if-match header at file
399             creation
400
401         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
402             it does not exist remotely, otherwise the operation will fail.
403             Involves the case of an object with the same path is created while
404             the object is being uploaded.
405
406         :param content_encoding: (str)
407
408         :param content_disposition: (str)
409
410         :param content_type: (str)
411
412         :param sharing: {'read':[user and/or grp names],
413             'write':[usr and/or grp names]}
414
415         :param public: (bool)
416
417         :param container_info_cache: (dict) if given, avoid redundant calls to
418             server for container info (block size and hash information)
419         """
420         self._assert_container()
421
422         block_info = (
423             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
424                 f, size, container_info_cache)
425         (hashes, hmap, offset) = ([], {}, 0)
426         if not content_type:
427             content_type = 'application/octet-stream'
428
429         self._calculate_blocks_for_upload(
430             *block_info,
431             hashes=hashes,
432             hmap=hmap,
433             fileobj=f,
434             hash_cb=hash_cb)
435
436         hashmap = dict(bytes=size, hashes=hashes)
437         missing, obj_headers = self._create_object_or_get_missing_hashes(
438             obj, hashmap,
439             content_type=content_type,
440             size=size,
441             if_etag_match=if_etag_match,
442             if_etag_not_match='*' if if_not_exist else None,
443             content_encoding=content_encoding,
444             content_disposition=content_disposition,
445             permissions=sharing,
446             public=public)
447
448         if missing is None:
449             return obj_headers
450
451         if upload_cb:
452             upload_gen = upload_cb(len(missing))
453             for i in range(len(missing), len(hashmap['hashes']) + 1):
454                 try:
455                     upload_gen.next()
456                 except:
457                     upload_gen = None
458         else:
459             upload_gen = None
460
461         retries = 7
462         try:
463             while retries:
464                 sendlog.info('%s blocks missing' % len(missing))
465                 num_of_blocks = len(missing)
466                 missing = self._upload_missing_blocks(
467                     missing,
468                     hmap,
469                     f,
470                     upload_gen)
471                 if missing:
472                     if num_of_blocks == len(missing):
473                         retries -= 1
474                     else:
475                         num_of_blocks = len(missing)
476                 else:
477                     break
478             if missing:
479                 try:
480                     details = ['%s' % thread.exception for thread in missing]
481                 except Exception:
482                     details = ['Also, failed to read thread exceptions']
483                 raise ClientError(
484                     '%s blocks failed to upload' % len(missing),
485                     details=details)
486         except KeyboardInterrupt:
487             sendlog.info('- - - wait for threads to finish')
488             for thread in activethreads():
489                 thread.join()
490             raise
491
492         r = self.object_put(
493             obj,
494             format='json',
495             hashmap=True,
496             content_type=content_type,
497             content_encoding=content_encoding,
498             if_etag_match=if_etag_match,
499             if_etag_not_match='*' if if_not_exist else None,
500             etag=etag,
501             json=hashmap,
502             permissions=sharing,
503             public=public,
504             success=201)
505         return r.headers
506
507     def upload_from_string(
508             self, obj, input_str,
509             hash_cb=None,
510             upload_cb=None,
511             etag=None,
512             if_etag_match=None,
513             if_not_exist=None,
514             content_encoding=None,
515             content_disposition=None,
516             content_type=None,
517             sharing=None,
518             public=None,
519             container_info_cache=None):
520         """Upload an object using multiple connections (threads)
521
522         :param obj: (str) remote object path
523
524         :param input_str: (str) upload content
525
526         :param hash_cb: optional progress.bar object for calculating hashes
527
528         :param upload_cb: optional progress.bar object for uploading
529
530         :param etag: (str)
531
532         :param if_etag_match: (str) Push that value to if-match header at file
533             creation
534
535         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
536             it does not exist remotely, otherwise the operation will fail.
537             Involves the case of an object with the same path is created while
538             the object is being uploaded.
539
540         :param content_encoding: (str)
541
542         :param content_disposition: (str)
543
544         :param content_type: (str)
545
546         :param sharing: {'read':[user and/or grp names],
547             'write':[usr and/or grp names]}
548
549         :param public: (bool)
550
551         :param container_info_cache: (dict) if given, avoid redundant calls to
552             server for container info (block size and hash information)
553         """
554         self._assert_container()
555
556         blocksize, blockhash, size, nblocks = self._get_file_block_info(
557                 fileobj=None, size=len(input_str), cache=container_info_cache)
558         (hashes, hmap, offset) = ([], {}, 0)
559         if not content_type:
560             content_type = 'application/octet-stream'
561
562         hashes = []
563         hmap = {}
564         for blockid in range(nblocks):
565             start = blockid * blocksize
566             block = input_str[start: (start + blocksize)]
567             hashes.append(_pithos_hash(block, blockhash))
568             hmap[hashes[blockid]] = (start, block)
569
570         hashmap = dict(bytes=size, hashes=hashes)
571         missing, obj_headers = self._create_object_or_get_missing_hashes(
572             obj, hashmap,
573             content_type=content_type,
574             size=size,
575             if_etag_match=if_etag_match,
576             if_etag_not_match='*' if if_not_exist else None,
577             content_encoding=content_encoding,
578             content_disposition=content_disposition,
579             permissions=sharing,
580             public=public)
581         if missing is None:
582             return obj_headers
583         num_of_missing = len(missing)
584
585         if upload_cb:
586             self.progress_bar_gen = upload_cb(nblocks)
587             for i in range(nblocks + 1 - num_of_missing):
588                 self._cb_next()
589
590         tries = 7
591         old_failures = 0
592         try:
593             while tries and missing:
594                 flying = []
595                 failures = []
596                 for hash in missing:
597                     offset, block = hmap[hash]
598                     bird = self._put_block_async(block, hash)
599                     flying.append(bird)
600                     unfinished = self._watch_thread_limit(flying)
601                     for thread in set(flying).difference(unfinished):
602                         if thread.exception:
603                             failures.append(thread.kwargs['hash'])
604                         if thread.isAlive():
605                             flying.append(thread)
606                         else:
607                             self._cb_next()
608                     flying = unfinished
609                 for thread in flying:
610                     thread.join()
611                     if thread.exception:
612                         failures.append(thread.kwargs['hash'])
613                     self._cb_next()
614                 missing = failures
615                 if missing and len(missing) == old_failures:
616                     tries -= 1
617                 old_failures = len(missing)
618             if missing:
619                 raise ClientError(
620                     '%s blocks failed to upload' % len(missing),
621                     details=['%s' % thread.exception for thread in missing])
622         except KeyboardInterrupt:
623             sendlog.info('- - - wait for threads to finish')
624             for thread in activethreads():
625                 thread.join()
626             raise
627
628         r = self.object_put(
629             obj,
630             format='json',
631             hashmap=True,
632             content_type=content_type,
633             content_encoding=content_encoding,
634             if_etag_match=if_etag_match,
635             if_etag_not_match='*' if if_not_exist else None,
636             etag=etag,
637             json=hashmap,
638             permissions=sharing,
639             public=public,
640             success=201)
641         return r.headers
642
643     # download_* auxiliary methods
644     def _get_remote_blocks_info(self, obj, **restargs):
645         #retrieve object hashmap
646         myrange = restargs.pop('data_range', None)
647         hashmap = self.get_object_hashmap(obj, **restargs)
648         restargs['data_range'] = myrange
649         blocksize = int(hashmap['block_size'])
650         blockhash = hashmap['block_hash']
651         total_size = hashmap['bytes']
652         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
653         map_dict = {}
654         for i, h in enumerate(hashmap['hashes']):
655             #  map_dict[h] = i   CHAGE
656             if h in map_dict:
657                 map_dict[h].append(i)
658             else:
659                 map_dict[h] = [i]
660         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
661
662     def _dump_blocks_sync(
663             self, obj, remote_hashes, blocksize, total_size, dst, crange,
664             **args):
665         for blockid, blockhash in enumerate(remote_hashes):
666             if blockhash:
667                 start = blocksize * blockid
668                 is_last = start + blocksize > total_size
669                 end = (total_size - 1) if is_last else (start + blocksize - 1)
670                 data_range = _range_up(start, end, total_size, crange)
671                 if not data_range:
672                     self._cb_next()
673                     continue
674                 args['data_range'] = 'bytes=%s' % data_range
675                 r = self.object_get(obj, success=(200, 206), **args)
676                 self._cb_next()
677                 dst.write(r.content)
678                 dst.flush()
679
680     def _get_block_async(self, obj, **args):
681         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
682         event.start()
683         return event
684
685     def _hash_from_file(self, fp, start, size, blockhash):
686         fp.seek(start)
687         block = fp.read(size)
688         h = newhashlib(blockhash)
689         h.update(block.strip('\x00'))
690         return hexlify(h.digest())
691
692     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
693         """write the results of a greenleted rest call to a file
694
695         :param offset: the offset of the file up to blocksize
696         - e.g. if the range is 10-100, all blocks will be written to
697         normal_position - 10
698         """
699         for key, g in flying.items():
700             if g.isAlive():
701                 continue
702             if g.exception:
703                 raise g.exception
704             block = g.value.content
705             for block_start in blockids[key]:
706                 local_file.seek(block_start + offset)
707                 local_file.write(block)
708                 self._cb_next()
709             flying.pop(key)
710             blockids.pop(key)
711         local_file.flush()
712
713     def _dump_blocks_async(
714             self, obj, remote_hashes, blocksize, total_size, local_file,
715             blockhash=None, resume=False, filerange=None, **restargs):
716         file_size = fstat(local_file.fileno()).st_size if resume else 0
717         flying = dict()
718         blockid_dict = dict()
719         offset = 0
720
721         self._init_thread_limit()
722         for block_hash, blockids in remote_hashes.items():
723             blockids = [blk * blocksize for blk in blockids]
724             unsaved = [blk for blk in blockids if not (
725                 blk < file_size and block_hash == self._hash_from_file(
726                         local_file, blk, blocksize, blockhash))]
727             self._cb_next(len(blockids) - len(unsaved))
728             if unsaved:
729                 key = unsaved[0]
730                 self._watch_thread_limit(flying.values())
731                 self._thread2file(
732                     flying, blockid_dict, local_file, offset,
733                     **restargs)
734                 end = total_size - 1 if (
735                     key + blocksize > total_size) else key + blocksize - 1
736                 data_range = _range_up(key, end, total_size, filerange)
737                 if not data_range:
738                     self._cb_next()
739                     continue
740                 restargs['async_headers'] = {'Range': 'bytes=%s' % data_range}
741                 flying[key] = self._get_block_async(obj, **restargs)
742                 blockid_dict[key] = unsaved
743
744         for thread in flying.values():
745             thread.join()
746         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
747
748     def download_object(
749             self, obj, dst,
750             download_cb=None,
751             version=None,
752             resume=False,
753             range_str=None,
754             if_match=None,
755             if_none_match=None,
756             if_modified_since=None,
757             if_unmodified_since=None):
758         """Download an object (multiple connections, random blocks)
759
760         :param obj: (str) remote object path
761
762         :param dst: open file descriptor (wb+)
763
764         :param download_cb: optional progress.bar object for downloading
765
766         :param version: (str) file version
767
768         :param resume: (bool) if set, preserve already downloaded file parts
769
770         :param range_str: (str) from, to are file positions (int) in bytes
771
772         :param if_match: (str)
773
774         :param if_none_match: (str)
775
776         :param if_modified_since: (str) formated date
777
778         :param if_unmodified_since: (str) formated date"""
779         restargs = dict(
780             version=version,
781             data_range=None if range_str is None else 'bytes=%s' % range_str,
782             if_match=if_match,
783             if_none_match=if_none_match,
784             if_modified_since=if_modified_since,
785             if_unmodified_since=if_unmodified_since)
786
787         (
788             blocksize,
789             blockhash,
790             total_size,
791             hash_list,
792             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
793         assert total_size >= 0
794
795         if download_cb:
796             self.progress_bar_gen = download_cb(len(hash_list))
797             self._cb_next()
798
799         if dst.isatty():
800             self._dump_blocks_sync(
801                 obj,
802                 hash_list,
803                 blocksize,
804                 total_size,
805                 dst,
806                 range_str,
807                 **restargs)
808         else:
809             self._dump_blocks_async(
810                 obj,
811                 remote_hashes,
812                 blocksize,
813                 total_size,
814                 dst,
815                 blockhash,
816                 resume,
817                 range_str,
818                 **restargs)
819             if not range_str:
820                 dst.truncate(total_size)
821
822         self._complete_cb()
823
824     def download_to_string(
825             self, obj,
826             download_cb=None,
827             version=None,
828             range_str=None,
829             if_match=None,
830             if_none_match=None,
831             if_modified_since=None,
832             if_unmodified_since=None):
833         """Download an object to a string (multiple connections). This method
834         uses threads for http requests, but stores all content in memory.
835
836         :param obj: (str) remote object path
837
838         :param download_cb: optional progress.bar object for downloading
839
840         :param version: (str) file version
841
842         :param range_str: (str) from, to are file positions (int) in bytes
843
844         :param if_match: (str)
845
846         :param if_none_match: (str)
847
848         :param if_modified_since: (str) formated date
849
850         :param if_unmodified_since: (str) formated date
851
852         :returns: (str) the whole object contents
853         """
854         restargs = dict(
855             version=version,
856             data_range=None if range_str is None else 'bytes=%s' % range_str,
857             if_match=if_match,
858             if_none_match=if_none_match,
859             if_modified_since=if_modified_since,
860             if_unmodified_since=if_unmodified_since)
861
862         (
863             blocksize,
864             blockhash,
865             total_size,
866             hash_list,
867             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
868         assert total_size >= 0
869
870         if download_cb:
871             self.progress_bar_gen = download_cb(len(hash_list))
872             self._cb_next()
873
874         num_of_blocks = len(remote_hashes)
875         ret = [''] * num_of_blocks
876         self._init_thread_limit()
877         flying = dict()
878         try:
879             for blockid, blockhash in enumerate(remote_hashes):
880                 start = blocksize * blockid
881                 is_last = start + blocksize > total_size
882                 end = (total_size - 1) if is_last else (start + blocksize - 1)
883                 data_range_str = _range_up(start, end, end, range_str)
884                 if data_range_str:
885                     self._watch_thread_limit(flying.values())
886                     restargs['data_range'] = 'bytes=%s' % data_range_str
887                     flying[blockid] = self._get_block_async(obj, **restargs)
888                 for runid, thread in flying.items():
889                     if (blockid + 1) == num_of_blocks:
890                         thread.join()
891                     elif thread.isAlive():
892                         continue
893                     if thread.exception:
894                         raise thread.exception
895                     ret[runid] = thread.value.content
896                     self._cb_next()
897                     flying.pop(runid)
898             return ''.join(ret)
899         except KeyboardInterrupt:
900             sendlog.info('- - - wait for threads to finish')
901             for thread in activethreads():
902                 thread.join()
903
904     #Command Progress Bar method
905     def _cb_next(self, step=1):
906         if hasattr(self, 'progress_bar_gen'):
907             try:
908                 for i in xrange(step):
909                     self.progress_bar_gen.next()
910             except:
911                 pass
912
913     def _complete_cb(self):
914         while True:
915             try:
916                 self.progress_bar_gen.next()
917             except:
918                 break
919
920     def get_object_hashmap(
921             self, obj,
922             version=None,
923             if_match=None,
924             if_none_match=None,
925             if_modified_since=None,
926             if_unmodified_since=None):
927         """
928         :param obj: (str) remote object path
929
930         :param if_match: (str)
931
932         :param if_none_match: (str)
933
934         :param if_modified_since: (str) formated date
935
936         :param if_unmodified_since: (str) formated date
937
938         :returns: (list)
939         """
940         try:
941             r = self.object_get(
942                 obj,
943                 hashmap=True,
944                 version=version,
945                 if_etag_match=if_match,
946                 if_etag_not_match=if_none_match,
947                 if_modified_since=if_modified_since,
948                 if_unmodified_since=if_unmodified_since)
949         except ClientError as err:
950             if err.status == 304 or err.status == 412:
951                 return {}
952             raise
953         return r.json
954
955     def set_account_group(self, group, usernames):
956         """
957         :param group: (str)
958
959         :param usernames: (list)
960         """
961         r = self.account_post(update=True, groups={group: usernames})
962         return r
963
964     def del_account_group(self, group):
965         """
966         :param group: (str)
967         """
968         self.account_post(update=True, groups={group: []})
969
970     def get_account_info(self, until=None):
971         """
972         :param until: (str) formated date
973
974         :returns: (dict)
975         """
976         r = self.account_head(until=until)
977         if r.status_code == 401:
978             raise ClientError("No authorization", status=401)
979         return r.headers
980
981     def get_account_quota(self):
982         """
983         :returns: (dict)
984         """
985         return filter_in(
986             self.get_account_info(),
987             'X-Account-Policy-Quota',
988             exactMatch=True)
989
990     #def get_account_versioning(self):
991     #    """
992     #    :returns: (dict)
993     #    """
994     #    return filter_in(
995     #        self.get_account_info(),
996     #        'X-Account-Policy-Versioning',
997     #        exactMatch=True)
998
999     def get_account_meta(self, until=None):
1000         """
1001         :param until: (str) formated date
1002
1003         :returns: (dict)
1004         """
1005         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1006
1007     def get_account_group(self):
1008         """
1009         :returns: (dict)
1010         """
1011         return filter_in(self.get_account_info(), 'X-Account-Group-')
1012
1013     def set_account_meta(self, metapairs):
1014         """
1015         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1016         """
1017         assert(type(metapairs) is dict)
1018         r = self.account_post(update=True, metadata=metapairs)
1019         return r.headers
1020
1021     def del_account_meta(self, metakey):
1022         """
1023         :param metakey: (str) metadatum key
1024         """
1025         r = self.account_post(update=True, metadata={metakey: ''})
1026         return r.headers
1027
1028     #def set_account_quota(self, quota):
1029     #    """
1030     #    :param quota: (int)
1031     #    """
1032     #    self.account_post(update=True, quota=quota)
1033
1034     #def set_account_versioning(self, versioning):
1035     #    """
1036     #    :param versioning: (str)
1037     #    """
1038     #    r = self.account_post(update=True, versioning=versioning)
1039     #    return r.headers
1040
1041     def list_containers(self):
1042         """
1043         :returns: (dict)
1044         """
1045         r = self.account_get()
1046         return r.json
1047
1048     def del_container(self, until=None, delimiter=None):
1049         """
1050         :param until: (str) formated date
1051
1052         :param delimiter: (str) with / empty container
1053
1054         :raises ClientError: 404 Container does not exist
1055
1056         :raises ClientError: 409 Container is not empty
1057         """
1058         self._assert_container()
1059         r = self.container_delete(
1060             until=until,
1061             delimiter=delimiter,
1062             success=(204, 404, 409))
1063         if r.status_code == 404:
1064             raise ClientError(
1065                 'Container "%s" does not exist' % self.container,
1066                 r.status_code)
1067         elif r.status_code == 409:
1068             raise ClientError(
1069                 'Container "%s" is not empty' % self.container,
1070                 r.status_code)
1071         return r.headers
1072
1073     def get_container_versioning(self, container=None):
1074         """
1075         :param container: (str)
1076
1077         :returns: (dict)
1078         """
1079         cnt_back_up = self.container
1080         try:
1081             self.container = container or cnt_back_up
1082             return filter_in(
1083                 self.get_container_info(),
1084                 'X-Container-Policy-Versioning')
1085         finally:
1086             self.container = cnt_back_up
1087
1088     def get_container_limit(self, container=None):
1089         """
1090         :param container: (str)
1091
1092         :returns: (dict)
1093         """
1094         cnt_back_up = self.container
1095         try:
1096             self.container = container or cnt_back_up
1097             return filter_in(
1098                 self.get_container_info(),
1099                 'X-Container-Policy-Quota')
1100         finally:
1101             self.container = cnt_back_up
1102
1103     def get_container_info(self, until=None):
1104         """
1105         :param until: (str) formated date
1106
1107         :returns: (dict)
1108
1109         :raises ClientError: 404 Container not found
1110         """
1111         try:
1112             r = self.container_head(until=until)
1113         except ClientError as err:
1114             err.details.append('for container %s' % self.container)
1115             raise err
1116         return r.headers
1117
1118     def get_container_meta(self, until=None):
1119         """
1120         :param until: (str) formated date
1121
1122         :returns: (dict)
1123         """
1124         return filter_in(
1125             self.get_container_info(until=until),
1126             'X-Container-Meta')
1127
1128     def get_container_object_meta(self, until=None):
1129         """
1130         :param until: (str) formated date
1131
1132         :returns: (dict)
1133         """
1134         return filter_in(
1135             self.get_container_info(until=until),
1136             'X-Container-Object-Meta')
1137
1138     def set_container_meta(self, metapairs):
1139         """
1140         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1141         """
1142         assert(type(metapairs) is dict)
1143         r = self.container_post(update=True, metadata=metapairs)
1144         return r.headers
1145
1146     def del_container_meta(self, metakey):
1147         """
1148         :param metakey: (str) metadatum key
1149
1150         :returns: (dict) response headers
1151         """
1152         r = self.container_post(update=True, metadata={metakey: ''})
1153         return r.headers
1154
1155     def set_container_limit(self, limit):
1156         """
1157         :param limit: (int)
1158         """
1159         r = self.container_post(update=True, quota=limit)
1160         return r.headers
1161
1162     def set_container_versioning(self, versioning):
1163         """
1164         :param versioning: (str)
1165         """
1166         r = self.container_post(update=True, versioning=versioning)
1167         return r.headers
1168
1169     def del_object(self, obj, until=None, delimiter=None):
1170         """
1171         :param obj: (str) remote object path
1172
1173         :param until: (str) formated date
1174
1175         :param delimiter: (str)
1176         """
1177         self._assert_container()
1178         r = self.object_delete(obj, until=until, delimiter=delimiter)
1179         return r.headers
1180
1181     def set_object_meta(self, obj, metapairs):
1182         """
1183         :param obj: (str) remote object path
1184
1185         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1186         """
1187         assert(type(metapairs) is dict)
1188         r = self.object_post(obj, update=True, metadata=metapairs)
1189         return r.headers
1190
1191     def del_object_meta(self, obj, metakey):
1192         """
1193         :param obj: (str) remote object path
1194
1195         :param metakey: (str) metadatum key
1196         """
1197         r = self.object_post(obj, update=True, metadata={metakey: ''})
1198         return r.headers
1199
1200     def publish_object(self, obj):
1201         """
1202         :param obj: (str) remote object path
1203
1204         :returns: (str) access url
1205         """
1206         self.object_post(obj, update=True, public=True)
1207         info = self.get_object_info(obj)
1208         return info['x-object-public']
1209         pref, sep, rest = self.base_url.partition('//')
1210         base = rest.split('/')[0]
1211         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1212
1213     def unpublish_object(self, obj):
1214         """
1215         :param obj: (str) remote object path
1216         """
1217         r = self.object_post(obj, update=True, public=False)
1218         return r.headers
1219
1220     def get_object_info(self, obj, version=None):
1221         """
1222         :param obj: (str) remote object path
1223
1224         :param version: (str)
1225
1226         :returns: (dict)
1227         """
1228         try:
1229             r = self.object_head(obj, version=version)
1230             return r.headers
1231         except ClientError as ce:
1232             if ce.status == 404:
1233                 raise ClientError('Object %s not found' % obj, status=404)
1234             raise
1235
1236     def get_object_meta(self, obj, version=None):
1237         """
1238         :param obj: (str) remote object path
1239
1240         :param version: (str)
1241
1242         :returns: (dict)
1243         """
1244         return filter_in(
1245             self.get_object_info(obj, version=version),
1246             'X-Object-Meta')
1247
1248     def get_object_sharing(self, obj):
1249         """
1250         :param obj: (str) remote object path
1251
1252         :returns: (dict)
1253         """
1254         r = filter_in(
1255             self.get_object_info(obj),
1256             'X-Object-Sharing',
1257             exactMatch=True)
1258         reply = {}
1259         if len(r) > 0:
1260             perms = r['x-object-sharing'].split(';')
1261             for perm in perms:
1262                 try:
1263                     perm.index('=')
1264                 except ValueError:
1265                     raise ClientError('Incorrect reply format')
1266                 (key, val) = perm.strip().split('=')
1267                 reply[key] = val
1268         return reply
1269
1270     def set_object_sharing(
1271             self, obj,
1272             read_permission=False, write_permission=False):
1273         """Give read/write permisions to an object.
1274
1275         :param obj: (str) remote object path
1276
1277         :param read_permission: (list - bool) users and user groups that get
1278             read permission for this object - False means all previous read
1279             permissions will be removed
1280
1281         :param write_permission: (list - bool) of users and user groups to get
1282            write permission for this object - False means all previous write
1283            permissions will be removed
1284
1285         :returns: (dict) response headers
1286         """
1287
1288         perms = dict(read=read_permission or '', write=write_permission or '')
1289         r = self.object_post(obj, update=True, permissions=perms)
1290         return r.headers
1291
1292     def del_object_sharing(self, obj):
1293         """
1294         :param obj: (str) remote object path
1295         """
1296         return self.set_object_sharing(obj)
1297
1298     def append_object(self, obj, source_file, upload_cb=None):
1299         """
1300         :param obj: (str) remote object path
1301
1302         :param source_file: open file descriptor
1303
1304         :param upload_db: progress.bar for uploading
1305         """
1306         self._assert_container()
1307         meta = self.get_container_info()
1308         blocksize = int(meta['x-container-block-size'])
1309         filesize = fstat(source_file.fileno()).st_size
1310         nblocks = 1 + (filesize - 1) // blocksize
1311         offset = 0
1312         headers = {}
1313         if upload_cb:
1314             self.progress_bar_gen = upload_cb(nblocks)
1315             self._cb_next()
1316         flying = {}
1317         self._init_thread_limit()
1318         try:
1319             for i in range(nblocks):
1320                 block = source_file.read(min(blocksize, filesize - offset))
1321                 offset += len(block)
1322
1323                 self._watch_thread_limit(flying.values())
1324                 unfinished = {}
1325                 flying[i] = SilentEvent(
1326                     method=self.object_post,
1327                     obj=obj,
1328                     update=True,
1329                     content_range='bytes */*',
1330                     content_type='application/octet-stream',
1331                     content_length=len(block),
1332                     data=block)
1333                 flying[i].start()
1334
1335                 for key, thread in flying.items():
1336                     if thread.isAlive():
1337                         if i < nblocks:
1338                             unfinished[key] = thread
1339                             continue
1340                         thread.join()
1341                     if thread.exception:
1342                         raise thread.exception
1343                     headers[key] = thread.value.headers
1344                     self._cb_next()
1345                 flying = unfinished
1346         except KeyboardInterrupt:
1347             sendlog.info('- - - wait for threads to finish')
1348             for thread in activethreads():
1349                 thread.join()
1350         finally:
1351             from time import sleep
1352             sleep(2 * len(activethreads()))
1353         return headers.values()
1354
1355     def truncate_object(self, obj, upto_bytes):
1356         """
1357         :param obj: (str) remote object path
1358
1359         :param upto_bytes: max number of bytes to leave on file
1360
1361         :returns: (dict) response headers
1362         """
1363         r = self.object_post(
1364             obj,
1365             update=True,
1366             content_range='bytes 0-%s/*' % upto_bytes,
1367             content_type='application/octet-stream',
1368             object_bytes=upto_bytes,
1369             source_object=path4url(self.container, obj))
1370         return r.headers
1371
1372     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1373         """Overwrite a part of an object from local source file
1374
1375         :param obj: (str) remote object path
1376
1377         :param start: (int) position in bytes to start overwriting from
1378
1379         :param end: (int) position in bytes to stop overwriting at
1380
1381         :param source_file: open file descriptor
1382
1383         :param upload_db: progress.bar for uploading
1384         """
1385
1386         r = self.get_object_info(obj)
1387         rf_size = int(r['content-length'])
1388         if rf_size < int(start):
1389             raise ClientError(
1390                 'Range start exceeds file size',
1391                 status=416)
1392         elif rf_size < int(end):
1393             raise ClientError(
1394                 'Range end exceeds file size',
1395                 status=416)
1396         self._assert_container()
1397         meta = self.get_container_info()
1398         blocksize = int(meta['x-container-block-size'])
1399         filesize = fstat(source_file.fileno()).st_size
1400         datasize = int(end) - int(start) + 1
1401         nblocks = 1 + (datasize - 1) // blocksize
1402         offset = 0
1403         if upload_cb:
1404             self.progress_bar_gen = upload_cb(nblocks)
1405             self._cb_next()
1406         headers = []
1407         for i in range(nblocks):
1408             read_size = min(blocksize, filesize - offset, datasize - offset)
1409             block = source_file.read(read_size)
1410             r = self.object_post(
1411                 obj,
1412                 update=True,
1413                 content_type='application/octet-stream',
1414                 content_length=len(block),
1415                 content_range='bytes %s-%s/*' % (
1416                     start + offset,
1417                     start + offset + len(block) - 1),
1418                 data=block)
1419             headers.append(dict(r.headers))
1420             offset += len(block)
1421
1422             self._cb_next
1423         return headers
1424
1425     def copy_object(
1426             self, src_container, src_object, dst_container,
1427             dst_object=None,
1428             source_version=None,
1429             source_account=None,
1430             public=False,
1431             content_type=None,
1432             delimiter=None):
1433         """
1434         :param src_container: (str) source container
1435
1436         :param src_object: (str) source object path
1437
1438         :param dst_container: (str) destination container
1439
1440         :param dst_object: (str) destination object path
1441
1442         :param source_version: (str) source object version
1443
1444         :param source_account: (str) account to copy from
1445
1446         :param public: (bool)
1447
1448         :param content_type: (str)
1449
1450         :param delimiter: (str)
1451
1452         :returns: (dict) response headers
1453         """
1454         self._assert_account()
1455         self.container = dst_container
1456         src_path = path4url(src_container, src_object)
1457         r = self.object_put(
1458             dst_object or src_object,
1459             success=201,
1460             copy_from=src_path,
1461             content_length=0,
1462             source_version=source_version,
1463             source_account=source_account,
1464             public=public,
1465             content_type=content_type,
1466             delimiter=delimiter)
1467         return r.headers
1468
1469     def move_object(
1470             self, src_container, src_object, dst_container,
1471             dst_object=False,
1472             source_account=None,
1473             source_version=None,
1474             public=False,
1475             content_type=None,
1476             delimiter=None):
1477         """
1478         :param src_container: (str) source container
1479
1480         :param src_object: (str) source object path
1481
1482         :param dst_container: (str) destination container
1483
1484         :param dst_object: (str) destination object path
1485
1486         :param source_account: (str) account to move from
1487
1488         :param source_version: (str) source object version
1489
1490         :param public: (bool)
1491
1492         :param content_type: (str)
1493
1494         :param delimiter: (str)
1495
1496         :returns: (dict) response headers
1497         """
1498         self._assert_account()
1499         self.container = dst_container
1500         dst_object = dst_object or src_object
1501         src_path = path4url(src_container, src_object)
1502         r = self.object_put(
1503             dst_object,
1504             success=201,
1505             move_from=src_path,
1506             content_length=0,
1507             source_account=source_account,
1508             source_version=source_version,
1509             public=public,
1510             content_type=content_type,
1511             delimiter=delimiter)
1512         return r.headers
1513
1514     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1515         """Get accounts that share with self.account
1516
1517         :param limit: (str)
1518
1519         :param marker: (str)
1520
1521         :returns: (dict)
1522         """
1523         self._assert_account()
1524
1525         self.set_param('format', 'json')
1526         self.set_param('limit', limit, iff=limit is not None)
1527         self.set_param('marker', marker, iff=marker is not None)
1528
1529         path = ''
1530         success = kwargs.pop('success', (200, 204))
1531         r = self.get(path, *args, success=success, **kwargs)
1532         return r.json
1533
1534     def get_object_versionlist(self, obj):
1535         """
1536         :param obj: (str) remote object path
1537
1538         :returns: (list)
1539         """
1540         self._assert_container()
1541         r = self.object_get(obj, format='json', version='list')
1542         return r.json['versions']