Implement an optional json output 4 outputing cmds
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def create_container(
75             self,
76             container=None, sizelimit=None, versioning=None, metadata=None):
77         """
78         :param container: (str) if not given, self.container is used instead
79
80         :param sizelimit: (int) container total size limit in bytes
81
82         :param versioning: (str) can be auto or whatever supported by server
83
84         :param metadata: (dict) Custom user-defined metadata of the form
85             { 'name1': 'value1', 'name2': 'value2', ... }
86
87         :returns: (dict) response headers
88         """
89         cnt_back_up = self.container
90         try:
91             self.container = container or cnt_back_up
92             r = self.container_put(
93                 quota=sizelimit, versioning=versioning, metadata=metadata)
94             return r.headers
95         finally:
96             self.container = cnt_back_up
97
98     def purge_container(self, container=None):
99         """Delete an empty container and destroy associated blocks
100         """
101         cnt_back_up = self.container
102         try:
103             self.container = container or cnt_back_up
104             r = self.container_delete(until=unicode(time()))
105         finally:
106             self.container = cnt_back_up
107         return r.headers
108
109     def upload_object_unchunked(
110             self, obj, f,
111             withHashFile=False,
112             size=None,
113             etag=None,
114             content_encoding=None,
115             content_disposition=None,
116             content_type=None,
117             sharing=None,
118             public=None):
119         """
120         :param obj: (str) remote object path
121
122         :param f: open file descriptor
123
124         :param withHashFile: (bool)
125
126         :param size: (int) size of data to upload
127
128         :param etag: (str)
129
130         :param content_encoding: (str)
131
132         :param content_disposition: (str)
133
134         :param content_type: (str)
135
136         :param sharing: {'read':[user and/or grp names],
137             'write':[usr and/or grp names]}
138
139         :param public: (bool)
140
141         :returns: (dict) created object metadata
142         """
143         self._assert_container()
144
145         if withHashFile:
146             data = f.read()
147             try:
148                 import json
149                 data = json.dumps(json.loads(data))
150             except ValueError:
151                 raise ClientError('"%s" is not json-formated' % f.name, 1)
152             except SyntaxError:
153                 msg = '"%s" is not a valid hashmap file' % f.name
154                 raise ClientError(msg, 1)
155             f = StringIO(data)
156         else:
157             data = f.read(size) if size else f.read()
158         r = self.object_put(
159             obj,
160             data=data,
161             etag=etag,
162             content_encoding=content_encoding,
163             content_disposition=content_disposition,
164             content_type=content_type,
165             permissions=sharing,
166             public=public,
167             success=201)
168         return r.headers
169
170     def create_object_by_manifestation(
171             self, obj,
172             etag=None,
173             content_encoding=None,
174             content_disposition=None,
175             content_type=None,
176             sharing=None,
177             public=None):
178         """
179         :param obj: (str) remote object path
180
181         :param etag: (str)
182
183         :param content_encoding: (str)
184
185         :param content_disposition: (str)
186
187         :param content_type: (str)
188
189         :param sharing: {'read':[user and/or grp names],
190             'write':[usr and/or grp names]}
191
192         :param public: (bool)
193
194         :returns: (dict) created object metadata
195         """
196         self._assert_container()
197         r = self.object_put(
198             obj,
199             content_length=0,
200             etag=etag,
201             content_encoding=content_encoding,
202             content_disposition=content_disposition,
203             content_type=content_type,
204             permissions=sharing,
205             public=public,
206             manifest='%s/%s' % (self.container, obj))
207         return r.headers
208
209     # upload_* auxiliary methods
210     def _put_block_async(self, data, hash):
211         event = SilentEvent(method=self._put_block, data=data, hash=hash)
212         event.start()
213         return event
214
215     def _put_block(self, data, hash):
216         r = self.container_post(
217             update=True,
218             content_type='application/octet-stream',
219             content_length=len(data),
220             data=data,
221             format='json')
222         assert r.json[0] == hash, 'Local hash does not match server'
223
224     def _get_file_block_info(self, fileobj, size=None, cache=None):
225         """
226         :param fileobj: (file descriptor) source
227
228         :param size: (int) size of data to upload from source
229
230         :param cache: (dict) if provided, cache container info response to
231         avoid redundant calls
232         """
233         if isinstance(cache, dict):
234             try:
235                 meta = cache[self.container]
236             except KeyError:
237                 meta = self.get_container_info()
238                 cache[self.container] = meta
239         else:
240             meta = self.get_container_info()
241         blocksize = int(meta['x-container-block-size'])
242         blockhash = meta['x-container-block-hash']
243         size = size if size is not None else fstat(fileobj.fileno()).st_size
244         nblocks = 1 + (size - 1) // blocksize
245         return (blocksize, blockhash, size, nblocks)
246
247     def _create_object_or_get_missing_hashes(
248             self, obj, json,
249             size=None,
250             format='json',
251             hashmap=True,
252             content_type=None,
253             if_etag_match=None,
254             if_etag_not_match=None,
255             content_encoding=None,
256             content_disposition=None,
257             permissions=None,
258             public=None,
259             success=(201, 409)):
260         r = self.object_put(
261             obj,
262             format='json',
263             hashmap=True,
264             content_type=content_type,
265             json=json,
266             if_etag_match=if_etag_match,
267             if_etag_not_match=if_etag_not_match,
268             content_encoding=content_encoding,
269             content_disposition=content_disposition,
270             permissions=permissions,
271             public=public,
272             success=success)
273         return (None if r.status_code == 201 else r.json), r.headers
274
275     def _calculate_blocks_for_upload(
276             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277             hash_cb=None):
278         offset = 0
279         if hash_cb:
280             hash_gen = hash_cb(nblocks)
281             hash_gen.next()
282
283         for i in range(nblocks):
284             block = fileobj.read(min(blocksize, size - offset))
285             bytes = len(block)
286             hash = _pithos_hash(block, blockhash)
287             hashes.append(hash)
288             hmap[hash] = (offset, bytes)
289             offset += bytes
290             if hash_cb:
291                 hash_gen.next()
292         msg = 'Failed to calculate uploaded blocks:'
293         ' Offset and object size do not match'
294         assert offset == size, msg
295
296     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297         """upload missing blocks asynchronously"""
298
299         self._init_thread_limit()
300
301         flying = []
302         failures = []
303         for hash in missing:
304             offset, bytes = hmap[hash]
305             fileobj.seek(offset)
306             data = fileobj.read(bytes)
307             r = self._put_block_async(data, hash)
308             flying.append(r)
309             unfinished = self._watch_thread_limit(flying)
310             for thread in set(flying).difference(unfinished):
311                 if thread.exception:
312                     failures.append(thread)
313                     if isinstance(
314                             thread.exception,
315                             ClientError) and thread.exception.status == 502:
316                         self.POOLSIZE = self._thread_limit
317                 elif thread.isAlive():
318                     flying.append(thread)
319                 elif upload_gen:
320                     try:
321                         upload_gen.next()
322                     except:
323                         pass
324             flying = unfinished
325
326         for thread in flying:
327             thread.join()
328             if thread.exception:
329                 failures.append(thread)
330             elif upload_gen:
331                 try:
332                     upload_gen.next()
333                 except:
334                     pass
335
336         return [failure.kwargs['hash'] for failure in failures]
337
338     def upload_object(
339             self, obj, f,
340             size=None,
341             hash_cb=None,
342             upload_cb=None,
343             etag=None,
344             if_etag_match=None,
345             if_not_exist=None,
346             content_encoding=None,
347             content_disposition=None,
348             content_type=None,
349             sharing=None,
350             public=None,
351             container_info_cache=None):
352         """Upload an object using multiple connections (threads)
353
354         :param obj: (str) remote object path
355
356         :param f: open file descriptor (rb)
357
358         :param hash_cb: optional progress.bar object for calculating hashes
359
360         :param upload_cb: optional progress.bar object for uploading
361
362         :param etag: (str)
363
364         :param if_etag_match: (str) Push that value to if-match header at file
365             creation
366
367         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368             it does not exist remotely, otherwise the operation will fail.
369             Involves the case of an object with the same path is created while
370             the object is being uploaded.
371
372         :param content_encoding: (str)
373
374         :param content_disposition: (str)
375
376         :param content_type: (str)
377
378         :param sharing: {'read':[user and/or grp names],
379             'write':[usr and/or grp names]}
380
381         :param public: (bool)
382
383         :param container_info_cache: (dict) if given, avoid redundant calls to
384         server for container info (block size and hash information)
385         """
386         self._assert_container()
387
388         block_info = (
389             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390                 f, size, container_info_cache)
391         (hashes, hmap, offset) = ([], {}, 0)
392         if not content_type:
393             content_type = 'application/octet-stream'
394
395         self._calculate_blocks_for_upload(
396             *block_info,
397             hashes=hashes,
398             hmap=hmap,
399             fileobj=f,
400             hash_cb=hash_cb)
401
402         hashmap = dict(bytes=size, hashes=hashes)
403         missing, obj_headers = self._create_object_or_get_missing_hashes(
404             obj, hashmap,
405             content_type=content_type,
406             size=size,
407             if_etag_match=if_etag_match,
408             if_etag_not_match='*' if if_not_exist else None,
409             content_encoding=content_encoding,
410             content_disposition=content_disposition,
411             permissions=sharing,
412             public=public)
413
414         if missing is None:
415             return obj_headers
416
417         if upload_cb:
418             upload_gen = upload_cb(len(missing))
419             for i in range(len(missing), len(hashmap['hashes']) + 1):
420                 try:
421                     upload_gen.next()
422                 except:
423                     upload_gen = None
424         else:
425             upload_gen = None
426
427         retries = 7
428         try:
429             while retries:
430                 sendlog.info('%s blocks missing' % len(missing))
431                 num_of_blocks = len(missing)
432                 missing = self._upload_missing_blocks(
433                     missing,
434                     hmap,
435                     f,
436                     upload_gen)
437                 if missing:
438                     if num_of_blocks == len(missing):
439                         retries -= 1
440                     else:
441                         num_of_blocks = len(missing)
442                 else:
443                     break
444             if missing:
445                 raise ClientError(
446                     '%s blocks failed to upload' % len(missing),
447                     details=['%s' % thread.exception for thread in missing])
448         except KeyboardInterrupt:
449             sendlog.info('- - - wait for threads to finish')
450             for thread in activethreads():
451                 thread.join()
452             raise
453
454         r = self.object_put(
455             obj,
456             format='json',
457             hashmap=True,
458             content_type=content_type,
459             if_etag_match=if_etag_match,
460             if_etag_not_match='*' if if_not_exist else None,
461             etag=etag,
462             json=hashmap,
463             permissions=sharing,
464             public=public,
465             success=201)
466         return r.headers
467
468     def upload_from_string(
469             self, obj, input_str,
470             hash_cb=None,
471             upload_cb=None,
472             etag=None,
473             if_etag_match=None,
474             if_not_exist=None,
475             content_encoding=None,
476             content_disposition=None,
477             content_type=None,
478             sharing=None,
479             public=None,
480             container_info_cache=None):
481         """Upload an object using multiple connections (threads)
482
483         :param obj: (str) remote object path
484
485         :param input_str: (str) upload content
486
487         :param hash_cb: optional progress.bar object for calculating hashes
488
489         :param upload_cb: optional progress.bar object for uploading
490
491         :param etag: (str)
492
493         :param if_etag_match: (str) Push that value to if-match header at file
494             creation
495
496         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497             it does not exist remotely, otherwise the operation will fail.
498             Involves the case of an object with the same path is created while
499             the object is being uploaded.
500
501         :param content_encoding: (str)
502
503         :param content_disposition: (str)
504
505         :param content_type: (str)
506
507         :param sharing: {'read':[user and/or grp names],
508             'write':[usr and/or grp names]}
509
510         :param public: (bool)
511
512         :param container_info_cache: (dict) if given, avoid redundant calls to
513         server for container info (block size and hash information)
514         """
515         self._assert_container()
516
517         blocksize, blockhash, size, nblocks = self._get_file_block_info(
518                 fileobj=None, size=len(input_str), cache=container_info_cache)
519         (hashes, hmap, offset) = ([], {}, 0)
520         if not content_type:
521             content_type = 'application/octet-stream'
522
523         num_of_blocks, blockmod = size / blocksize, size % blocksize
524         num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod
525
526         hashes = []
527         hmap = {}
528         for blockid in range(num_of_blocks):
529             start = blockid * blocksize
530             block = input_str[start: (start + blocksize)]
531             hashes.append(_pithos_hash(block, blockhash))
532             hmap[hashes[blockid]] = (start, block)
533
534         hashmap = dict(bytes=size, hashes=hashes)
535         missing, obj_headers = self._create_object_or_get_missing_hashes(
536             obj, hashmap,
537             content_type=content_type,
538             size=size,
539             if_etag_match=if_etag_match,
540             if_etag_not_match='*' if if_not_exist else None,
541             content_encoding=content_encoding,
542             content_disposition=content_disposition,
543             permissions=sharing,
544             public=public)
545         if missing is None:
546             return obj_headers
547         num_of_missing = len(missing)
548
549         if upload_cb:
550             self.progress_bar_gen = upload_cb(num_of_blocks)
551             for i in range(num_of_blocks + 1 - num_of_missing):
552                 self._cb_next()
553
554         tries = 7
555         old_failures = 0
556         try:
557             while tries and missing:
558                 flying = []
559                 failures = []
560                 for hash in missing:
561                     offset, block = hmap[hash]
562                     bird = self._put_block_async(block, hash)
563                     flying.append(bird)
564                     unfinished = self._watch_thread_limit(flying)
565                     for thread in set(flying).difference(unfinished):
566                         if thread.exception:
567                             failures.append(thread.kwargs['hash'])
568                         if thread.isAlive():
569                             flying.append(thread)
570                         else:
571                             self._cb_next()
572                     flying = unfinished
573                 for thread in flying:
574                     thread.join()
575                     if thread.exception:
576                         failures.append(thread.kwargs['hash'])
577                     self._cb_next()
578                 missing = failures
579                 if missing and len(missing) == old_failures:
580                     tries -= 1
581                 old_failures = len(missing)
582             if missing:
583                 raise ClientError(
584                     '%s blocks failed to upload' % len(missing),
585                     details=['%s' % thread.exception for thread in missing])
586         except KeyboardInterrupt:
587             sendlog.info('- - - wait for threads to finish')
588             for thread in activethreads():
589                 thread.join()
590             raise
591
592         r = self.object_put(
593             obj,
594             format='json',
595             hashmap=True,
596             content_type=content_type,
597             if_etag_match=if_etag_match,
598             if_etag_not_match='*' if if_not_exist else None,
599             etag=etag,
600             json=hashmap,
601             permissions=sharing,
602             public=public,
603             success=201)
604         return r.headers
605
606     # download_* auxiliary methods
607     def _get_remote_blocks_info(self, obj, **restargs):
608         #retrieve object hashmap
609         myrange = restargs.pop('data_range', None)
610         hashmap = self.get_object_hashmap(obj, **restargs)
611         restargs['data_range'] = myrange
612         blocksize = int(hashmap['block_size'])
613         blockhash = hashmap['block_hash']
614         total_size = hashmap['bytes']
615         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
616         map_dict = {}
617         for i, h in enumerate(hashmap['hashes']):
618             #  map_dict[h] = i   CHAGE
619             if h in map_dict:
620                 map_dict[h].append(i)
621             else:
622                 map_dict[h] = [i]
623         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
624
625     def _dump_blocks_sync(
626             self, obj, remote_hashes, blocksize, total_size, dst, range,
627             **args):
628         for blockid, blockhash in enumerate(remote_hashes):
629             if blockhash:
630                 start = blocksize * blockid
631                 is_last = start + blocksize > total_size
632                 end = (total_size - 1) if is_last else (start + blocksize - 1)
633                 (start, end) = _range_up(start, end, range)
634                 args['data_range'] = 'bytes=%s-%s' % (start, end)
635                 r = self.object_get(obj, success=(200, 206), **args)
636                 self._cb_next()
637                 dst.write(r.content)
638                 dst.flush()
639
640     def _get_block_async(self, obj, **args):
641         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
642         event.start()
643         return event
644
645     def _hash_from_file(self, fp, start, size, blockhash):
646         fp.seek(start)
647         block = fp.read(size)
648         h = newhashlib(blockhash)
649         h.update(block.strip('\x00'))
650         return hexlify(h.digest())
651
652     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
653         """write the results of a greenleted rest call to a file
654
655         :param offset: the offset of the file up to blocksize
656         - e.g. if the range is 10-100, all blocks will be written to
657         normal_position - 10
658         """
659         for key, g in flying.items():
660             if g.isAlive():
661                 continue
662             if g.exception:
663                 raise g.exception
664             block = g.value.content
665             for block_start in blockids[key]:
666                 local_file.seek(block_start + offset)
667                 local_file.write(block)
668                 self._cb_next()
669             flying.pop(key)
670             blockids.pop(key)
671         local_file.flush()
672
673     def _dump_blocks_async(
674             self, obj, remote_hashes, blocksize, total_size, local_file,
675             blockhash=None, resume=False, filerange=None, **restargs):
676         file_size = fstat(local_file.fileno()).st_size if resume else 0
677         flying = dict()
678         blockid_dict = dict()
679         offset = 0
680         if filerange is not None:
681             rstart = int(filerange.split('-')[0])
682             offset = rstart if blocksize > rstart else rstart % blocksize
683
684         self._init_thread_limit()
685         for block_hash, blockids in remote_hashes.items():
686             blockids = [blk * blocksize for blk in blockids]
687             unsaved = [blk for blk in blockids if not (
688                 blk < file_size and block_hash == self._hash_from_file(
689                         local_file, blk, blocksize, blockhash))]
690             self._cb_next(len(blockids) - len(unsaved))
691             if unsaved:
692                 key = unsaved[0]
693                 self._watch_thread_limit(flying.values())
694                 self._thread2file(
695                     flying, blockid_dict, local_file, offset,
696                     **restargs)
697                 end = total_size - 1 if (
698                     key + blocksize > total_size) else key + blocksize - 1
699                 start, end = _range_up(key, end, filerange)
700                 if start == end:
701                     self._cb_next()
702                     continue
703                 restargs['async_headers'] = {
704                     'Range': 'bytes=%s-%s' % (start, end)}
705                 flying[key] = self._get_block_async(obj, **restargs)
706                 blockid_dict[key] = unsaved
707
708         for thread in flying.values():
709             thread.join()
710         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
711
712     def download_object(
713             self, obj, dst,
714             download_cb=None,
715             version=None,
716             resume=False,
717             range_str=None,
718             if_match=None,
719             if_none_match=None,
720             if_modified_since=None,
721             if_unmodified_since=None):
722         """Download an object (multiple connections, random blocks)
723
724         :param obj: (str) remote object path
725
726         :param dst: open file descriptor (wb+)
727
728         :param download_cb: optional progress.bar object for downloading
729
730         :param version: (str) file version
731
732         :param resume: (bool) if set, preserve already downloaded file parts
733
734         :param range_str: (str) from, to are file positions (int) in bytes
735
736         :param if_match: (str)
737
738         :param if_none_match: (str)
739
740         :param if_modified_since: (str) formated date
741
742         :param if_unmodified_since: (str) formated date"""
743         restargs = dict(
744             version=version,
745             data_range=None if range_str is None else 'bytes=%s' % range_str,
746             if_match=if_match,
747             if_none_match=if_none_match,
748             if_modified_since=if_modified_since,
749             if_unmodified_since=if_unmodified_since)
750
751         (
752             blocksize,
753             blockhash,
754             total_size,
755             hash_list,
756             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
757         assert total_size >= 0
758
759         if download_cb:
760             self.progress_bar_gen = download_cb(len(hash_list))
761             self._cb_next()
762
763         if dst.isatty():
764             self._dump_blocks_sync(
765                 obj,
766                 hash_list,
767                 blocksize,
768                 total_size,
769                 dst,
770                 range_str,
771                 **restargs)
772         else:
773             self._dump_blocks_async(
774                 obj,
775                 remote_hashes,
776                 blocksize,
777                 total_size,
778                 dst,
779                 blockhash,
780                 resume,
781                 range_str,
782                 **restargs)
783             if not range_str:
784                 dst.truncate(total_size)
785
786         self._complete_cb()
787
788     def download_to_string(
789             self, obj,
790             download_cb=None,
791             version=None,
792             range_str=None,
793             if_match=None,
794             if_none_match=None,
795             if_modified_since=None,
796             if_unmodified_since=None):
797         """Download an object to a string (multiple connections). This method
798         uses threads for http requests, but stores all content in memory.
799
800         :param obj: (str) remote object path
801
802         :param download_cb: optional progress.bar object for downloading
803
804         :param version: (str) file version
805
806         :param range_str: (str) from, to are file positions (int) in bytes
807
808         :param if_match: (str)
809
810         :param if_none_match: (str)
811
812         :param if_modified_since: (str) formated date
813
814         :param if_unmodified_since: (str) formated date
815
816         :returns: (str) the whole object contents
817         """
818         restargs = dict(
819             version=version,
820             data_range=None if range_str is None else 'bytes=%s' % range_str,
821             if_match=if_match,
822             if_none_match=if_none_match,
823             if_modified_since=if_modified_since,
824             if_unmodified_since=if_unmodified_since)
825
826         (
827             blocksize,
828             blockhash,
829             total_size,
830             hash_list,
831             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
832         assert total_size >= 0
833
834         if download_cb:
835             self.progress_bar_gen = download_cb(len(hash_list))
836             self._cb_next()
837
838         num_of_blocks = len(remote_hashes)
839         ret = [''] * num_of_blocks
840         self._init_thread_limit()
841         flying = dict()
842         try:
843             for blockid, blockhash in enumerate(remote_hashes):
844                 start = blocksize * blockid
845                 is_last = start + blocksize > total_size
846                 end = (total_size - 1) if is_last else (start + blocksize - 1)
847                 (start, end) = _range_up(start, end, range_str)
848                 if start < end:
849                     self._watch_thread_limit(flying.values())
850                     flying[blockid] = self._get_block_async(obj, **restargs)
851                 for runid, thread in flying.items():
852                     if (blockid + 1) == num_of_blocks:
853                         thread.join()
854                     elif thread.isAlive():
855                         continue
856                     if thread.exception:
857                         raise thread.exception
858                     ret[runid] = thread.value.content
859                     self._cb_next()
860                     flying.pop(runid)
861             return ''.join(ret)
862         except KeyboardInterrupt:
863             sendlog.info('- - - wait for threads to finish')
864             for thread in activethreads():
865                 thread.join()
866
867     #Command Progress Bar method
868     def _cb_next(self, step=1):
869         if hasattr(self, 'progress_bar_gen'):
870             try:
871                 for i in xrange(step):
872                     self.progress_bar_gen.next()
873             except:
874                 pass
875
876     def _complete_cb(self):
877         while True:
878             try:
879                 self.progress_bar_gen.next()
880             except:
881                 break
882
883     def get_object_hashmap(
884             self, obj,
885             version=None,
886             if_match=None,
887             if_none_match=None,
888             if_modified_since=None,
889             if_unmodified_since=None,
890             data_range=None):
891         """
892         :param obj: (str) remote object path
893
894         :param if_match: (str)
895
896         :param if_none_match: (str)
897
898         :param if_modified_since: (str) formated date
899
900         :param if_unmodified_since: (str) formated date
901
902         :param data_range: (str) from-to where from and to are integers
903             denoting file positions in bytes
904
905         :returns: (list)
906         """
907         try:
908             r = self.object_get(
909                 obj,
910                 hashmap=True,
911                 version=version,
912                 if_etag_match=if_match,
913                 if_etag_not_match=if_none_match,
914                 if_modified_since=if_modified_since,
915                 if_unmodified_since=if_unmodified_since,
916                 data_range=data_range)
917         except ClientError as err:
918             if err.status == 304 or err.status == 412:
919                 return {}
920             raise
921         return r.json
922
923     def set_account_group(self, group, usernames):
924         """
925         :param group: (str)
926
927         :param usernames: (list)
928         """
929         r = self.account_post(update=True, groups={group: usernames})
930         return r
931
932     def del_account_group(self, group):
933         """
934         :param group: (str)
935         """
936         self.account_post(update=True, groups={group: []})
937
938     def get_account_info(self, until=None):
939         """
940         :param until: (str) formated date
941
942         :returns: (dict)
943         """
944         r = self.account_head(until=until)
945         if r.status_code == 401:
946             raise ClientError("No authorization", status=401)
947         return r.headers
948
949     def get_account_quota(self):
950         """
951         :returns: (dict)
952         """
953         return filter_in(
954             self.get_account_info(),
955             'X-Account-Policy-Quota',
956             exactMatch=True)
957
958     def get_account_versioning(self):
959         """
960         :returns: (dict)
961         """
962         return filter_in(
963             self.get_account_info(),
964             'X-Account-Policy-Versioning',
965             exactMatch=True)
966
967     def get_account_meta(self, until=None):
968         """
969         :meta until: (str) formated date
970
971         :returns: (dict)
972         """
973         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
974
975     def get_account_group(self):
976         """
977         :returns: (dict)
978         """
979         return filter_in(self.get_account_info(), 'X-Account-Group-')
980
981     def set_account_meta(self, metapairs):
982         """
983         :param metapairs: (dict) {key1:val1, key2:val2, ...}
984         """
985         assert(type(metapairs) is dict)
986         r = self.account_post(update=True, metadata=metapairs)
987         return r.headers
988
989     def del_account_meta(self, metakey):
990         """
991         :param metakey: (str) metadatum key
992         """
993         r = self.account_post(update=True, metadata={metakey: ''})
994         return r.headers
995
996     """
997     def set_account_quota(self, quota):
998         ""
999         :param quota: (int)
1000         ""
1001         self.account_post(update=True, quota=quota)
1002     """
1003
1004     def set_account_versioning(self, versioning):
1005         """
1006         "param versioning: (str)
1007         """
1008         r = self.account_post(update=True, versioning=versioning)
1009         return r.headers
1010
1011     def list_containers(self):
1012         """
1013         :returns: (dict)
1014         """
1015         r = self.account_get()
1016         return r.json
1017
1018     def del_container(self, until=None, delimiter=None):
1019         """
1020         :param until: (str) formated date
1021
1022         :param delimiter: (str) with / empty container
1023
1024         :raises ClientError: 404 Container does not exist
1025
1026         :raises ClientError: 409 Container is not empty
1027         """
1028         self._assert_container()
1029         r = self.container_delete(
1030             until=until,
1031             delimiter=delimiter,
1032             success=(204, 404, 409))
1033         if r.status_code == 404:
1034             raise ClientError(
1035                 'Container "%s" does not exist' % self.container,
1036                 r.status_code)
1037         elif r.status_code == 409:
1038             raise ClientError(
1039                 'Container "%s" is not empty' % self.container,
1040                 r.status_code)
1041         return r.headers
1042
1043     def get_container_versioning(self, container=None):
1044         """
1045         :param container: (str)
1046
1047         :returns: (dict)
1048         """
1049         cnt_back_up = self.container
1050         try:
1051             self.container = container or cnt_back_up
1052             return filter_in(
1053                 self.get_container_info(),
1054                 'X-Container-Policy-Versioning')
1055         finally:
1056             self.container = cnt_back_up
1057
1058     def get_container_limit(self, container=None):
1059         """
1060         :param container: (str)
1061
1062         :returns: (dict)
1063         """
1064         cnt_back_up = self.container
1065         try:
1066             self.container = container or cnt_back_up
1067             return filter_in(
1068                 self.get_container_info(),
1069                 'X-Container-Policy-Quota')
1070         finally:
1071             self.container = cnt_back_up
1072
1073     def get_container_info(self, until=None):
1074         """
1075         :param until: (str) formated date
1076
1077         :returns: (dict)
1078
1079         :raises ClientError: 404 Container not found
1080         """
1081         try:
1082             r = self.container_head(until=until)
1083         except ClientError as err:
1084             err.details.append('for container %s' % self.container)
1085             raise err
1086         return r.headers
1087
1088     def get_container_meta(self, until=None):
1089         """
1090         :param until: (str) formated date
1091
1092         :returns: (dict)
1093         """
1094         return filter_in(
1095             self.get_container_info(until=until),
1096             'X-Container-Meta')
1097
1098     def get_container_object_meta(self, until=None):
1099         """
1100         :param until: (str) formated date
1101
1102         :returns: (dict)
1103         """
1104         return filter_in(
1105             self.get_container_info(until=until),
1106             'X-Container-Object-Meta')
1107
1108     def set_container_meta(self, metapairs):
1109         """
1110         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1111         """
1112         assert(type(metapairs) is dict)
1113         r = self.container_post(update=True, metadata=metapairs)
1114         return r.headers
1115
1116     def del_container_meta(self, metakey):
1117         """
1118         :param metakey: (str) metadatum key
1119
1120         :returns: (dict) response headers
1121         """
1122         r = self.container_post(update=True, metadata={metakey: ''})
1123         return r.headers
1124
1125     def set_container_limit(self, limit):
1126         """
1127         :param limit: (int)
1128         """
1129         r = self.container_post(update=True, quota=limit)
1130         return r.headers
1131
1132     def set_container_versioning(self, versioning):
1133         """
1134         :param versioning: (str)
1135         """
1136         r = self.container_post(update=True, versioning=versioning)
1137         return r.headers
1138
1139     def del_object(self, obj, until=None, delimiter=None):
1140         """
1141         :param obj: (str) remote object path
1142
1143         :param until: (str) formated date
1144
1145         :param delimiter: (str)
1146         """
1147         self._assert_container()
1148         r = self.object_delete(obj, until=until, delimiter=delimiter)
1149         return r.headers
1150
1151     def set_object_meta(self, obj, metapairs):
1152         """
1153         :param obj: (str) remote object path
1154
1155         :param metapairs: (dict) {key1:val1, key2:val2, ...}
1156         """
1157         assert(type(metapairs) is dict)
1158         r = self.object_post(obj, update=True, metadata=metapairs)
1159         return r.headers
1160
1161     def del_object_meta(self, obj, metakey):
1162         """
1163         :param obj: (str) remote object path
1164
1165         :param metakey: (str) metadatum key
1166         """
1167         r = self.object_post(obj, update=True, metadata={metakey: ''})
1168         return r.headers
1169
1170     def publish_object(self, obj):
1171         """
1172         :param obj: (str) remote object path
1173
1174         :returns: (str) access url
1175         """
1176         self.object_post(obj, update=True, public=True)
1177         info = self.get_object_info(obj)
1178         pref, sep, rest = self.base_url.partition('//')
1179         base = rest.split('/')[0]
1180         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1181
1182     def unpublish_object(self, obj):
1183         """
1184         :param obj: (str) remote object path
1185         """
1186         r = self.object_post(obj, update=True, public=False)
1187         return r.headers
1188
1189     def get_object_info(self, obj, version=None):
1190         """
1191         :param obj: (str) remote object path
1192
1193         :param version: (str)
1194
1195         :returns: (dict)
1196         """
1197         try:
1198             r = self.object_head(obj, version=version)
1199             return r.headers
1200         except ClientError as ce:
1201             if ce.status == 404:
1202                 raise ClientError('Object %s not found' % obj, status=404)
1203             raise
1204
1205     def get_object_meta(self, obj, version=None):
1206         """
1207         :param obj: (str) remote object path
1208
1209         :param version: (str)
1210
1211         :returns: (dict)
1212         """
1213         return filter_in(
1214             self.get_object_info(obj, version=version),
1215             'X-Object-Meta')
1216
1217     def get_object_sharing(self, obj):
1218         """
1219         :param obj: (str) remote object path
1220
1221         :returns: (dict)
1222         """
1223         r = filter_in(
1224             self.get_object_info(obj),
1225             'X-Object-Sharing',
1226             exactMatch=True)
1227         reply = {}
1228         if len(r) > 0:
1229             perms = r['x-object-sharing'].split(';')
1230             for perm in perms:
1231                 try:
1232                     perm.index('=')
1233                 except ValueError:
1234                     raise ClientError('Incorrect reply format')
1235                 (key, val) = perm.strip().split('=')
1236                 reply[key] = val
1237         return reply
1238
1239     def set_object_sharing(
1240             self, obj,
1241             read_permition=False, write_permition=False):
1242         """Give read/write permisions to an object.
1243
1244         :param obj: (str) remote object path
1245
1246         :param read_permition: (list - bool) users and user groups that get
1247             read permition for this object - False means all previous read
1248             permissions will be removed
1249
1250         :param write_perimition: (list - bool) of users and user groups to get
1251            write permition for this object - False means all previous write
1252            permissions will be removed
1253
1254         :returns: (dict) response headers
1255         """
1256
1257         perms = dict(read=read_permition or '', write=write_permition or '')
1258         r = self.object_post(obj, update=True, permissions=perms)
1259         return r.headers
1260
1261     def del_object_sharing(self, obj):
1262         """
1263         :param obj: (str) remote object path
1264         """
1265         return self.set_object_sharing(obj)
1266
1267     def append_object(self, obj, source_file, upload_cb=None):
1268         """
1269         :param obj: (str) remote object path
1270
1271         :param source_file: open file descriptor
1272
1273         :param upload_db: progress.bar for uploading
1274         """
1275
1276         self._assert_container()
1277         meta = self.get_container_info()
1278         blocksize = int(meta['x-container-block-size'])
1279         filesize = fstat(source_file.fileno()).st_size
1280         nblocks = 1 + (filesize - 1) // blocksize
1281         offset = 0
1282         headers = {}
1283         if upload_cb:
1284             self.progress_bar_gen = upload_cb(nblocks)
1285             self._cb_next()
1286         flying = {}
1287         self._init_thread_limit()
1288         try:
1289             for i in range(nblocks):
1290                 block = source_file.read(min(blocksize, filesize - offset))
1291                 offset += len(block)
1292
1293                 self._watch_thread_limit(flying.values())
1294                 unfinished = {}
1295                 flying[i] = SilentEvent(
1296                     method=self.object_post,
1297                     obj=obj,
1298                     update=True,
1299                     content_range='bytes */*',
1300                     content_type='application/octet-stream',
1301                     content_length=len(block),
1302                     data=block)
1303                 flying[i].start()
1304
1305                 for key, thread in flying.items():
1306                     if thread.isAlive():
1307                         if i < nblocks:
1308                             unfinished[key] = thread
1309                             continue
1310                         thread.join()
1311                     if thread.exception:
1312                         raise thread.exception
1313                     headers[key] = thread.value.headers
1314                     self._cb_next()
1315                 flying = unfinished
1316         except KeyboardInterrupt:
1317             sendlog.info('- - - wait for threads to finish')
1318             for thread in activethreads():
1319                 thread.join()
1320         return headers.values()
1321
1322     def truncate_object(self, obj, upto_bytes):
1323         """
1324         :param obj: (str) remote object path
1325
1326         :param upto_bytes: max number of bytes to leave on file
1327
1328         :returns: (dict) response headers
1329         """
1330         r = self.object_post(
1331             obj,
1332             update=True,
1333             content_range='bytes 0-%s/*' % upto_bytes,
1334             content_type='application/octet-stream',
1335             object_bytes=upto_bytes,
1336             source_object=path4url(self.container, obj))
1337         return r.headers
1338
1339     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1340         """Overwrite a part of an object from local source file
1341
1342         :param obj: (str) remote object path
1343
1344         :param start: (int) position in bytes to start overwriting from
1345
1346         :param end: (int) position in bytes to stop overwriting at
1347
1348         :param source_file: open file descriptor
1349
1350         :param upload_db: progress.bar for uploading
1351         """
1352
1353         r = self.get_object_info(obj)
1354         rf_size = int(r['content-length'])
1355         if rf_size < int(start):
1356             raise ClientError(
1357                 'Range start exceeds file size',
1358                 status=416)
1359         elif rf_size < int(end):
1360             raise ClientError(
1361                 'Range end exceeds file size',
1362                 status=416)
1363         self._assert_container()
1364         meta = self.get_container_info()
1365         blocksize = int(meta['x-container-block-size'])
1366         filesize = fstat(source_file.fileno()).st_size
1367         datasize = int(end) - int(start) + 1
1368         nblocks = 1 + (datasize - 1) // blocksize
1369         offset = 0
1370         if upload_cb:
1371             self.progress_bar_gen = upload_cb(nblocks)
1372             self._cb_next()
1373         headers = []
1374         for i in range(nblocks):
1375             read_size = min(blocksize, filesize - offset, datasize - offset)
1376             block = source_file.read(read_size)
1377             r = self.object_post(
1378                 obj,
1379                 update=True,
1380                 content_type='application/octet-stream',
1381                 content_length=len(block),
1382                 content_range='bytes %s-%s/*' % (
1383                     start + offset,
1384                     start + offset + len(block) - 1),
1385                 data=block)
1386             headers.append(dict(r.headers))
1387             offset += len(block)
1388
1389             self._cb_next
1390         return headers
1391
1392     def copy_object(
1393             self, src_container, src_object, dst_container,
1394             dst_object=None,
1395             source_version=None,
1396             source_account=None,
1397             public=False,
1398             content_type=None,
1399             delimiter=None):
1400         """
1401         :param src_container: (str) source container
1402
1403         :param src_object: (str) source object path
1404
1405         :param dst_container: (str) destination container
1406
1407         :param dst_object: (str) destination object path
1408
1409         :param source_version: (str) source object version
1410
1411         :param source_account: (str) account to copy from
1412
1413         :param public: (bool)
1414
1415         :param content_type: (str)
1416
1417         :param delimiter: (str)
1418
1419         :returns: (dict) response headers
1420         """
1421         self._assert_account()
1422         self.container = dst_container
1423         src_path = path4url(src_container, src_object)
1424         r = self.object_put(
1425             dst_object or src_object,
1426             success=201,
1427             copy_from=src_path,
1428             content_length=0,
1429             source_version=source_version,
1430             source_account=source_account,
1431             public=public,
1432             content_type=content_type,
1433             delimiter=delimiter)
1434         return r.headers
1435
1436     def move_object(
1437             self, src_container, src_object, dst_container,
1438             dst_object=False,
1439             source_account=None,
1440             source_version=None,
1441             public=False,
1442             content_type=None,
1443             delimiter=None):
1444         """
1445         :param src_container: (str) source container
1446
1447         :param src_object: (str) source object path
1448
1449         :param dst_container: (str) destination container
1450
1451         :param dst_object: (str) destination object path
1452
1453         :param source_account: (str) account to move from
1454
1455         :param source_version: (str) source object version
1456
1457         :param public: (bool)
1458
1459         :param content_type: (str)
1460
1461         :param delimiter: (str)
1462
1463         :returns: (dict) response headers
1464         """
1465         self._assert_account()
1466         self.container = dst_container
1467         dst_object = dst_object or src_object
1468         src_path = path4url(src_container, src_object)
1469         r = self.object_put(
1470             dst_object,
1471             success=201,
1472             move_from=src_path,
1473             content_length=0,
1474             source_account=source_account,
1475             source_version=source_version,
1476             public=public,
1477             content_type=content_type,
1478             delimiter=delimiter)
1479         return r.headers
1480
1481     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1482         """Get accounts that share with self.account
1483
1484         :param limit: (str)
1485
1486         :param marker: (str)
1487
1488         :returns: (dict)
1489         """
1490         self._assert_account()
1491
1492         self.set_param('format', 'json')
1493         self.set_param('limit', limit, iff=limit is not None)
1494         self.set_param('marker', marker, iff=marker is not None)
1495
1496         path = ''
1497         success = kwargs.pop('success', (200, 204))
1498         r = self.get(path, *args, success=success, **kwargs)
1499         return r.json
1500
1501     def get_object_versionlist(self, obj):
1502         """
1503         :param obj: (str) remote object path
1504
1505         :returns: (list)
1506         """
1507         self._assert_container()
1508         r = self.object_get(obj, format='json', version='list')
1509         return r.json['versions']