Keep objects content type by default
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, max_value, a_range):
56     """
57     :param start: (int) the window bottom
58
59     :param end: (int) the window top
60
61     :param max_value: (int) maximum accepted value
62
63     :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64         where X: x|x-y|-x where x < y and x, y natural numbers
65
66     :returns: (str) a range string cut-off for the start-end range
67         an empty response means this window is out of range
68     """
69     assert start >= 0, '_range_up called w. start(%s) < 0' % start
70     assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71         end, start)
72     assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73         max_value, end)
74     if not a_range:
75         return '%s-%s' % (start, end)
76     selected = []
77     for some_range in a_range.split(','):
78         v0, sep, v1 = some_range.partition('-')
79         if v0:
80             v0 = int(v0)
81             if sep:
82                 v1 = int(v1)
83                 if v1 < start or v0 > end or v1 < v0:
84                     continue
85                 v0 = v0 if v0 > start else start
86                 v1 = v1 if v1 < end else end
87                 selected.append('%s-%s' % (v0, v1))
88             elif v0 < start:
89                 continue
90             else:
91                 v1 = v0 if v0 <= end else end
92                 selected.append('%s-%s' % (start, v1))
93         else:
94             v1 = int(v1)
95             if max_value - v1 > end:
96                 continue
97             v0 = (max_value - v1) if max_value - v1 > start else start
98             selected.append('%s-%s' % (v0, end))
99     return ','.join(selected)
100
101
102 class PithosClient(PithosRestClient):
103     """Synnefo Pithos+ API client"""
104
105     def __init__(self, base_url, token, account=None, container=None):
106         super(PithosClient, self).__init__(base_url, token, account, container)
107
108     def create_container(
109             self,
110             container=None, sizelimit=None, versioning=None, metadata=None,
111             **kwargs):
112         """
113         :param container: (str) if not given, self.container is used instead
114
115         :param sizelimit: (int) container total size limit in bytes
116
117         :param versioning: (str) can be auto or whatever supported by server
118
119         :param metadata: (dict) Custom user-defined metadata of the form
120             { 'name1': 'value1', 'name2': 'value2', ... }
121
122         :returns: (dict) response headers
123         """
124         cnt_back_up = self.container
125         try:
126             self.container = container or cnt_back_up
127             r = self.container_put(
128                 quota=sizelimit, versioning=versioning, metadata=metadata,
129                 **kwargs)
130             return r.headers
131         finally:
132             self.container = cnt_back_up
133
134     def purge_container(self, container=None):
135         """Delete an empty container and destroy associated blocks"""
136         cnt_back_up = self.container
137         try:
138             self.container = container or cnt_back_up
139             r = self.container_delete(until=unicode(time()))
140         finally:
141             self.container = cnt_back_up
142         return r.headers
143
144     def upload_object_unchunked(
145             self, obj, f,
146             withHashFile=False,
147             size=None,
148             etag=None,
149             content_encoding=None,
150             content_disposition=None,
151             content_type=None,
152             sharing=None,
153             public=None):
154         """
155         :param obj: (str) remote object path
156
157         :param f: open file descriptor
158
159         :param withHashFile: (bool)
160
161         :param size: (int) size of data to upload
162
163         :param etag: (str)
164
165         :param content_encoding: (str)
166
167         :param content_disposition: (str)
168
169         :param content_type: (str)
170
171         :param sharing: {'read':[user and/or grp names],
172             'write':[usr and/or grp names]}
173
174         :param public: (bool)
175
176         :returns: (dict) created object metadata
177         """
178         self._assert_container()
179
180         if withHashFile:
181             data = f.read()
182             try:
183                 import json
184                 data = json.dumps(json.loads(data))
185             except ValueError:
186                 raise ClientError('"%s" is not json-formated' % f.name, 1)
187             except SyntaxError:
188                 msg = '"%s" is not a valid hashmap file' % f.name
189                 raise ClientError(msg, 1)
190             f = StringIO(data)
191         else:
192             data = readall(f, size) if size else f.read()
193         r = self.object_put(
194             obj,
195             data=data,
196             etag=etag,
197             content_encoding=content_encoding,
198             content_disposition=content_disposition,
199             content_type=content_type,
200             permissions=sharing,
201             public=public,
202             success=201)
203         return r.headers
204
205     def create_object_by_manifestation(
206             self, obj,
207             etag=None,
208             content_encoding=None,
209             content_disposition=None,
210             content_type=None,
211             sharing=None,
212             public=None):
213         """
214         :param obj: (str) remote object path
215
216         :param etag: (str)
217
218         :param content_encoding: (str)
219
220         :param content_disposition: (str)
221
222         :param content_type: (str)
223
224         :param sharing: {'read':[user and/or grp names],
225             'write':[usr and/or grp names]}
226
227         :param public: (bool)
228
229         :returns: (dict) created object metadata
230         """
231         self._assert_container()
232         r = self.object_put(
233             obj,
234             content_length=0,
235             etag=etag,
236             content_encoding=content_encoding,
237             content_disposition=content_disposition,
238             content_type=content_type,
239             permissions=sharing,
240             public=public,
241             manifest='%s/%s' % (self.container, obj))
242         return r.headers
243
244     # upload_* auxiliary methods
245     def _put_block_async(self, data, hash):
246         event = SilentEvent(method=self._put_block, data=data, hash=hash)
247         event.start()
248         return event
249
250     def _put_block(self, data, hash):
251         r = self.container_post(
252             update=True,
253             content_type='application/octet-stream',
254             content_length=len(data),
255             data=data,
256             format='json')
257         assert r.json[0] == hash, 'Local hash does not match server'
258
259     def _get_file_block_info(self, fileobj, size=None, cache=None):
260         """
261         :param fileobj: (file descriptor) source
262
263         :param size: (int) size of data to upload from source
264
265         :param cache: (dict) if provided, cache container info response to
266         avoid redundant calls
267         """
268         if isinstance(cache, dict):
269             try:
270                 meta = cache[self.container]
271             except KeyError:
272                 meta = self.get_container_info()
273                 cache[self.container] = meta
274         else:
275             meta = self.get_container_info()
276         blocksize = int(meta['x-container-block-size'])
277         blockhash = meta['x-container-block-hash']
278         size = size if size is not None else fstat(fileobj.fileno()).st_size
279         nblocks = 1 + (size - 1) // blocksize
280         return (blocksize, blockhash, size, nblocks)
281
282     def _create_object_or_get_missing_hashes(
283             self, obj, json,
284             size=None,
285             format='json',
286             hashmap=True,
287             content_type=None,
288             if_etag_match=None,
289             if_etag_not_match=None,
290             content_encoding=None,
291             content_disposition=None,
292             permissions=None,
293             public=None,
294             success=(201, 409)):
295         r = self.object_put(
296             obj,
297             format='json',
298             hashmap=True,
299             content_type=content_type,
300             json=json,
301             if_etag_match=if_etag_match,
302             if_etag_not_match=if_etag_not_match,
303             content_encoding=content_encoding,
304             content_disposition=content_disposition,
305             permissions=permissions,
306             public=public,
307             success=success)
308         return (None if r.status_code == 201 else r.json), r.headers
309
310     def _calculate_blocks_for_upload(
311             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
312             hash_cb=None):
313         offset = 0
314         if hash_cb:
315             hash_gen = hash_cb(nblocks)
316             hash_gen.next()
317
318         for i in xrange(nblocks):
319             block = readall(fileobj, min(blocksize, size - offset))
320             bytes = len(block)
321             if bytes <= 0:
322                 break
323             hash = _pithos_hash(block, blockhash)
324             hashes.append(hash)
325             hmap[hash] = (offset, bytes)
326             offset += bytes
327             if hash_cb:
328                 hash_gen.next()
329         msg = ('Failed to calculate uploading blocks: '
330                'read bytes(%s) != requested size (%s)' % (offset, size))
331         assert offset == size, msg
332
333     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
334         """upload missing blocks asynchronously"""
335
336         self._init_thread_limit()
337
338         flying = []
339         failures = []
340         for hash in missing:
341             offset, bytes = hmap[hash]
342             fileobj.seek(offset)
343             data = readall(fileobj, bytes)
344             r = self._put_block_async(data, hash)
345             flying.append(r)
346             unfinished = self._watch_thread_limit(flying)
347             for thread in set(flying).difference(unfinished):
348                 if thread.exception:
349                     failures.append(thread)
350                     if isinstance(
351                             thread.exception,
352                             ClientError) and thread.exception.status == 502:
353                         self.POOLSIZE = self._thread_limit
354                 elif thread.isAlive():
355                     flying.append(thread)
356                 elif upload_gen:
357                     try:
358                         upload_gen.next()
359                     except:
360                         pass
361             flying = unfinished
362
363         for thread in flying:
364             thread.join()
365             if thread.exception:
366                 failures.append(thread)
367             elif upload_gen:
368                 try:
369                     upload_gen.next()
370                 except:
371                     pass
372
373         return [failure.kwargs['hash'] for failure in failures]
374
375     def upload_object(
376             self, obj, f,
377             size=None,
378             hash_cb=None,
379             upload_cb=None,
380             etag=None,
381             if_etag_match=None,
382             if_not_exist=None,
383             content_encoding=None,
384             content_disposition=None,
385             content_type=None,
386             sharing=None,
387             public=None,
388             container_info_cache=None):
389         """Upload an object using multiple connections (threads)
390
391         :param obj: (str) remote object path
392
393         :param f: open file descriptor (rb)
394
395         :param hash_cb: optional progress.bar object for calculating hashes
396
397         :param upload_cb: optional progress.bar object for uploading
398
399         :param etag: (str)
400
401         :param if_etag_match: (str) Push that value to if-match header at file
402             creation
403
404         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
405             it does not exist remotely, otherwise the operation will fail.
406             Involves the case of an object with the same path is created while
407             the object is being uploaded.
408
409         :param content_encoding: (str)
410
411         :param content_disposition: (str)
412
413         :param content_type: (str)
414
415         :param sharing: {'read':[user and/or grp names],
416             'write':[usr and/or grp names]}
417
418         :param public: (bool)
419
420         :param container_info_cache: (dict) if given, avoid redundant calls to
421             server for container info (block size and hash information)
422         """
423         self._assert_container()
424
425         block_info = (
426             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
427                 f, size, container_info_cache)
428         (hashes, hmap, offset) = ([], {}, 0)
429         if not content_type:
430             content_type = 'application/octet-stream'
431
432         self._calculate_blocks_for_upload(
433             *block_info,
434             hashes=hashes,
435             hmap=hmap,
436             fileobj=f,
437             hash_cb=hash_cb)
438
439         hashmap = dict(bytes=size, hashes=hashes)
440         missing, obj_headers = self._create_object_or_get_missing_hashes(
441             obj, hashmap,
442             content_type=content_type,
443             size=size,
444             if_etag_match=if_etag_match,
445             if_etag_not_match='*' if if_not_exist else None,
446             content_encoding=content_encoding,
447             content_disposition=content_disposition,
448             permissions=sharing,
449             public=public)
450
451         if missing is None:
452             return obj_headers
453
454         if upload_cb:
455             upload_gen = upload_cb(len(missing))
456             for i in range(len(missing), len(hashmap['hashes']) + 1):
457                 try:
458                     upload_gen.next()
459                 except:
460                     upload_gen = None
461         else:
462             upload_gen = None
463
464         retries = 7
465         try:
466             while retries:
467                 sendlog.info('%s blocks missing' % len(missing))
468                 num_of_blocks = len(missing)
469                 missing = self._upload_missing_blocks(
470                     missing,
471                     hmap,
472                     f,
473                     upload_gen)
474                 if missing:
475                     if num_of_blocks == len(missing):
476                         retries -= 1
477                     else:
478                         num_of_blocks = len(missing)
479                 else:
480                     break
481             if missing:
482                 try:
483                     details = ['%s' % thread.exception for thread in missing]
484                 except Exception:
485                     details = ['Also, failed to read thread exceptions']
486                 raise ClientError(
487                     '%s blocks failed to upload' % len(missing),
488                     details=details)
489         except KeyboardInterrupt:
490             sendlog.info('- - - wait for threads to finish')
491             for thread in activethreads():
492                 thread.join()
493             raise
494
495         r = self.object_put(
496             obj,
497             format='json',
498             hashmap=True,
499             content_type=content_type,
500             content_encoding=content_encoding,
501             if_etag_match=if_etag_match,
502             if_etag_not_match='*' if if_not_exist else None,
503             etag=etag,
504             json=hashmap,
505             permissions=sharing,
506             public=public,
507             success=201)
508         return r.headers
509
510     def upload_from_string(
511             self, obj, input_str,
512             hash_cb=None,
513             upload_cb=None,
514             etag=None,
515             if_etag_match=None,
516             if_not_exist=None,
517             content_encoding=None,
518             content_disposition=None,
519             content_type=None,
520             sharing=None,
521             public=None,
522             container_info_cache=None):
523         """Upload an object using multiple connections (threads)
524
525         :param obj: (str) remote object path
526
527         :param input_str: (str) upload content
528
529         :param hash_cb: optional progress.bar object for calculating hashes
530
531         :param upload_cb: optional progress.bar object for uploading
532
533         :param etag: (str)
534
535         :param if_etag_match: (str) Push that value to if-match header at file
536             creation
537
538         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
539             it does not exist remotely, otherwise the operation will fail.
540             Involves the case of an object with the same path is created while
541             the object is being uploaded.
542
543         :param content_encoding: (str)
544
545         :param content_disposition: (str)
546
547         :param content_type: (str)
548
549         :param sharing: {'read':[user and/or grp names],
550             'write':[usr and/or grp names]}
551
552         :param public: (bool)
553
554         :param container_info_cache: (dict) if given, avoid redundant calls to
555             server for container info (block size and hash information)
556         """
557         self._assert_container()
558
559         blocksize, blockhash, size, nblocks = self._get_file_block_info(
560                 fileobj=None, size=len(input_str), cache=container_info_cache)
561         (hashes, hmap, offset) = ([], {}, 0)
562         if not content_type:
563             content_type = 'application/octet-stream'
564
565         hashes = []
566         hmap = {}
567         for blockid in range(nblocks):
568             start = blockid * blocksize
569             block = input_str[start: (start + blocksize)]
570             hashes.append(_pithos_hash(block, blockhash))
571             hmap[hashes[blockid]] = (start, block)
572
573         hashmap = dict(bytes=size, hashes=hashes)
574         missing, obj_headers = self._create_object_or_get_missing_hashes(
575             obj, hashmap,
576             content_type=content_type,
577             size=size,
578             if_etag_match=if_etag_match,
579             if_etag_not_match='*' if if_not_exist else None,
580             content_encoding=content_encoding,
581             content_disposition=content_disposition,
582             permissions=sharing,
583             public=public)
584         if missing is None:
585             return obj_headers
586         num_of_missing = len(missing)
587
588         if upload_cb:
589             self.progress_bar_gen = upload_cb(nblocks)
590             for i in range(nblocks + 1 - num_of_missing):
591                 self._cb_next()
592
593         tries = 7
594         old_failures = 0
595         try:
596             while tries and missing:
597                 flying = []
598                 failures = []
599                 for hash in missing:
600                     offset, block = hmap[hash]
601                     bird = self._put_block_async(block, hash)
602                     flying.append(bird)
603                     unfinished = self._watch_thread_limit(flying)
604                     for thread in set(flying).difference(unfinished):
605                         if thread.exception:
606                             failures.append(thread.kwargs['hash'])
607                         if thread.isAlive():
608                             flying.append(thread)
609                         else:
610                             self._cb_next()
611                     flying = unfinished
612                 for thread in flying:
613                     thread.join()
614                     if thread.exception:
615                         failures.append(thread.kwargs['hash'])
616                     self._cb_next()
617                 missing = failures
618                 if missing and len(missing) == old_failures:
619                     tries -= 1
620                 old_failures = len(missing)
621             if missing:
622                 raise ClientError(
623                     '%s blocks failed to upload' % len(missing),
624                     details=['%s' % thread.exception for thread in missing])
625         except KeyboardInterrupt:
626             sendlog.info('- - - wait for threads to finish')
627             for thread in activethreads():
628                 thread.join()
629             raise
630
631         r = self.object_put(
632             obj,
633             format='json',
634             hashmap=True,
635             content_type=content_type,
636             content_encoding=content_encoding,
637             if_etag_match=if_etag_match,
638             if_etag_not_match='*' if if_not_exist else None,
639             etag=etag,
640             json=hashmap,
641             permissions=sharing,
642             public=public,
643             success=201)
644         return r.headers
645
646     # download_* auxiliary methods
647     def _get_remote_blocks_info(self, obj, **restargs):
648         #retrieve object hashmap
649         myrange = restargs.pop('data_range', None)
650         hashmap = self.get_object_hashmap(obj, **restargs)
651         restargs['data_range'] = myrange
652         blocksize = int(hashmap['block_size'])
653         blockhash = hashmap['block_hash']
654         total_size = hashmap['bytes']
655         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
656         map_dict = {}
657         for i, h in enumerate(hashmap['hashes']):
658             #  map_dict[h] = i   CHAGE
659             if h in map_dict:
660                 map_dict[h].append(i)
661             else:
662                 map_dict[h] = [i]
663         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
664
665     def _dump_blocks_sync(
666             self, obj, remote_hashes, blocksize, total_size, dst, crange,
667             **args):
668         if not total_size:
669             return
670         for blockid, blockhash in enumerate(remote_hashes):
671             if blockhash:
672                 start = blocksize * blockid
673                 is_last = start + blocksize > total_size
674                 end = (total_size - 1) if is_last else (start + blocksize - 1)
675                 data_range = _range_up(start, end, total_size, crange)
676                 if not data_range:
677                     self._cb_next()
678                     continue
679                 args['data_range'] = 'bytes=%s' % data_range
680                 r = self.object_get(obj, success=(200, 206), **args)
681                 self._cb_next()
682                 dst.write(r.content)
683                 dst.flush()
684
685     def _get_block_async(self, obj, **args):
686         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
687         event.start()
688         return event
689
690     def _hash_from_file(self, fp, start, size, blockhash):
691         fp.seek(start)
692         block = readall(fp, size)
693         h = newhashlib(blockhash)
694         h.update(block.strip('\x00'))
695         return hexlify(h.digest())
696
697     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
698         """write the results of a greenleted rest call to a file
699
700         :param offset: the offset of the file up to blocksize
701         - e.g. if the range is 10-100, all blocks will be written to
702         normal_position - 10
703         """
704         for key, g in flying.items():
705             if g.isAlive():
706                 continue
707             if g.exception:
708                 raise g.exception
709             block = g.value.content
710             for block_start in blockids[key]:
711                 local_file.seek(block_start + offset)
712                 local_file.write(block)
713                 self._cb_next()
714             flying.pop(key)
715             blockids.pop(key)
716         local_file.flush()
717
718     def _dump_blocks_async(
719             self, obj, remote_hashes, blocksize, total_size, local_file,
720             blockhash=None, resume=False, filerange=None, **restargs):
721         file_size = fstat(local_file.fileno()).st_size if resume else 0
722         flying = dict()
723         blockid_dict = dict()
724         offset = 0
725
726         self._init_thread_limit()
727         for block_hash, blockids in remote_hashes.items():
728             blockids = [blk * blocksize for blk in blockids]
729             unsaved = [blk for blk in blockids if not (
730                 blk < file_size and block_hash == self._hash_from_file(
731                         local_file, blk, blocksize, blockhash))]
732             self._cb_next(len(blockids) - len(unsaved))
733             if unsaved:
734                 key = unsaved[0]
735                 self._watch_thread_limit(flying.values())
736                 self._thread2file(
737                     flying, blockid_dict, local_file, offset,
738                     **restargs)
739                 end = total_size - 1 if (
740                     key + blocksize > total_size) else key + blocksize - 1
741                 if end < key:
742                     self._cb_next()
743                     continue
744                 data_range = _range_up(key, end, total_size, filerange)
745                 if not data_range:
746                     self._cb_next()
747                     continue
748                 restargs[
749                     'async_headers'] = {'Range': 'bytes=%s' % data_range}
750                 flying[key] = self._get_block_async(obj, **restargs)
751                 blockid_dict[key] = unsaved
752
753         for thread in flying.values():
754             thread.join()
755         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
756
757     def download_object(
758             self, obj, dst,
759             download_cb=None,
760             version=None,
761             resume=False,
762             range_str=None,
763             if_match=None,
764             if_none_match=None,
765             if_modified_since=None,
766             if_unmodified_since=None):
767         """Download an object (multiple connections, random blocks)
768
769         :param obj: (str) remote object path
770
771         :param dst: open file descriptor (wb+)
772
773         :param download_cb: optional progress.bar object for downloading
774
775         :param version: (str) file version
776
777         :param resume: (bool) if set, preserve already downloaded file parts
778
779         :param range_str: (str) from, to are file positions (int) in bytes
780
781         :param if_match: (str)
782
783         :param if_none_match: (str)
784
785         :param if_modified_since: (str) formated date
786
787         :param if_unmodified_since: (str) formated date"""
788         restargs = dict(
789             version=version,
790             data_range=None if range_str is None else 'bytes=%s' % range_str,
791             if_match=if_match,
792             if_none_match=if_none_match,
793             if_modified_since=if_modified_since,
794             if_unmodified_since=if_unmodified_since)
795
796         (
797             blocksize,
798             blockhash,
799             total_size,
800             hash_list,
801             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
802         assert total_size >= 0
803
804         if download_cb:
805             self.progress_bar_gen = download_cb(len(hash_list))
806             self._cb_next()
807
808         if dst.isatty():
809             self._dump_blocks_sync(
810                 obj,
811                 hash_list,
812                 blocksize,
813                 total_size,
814                 dst,
815                 range_str,
816                 **restargs)
817         else:
818             self._dump_blocks_async(
819                 obj,
820                 remote_hashes,
821                 blocksize,
822                 total_size,
823                 dst,
824                 blockhash,
825                 resume,
826                 range_str,
827                 **restargs)
828             if not range_str:
829                 dst.truncate(total_size)
830
831         self._complete_cb()
832
833     def download_to_string(
834             self, obj,
835             download_cb=None,
836             version=None,
837             range_str=None,
838             if_match=None,
839             if_none_match=None,
840             if_modified_since=None,
841             if_unmodified_since=None):
842         """Download an object to a string (multiple connections). This method
843         uses threads for http requests, but stores all content in memory.
844
845         :param obj: (str) remote object path
846
847         :param download_cb: optional progress.bar object for downloading
848
849         :param version: (str) file version
850
851         :param range_str: (str) from, to are file positions (int) in bytes
852
853         :param if_match: (str)
854
855         :param if_none_match: (str)
856
857         :param if_modified_since: (str) formated date
858
859         :param if_unmodified_since: (str) formated date
860
861         :returns: (str) the whole object contents
862         """
863         restargs = dict(
864             version=version,
865             data_range=None if range_str is None else 'bytes=%s' % range_str,
866             if_match=if_match,
867             if_none_match=if_none_match,
868             if_modified_since=if_modified_since,
869             if_unmodified_since=if_unmodified_since)
870
871         (
872             blocksize,
873             blockhash,
874             total_size,
875             hash_list,
876             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
877         assert total_size >= 0
878
879         if download_cb:
880             self.progress_bar_gen = download_cb(len(hash_list))
881             self._cb_next()
882
883         num_of_blocks = len(remote_hashes)
884         ret = [''] * num_of_blocks
885         self._init_thread_limit()
886         flying = dict()
887         try:
888             for blockid, blockhash in enumerate(remote_hashes):
889                 start = blocksize * blockid
890                 is_last = start + blocksize > total_size
891                 end = (total_size - 1) if is_last else (start + blocksize - 1)
892                 data_range_str = _range_up(start, end, end, range_str)
893                 if data_range_str:
894                     self._watch_thread_limit(flying.values())
895                     restargs['data_range'] = 'bytes=%s' % data_range_str
896                     flying[blockid] = self._get_block_async(obj, **restargs)
897                 for runid, thread in flying.items():
898                     if (blockid + 1) == num_of_blocks:
899                         thread.join()
900                     elif thread.isAlive():
901                         continue
902                     if thread.exception:
903                         raise thread.exception
904                     ret[runid] = thread.value.content
905                     self._cb_next()
906                     flying.pop(runid)
907             return ''.join(ret)
908         except KeyboardInterrupt:
909             sendlog.info('- - - wait for threads to finish')
910             for thread in activethreads():
911                 thread.join()
912
913     #Command Progress Bar method
914     def _cb_next(self, step=1):
915         if hasattr(self, 'progress_bar_gen'):
916             try:
917                 for i in xrange(step):
918                     self.progress_bar_gen.next()
919             except:
920                 pass
921
922     def _complete_cb(self):
923         while True:
924             try:
925                 self.progress_bar_gen.next()
926             except:
927                 break
928
929     def get_object_hashmap(
930             self, obj,
931             version=None,
932             if_match=None,
933             if_none_match=None,
934             if_modified_since=None,
935             if_unmodified_since=None):
936         """
937         :param obj: (str) remote object path
938
939         :param if_match: (str)
940
941         :param if_none_match: (str)
942
943         :param if_modified_since: (str) formated date
944
945         :param if_unmodified_since: (str) formated date
946
947         :returns: (list)
948         """
949         try:
950             r = self.object_get(
951                 obj,
952                 hashmap=True,
953                 version=version,
954                 if_etag_match=if_match,
955                 if_etag_not_match=if_none_match,
956                 if_modified_since=if_modified_since,
957                 if_unmodified_since=if_unmodified_since)
958         except ClientError as err:
959             if err.status == 304 or err.status == 412:
960                 return {}
961             raise
962         return r.json
963
964     def set_account_group(self, group, usernames):
965         """
966         :param group: (str)
967
968         :param usernames: (list)
969         """
970         r = self.account_post(update=True, groups={group: usernames})
971         return r
972
973     def del_account_group(self, group):
974         """
975         :param group: (str)
976         """
977         self.account_post(update=True, groups={group: []})
978
979     def get_account_info(self, until=None):
980         """
981         :param until: (str) formated date
982
983         :returns: (dict)
984         """
985         r = self.account_head(until=until)
986         if r.status_code == 401:
987             raise ClientError("No authorization", status=401)
988         return r.headers
989
990     def get_account_quota(self):
991         """
992         :returns: (dict)
993         """
994         return filter_in(
995             self.get_account_info(),
996             'X-Account-Policy-Quota',
997             exactMatch=True)
998
999     #def get_account_versioning(self):
1000     #    """
1001     #    :returns: (dict)
1002     #    """
1003     #    return filter_in(
1004     #        self.get_account_info(),
1005     #        'X-Account-Policy-Versioning',
1006     #        exactMatch=True)
1007
1008     def get_account_meta(self, until=None):
1009         """
1010         :param until: (str) formated date
1011
1012         :returns: (dict)
1013         """
1014         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1015
1016     def get_account_group(self):
1017         """
1018         :returns: (dict)
1019         """
1020         return filter_in(self.get_account_info(), 'X-Account-Group-')
1021
1022     def set_account_meta(self, metapairs):
1023         """
1024         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1025         """
1026         assert(type(metapairs) is dict)
1027         r = self.account_post(update=True, metadata=metapairs)
1028         return r.headers
1029
1030     def del_account_meta(self, metakey):
1031         """
1032         :param metakey: (str) metadatum key
1033         """
1034         r = self.account_post(update=True, metadata={metakey: ''})
1035         return r.headers
1036
1037     #def set_account_quota(self, quota):
1038     #    """
1039     #    :param quota: (int)
1040     #    """
1041     #    self.account_post(update=True, quota=quota)
1042
1043     #def set_account_versioning(self, versioning):
1044     #    """
1045     #    :param versioning: (str)
1046     #    """
1047     #    r = self.account_post(update=True, versioning=versioning)
1048     #    return r.headers
1049
1050     def list_containers(self):
1051         """
1052         :returns: (dict)
1053         """
1054         r = self.account_get()
1055         return r.json
1056
1057     def del_container(self, until=None, delimiter=None):
1058         """
1059         :param until: (str) formated date
1060
1061         :param delimiter: (str) with / empty container
1062
1063         :raises ClientError: 404 Container does not exist
1064
1065         :raises ClientError: 409 Container is not empty
1066         """
1067         self._assert_container()
1068         r = self.container_delete(
1069             until=until,
1070             delimiter=delimiter,
1071             success=(204, 404, 409))
1072         if r.status_code == 404:
1073             raise ClientError(
1074                 'Container "%s" does not exist' % self.container,
1075                 r.status_code)
1076         elif r.status_code == 409:
1077             raise ClientError(
1078                 'Container "%s" is not empty' % self.container,
1079                 r.status_code)
1080         return r.headers
1081
1082     def get_container_versioning(self, container=None):
1083         """
1084         :param container: (str)
1085
1086         :returns: (dict)
1087         """
1088         cnt_back_up = self.container
1089         try:
1090             self.container = container or cnt_back_up
1091             return filter_in(
1092                 self.get_container_info(),
1093                 'X-Container-Policy-Versioning')
1094         finally:
1095             self.container = cnt_back_up
1096
1097     def get_container_limit(self, container=None):
1098         """
1099         :param container: (str)
1100
1101         :returns: (dict)
1102         """
1103         cnt_back_up = self.container
1104         try:
1105             self.container = container or cnt_back_up
1106             return filter_in(
1107                 self.get_container_info(),
1108                 'X-Container-Policy-Quota')
1109         finally:
1110             self.container = cnt_back_up
1111
1112     def get_container_info(self, container=None, until=None):
1113         """
1114         :param until: (str) formated date
1115
1116         :returns: (dict)
1117
1118         :raises ClientError: 404 Container not found
1119         """
1120         bck_cont = self.container
1121         try:
1122             self.container = container or bck_cont
1123             self._assert_container()
1124             r = self.container_head(until=until)
1125         except ClientError as err:
1126             err.details.append('for container %s' % self.container)
1127             raise err
1128         finally:
1129             self.container = bck_cont
1130         return r.headers
1131
1132     def get_container_meta(self, until=None):
1133         """
1134         :param until: (str) formated date
1135
1136         :returns: (dict)
1137         """
1138         return filter_in(
1139             self.get_container_info(until=until), 'X-Container-Meta')
1140
1141     def get_container_object_meta(self, until=None):
1142         """
1143         :param until: (str) formated date
1144
1145         :returns: (dict)
1146         """
1147         return filter_in(
1148             self.get_container_info(until=until), 'X-Container-Object-Meta')
1149
1150     def set_container_meta(self, metapairs):
1151         """
1152         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1153         """
1154         assert(type(metapairs) is dict)
1155         r = self.container_post(update=True, metadata=metapairs)
1156         return r.headers
1157
1158     def del_container_meta(self, metakey):
1159         """
1160         :param metakey: (str) metadatum key
1161
1162         :returns: (dict) response headers
1163         """
1164         r = self.container_post(update=True, metadata={metakey: ''})
1165         return r.headers
1166
1167     def set_container_limit(self, limit):
1168         """
1169         :param limit: (int)
1170         """
1171         r = self.container_post(update=True, quota=limit)
1172         return r.headers
1173
1174     def set_container_versioning(self, versioning):
1175         """
1176         :param versioning: (str)
1177         """
1178         r = self.container_post(update=True, versioning=versioning)
1179         return r.headers
1180
1181     def del_object(self, obj, until=None, delimiter=None):
1182         """
1183         :param obj: (str) remote object path
1184
1185         :param until: (str) formated date
1186
1187         :param delimiter: (str)
1188         """
1189         self._assert_container()
1190         r = self.object_delete(obj, until=until, delimiter=delimiter)
1191         return r.headers
1192
1193     def set_object_meta(self, obj, metapairs):
1194         """
1195         :param obj: (str) remote object path
1196
1197         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1198         """
1199         assert(type(metapairs) is dict)
1200         r = self.object_post(obj, update=True, metadata=metapairs)
1201         return r.headers
1202
1203     def del_object_meta(self, obj, metakey):
1204         """
1205         :param obj: (str) remote object path
1206
1207         :param metakey: (str) metadatum key
1208         """
1209         r = self.object_post(obj, update=True, metadata={metakey: ''})
1210         return r.headers
1211
1212     def publish_object(self, obj):
1213         """
1214         :param obj: (str) remote object path
1215
1216         :returns: (str) access url
1217         """
1218         self.object_post(obj, update=True, public=True)
1219         info = self.get_object_info(obj)
1220         return info['x-object-public']
1221         pref, sep, rest = self.base_url.partition('//')
1222         base = rest.split('/')[0]
1223         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1224
1225     def unpublish_object(self, obj):
1226         """
1227         :param obj: (str) remote object path
1228         """
1229         r = self.object_post(obj, update=True, public=False)
1230         return r.headers
1231
1232     def get_object_info(self, obj, version=None):
1233         """
1234         :param obj: (str) remote object path
1235
1236         :param version: (str)
1237
1238         :returns: (dict)
1239         """
1240         try:
1241             r = self.object_head(obj, version=version)
1242             return r.headers
1243         except ClientError as ce:
1244             if ce.status == 404:
1245                 raise ClientError('Object %s not found' % obj, status=404)
1246             raise
1247
1248     def get_object_meta(self, obj, version=None):
1249         """
1250         :param obj: (str) remote object path
1251
1252         :param version: (str)
1253
1254         :returns: (dict)
1255         """
1256         return filter_in(
1257             self.get_object_info(obj, version=version),
1258             'X-Object-Meta')
1259
1260     def get_object_sharing(self, obj):
1261         """
1262         :param obj: (str) remote object path
1263
1264         :returns: (dict)
1265         """
1266         r = filter_in(
1267             self.get_object_info(obj),
1268             'X-Object-Sharing',
1269             exactMatch=True)
1270         reply = {}
1271         if len(r) > 0:
1272             perms = r['x-object-sharing'].split(';')
1273             for perm in perms:
1274                 try:
1275                     perm.index('=')
1276                 except ValueError:
1277                     raise ClientError('Incorrect reply format')
1278                 (key, val) = perm.strip().split('=')
1279                 reply[key] = val
1280         return reply
1281
1282     def set_object_sharing(
1283             self, obj,
1284             read_permission=False, write_permission=False):
1285         """Give read/write permisions to an object.
1286
1287         :param obj: (str) remote object path
1288
1289         :param read_permission: (list - bool) users and user groups that get
1290             read permission for this object - False means all previous read
1291             permissions will be removed
1292
1293         :param write_permission: (list - bool) of users and user groups to get
1294            write permission for this object - False means all previous write
1295            permissions will be removed
1296
1297         :returns: (dict) response headers
1298         """
1299
1300         perms = dict(read=read_permission or '', write=write_permission or '')
1301         r = self.object_post(obj, update=True, permissions=perms)
1302         return r.headers
1303
1304     def del_object_sharing(self, obj):
1305         """
1306         :param obj: (str) remote object path
1307         """
1308         return self.set_object_sharing(obj)
1309
1310     def append_object(self, obj, source_file, upload_cb=None):
1311         """
1312         :param obj: (str) remote object path
1313
1314         :param source_file: open file descriptor
1315
1316         :param upload_db: progress.bar for uploading
1317         """
1318         self._assert_container()
1319         meta = self.get_container_info()
1320         blocksize = int(meta['x-container-block-size'])
1321         filesize = fstat(source_file.fileno()).st_size
1322         nblocks = 1 + (filesize - 1) // blocksize
1323         offset = 0
1324         headers = {}
1325         if upload_cb:
1326             self.progress_bar_gen = upload_cb(nblocks)
1327             self._cb_next()
1328         flying = {}
1329         self._init_thread_limit()
1330         try:
1331             for i in range(nblocks):
1332                 block = source_file.read(min(blocksize, filesize - offset))
1333                 offset += len(block)
1334
1335                 self._watch_thread_limit(flying.values())
1336                 unfinished = {}
1337                 flying[i] = SilentEvent(
1338                     method=self.object_post,
1339                     obj=obj,
1340                     update=True,
1341                     content_range='bytes */*',
1342                     content_type='application/octet-stream',
1343                     content_length=len(block),
1344                     data=block)
1345                 flying[i].start()
1346
1347                 for key, thread in flying.items():
1348                     if thread.isAlive():
1349                         if i < nblocks:
1350                             unfinished[key] = thread
1351                             continue
1352                         thread.join()
1353                     if thread.exception:
1354                         raise thread.exception
1355                     headers[key] = thread.value.headers
1356                     self._cb_next()
1357                 flying = unfinished
1358         except KeyboardInterrupt:
1359             sendlog.info('- - - wait for threads to finish')
1360             for thread in activethreads():
1361                 thread.join()
1362         finally:
1363             from time import sleep
1364             sleep(2 * len(activethreads()))
1365         return headers.values()
1366
1367     def truncate_object(self, obj, upto_bytes):
1368         """
1369         :param obj: (str) remote object path
1370
1371         :param upto_bytes: max number of bytes to leave on file
1372
1373         :returns: (dict) response headers
1374         """
1375         ctype = self.get_object_info(obj)['content-type']
1376         r = self.object_post(
1377             obj,
1378             update=True,
1379             content_range='bytes 0-%s/*' % upto_bytes,
1380             content_type=ctype,
1381             object_bytes=upto_bytes,
1382             source_object=path4url(self.container, obj))
1383         return r.headers
1384
1385     def overwrite_object(
1386             self, obj, start, end, source_file,
1387             content_type=None, upload_cb=None):
1388         """Overwrite a part of an object from local source file
1389
1390         :param obj: (str) remote object path
1391
1392         :param start: (int) position in bytes to start overwriting from
1393
1394         :param end: (int) position in bytes to stop overwriting at
1395
1396         :param source_file: open file descriptor
1397
1398         :param content_type: (str) default: application/octet-stream
1399
1400         :param upload_db: progress.bar for uploading
1401         """
1402
1403         r = self.get_object_info(obj)
1404         rf_size = int(r['content-length'])
1405         if rf_size < int(start):
1406             raise ClientError(
1407                 'Range start exceeds file size',
1408                 status=416)
1409         elif rf_size < int(end):
1410             raise ClientError(
1411                 'Range end exceeds file size',
1412                 status=416)
1413         self._assert_container()
1414         meta = self.get_container_info()
1415         blocksize = int(meta['x-container-block-size'])
1416         filesize = fstat(source_file.fileno()).st_size
1417         datasize = int(end) - int(start) + 1
1418         nblocks = 1 + (datasize - 1) // blocksize
1419         offset = 0
1420         if upload_cb:
1421             self.progress_bar_gen = upload_cb(nblocks)
1422             self._cb_next()
1423         headers = []
1424         for i in range(nblocks):
1425             read_size = min(blocksize, filesize - offset, datasize - offset)
1426             block = source_file.read(read_size)
1427             r = self.object_post(
1428                 obj,
1429                 update=True,
1430                 content_type=content_type or 'application/octet-stream',
1431                 content_length=len(block),
1432                 content_range='bytes %s-%s/*' % (
1433                     start + offset,
1434                     start + offset + len(block) - 1),
1435                 data=block)
1436             headers.append(dict(r.headers))
1437             offset += len(block)
1438
1439             self._cb_next
1440         return headers
1441
1442     def copy_object(
1443             self, src_container, src_object, dst_container,
1444             dst_object=None,
1445             source_version=None,
1446             source_account=None,
1447             public=False,
1448             content_type=None,
1449             delimiter=None):
1450         """
1451         :param src_container: (str) source container
1452
1453         :param src_object: (str) source object path
1454
1455         :param dst_container: (str) destination container
1456
1457         :param dst_object: (str) destination object path
1458
1459         :param source_version: (str) source object version
1460
1461         :param source_account: (str) account to copy from
1462
1463         :param public: (bool)
1464
1465         :param content_type: (str)
1466
1467         :param delimiter: (str)
1468
1469         :returns: (dict) response headers
1470         """
1471         self._assert_account()
1472         self.container = dst_container
1473         src_path = path4url(src_container, src_object)
1474         r = self.object_put(
1475             dst_object or src_object,
1476             success=201,
1477             copy_from=src_path,
1478             content_length=0,
1479             source_version=source_version,
1480             source_account=source_account,
1481             public=public,
1482             content_type=content_type,
1483             delimiter=delimiter)
1484         return r.headers
1485
1486     def move_object(
1487             self, src_container, src_object, dst_container,
1488             dst_object=False,
1489             source_account=None,
1490             source_version=None,
1491             public=False,
1492             content_type=None,
1493             delimiter=None):
1494         """
1495         :param src_container: (str) source container
1496
1497         :param src_object: (str) source object path
1498
1499         :param dst_container: (str) destination container
1500
1501         :param dst_object: (str) destination object path
1502
1503         :param source_account: (str) account to move from
1504
1505         :param source_version: (str) source object version
1506
1507         :param public: (bool)
1508
1509         :param content_type: (str)
1510
1511         :param delimiter: (str)
1512
1513         :returns: (dict) response headers
1514         """
1515         self._assert_account()
1516         self.container = dst_container
1517         dst_object = dst_object or src_object
1518         src_path = path4url(src_container, src_object)
1519         r = self.object_put(
1520             dst_object,
1521             success=201,
1522             move_from=src_path,
1523             content_length=0,
1524             source_account=source_account,
1525             source_version=source_version,
1526             public=public,
1527             content_type=content_type,
1528             delimiter=delimiter)
1529         return r.headers
1530
1531     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1532         """Get accounts that share with self.account
1533
1534         :param limit: (str)
1535
1536         :param marker: (str)
1537
1538         :returns: (dict)
1539         """
1540         self._assert_account()
1541
1542         self.set_param('format', 'json')
1543         self.set_param('limit', limit, iff=limit is not None)
1544         self.set_param('marker', marker, iff=marker is not None)
1545
1546         path = ''
1547         success = kwargs.pop('success', (200, 204))
1548         r = self.get(path, *args, success=success, **kwargs)
1549         return r.json
1550
1551     def get_object_versionlist(self, obj):
1552         """
1553         :param obj: (str) remote object path
1554
1555         :returns: (list)
1556         """
1557         self._assert_container()
1558         r = self.object_get(obj, format='json', version='list')
1559         return r.json['versions']