Allow upload_from_string to use content-encoding
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def create_container(
75             self,
76             container=None, sizelimit=None, versioning=None, metadata=None):
77         """
78         :param container: (str) if not given, self.container is used instead
79
80         :param sizelimit: (int) container total size limit in bytes
81
82         :param versioning: (str) can be auto or whatever supported by server
83
84         :param metadata: (dict) Custom user-defined metadata of the form
85             { 'name1': 'value1', 'name2': 'value2', ... }
86
87         :returns: (dict) response headers
88         """
89         cnt_back_up = self.container
90         try:
91             self.container = container or cnt_back_up
92             r = self.container_put(
93                 quota=sizelimit, versioning=versioning, metadata=metadata)
94             return r.headers
95         finally:
96             self.container = cnt_back_up
97
98     def purge_container(self, container=None):
99         """Delete an empty container and destroy associated blocks
100         """
101         cnt_back_up = self.container
102         try:
103             self.container = container or cnt_back_up
104             r = self.container_delete(until=unicode(time()))
105         finally:
106             self.container = cnt_back_up
107         return r.headers
108
109     def upload_object_unchunked(
110             self, obj, f,
111             withHashFile=False,
112             size=None,
113             etag=None,
114             content_encoding=None,
115             content_disposition=None,
116             content_type=None,
117             sharing=None,
118             public=None):
119         """
120         :param obj: (str) remote object path
121
122         :param f: open file descriptor
123
124         :param withHashFile: (bool)
125
126         :param size: (int) size of data to upload
127
128         :param etag: (str)
129
130         :param content_encoding: (str)
131
132         :param content_disposition: (str)
133
134         :param content_type: (str)
135
136         :param sharing: {'read':[user and/or grp names],
137             'write':[usr and/or grp names]}
138
139         :param public: (bool)
140
141         :returns: (dict) created object metadata
142         """
143         self._assert_container()
144
145         if withHashFile:
146             data = f.read()
147             try:
148                 import json
149                 data = json.dumps(json.loads(data))
150             except ValueError:
151                 raise ClientError('"%s" is not json-formated' % f.name, 1)
152             except SyntaxError:
153                 msg = '"%s" is not a valid hashmap file' % f.name
154                 raise ClientError(msg, 1)
155             f = StringIO(data)
156         else:
157             data = f.read(size) if size else f.read()
158         r = self.object_put(
159             obj,
160             data=data,
161             etag=etag,
162             content_encoding=content_encoding,
163             content_disposition=content_disposition,
164             content_type=content_type,
165             permissions=sharing,
166             public=public,
167             success=201)
168         return r.headers
169
170     def create_object_by_manifestation(
171             self, obj,
172             etag=None,
173             content_encoding=None,
174             content_disposition=None,
175             content_type=None,
176             sharing=None,
177             public=None):
178         """
179         :param obj: (str) remote object path
180
181         :param etag: (str)
182
183         :param content_encoding: (str)
184
185         :param content_disposition: (str)
186
187         :param content_type: (str)
188
189         :param sharing: {'read':[user and/or grp names],
190             'write':[usr and/or grp names]}
191
192         :param public: (bool)
193
194         :returns: (dict) created object metadata
195         """
196         self._assert_container()
197         r = self.object_put(
198             obj,
199             content_length=0,
200             etag=etag,
201             content_encoding=content_encoding,
202             content_disposition=content_disposition,
203             content_type=content_type,
204             permissions=sharing,
205             public=public,
206             manifest='%s/%s' % (self.container, obj))
207         return r.headers
208
209     # upload_* auxiliary methods
210     def _put_block_async(self, data, hash):
211         event = SilentEvent(method=self._put_block, data=data, hash=hash)
212         event.start()
213         return event
214
215     def _put_block(self, data, hash):
216         r = self.container_post(
217             update=True,
218             content_type='application/octet-stream',
219             content_length=len(data),
220             data=data,
221             format='json')
222         assert r.json[0] == hash, 'Local hash does not match server'
223
224     def _get_file_block_info(self, fileobj, size=None, cache=None):
225         """
226         :param fileobj: (file descriptor) source
227
228         :param size: (int) size of data to upload from source
229
230         :param cache: (dict) if provided, cache container info response to
231         avoid redundant calls
232         """
233         if isinstance(cache, dict):
234             try:
235                 meta = cache[self.container]
236             except KeyError:
237                 meta = self.get_container_info()
238                 cache[self.container] = meta
239         else:
240             meta = self.get_container_info()
241         blocksize = int(meta['x-container-block-size'])
242         blockhash = meta['x-container-block-hash']
243         size = size if size is not None else fstat(fileobj.fileno()).st_size
244         nblocks = 1 + (size - 1) // blocksize
245         return (blocksize, blockhash, size, nblocks)
246
247     def _create_object_or_get_missing_hashes(
248             self, obj, json,
249             size=None,
250             format='json',
251             hashmap=True,
252             content_type=None,
253             if_etag_match=None,
254             if_etag_not_match=None,
255             content_encoding=None,
256             content_disposition=None,
257             permissions=None,
258             public=None,
259             success=(201, 409)):
260         r = self.object_put(
261             obj,
262             format='json',
263             hashmap=True,
264             content_type=content_type,
265             json=json,
266             if_etag_match=if_etag_match,
267             if_etag_not_match=if_etag_not_match,
268             content_encoding=content_encoding,
269             content_disposition=content_disposition,
270             permissions=permissions,
271             public=public,
272             success=success)
273         return (None if r.status_code == 201 else r.json), r.headers
274
275     def _calculate_blocks_for_upload(
276             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277             hash_cb=None):
278         offset = 0
279         if hash_cb:
280             hash_gen = hash_cb(nblocks)
281             hash_gen.next()
282
283         for i in range(nblocks):
284             block = fileobj.read(min(blocksize, size - offset))
285             bytes = len(block)
286             hash = _pithos_hash(block, blockhash)
287             hashes.append(hash)
288             hmap[hash] = (offset, bytes)
289             offset += bytes
290             if hash_cb:
291                 hash_gen.next()
292         msg = 'Failed to calculate uploaded blocks:'
293         ' Offset and object size do not match'
294         assert offset == size, msg
295
296     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297         """upload missing blocks asynchronously"""
298
299         self._init_thread_limit()
300
301         flying = []
302         failures = []
303         for hash in missing:
304             offset, bytes = hmap[hash]
305             fileobj.seek(offset)
306             data = fileobj.read(bytes)
307             r = self._put_block_async(data, hash)
308             flying.append(r)
309             unfinished = self._watch_thread_limit(flying)
310             for thread in set(flying).difference(unfinished):
311                 if thread.exception:
312                     failures.append(thread)
313                     if isinstance(
314                             thread.exception,
315                             ClientError) and thread.exception.status == 502:
316                         self.POOLSIZE = self._thread_limit
317                 elif thread.isAlive():
318                     flying.append(thread)
319                 elif upload_gen:
320                     try:
321                         upload_gen.next()
322                     except:
323                         pass
324             flying = unfinished
325
326         for thread in flying:
327             thread.join()
328             if thread.exception:
329                 failures.append(thread)
330             elif upload_gen:
331                 try:
332                     upload_gen.next()
333                 except:
334                     pass
335
336         return [failure.kwargs['hash'] for failure in failures]
337
338     def upload_object(
339             self, obj, f,
340             size=None,
341             hash_cb=None,
342             upload_cb=None,
343             etag=None,
344             if_etag_match=None,
345             if_not_exist=None,
346             content_encoding=None,
347             content_disposition=None,
348             content_type=None,
349             sharing=None,
350             public=None,
351             container_info_cache=None):
352         """Upload an object using multiple connections (threads)
353
354         :param obj: (str) remote object path
355
356         :param f: open file descriptor (rb)
357
358         :param hash_cb: optional progress.bar object for calculating hashes
359
360         :param upload_cb: optional progress.bar object for uploading
361
362         :param etag: (str)
363
364         :param if_etag_match: (str) Push that value to if-match header at file
365             creation
366
367         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368             it does not exist remotely, otherwise the operation will fail.
369             Involves the case of an object with the same path is created while
370             the object is being uploaded.
371
372         :param content_encoding: (str)
373
374         :param content_disposition: (str)
375
376         :param content_type: (str)
377
378         :param sharing: {'read':[user and/or grp names],
379             'write':[usr and/or grp names]}
380
381         :param public: (bool)
382
383         :param container_info_cache: (dict) if given, avoid redundant calls to
384             server for container info (block size and hash information)
385         """
386         self._assert_container()
387
388         block_info = (
389             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390                 f, size, container_info_cache)
391         (hashes, hmap, offset) = ([], {}, 0)
392         if not content_type:
393             content_type = 'application/octet-stream'
394
395         self._calculate_blocks_for_upload(
396             *block_info,
397             hashes=hashes,
398             hmap=hmap,
399             fileobj=f,
400             hash_cb=hash_cb)
401
402         hashmap = dict(bytes=size, hashes=hashes)
403         missing, obj_headers = self._create_object_or_get_missing_hashes(
404             obj, hashmap,
405             content_type=content_type,
406             size=size,
407             if_etag_match=if_etag_match,
408             if_etag_not_match='*' if if_not_exist else None,
409             content_encoding=content_encoding,
410             content_disposition=content_disposition,
411             permissions=sharing,
412             public=public)
413
414         if missing is None:
415             return obj_headers
416
417         if upload_cb:
418             upload_gen = upload_cb(len(missing))
419             for i in range(len(missing), len(hashmap['hashes']) + 1):
420                 try:
421                     upload_gen.next()
422                 except:
423                     upload_gen = None
424         else:
425             upload_gen = None
426
427         retries = 7
428         try:
429             while retries:
430                 sendlog.info('%s blocks missing' % len(missing))
431                 num_of_blocks = len(missing)
432                 missing = self._upload_missing_blocks(
433                     missing,
434                     hmap,
435                     f,
436                     upload_gen)
437                 if missing:
438                     if num_of_blocks == len(missing):
439                         retries -= 1
440                     else:
441                         num_of_blocks = len(missing)
442                 else:
443                     break
444             if missing:
445                 try:
446                     details = ['%s' % thread.exception for thread in missing]
447                 except Exception:
448                     details = ['Also, failed to read thread exceptions']
449                 raise ClientError(
450                     '%s blocks failed to upload' % len(missing),
451                     details=details)
452         except KeyboardInterrupt:
453             sendlog.info('- - - wait for threads to finish')
454             for thread in activethreads():
455                 thread.join()
456             raise
457
458         r = self.object_put(
459             obj,
460             format='json',
461             hashmap=True,
462             content_type=content_type,
463             content_encoding=content_encoding,
464             if_etag_match=if_etag_match,
465             if_etag_not_match='*' if if_not_exist else None,
466             etag=etag,
467             json=hashmap,
468             permissions=sharing,
469             public=public,
470             success=201)
471         return r.headers
472
473     def upload_from_string(
474             self, obj, input_str,
475             hash_cb=None,
476             upload_cb=None,
477             etag=None,
478             if_etag_match=None,
479             if_not_exist=None,
480             content_encoding=None,
481             content_disposition=None,
482             content_type=None,
483             sharing=None,
484             public=None,
485             container_info_cache=None):
486         """Upload an object using multiple connections (threads)
487
488         :param obj: (str) remote object path
489
490         :param input_str: (str) upload content
491
492         :param hash_cb: optional progress.bar object for calculating hashes
493
494         :param upload_cb: optional progress.bar object for uploading
495
496         :param etag: (str)
497
498         :param if_etag_match: (str) Push that value to if-match header at file
499             creation
500
501         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
502             it does not exist remotely, otherwise the operation will fail.
503             Involves the case of an object with the same path is created while
504             the object is being uploaded.
505
506         :param content_encoding: (str)
507
508         :param content_disposition: (str)
509
510         :param content_type: (str)
511
512         :param sharing: {'read':[user and/or grp names],
513             'write':[usr and/or grp names]}
514
515         :param public: (bool)
516
517         :param container_info_cache: (dict) if given, avoid redundant calls to
518             server for container info (block size and hash information)
519         """
520         self._assert_container()
521
522         blocksize, blockhash, size, nblocks = self._get_file_block_info(
523                 fileobj=None, size=len(input_str), cache=container_info_cache)
524         (hashes, hmap, offset) = ([], {}, 0)
525         if not content_type:
526             content_type = 'application/octet-stream'
527
528         hashes = []
529         hmap = {}
530         for blockid in range(nblocks):
531             start = blockid * blocksize
532             block = input_str[start: (start + blocksize)]
533             hashes.append(_pithos_hash(block, blockhash))
534             hmap[hashes[blockid]] = (start, block)
535
536         hashmap = dict(bytes=size, hashes=hashes)
537         missing, obj_headers = self._create_object_or_get_missing_hashes(
538             obj, hashmap,
539             content_type=content_type,
540             size=size,
541             if_etag_match=if_etag_match,
542             if_etag_not_match='*' if if_not_exist else None,
543             content_encoding=content_encoding,
544             content_disposition=content_disposition,
545             permissions=sharing,
546             public=public)
547         if missing is None:
548             return obj_headers
549         num_of_missing = len(missing)
550
551         if upload_cb:
552             self.progress_bar_gen = upload_cb(nblocks)
553             for i in range(nblocks + 1 - num_of_missing):
554                 self._cb_next()
555
556         tries = 7
557         old_failures = 0
558         try:
559             while tries and missing:
560                 flying = []
561                 failures = []
562                 for hash in missing:
563                     offset, block = hmap[hash]
564                     bird = self._put_block_async(block, hash)
565                     flying.append(bird)
566                     unfinished = self._watch_thread_limit(flying)
567                     for thread in set(flying).difference(unfinished):
568                         if thread.exception:
569                             failures.append(thread.kwargs['hash'])
570                         if thread.isAlive():
571                             flying.append(thread)
572                         else:
573                             self._cb_next()
574                     flying = unfinished
575                 for thread in flying:
576                     thread.join()
577                     if thread.exception:
578                         failures.append(thread.kwargs['hash'])
579                     self._cb_next()
580                 missing = failures
581                 if missing and len(missing) == old_failures:
582                     tries -= 1
583                 old_failures = len(missing)
584             if missing:
585                 raise ClientError(
586                     '%s blocks failed to upload' % len(missing),
587                     details=['%s' % thread.exception for thread in missing])
588         except KeyboardInterrupt:
589             sendlog.info('- - - wait for threads to finish')
590             for thread in activethreads():
591                 thread.join()
592             raise
593
594         r = self.object_put(
595             obj,
596             format='json',
597             hashmap=True,
598             content_type=content_type,
599             content_encoding=content_encoding,
600             if_etag_match=if_etag_match,
601             if_etag_not_match='*' if if_not_exist else None,
602             etag=etag,
603             json=hashmap,
604             permissions=sharing,
605             public=public,
606             success=201)
607         return r.headers
608
609     # download_* auxiliary methods
610     def _get_remote_blocks_info(self, obj, **restargs):
611         #retrieve object hashmap
612         myrange = restargs.pop('data_range', None)
613         hashmap = self.get_object_hashmap(obj, **restargs)
614         restargs['data_range'] = myrange
615         blocksize = int(hashmap['block_size'])
616         blockhash = hashmap['block_hash']
617         total_size = hashmap['bytes']
618         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
619         map_dict = {}
620         for i, h in enumerate(hashmap['hashes']):
621             #  map_dict[h] = i   CHAGE
622             if h in map_dict:
623                 map_dict[h].append(i)
624             else:
625                 map_dict[h] = [i]
626         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
627
628     def _dump_blocks_sync(
629             self, obj, remote_hashes, blocksize, total_size, dst, range,
630             **args):
631         for blockid, blockhash in enumerate(remote_hashes):
632             if blockhash:
633                 start = blocksize * blockid
634                 is_last = start + blocksize > total_size
635                 end = (total_size - 1) if is_last else (start + blocksize - 1)
636                 (start, end) = _range_up(start, end, range)
637                 args['data_range'] = 'bytes=%s-%s' % (start, end)
638                 r = self.object_get(obj, success=(200, 206), **args)
639                 self._cb_next()
640                 dst.write(r.content)
641                 dst.flush()
642
643     def _get_block_async(self, obj, **args):
644         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
645         event.start()
646         return event
647
648     def _hash_from_file(self, fp, start, size, blockhash):
649         fp.seek(start)
650         block = fp.read(size)
651         h = newhashlib(blockhash)
652         h.update(block.strip('\x00'))
653         return hexlify(h.digest())
654
655     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
656         """write the results of a greenleted rest call to a file
657
658         :param offset: the offset of the file up to blocksize
659         - e.g. if the range is 10-100, all blocks will be written to
660         normal_position - 10
661         """
662         for key, g in flying.items():
663             if g.isAlive():
664                 continue
665             if g.exception:
666                 raise g.exception
667             block = g.value.content
668             for block_start in blockids[key]:
669                 local_file.seek(block_start + offset)
670                 local_file.write(block)
671                 self._cb_next()
672             flying.pop(key)
673             blockids.pop(key)
674         local_file.flush()
675
676     def _dump_blocks_async(
677             self, obj, remote_hashes, blocksize, total_size, local_file,
678             blockhash=None, resume=False, filerange=None, **restargs):
679         file_size = fstat(local_file.fileno()).st_size if resume else 0
680         flying = dict()
681         blockid_dict = dict()
682         offset = 0
683         if filerange is not None:
684             rstart = int(filerange.split('-')[0])
685             offset = rstart if blocksize > rstart else rstart % blocksize
686
687         self._init_thread_limit()
688         for block_hash, blockids in remote_hashes.items():
689             blockids = [blk * blocksize for blk in blockids]
690             unsaved = [blk for blk in blockids if not (
691                 blk < file_size and block_hash == self._hash_from_file(
692                         local_file, blk, blocksize, blockhash))]
693             self._cb_next(len(blockids) - len(unsaved))
694             if unsaved:
695                 key = unsaved[0]
696                 self._watch_thread_limit(flying.values())
697                 self._thread2file(
698                     flying, blockid_dict, local_file, offset,
699                     **restargs)
700                 end = total_size - 1 if (
701                     key + blocksize > total_size) else key + blocksize - 1
702                 start, end = _range_up(key, end, filerange)
703                 if start == end:
704                     self._cb_next()
705                     continue
706                 restargs['async_headers'] = {
707                     'Range': 'bytes=%s-%s' % (start, end)}
708                 flying[key] = self._get_block_async(obj, **restargs)
709                 blockid_dict[key] = unsaved
710
711         for thread in flying.values():
712             thread.join()
713         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
714
715     def download_object(
716             self, obj, dst,
717             download_cb=None,
718             version=None,
719             resume=False,
720             range_str=None,
721             if_match=None,
722             if_none_match=None,
723             if_modified_since=None,
724             if_unmodified_since=None):
725         """Download an object (multiple connections, random blocks)
726
727         :param obj: (str) remote object path
728
729         :param dst: open file descriptor (wb+)
730
731         :param download_cb: optional progress.bar object for downloading
732
733         :param version: (str) file version
734
735         :param resume: (bool) if set, preserve already downloaded file parts
736
737         :param range_str: (str) from, to are file positions (int) in bytes
738
739         :param if_match: (str)
740
741         :param if_none_match: (str)
742
743         :param if_modified_since: (str) formated date
744
745         :param if_unmodified_since: (str) formated date"""
746         restargs = dict(
747             version=version,
748             data_range=None if range_str is None else 'bytes=%s' % range_str,
749             if_match=if_match,
750             if_none_match=if_none_match,
751             if_modified_since=if_modified_since,
752             if_unmodified_since=if_unmodified_since)
753
754         (
755             blocksize,
756             blockhash,
757             total_size,
758             hash_list,
759             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
760         assert total_size >= 0
761
762         if download_cb:
763             self.progress_bar_gen = download_cb(len(hash_list))
764             self._cb_next()
765
766         if dst.isatty():
767             self._dump_blocks_sync(
768                 obj,
769                 hash_list,
770                 blocksize,
771                 total_size,
772                 dst,
773                 range_str,
774                 **restargs)
775         else:
776             self._dump_blocks_async(
777                 obj,
778                 remote_hashes,
779                 blocksize,
780                 total_size,
781                 dst,
782                 blockhash,
783                 resume,
784                 range_str,
785                 **restargs)
786             if not range_str:
787                 dst.truncate(total_size)
788
789         self._complete_cb()
790
791     def download_to_string(
792             self, obj,
793             download_cb=None,
794             version=None,
795             range_str=None,
796             if_match=None,
797             if_none_match=None,
798             if_modified_since=None,
799             if_unmodified_since=None):
800         """Download an object to a string (multiple connections). This method
801         uses threads for http requests, but stores all content in memory.
802
803         :param obj: (str) remote object path
804
805         :param download_cb: optional progress.bar object for downloading
806
807         :param version: (str) file version
808
809         :param range_str: (str) from, to are file positions (int) in bytes
810
811         :param if_match: (str)
812
813         :param if_none_match: (str)
814
815         :param if_modified_since: (str) formated date
816
817         :param if_unmodified_since: (str) formated date
818
819         :returns: (str) the whole object contents
820         """
821         restargs = dict(
822             version=version,
823             data_range=None if range_str is None else 'bytes=%s' % range_str,
824             if_match=if_match,
825             if_none_match=if_none_match,
826             if_modified_since=if_modified_since,
827             if_unmodified_since=if_unmodified_since)
828
829         (
830             blocksize,
831             blockhash,
832             total_size,
833             hash_list,
834             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
835         assert total_size >= 0
836
837         if download_cb:
838             self.progress_bar_gen = download_cb(len(hash_list))
839             self._cb_next()
840
841         num_of_blocks = len(remote_hashes)
842         ret = [''] * num_of_blocks
843         self._init_thread_limit()
844         flying = dict()
845         try:
846             for blockid, blockhash in enumerate(remote_hashes):
847                 start = blocksize * blockid
848                 is_last = start + blocksize > total_size
849                 end = (total_size - 1) if is_last else (start + blocksize - 1)
850                 (start, end) = _range_up(start, end, range_str)
851                 if start < end:
852                     self._watch_thread_limit(flying.values())
853                     flying[blockid] = self._get_block_async(obj, **restargs)
854                 for runid, thread in flying.items():
855                     if (blockid + 1) == num_of_blocks:
856                         thread.join()
857                     elif thread.isAlive():
858                         continue
859                     if thread.exception:
860                         raise thread.exception
861                     ret[runid] = thread.value.content
862                     self._cb_next()
863                     flying.pop(runid)
864             return ''.join(ret)
865         except KeyboardInterrupt:
866             sendlog.info('- - - wait for threads to finish')
867             for thread in activethreads():
868                 thread.join()
869
870     #Command Progress Bar method
871     def _cb_next(self, step=1):
872         if hasattr(self, 'progress_bar_gen'):
873             try:
874                 for i in xrange(step):
875                     self.progress_bar_gen.next()
876             except:
877                 pass
878
879     def _complete_cb(self):
880         while True:
881             try:
882                 self.progress_bar_gen.next()
883             except:
884                 break
885
886     def get_object_hashmap(
887             self, obj,
888             version=None,
889             if_match=None,
890             if_none_match=None,
891             if_modified_since=None,
892             if_unmodified_since=None,
893             data_range=None):
894         """
895         :param obj: (str) remote object path
896
897         :param if_match: (str)
898
899         :param if_none_match: (str)
900
901         :param if_modified_since: (str) formated date
902
903         :param if_unmodified_since: (str) formated date
904
905         :param data_range: (str) from-to where from and to are integers
906             denoting file positions in bytes
907
908         :returns: (list)
909         """
910         try:
911             r = self.object_get(
912                 obj,
913                 hashmap=True,
914                 version=version,
915                 if_etag_match=if_match,
916                 if_etag_not_match=if_none_match,
917                 if_modified_since=if_modified_since,
918                 if_unmodified_since=if_unmodified_since,
919                 data_range=data_range)
920         except ClientError as err:
921             if err.status == 304 or err.status == 412:
922                 return {}
923             raise
924         return r.json
925
926     def set_account_group(self, group, usernames):
927         """
928         :param group: (str)
929
930         :param usernames: (list)
931         """
932         r = self.account_post(update=True, groups={group: usernames})
933         return r
934
935     def del_account_group(self, group):
936         """
937         :param group: (str)
938         """
939         self.account_post(update=True, groups={group: []})
940
941     def get_account_info(self, until=None):
942         """
943         :param until: (str) formated date
944
945         :returns: (dict)
946         """
947         r = self.account_head(until=until)
948         if r.status_code == 401:
949             raise ClientError("No authorization", status=401)
950         return r.headers
951
952     def get_account_quota(self):
953         """
954         :returns: (dict)
955         """
956         return filter_in(
957             self.get_account_info(),
958             'X-Account-Policy-Quota',
959             exactMatch=True)
960
961     #def get_account_versioning(self):
962     #    """
963     #    :returns: (dict)
964     #    """
965     #    return filter_in(
966     #        self.get_account_info(),
967     #        'X-Account-Policy-Versioning',
968     #        exactMatch=True)
969
970     def get_account_meta(self, until=None):
971         """
972         :param until: (str) formated date
973
974         :returns: (dict)
975         """
976         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
977
978     def get_account_group(self):
979         """
980         :returns: (dict)
981         """
982         return filter_in(self.get_account_info(), 'X-Account-Group-')
983
984     def set_account_meta(self, metapairs):
985         """
986         :param metapairs: (dict) {key1:val1, key2:val2, ...}
987         """
988         assert(type(metapairs) is dict)
989         r = self.account_post(update=True, metadata=metapairs)
990         return r.headers
991
992     def del_account_meta(self, metakey):
993         """
994         :param metakey: (str) metadatum key
995         """
996         r = self.account_post(update=True, metadata={metakey: ''})
997         return r.headers
998
999     #def set_account_quota(self, quota):
1000     #    """
1001     #    :param quota: (int)
1002     #    """
1003     #    self.account_post(update=True, quota=quota)
1004
1005     #def set_account_versioning(self, versioning):
1006     #    """
1007     #    :param versioning: (str)
1008     #    """
1009     #    r = self.account_post(update=True, versioning=versioning)
1010     #    return r.headers
1011
1012     def list_containers(self):
1013         """
1014         :returns: (dict)
1015         """
1016         r = self.account_get()
1017         return r.json
1018
1019     def del_container(self, until=None, delimiter=None):
1020         """
1021         :param until: (str) formated date
1022
1023         :param delimiter: (str) with / empty container
1024
1025         :raises ClientError: 404 Container does not exist
1026
1027         :raises ClientError: 409 Container is not empty
1028         """
1029         self._assert_container()
1030         r = self.container_delete(
1031             until=until,
1032             delimiter=delimiter,
1033             success=(204, 404, 409))
1034         if r.status_code == 404:
1035             raise ClientError(
1036                 'Container "%s" does not exist' % self.container,
1037                 r.status_code)
1038         elif r.status_code == 409:
1039             raise ClientError(
1040                 'Container "%s" is not empty' % self.container,
1041                 r.status_code)
1042         return r.headers
1043
1044     def get_container_versioning(self, container=None):
1045         """
1046         :param container: (str)
1047
1048         :returns: (dict)
1049         """
1050         cnt_back_up = self.container
1051         try:
1052             self.container = container or cnt_back_up
1053             return filter_in(
1054                 self.get_container_info(),
1055                 'X-Container-Policy-Versioning')
1056         finally:
1057             self.container = cnt_back_up
1058
1059     def get_container_limit(self, container=None):
1060         """
1061         :param container: (str)
1062
1063         :returns: (dict)
1064         """
1065         cnt_back_up = self.container
1066         try:
1067             self.container = container or cnt_back_up
1068             return filter_in(
1069                 self.get_container_info(),
1070                 'X-Container-Policy-Quota')
1071         finally:
1072             self.container = cnt_back_up
1073
1074     def get_container_info(self, until=None):
1075         """
1076         :param until: (str) formated date
1077
1078         :returns: (dict)
1079
1080         :raises ClientError: 404 Container not found
1081         """
1082         try:
1083             r = self.container_head(until=until)
1084         except ClientError as err:
1085             err.details.append('for container %s' % self.container)
1086             raise err
1087         return r.headers
1088
1089     def get_container_meta(self, until=None):
1090         """
1091         :param until: (str) formated date
1092
1093         :returns: (dict)
1094         """
1095         return filter_in(
1096             self.get_container_info(until=until),
1097             'X-Container-Meta')
1098
1099     def get_container_object_meta(self, until=None):
1100         """
1101         :param until: (str) formated date
1102
1103         :returns: (dict)
1104         """
1105         return filter_in(
1106             self.get_container_info(until=until),
1107             'X-Container-Object-Meta')
1108
1109     def set_container_meta(self, metapairs):
1110         """
1111         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1112         """
1113         assert(type(metapairs) is dict)
1114         r = self.container_post(update=True, metadata=metapairs)
1115         return r.headers
1116
1117     def del_container_meta(self, metakey):
1118         """
1119         :param metakey: (str) metadatum key
1120
1121         :returns: (dict) response headers
1122         """
1123         r = self.container_post(update=True, metadata={metakey: ''})
1124         return r.headers
1125
1126     def set_container_limit(self, limit):
1127         """
1128         :param limit: (int)
1129         """
1130         r = self.container_post(update=True, quota=limit)
1131         return r.headers
1132
1133     def set_container_versioning(self, versioning):
1134         """
1135         :param versioning: (str)
1136         """
1137         r = self.container_post(update=True, versioning=versioning)
1138         return r.headers
1139
1140     def del_object(self, obj, until=None, delimiter=None):
1141         """
1142         :param obj: (str) remote object path
1143
1144         :param until: (str) formated date
1145
1146         :param delimiter: (str)
1147         """
1148         self._assert_container()
1149         r = self.object_delete(obj, until=until, delimiter=delimiter)
1150         return r.headers
1151
1152     def set_object_meta(self, obj, metapairs):
1153         """
1154         :param obj: (str) remote object path
1155
1156         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1157         """
1158         assert(type(metapairs) is dict)
1159         r = self.object_post(obj, update=True, metadata=metapairs)
1160         return r.headers
1161
1162     def del_object_meta(self, obj, metakey):
1163         """
1164         :param obj: (str) remote object path
1165
1166         :param metakey: (str) metadatum key
1167         """
1168         r = self.object_post(obj, update=True, metadata={metakey: ''})
1169         return r.headers
1170
1171     def publish_object(self, obj):
1172         """
1173         :param obj: (str) remote object path
1174
1175         :returns: (str) access url
1176         """
1177         self.object_post(obj, update=True, public=True)
1178         info = self.get_object_info(obj)
1179         return info['x-object-public']
1180         pref, sep, rest = self.base_url.partition('//')
1181         base = rest.split('/')[0]
1182         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1183
1184     def unpublish_object(self, obj):
1185         """
1186         :param obj: (str) remote object path
1187         """
1188         r = self.object_post(obj, update=True, public=False)
1189         return r.headers
1190
1191     def get_object_info(self, obj, version=None):
1192         """
1193         :param obj: (str) remote object path
1194
1195         :param version: (str)
1196
1197         :returns: (dict)
1198         """
1199         try:
1200             r = self.object_head(obj, version=version)
1201             return r.headers
1202         except ClientError as ce:
1203             if ce.status == 404:
1204                 raise ClientError('Object %s not found' % obj, status=404)
1205             raise
1206
1207     def get_object_meta(self, obj, version=None):
1208         """
1209         :param obj: (str) remote object path
1210
1211         :param version: (str)
1212
1213         :returns: (dict)
1214         """
1215         return filter_in(
1216             self.get_object_info(obj, version=version),
1217             'X-Object-Meta')
1218
1219     def get_object_sharing(self, obj):
1220         """
1221         :param obj: (str) remote object path
1222
1223         :returns: (dict)
1224         """
1225         r = filter_in(
1226             self.get_object_info(obj),
1227             'X-Object-Sharing',
1228             exactMatch=True)
1229         reply = {}
1230         if len(r) > 0:
1231             perms = r['x-object-sharing'].split(';')
1232             for perm in perms:
1233                 try:
1234                     perm.index('=')
1235                 except ValueError:
1236                     raise ClientError('Incorrect reply format')
1237                 (key, val) = perm.strip().split('=')
1238                 reply[key] = val
1239         return reply
1240
1241     def set_object_sharing(
1242             self, obj,
1243             read_permission=False, write_permission=False):
1244         """Give read/write permisions to an object.
1245
1246         :param obj: (str) remote object path
1247
1248         :param read_permission: (list - bool) users and user groups that get
1249             read permission for this object - False means all previous read
1250             permissions will be removed
1251
1252         :param write_permission: (list - bool) of users and user groups to get
1253            write permission for this object - False means all previous write
1254            permissions will be removed
1255
1256         :returns: (dict) response headers
1257         """
1258
1259         perms = dict(read=read_permission or '', write=write_permission or '')
1260         r = self.object_post(obj, update=True, permissions=perms)
1261         return r.headers
1262
1263     def del_object_sharing(self, obj):
1264         """
1265         :param obj: (str) remote object path
1266         """
1267         return self.set_object_sharing(obj)
1268
1269     def append_object(self, obj, source_file, upload_cb=None):
1270         """
1271         :param obj: (str) remote object path
1272
1273         :param source_file: open file descriptor
1274
1275         :param upload_db: progress.bar for uploading
1276         """
1277         self._assert_container()
1278         meta = self.get_container_info()
1279         blocksize = int(meta['x-container-block-size'])
1280         filesize = fstat(source_file.fileno()).st_size
1281         nblocks = 1 + (filesize - 1) // blocksize
1282         offset = 0
1283         headers = {}
1284         if upload_cb:
1285             self.progress_bar_gen = upload_cb(nblocks)
1286             self._cb_next()
1287         flying = {}
1288         self._init_thread_limit()
1289         try:
1290             for i in range(nblocks):
1291                 block = source_file.read(min(blocksize, filesize - offset))
1292                 offset += len(block)
1293
1294                 self._watch_thread_limit(flying.values())
1295                 unfinished = {}
1296                 flying[i] = SilentEvent(
1297                     method=self.object_post,
1298                     obj=obj,
1299                     update=True,
1300                     content_range='bytes */*',
1301                     content_type='application/octet-stream',
1302                     content_length=len(block),
1303                     data=block)
1304                 flying[i].start()
1305
1306                 for key, thread in flying.items():
1307                     if thread.isAlive():
1308                         if i < nblocks:
1309                             unfinished[key] = thread
1310                             continue
1311                         thread.join()
1312                     if thread.exception:
1313                         raise thread.exception
1314                     headers[key] = thread.value.headers
1315                     self._cb_next()
1316                 flying = unfinished
1317         except KeyboardInterrupt:
1318             sendlog.info('- - - wait for threads to finish')
1319             for thread in activethreads():
1320                 thread.join()
1321         finally:
1322             from time import sleep
1323             sleep(2 * len(activethreads()))
1324         return headers.values()
1325
1326     def truncate_object(self, obj, upto_bytes):
1327         """
1328         :param obj: (str) remote object path
1329
1330         :param upto_bytes: max number of bytes to leave on file
1331
1332         :returns: (dict) response headers
1333         """
1334         r = self.object_post(
1335             obj,
1336             update=True,
1337             content_range='bytes 0-%s/*' % upto_bytes,
1338             content_type='application/octet-stream',
1339             object_bytes=upto_bytes,
1340             source_object=path4url(self.container, obj))
1341         return r.headers
1342
1343     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1344         """Overwrite a part of an object from local source file
1345
1346         :param obj: (str) remote object path
1347
1348         :param start: (int) position in bytes to start overwriting from
1349
1350         :param end: (int) position in bytes to stop overwriting at
1351
1352         :param source_file: open file descriptor
1353
1354         :param upload_db: progress.bar for uploading
1355         """
1356
1357         r = self.get_object_info(obj)
1358         rf_size = int(r['content-length'])
1359         if rf_size < int(start):
1360             raise ClientError(
1361                 'Range start exceeds file size',
1362                 status=416)
1363         elif rf_size < int(end):
1364             raise ClientError(
1365                 'Range end exceeds file size',
1366                 status=416)
1367         self._assert_container()
1368         meta = self.get_container_info()
1369         blocksize = int(meta['x-container-block-size'])
1370         filesize = fstat(source_file.fileno()).st_size
1371         datasize = int(end) - int(start) + 1
1372         nblocks = 1 + (datasize - 1) // blocksize
1373         offset = 0
1374         if upload_cb:
1375             self.progress_bar_gen = upload_cb(nblocks)
1376             self._cb_next()
1377         headers = []
1378         for i in range(nblocks):
1379             read_size = min(blocksize, filesize - offset, datasize - offset)
1380             block = source_file.read(read_size)
1381             r = self.object_post(
1382                 obj,
1383                 update=True,
1384                 content_type='application/octet-stream',
1385                 content_length=len(block),
1386                 content_range='bytes %s-%s/*' % (
1387                     start + offset,
1388                     start + offset + len(block) - 1),
1389                 data=block)
1390             headers.append(dict(r.headers))
1391             offset += len(block)
1392
1393             self._cb_next
1394         return headers
1395
1396     def copy_object(
1397             self, src_container, src_object, dst_container,
1398             dst_object=None,
1399             source_version=None,
1400             source_account=None,
1401             public=False,
1402             content_type=None,
1403             delimiter=None):
1404         """
1405         :param src_container: (str) source container
1406
1407         :param src_object: (str) source object path
1408
1409         :param dst_container: (str) destination container
1410
1411         :param dst_object: (str) destination object path
1412
1413         :param source_version: (str) source object version
1414
1415         :param source_account: (str) account to copy from
1416
1417         :param public: (bool)
1418
1419         :param content_type: (str)
1420
1421         :param delimiter: (str)
1422
1423         :returns: (dict) response headers
1424         """
1425         self._assert_account()
1426         self.container = dst_container
1427         src_path = path4url(src_container, src_object)
1428         r = self.object_put(
1429             dst_object or src_object,
1430             success=201,
1431             copy_from=src_path,
1432             content_length=0,
1433             source_version=source_version,
1434             source_account=source_account,
1435             public=public,
1436             content_type=content_type,
1437             delimiter=delimiter)
1438         return r.headers
1439
1440     def move_object(
1441             self, src_container, src_object, dst_container,
1442             dst_object=False,
1443             source_account=None,
1444             source_version=None,
1445             public=False,
1446             content_type=None,
1447             delimiter=None):
1448         """
1449         :param src_container: (str) source container
1450
1451         :param src_object: (str) source object path
1452
1453         :param dst_container: (str) destination container
1454
1455         :param dst_object: (str) destination object path
1456
1457         :param source_account: (str) account to move from
1458
1459         :param source_version: (str) source object version
1460
1461         :param public: (bool)
1462
1463         :param content_type: (str)
1464
1465         :param delimiter: (str)
1466
1467         :returns: (dict) response headers
1468         """
1469         self._assert_account()
1470         self.container = dst_container
1471         dst_object = dst_object or src_object
1472         src_path = path4url(src_container, src_object)
1473         r = self.object_put(
1474             dst_object,
1475             success=201,
1476             move_from=src_path,
1477             content_length=0,
1478             source_account=source_account,
1479             source_version=source_version,
1480             public=public,
1481             content_type=content_type,
1482             delimiter=delimiter)
1483         return r.headers
1484
1485     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1486         """Get accounts that share with self.account
1487
1488         :param limit: (str)
1489
1490         :param marker: (str)
1491
1492         :returns: (dict)
1493         """
1494         self._assert_account()
1495
1496         self.set_param('format', 'json')
1497         self.set_param('limit', limit, iff=limit is not None)
1498         self.set_param('marker', marker, iff=marker is not None)
1499
1500         path = ''
1501         success = kwargs.pop('success', (200, 204))
1502         r = self.get(path, *args, success=success, **kwargs)
1503         return r.json
1504
1505     def get_object_versionlist(self, obj):
1506         """
1507         :param obj: (str) remote object path
1508
1509         :returns: (list)
1510         """
1511         self._assert_container()
1512         r = self.object_get(obj, format='json', version='list')
1513         return r.json['versions']