1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos_rest_api import PithosRestAPI
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestAPI):
69 """GRNet Pithos API client"""
71 _thread_exceptions = []
73 def __init__(self, base_url, token, account=None, container=None):
74 super(PithosClient, self).__init__(base_url, token, account, container)
76 def purge_container(self):
77 """Delete an empty container and destroy associated blocks
79 r = self.container_delete(until=unicode(time()))
82 def upload_object_unchunked(
87 content_encoding=None,
88 content_disposition=None,
93 :param obj: (str) remote object path
95 :param f: open file descriptor
97 :param withHashFile: (bool)
99 :param size: (int) size of data to upload
103 :param content_encoding: (str)
105 :param content_disposition: (str)
107 :param content_type: (str)
109 :param sharing: {'read':[user and/or grp names],
110 'write':[usr and/or grp names]}
112 :param public: (bool)
114 self._assert_container()
120 data = json.dumps(json.loads(data))
122 raise ClientError('"%s" is not json-formated' % f.name, 1)
124 msg = '"%s" is not a valid hashmap file' % f.name
125 raise ClientError(msg, 1)
127 data = f.read(size) if size is not None else f.read()
132 content_encoding=content_encoding,
133 content_disposition=content_disposition,
134 content_type=content_type,
140 def create_object_by_manifestation(
143 content_encoding=None,
144 content_disposition=None,
149 :param obj: (str) remote object path
153 :param content_encoding: (str)
155 :param content_disposition: (str)
157 :param content_type: (str)
159 :param sharing: {'read':[user and/or grp names],
160 'write':[usr and/or grp names]}
162 :param public: (bool)
164 self._assert_container()
169 content_encoding=content_encoding,
170 content_disposition=content_disposition,
171 content_type=content_type,
174 manifest='%s/%s' % (self.container, obj))
177 # upload_* auxiliary methods
178 def _put_block_async(self, data, hash, upload_gen=None):
179 event = SilentEvent(method=self._put_block, data=data, hash=hash)
183 def _put_block(self, data, hash):
184 from random import randint
185 if not randint(0, 7):
186 raise ClientError('BAD GATEWAY STUFF', 503)
187 r = self.container_post(
189 content_type='application/octet-stream',
190 content_length=len(data),
193 assert r.json[0] == hash, 'Local hash does not match server'
195 def _get_file_block_info(self, fileobj, size=None):
196 meta = self.get_container_info()
197 blocksize = int(meta['x-container-block-size'])
198 blockhash = meta['x-container-block-hash']
199 size = size if size is not None else fstat(fileobj.fileno()).st_size
200 nblocks = 1 + (size - 1) // blocksize
201 return (blocksize, blockhash, size, nblocks)
203 def _get_missing_hashes(
210 content_encoding=None,
211 content_disposition=None,
219 content_type=content_type,
222 content_encoding=content_encoding,
223 content_disposition=content_disposition,
224 permissions=permissions,
227 if r.status_code == 201:
232 def _caclulate_uploaded_blocks(
233 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
237 hash_gen = hash_cb(nblocks)
240 for i in range(nblocks):
241 block = fileobj.read(min(blocksize, size - offset))
243 hash = _pithos_hash(block, blockhash)
245 hmap[hash] = (offset, bytes)
249 msg = 'Failed to calculate uploaded blocks:'
250 msg += ' Offset and object size do not match'
251 assert offset == size, msg
253 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
254 """upload missing blocks asynchronously"""
256 self._init_thread_limit()
261 offset, bytes = hmap[hash]
263 data = fileobj.read(bytes)
264 r = self._put_block_async(data, hash, upload_gen)
266 unfinished = self._watch_thread_limit(flying)
267 for thread in set(flying).difference(unfinished):
269 failures.append(thread)
272 ClientError) and thread.exception.status == 502:
273 self.POOLSIZE = self._thread_limit
274 elif thread.isAlive():
275 flying.append(thread)
283 for thread in flying:
286 failures.append(thread)
293 return [failure.kwargs['hash'] for failure in failures]
301 content_encoding=None,
302 content_disposition=None,
306 """Upload an object using multiple connections (threads)
308 :param obj: (str) remote object path
310 :param f: open file descriptor (rb)
312 :param hash_cb: optional progress.bar object for calculating hashes
314 :param upload_cb: optional progress.bar object for uploading
318 :param content_encoding: (str)
320 :param content_disposition: (str)
322 :param content_type: (str)
324 :param sharing: {'read':[user and/or grp names],
325 'write':[usr and/or grp names]}
327 :param public: (bool)
329 self._assert_container()
332 block_info = (blocksize, blockhash, size, nblocks) =\
333 self._get_file_block_info(f, size)
334 (hashes, hmap, offset) = ([], {}, 0)
335 if content_type is None:
336 content_type = 'application/octet-stream'
338 self._caclulate_uploaded_blocks(
345 hashmap = dict(bytes=size, hashes=hashes)
346 missing = self._get_missing_hashes(
348 content_type=content_type,
351 content_encoding=content_encoding,
352 content_disposition=content_disposition,
360 upload_gen = upload_cb(len(missing))
361 for i in range(len(missing), len(hashmap['hashes']) + 1):
372 sendlog.info('%s blocks missing' % len(missing))
373 num_of_blocks = len(missing)
374 missing = self._upload_missing_blocks(
380 if num_of_blocks == len(missing):
383 num_of_blocks = len(missing)
388 '%s blocks failed to upload' % len(missing),
390 except KeyboardInterrupt:
391 sendlog.info('- - - wait for threads to finish')
392 for thread in activethreads():
400 content_type=content_type,
405 # download_* auxiliary methods
406 def _get_remote_blocks_info(self, obj, **restargs):
407 #retrieve object hashmap
408 myrange = restargs.pop('data_range', None)
409 hashmap = self.get_object_hashmap(obj, **restargs)
410 restargs['data_range'] = myrange
411 blocksize = int(hashmap['block_size'])
412 blockhash = hashmap['block_hash']
413 total_size = hashmap['bytes']
414 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
416 for i, h in enumerate(hashmap['hashes']):
418 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
420 def _dump_blocks_sync(
421 self, obj, remote_hashes, blocksize, total_size, dst, range,
423 for blockid, blockhash in enumerate(remote_hashes):
425 start = blocksize * blockid
426 is_last = start + blocksize > total_size
427 end = (total_size - 1) if is_last else (start + blocksize - 1)
428 (start, end) = _range_up(start, end, range)
429 args['data_range'] = 'bytes=%s-%s' % (start, end)
430 r = self.object_get(obj, success=(200, 206), **args)
435 def _get_block_async(self, obj, **args):
436 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
440 def _hash_from_file(self, fp, start, size, blockhash):
442 block = fp.read(size)
443 h = newhashlib(blockhash)
444 h.update(block.strip('\x00'))
445 return hexlify(h.digest())
447 def _thread2file(self, flying, local_file, offset=0, **restargs):
448 """write the results of a greenleted rest call to a file
450 :param offset: the offset of the file up to blocksize
451 - e.g. if the range is 10-100, all blocks will be written to
455 for i, (start, g) in enumerate(flying.items()):
459 block = g.value.content
460 local_file.seek(start - offset)
461 local_file.write(block)
463 finished.append(flying.pop(start))
467 def _dump_blocks_async(
468 self, obj, remote_hashes, blocksize, total_size, local_file,
469 blockhash=None, resume=False, filerange=None, **restargs):
470 file_size = fstat(local_file.fileno()).st_size if resume else 0
474 if filerange is not None:
475 rstart = int(filerange.split('-')[0])
476 offset = rstart if blocksize > rstart else rstart % blocksize
478 self._init_thread_limit()
479 for block_hash, blockid in remote_hashes.items():
480 start = blocksize * blockid
481 if start < file_size and block_hash == self._hash_from_file(
482 local_file, start, blocksize, blockhash):
485 self._watch_thread_limit(flying.values())
486 finished += self._thread2file(
491 end = total_size - 1 if start + blocksize > total_size\
492 else start + blocksize - 1
493 (start, end) = _range_up(start, end, filerange)
497 restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
498 flying[start] = self._get_block_async(obj, **restargs)
500 for thread in flying.values():
502 finished += self._thread2file(flying, local_file, offset, **restargs)
512 if_modified_since=None,
513 if_unmodified_since=None):
514 """Download an object (multiple connections, random blocks)
516 :param obj: (str) remote object path
518 :param dst: open file descriptor (wb+)
520 :param download_cb: optional progress.bar object for downloading
522 :param version: (str) file version
524 :param resume: (bool) if set, preserve already downloaded file parts
526 :param range_str: (str) from, to are file positions (int) in bytes
528 :param if_match: (str)
530 :param if_none_match: (str)
532 :param if_modified_since: (str) formated date
534 :param if_unmodified_since: (str) formated date"""
537 data_range=None if range_str is None else 'bytes=%s' % range_str,
539 if_none_match=if_none_match,
540 if_modified_since=if_modified_since,
541 if_unmodified_since=if_unmodified_since)
548 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
549 assert total_size >= 0
552 self.progress_bar_gen = download_cb(len(remote_hashes))
556 self._dump_blocks_sync(
565 self._dump_blocks_async(
576 dst.truncate(total_size)
580 #Command Progress Bar method
582 if hasattr(self, 'progress_bar_gen'):
584 self.progress_bar_gen.next()
588 def _complete_cb(self):
591 self.progress_bar_gen.next()
595 def get_object_hashmap(
600 if_modified_since=None,
601 if_unmodified_since=None,
604 :param obj: (str) remote object path
606 :param if_match: (str)
608 :param if_none_match: (str)
610 :param if_modified_since: (str) formated date
612 :param if_unmodified_since: (str) formated date
614 :param data_range: (str) from-to where from and to are integers
615 denoting file positions in bytes
624 if_etag_match=if_match,
625 if_etag_not_match=if_none_match,
626 if_modified_since=if_modified_since,
627 if_unmodified_since=if_unmodified_since,
628 data_range=data_range)
629 except ClientError as err:
630 if err.status == 304 or err.status == 412:
635 def set_account_group(self, group, usernames):
639 :param usernames: (list)
641 r = self.account_post(update=True, groups={group: usernames})
644 def del_account_group(self, group):
648 r = self.account_post(update=True, groups={group: []})
651 def get_account_info(self, until=None):
653 :param until: (str) formated date
657 r = self.account_head(until=until)
658 if r.status_code == 401:
659 raise ClientError("No authorization")
662 def get_account_quota(self):
667 self.get_account_info(),
668 'X-Account-Policy-Quota',
671 def get_account_versioning(self):
676 self.get_account_info(),
677 'X-Account-Policy-Versioning',
680 def get_account_meta(self, until=None):
682 :meta until: (str) formated date
686 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
688 def get_account_group(self):
692 return filter_in(self.get_account_info(), 'X-Account-Group-')
694 def set_account_meta(self, metapairs):
696 :param metapairs: (dict) {key1:val1, key2:val2, ...}
698 assert(type(metapairs) is dict)
699 r = self.account_post(update=True, metadata=metapairs)
702 def del_account_meta(self, metakey):
704 :param metakey: (str) metadatum key
706 r = self.account_post(update=True, metadata={metakey: ''})
709 def set_account_quota(self, quota):
713 r = self.account_post(update=True, quota=quota)
716 def set_account_versioning(self, versioning):
718 "param versioning: (str)
720 r = self.account_post(update=True, versioning=versioning)
723 def list_containers(self):
727 r = self.account_get()
730 def del_container(self, until=None, delimiter=None):
732 :param until: (str) formated date
734 :param delimiter: (str) with / empty container
736 :raises ClientError: 404 Container does not exist
738 :raises ClientError: 409 Container is not empty
740 self._assert_container()
741 r = self.container_delete(
744 success=(204, 404, 409))
746 if r.status_code == 404:
748 'Container "%s" does not exist' % self.container,
750 elif r.status_code == 409:
752 'Container "%s" is not empty' % self.container,
755 def get_container_versioning(self, container):
757 :param container: (str)
761 self.container = container
763 self.get_container_info(),
764 'X-Container-Policy-Versioning')
766 def get_container_quota(self, container):
768 :param container: (str)
772 self.container = container
773 return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
775 def get_container_info(self, until=None):
777 :param until: (str) formated date
781 :raises ClientError: 404 Container not found
784 r = self.container_head(until=until)
785 except ClientError as err:
786 err.details.append('for container %s' % self.container)
790 def get_container_meta(self, until=None):
792 :param until: (str) formated date
797 self.get_container_info(until=until),
800 def get_container_object_meta(self, until=None):
802 :param until: (str) formated date
807 self.get_container_info(until=until),
808 'X-Container-Object-Meta')
810 def set_container_meta(self, metapairs):
812 :param metapairs: (dict) {key1:val1, key2:val2, ...}
814 assert(type(metapairs) is dict)
815 r = self.container_post(update=True, metadata=metapairs)
818 def del_container_meta(self, metakey):
820 :param metakey: (str) metadatum key
822 r = self.container_post(update=True, metadata={metakey: ''})
825 def set_container_quota(self, quota):
829 r = self.container_post(update=True, quota=quota)
832 def set_container_versioning(self, versioning):
834 :param versioning: (str)
836 r = self.container_post(update=True, versioning=versioning)
839 def del_object(self, obj, until=None, delimiter=None):
841 :param obj: (str) remote object path
843 :param until: (str) formated date
845 :param delimiter: (str)
847 self._assert_container()
848 r = self.object_delete(obj, until=until, delimiter=delimiter)
851 def set_object_meta(self, obj, metapairs):
853 :param obj: (str) remote object path
855 :param metapairs: (dict) {key1:val1, key2:val2, ...}
857 assert(type(metapairs) is dict)
858 r = self.object_post(obj, update=True, metadata=metapairs)
861 def del_object_meta(self, obj, metakey):
863 :param obj: (str) remote object path
865 :param metakey: (str) metadatum key
867 r = self.object_post(obj, update=True, metadata={metakey: ''})
870 def publish_object(self, obj):
872 :param obj: (str) remote object path
874 :returns: (str) access url
876 r = self.object_post(obj, update=True, public=True)
878 info = self.get_object_info(obj)
879 pref, sep, rest = self.base_url.partition('//')
880 base = rest.split('/')[0]
882 '%s%s%s' % (pref, sep, base),
883 info['x-object-public'])
886 def unpublish_object(self, obj):
888 :param obj: (str) remote object path
890 r = self.object_post(obj, update=True, public=False)
893 def get_object_info(self, obj, version=None):
895 :param obj: (str) remote object path
897 :param version: (str)
902 r = self.object_head(obj, version=version)
904 except ClientError as ce:
906 raise ClientError('Object not found', status=404)
909 def get_object_meta(self, obj, version=None):
911 :param obj: (str) remote object path
913 :param version: (str)
918 self.get_object_info(obj, version=version),
921 def get_object_sharing(self, obj):
923 :param obj: (str) remote object path
928 self.get_object_info(obj),
933 perms = r['x-object-sharing'].split(';')
938 raise ClientError('Incorrect reply format')
939 (key, val) = perm.strip().split('=')
943 def set_object_sharing(
945 read_permition=False, write_permition=False):
946 """Give read/write permisions to an object.
948 :param obj: (str) remote object path
950 :param read_permition: (list - bool) users and user groups that get
951 read permition for this object - False means all previous read
952 permissions will be removed
954 :param write_perimition: (list - bool) of users and user groups to get
955 write permition for this object - False means all previous write
956 permissions will be removed
960 read='' if not read_permition else read_permition,
961 write='' if not write_permition else write_permition)
962 r = self.object_post(obj, update=True, permissions=perms)
965 def del_object_sharing(self, obj):
967 :param obj: (str) remote object path
969 self.set_object_sharing(obj)
971 def append_object(self, obj, source_file, upload_cb=None):
973 :param obj: (str) remote object path
975 :param source_file: open file descriptor
977 :param upload_db: progress.bar for uploading
980 self._assert_container()
981 meta = self.get_container_info()
982 blocksize = int(meta['x-container-block-size'])
983 filesize = fstat(source_file.fileno()).st_size
984 nblocks = 1 + (filesize - 1) // blocksize
987 upload_gen = upload_cb(nblocks)
989 for i in range(nblocks):
990 block = source_file.read(min(blocksize, filesize - offset))
992 r = self.object_post(
995 content_range='bytes */*',
996 content_type='application/octet-stream',
997 content_length=len(block),
1004 def truncate_object(self, obj, upto_bytes):
1006 :param obj: (str) remote object path
1008 :param upto_bytes: max number of bytes to leave on file
1010 r = self.object_post(
1013 content_range='bytes 0-%s/*' % upto_bytes,
1014 content_type='application/octet-stream',
1015 object_bytes=upto_bytes,
1016 source_object=path4url(self.container, obj))
1019 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1020 """Overwrite a part of an object from local source file
1022 :param obj: (str) remote object path
1024 :param start: (int) position in bytes to start overwriting from
1026 :param end: (int) position in bytes to stop overwriting at
1028 :param source_file: open file descriptor
1030 :param upload_db: progress.bar for uploading
1033 r = self.get_object_info(obj)
1034 rf_size = int(r['content-length'])
1035 if rf_size < int(start):
1037 'Range start exceeds file size',
1039 elif rf_size < int(end):
1041 'Range end exceeds file size',
1043 self._assert_container()
1044 meta = self.get_container_info()
1045 blocksize = int(meta['x-container-block-size'])
1046 filesize = fstat(source_file.fileno()).st_size
1047 datasize = int(end) - int(start) + 1
1048 nblocks = 1 + (datasize - 1) // blocksize
1051 upload_gen = upload_cb(nblocks)
1053 for i in range(nblocks):
1054 read_size = min(blocksize, filesize - offset, datasize - offset)
1055 block = source_file.read(read_size)
1056 r = self.object_post(
1059 content_type='application/octet-stream',
1060 content_length=len(block),
1061 content_range='bytes %s-%s/*' % (
1063 start + offset + len(block) - 1),
1065 offset += len(block)
1072 self, src_container, src_object, dst_container,
1074 source_version=None,
1079 :param src_container: (str) source container
1081 :param src_object: (str) source object path
1083 :param dst_container: (str) destination container
1085 :param dst_object: (str) destination object path
1087 :param source_version: (str) source object version
1089 :param public: (bool)
1091 :param content_type: (str)
1093 :param delimiter: (str)
1095 self._assert_account()
1096 self.container = dst_container
1097 dst_object = dst_object or src_object
1098 src_path = path4url(src_container, src_object)
1099 r = self.object_put(
1104 source_version=source_version,
1106 content_type=content_type,
1107 delimiter=delimiter)
1111 self, src_container, src_object, dst_container,
1113 source_version=None,
1118 :param src_container: (str) source container
1120 :param src_object: (str) source object path
1122 :param dst_container: (str) destination container
1124 :param dst_object: (str) destination object path
1126 :param source_version: (str) source object version
1128 :param public: (bool)
1130 :param content_type: (str)
1132 :param delimiter: (str)
1134 self._assert_account()
1135 self.container = dst_container
1136 dst_object = dst_object or src_object
1137 src_path = path4url(src_container, src_object)
1138 r = self.object_put(
1143 source_version=source_version,
1145 content_type=content_type,
1146 delimiter=delimiter)
1149 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1150 """Get accounts that share with self.account
1154 :param marker: (str)
1158 self._assert_account()
1160 self.set_param('format', 'json')
1161 self.set_param('limit', limit, iff=limit is not None)
1162 self.set_param('marker', marker, iff=marker is not None)
1165 success = kwargs.pop('success', (200, 204))
1166 r = self.get(path, *args, success=success, **kwargs)
1169 def get_object_versionlist(self, obj):
1171 :param obj: (str) remote object path
1175 self._assert_container()
1176 r = self.object_get(obj, format='json', version='list')
1177 return r.json['versions']