1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """GRNet Pithos API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
74 def purge_container(self, container=None):
75 """Delete an empty container and destroy associated blocks
77 cnt_back_up = self.container
79 self.container = container or cnt_back_up
80 self.container_delete(until=unicode(time()))
82 self.container = cnt_back_up
84 def upload_object_unchunked(
89 content_encoding=None,
90 content_disposition=None,
95 :param obj: (str) remote object path
97 :param f: open file descriptor
99 :param withHashFile: (bool)
101 :param size: (int) size of data to upload
105 :param content_encoding: (str)
107 :param content_disposition: (str)
109 :param content_type: (str)
111 :param sharing: {'read':[user and/or grp names],
112 'write':[usr and/or grp names]}
114 :param public: (bool)
116 self._assert_container()
122 data = json.dumps(json.loads(data))
124 raise ClientError('"%s" is not json-formated' % f.name, 1)
126 msg = '"%s" is not a valid hashmap file' % f.name
127 raise ClientError(msg, 1)
130 data = f.read(size) if size else f.read()
135 content_encoding=content_encoding,
136 content_disposition=content_disposition,
137 content_type=content_type,
142 def create_object_by_manifestation(
145 content_encoding=None,
146 content_disposition=None,
151 :param obj: (str) remote object path
155 :param content_encoding: (str)
157 :param content_disposition: (str)
159 :param content_type: (str)
161 :param sharing: {'read':[user and/or grp names],
162 'write':[usr and/or grp names]}
164 :param public: (bool)
166 self._assert_container()
171 content_encoding=content_encoding,
172 content_disposition=content_disposition,
173 content_type=content_type,
176 manifest='%s/%s' % (self.container, obj))
178 # upload_* auxiliary methods
179 def _put_block_async(self, data, hash, upload_gen=None):
180 event = SilentEvent(method=self._put_block, data=data, hash=hash)
184 def _put_block(self, data, hash):
185 r = self.container_post(
187 content_type='application/octet-stream',
188 content_length=len(data),
191 assert r.json[0] == hash, 'Local hash does not match server'
193 def _get_file_block_info(self, fileobj, size=None):
194 meta = self.get_container_info()
195 blocksize = int(meta['x-container-block-size'])
196 blockhash = meta['x-container-block-hash']
197 size = size if size is not None else fstat(fileobj.fileno()).st_size
198 nblocks = 1 + (size - 1) // blocksize
199 return (blocksize, blockhash, size, nblocks)
201 def _get_missing_hashes(
207 content_encoding=None,
208 content_disposition=None,
216 content_type=content_type,
218 content_encoding=content_encoding,
219 content_disposition=content_disposition,
220 permissions=permissions,
223 return None if r.status_code == 201 else r.json
225 def _culculate_blocks_for_upload(
226 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
230 hash_gen = hash_cb(nblocks)
233 for i in range(nblocks):
234 block = fileobj.read(min(blocksize, size - offset))
236 hash = _pithos_hash(block, blockhash)
238 hmap[hash] = (offset, bytes)
242 msg = 'Failed to calculate uploaded blocks:'
243 ' Offset and object size do not match'
244 assert offset == size, msg
246 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
247 """upload missing blocks asynchronously"""
249 self._init_thread_limit()
254 offset, bytes = hmap[hash]
256 data = fileobj.read(bytes)
257 r = self._put_block_async(data, hash, upload_gen)
259 unfinished = self._watch_thread_limit(flying)
260 for thread in set(flying).difference(unfinished):
262 failures.append(thread)
265 ClientError) and thread.exception.status == 502:
266 self.POOLSIZE = self._thread_limit
267 elif thread.isAlive():
268 flying.append(thread)
276 for thread in flying:
279 failures.append(thread)
286 return [failure.kwargs['hash'] for failure in failures]
296 content_encoding=None,
297 content_disposition=None,
301 """Upload an object using multiple connections (threads)
303 :param obj: (str) remote object path
305 :param f: open file descriptor (rb)
307 :param hash_cb: optional progress.bar object for calculating hashes
309 :param upload_cb: optional progress.bar object for uploading
313 :param if_etag_match: (str) Push that value to if-match header at file
316 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
317 it does not exist remotely, otherwise the operation will fail.
318 Involves the case of an object with the same path is created while
319 the object is being uploaded.
321 :param content_encoding: (str)
323 :param content_disposition: (str)
325 :param content_type: (str)
327 :param sharing: {'read':[user and/or grp names],
328 'write':[usr and/or grp names]}
330 :param public: (bool)
332 self._assert_container()
335 block_info = (blocksize, blockhash, size, nblocks) =\
336 self._get_file_block_info(f, size)
337 (hashes, hmap, offset) = ([], {}, 0)
339 content_type = 'application/octet-stream'
341 self._culculate_blocks_for_upload(
348 hashmap = dict(bytes=size, hashes=hashes)
349 missing = self._get_missing_hashes(
351 content_type=content_type,
353 content_encoding=content_encoding,
354 content_disposition=content_disposition,
362 upload_gen = upload_cb(len(missing))
363 for i in range(len(missing), len(hashmap['hashes']) + 1):
374 sendlog.info('%s blocks missing' % len(missing))
375 num_of_blocks = len(missing)
376 missing = self._upload_missing_blocks(
382 if num_of_blocks == len(missing):
385 num_of_blocks = len(missing)
390 '%s blocks failed to upload' % len(missing),
392 except KeyboardInterrupt:
393 sendlog.info('- - - wait for threads to finish')
394 for thread in activethreads():
402 content_type=content_type,
403 if_etag_match=if_etag_match,
404 if_etag_not_match='*' if if_not_exist else None,
411 # download_* auxiliary methods
412 def _get_remote_blocks_info(self, obj, **restargs):
413 #retrieve object hashmap
414 myrange = restargs.pop('data_range', None)
415 hashmap = self.get_object_hashmap(obj, **restargs)
416 restargs['data_range'] = myrange
417 blocksize = int(hashmap['block_size'])
418 blockhash = hashmap['block_hash']
419 total_size = hashmap['bytes']
420 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
422 for i, h in enumerate(hashmap['hashes']):
423 # map_dict[h] = i CHAGE
425 map_dict[h].append(i)
428 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
430 def _dump_blocks_sync(
431 self, obj, remote_hashes, blocksize, total_size, dst, range,
433 for blockid, blockhash in enumerate(remote_hashes):
435 start = blocksize * blockid
436 is_last = start + blocksize > total_size
437 end = (total_size - 1) if is_last else (start + blocksize - 1)
438 (start, end) = _range_up(start, end, range)
439 args['data_range'] = 'bytes=%s-%s' % (start, end)
440 r = self.object_get(obj, success=(200, 206), **args)
445 def _get_block_async(self, obj, **args):
446 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
450 def _hash_from_file(self, fp, start, size, blockhash):
452 block = fp.read(size)
453 h = newhashlib(blockhash)
454 h.update(block.strip('\x00'))
455 return hexlify(h.digest())
457 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
458 """write the results of a greenleted rest call to a file
460 :param offset: the offset of the file up to blocksize
461 - e.g. if the range is 10-100, all blocks will be written to
464 for i, (key, g) in enumerate(flying.items()):
469 block = g.value.content
470 for block_start in blockids[key]:
471 local_file.seek(block_start + offset)
472 local_file.write(block)
478 def _dump_blocks_async(
479 self, obj, remote_hashes, blocksize, total_size, local_file,
480 blockhash=None, resume=False, filerange=None, **restargs):
481 file_size = fstat(local_file.fileno()).st_size if resume else 0
483 blockid_dict = dict()
485 if filerange is not None:
486 rstart = int(filerange.split('-')[0])
487 offset = rstart if blocksize > rstart else rstart % blocksize
489 self._init_thread_limit()
490 for block_hash, blockids in remote_hashes.items():
491 blockids = [blk * blocksize for blk in blockids]
492 unsaved = [blk for blk in blockids if not (
493 blk < file_size and block_hash == self._hash_from_file(
494 local_file, blk, blocksize, blockhash))]
495 self._cb_next(len(blockids) - len(unsaved))
498 self._watch_thread_limit(flying.values())
500 flying, blockid_dict, local_file, offset,
502 end = total_size - 1 if key + blocksize > total_size\
503 else key + blocksize - 1
504 start, end = _range_up(key, end, filerange)
508 restargs['async_headers'] = {
509 'Range': 'bytes=%s-%s' % (start, end)}
510 flying[key] = self._get_block_async(obj, **restargs)
511 blockid_dict[key] = unsaved
513 for thread in flying.values():
515 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
525 if_modified_since=None,
526 if_unmodified_since=None):
527 """Download an object (multiple connections, random blocks)
529 :param obj: (str) remote object path
531 :param dst: open file descriptor (wb+)
533 :param download_cb: optional progress.bar object for downloading
535 :param version: (str) file version
537 :param resume: (bool) if set, preserve already downloaded file parts
539 :param range_str: (str) from, to are file positions (int) in bytes
541 :param if_match: (str)
543 :param if_none_match: (str)
545 :param if_modified_since: (str) formated date
547 :param if_unmodified_since: (str) formated date"""
550 data_range=None if range_str is None else 'bytes=%s' % range_str,
552 if_none_match=if_none_match,
553 if_modified_since=if_modified_since,
554 if_unmodified_since=if_unmodified_since)
561 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
562 assert total_size >= 0
565 self.progress_bar_gen = download_cb(len(hash_list))
569 self._dump_blocks_sync(
578 self._dump_blocks_async(
589 dst.truncate(total_size)
593 #Command Progress Bar method
594 def _cb_next(self, step=1):
595 if hasattr(self, 'progress_bar_gen'):
597 for i in xrange(step):
598 self.progress_bar_gen.next()
602 def _complete_cb(self):
605 self.progress_bar_gen.next()
609 def get_object_hashmap(
614 if_modified_since=None,
615 if_unmodified_since=None,
618 :param obj: (str) remote object path
620 :param if_match: (str)
622 :param if_none_match: (str)
624 :param if_modified_since: (str) formated date
626 :param if_unmodified_since: (str) formated date
628 :param data_range: (str) from-to where from and to are integers
629 denoting file positions in bytes
638 if_etag_match=if_match,
639 if_etag_not_match=if_none_match,
640 if_modified_since=if_modified_since,
641 if_unmodified_since=if_unmodified_since,
642 data_range=data_range)
643 except ClientError as err:
644 if err.status == 304 or err.status == 412:
649 def set_account_group(self, group, usernames):
653 :param usernames: (list)
655 self.account_post(update=True, groups={group: usernames})
657 def del_account_group(self, group):
661 self.account_post(update=True, groups={group: []})
663 def get_account_info(self, until=None):
665 :param until: (str) formated date
669 r = self.account_head(until=until)
670 if r.status_code == 401:
671 raise ClientError("No authorization", status=401)
674 def get_account_quota(self):
679 self.get_account_info(),
680 'X-Account-Policy-Quota',
683 def get_account_versioning(self):
688 self.get_account_info(),
689 'X-Account-Policy-Versioning',
692 def get_account_meta(self, until=None):
694 :meta until: (str) formated date
698 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
700 def get_account_group(self):
704 return filter_in(self.get_account_info(), 'X-Account-Group-')
706 def set_account_meta(self, metapairs):
708 :param metapairs: (dict) {key1:val1, key2:val2, ...}
710 assert(type(metapairs) is dict)
711 self.account_post(update=True, metadata=metapairs)
713 def del_account_meta(self, metakey):
715 :param metakey: (str) metadatum key
717 self.account_post(update=True, metadata={metakey: ''})
720 def set_account_quota(self, quota):
724 self.account_post(update=True, quota=quota)
727 def set_account_versioning(self, versioning):
729 "param versioning: (str)
731 self.account_post(update=True, versioning=versioning)
733 def list_containers(self):
737 r = self.account_get()
740 def del_container(self, until=None, delimiter=None):
742 :param until: (str) formated date
744 :param delimiter: (str) with / empty container
746 :raises ClientError: 404 Container does not exist
748 :raises ClientError: 409 Container is not empty
750 self._assert_container()
751 r = self.container_delete(
754 success=(204, 404, 409))
755 if r.status_code == 404:
757 'Container "%s" does not exist' % self.container,
759 elif r.status_code == 409:
761 'Container "%s" is not empty' % self.container,
764 def get_container_versioning(self, container=None):
766 :param container: (str)
770 cnt_back_up = self.container
772 self.container = container or cnt_back_up
774 self.get_container_info(),
775 'X-Container-Policy-Versioning')
777 self.container = cnt_back_up
779 def get_container_limit(self, container=None):
781 :param container: (str)
785 cnt_back_up = self.container
787 self.container = container or cnt_back_up
789 self.get_container_info(),
790 'X-Container-Policy-Quota')
792 self.container = cnt_back_up
794 def get_container_info(self, until=None):
796 :param until: (str) formated date
800 :raises ClientError: 404 Container not found
803 r = self.container_head(until=until)
804 except ClientError as err:
805 err.details.append('for container %s' % self.container)
809 def get_container_meta(self, until=None):
811 :param until: (str) formated date
816 self.get_container_info(until=until),
819 def get_container_object_meta(self, until=None):
821 :param until: (str) formated date
826 self.get_container_info(until=until),
827 'X-Container-Object-Meta')
829 def set_container_meta(self, metapairs):
831 :param metapairs: (dict) {key1:val1, key2:val2, ...}
833 assert(type(metapairs) is dict)
834 self.container_post(update=True, metadata=metapairs)
836 def del_container_meta(self, metakey):
838 :param metakey: (str) metadatum key
840 self.container_post(update=True, metadata={metakey: ''})
842 def set_container_limit(self, limit):
846 self.container_post(update=True, quota=limit)
848 def set_container_versioning(self, versioning):
850 :param versioning: (str)
852 self.container_post(update=True, versioning=versioning)
854 def del_object(self, obj, until=None, delimiter=None):
856 :param obj: (str) remote object path
858 :param until: (str) formated date
860 :param delimiter: (str)
862 self._assert_container()
863 self.object_delete(obj, until=until, delimiter=delimiter)
865 def set_object_meta(self, obj, metapairs):
867 :param obj: (str) remote object path
869 :param metapairs: (dict) {key1:val1, key2:val2, ...}
871 assert(type(metapairs) is dict)
872 self.object_post(obj, update=True, metadata=metapairs)
874 def del_object_meta(self, obj, metakey):
876 :param obj: (str) remote object path
878 :param metakey: (str) metadatum key
880 self.object_post(obj, update=True, metadata={metakey: ''})
882 def publish_object(self, obj):
884 :param obj: (str) remote object path
886 :returns: (str) access url
888 self.object_post(obj, update=True, public=True)
889 info = self.get_object_info(obj)
890 pref, sep, rest = self.base_url.partition('//')
891 base = rest.split('/')[0]
892 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
894 def unpublish_object(self, obj):
896 :param obj: (str) remote object path
898 self.object_post(obj, update=True, public=False)
900 def get_object_info(self, obj, version=None):
902 :param obj: (str) remote object path
904 :param version: (str)
909 r = self.object_head(obj, version=version)
911 except ClientError as ce:
913 raise ClientError('Object %s not found' % obj, status=404)
916 def get_object_meta(self, obj, version=None):
918 :param obj: (str) remote object path
920 :param version: (str)
925 self.get_object_info(obj, version=version),
928 def get_object_sharing(self, obj):
930 :param obj: (str) remote object path
935 self.get_object_info(obj),
940 perms = r['x-object-sharing'].split(';')
945 raise ClientError('Incorrect reply format')
946 (key, val) = perm.strip().split('=')
950 def set_object_sharing(
952 read_permition=False, write_permition=False):
953 """Give read/write permisions to an object.
955 :param obj: (str) remote object path
957 :param read_permition: (list - bool) users and user groups that get
958 read permition for this object - False means all previous read
959 permissions will be removed
961 :param write_perimition: (list - bool) of users and user groups to get
962 write permition for this object - False means all previous write
963 permissions will be removed
966 perms = dict(read=read_permition or '', write=write_permition or '')
967 self.object_post(obj, update=True, permissions=perms)
969 def del_object_sharing(self, obj):
971 :param obj: (str) remote object path
973 self.set_object_sharing(obj)
975 def append_object(self, obj, source_file, upload_cb=None):
977 :param obj: (str) remote object path
979 :param source_file: open file descriptor
981 :param upload_db: progress.bar for uploading
984 self._assert_container()
985 meta = self.get_container_info()
986 blocksize = int(meta['x-container-block-size'])
987 filesize = fstat(source_file.fileno()).st_size
988 nblocks = 1 + (filesize - 1) // blocksize
991 upload_gen = upload_cb(nblocks)
993 for i in range(nblocks):
994 block = source_file.read(min(blocksize, filesize - offset))
999 content_range='bytes */*',
1000 content_type='application/octet-stream',
1001 content_length=len(block),
1007 def truncate_object(self, obj, upto_bytes):
1009 :param obj: (str) remote object path
1011 :param upto_bytes: max number of bytes to leave on file
1016 content_range='bytes 0-%s/*' % upto_bytes,
1017 content_type='application/octet-stream',
1018 object_bytes=upto_bytes,
1019 source_object=path4url(self.container, obj))
1021 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1022 """Overwrite a part of an object from local source file
1024 :param obj: (str) remote object path
1026 :param start: (int) position in bytes to start overwriting from
1028 :param end: (int) position in bytes to stop overwriting at
1030 :param source_file: open file descriptor
1032 :param upload_db: progress.bar for uploading
1035 r = self.get_object_info(obj)
1036 rf_size = int(r['content-length'])
1037 if rf_size < int(start):
1039 'Range start exceeds file size',
1041 elif rf_size < int(end):
1043 'Range end exceeds file size',
1045 self._assert_container()
1046 meta = self.get_container_info()
1047 blocksize = int(meta['x-container-block-size'])
1048 filesize = fstat(source_file.fileno()).st_size
1049 datasize = int(end) - int(start) + 1
1050 nblocks = 1 + (datasize - 1) // blocksize
1053 upload_gen = upload_cb(nblocks)
1055 for i in range(nblocks):
1056 read_size = min(blocksize, filesize - offset, datasize - offset)
1057 block = source_file.read(read_size)
1061 content_type='application/octet-stream',
1062 content_length=len(block),
1063 content_range='bytes %s-%s/*' % (
1065 start + offset + len(block) - 1),
1067 offset += len(block)
1073 self, src_container, src_object, dst_container,
1075 source_version=None,
1076 source_account=None,
1081 :param src_container: (str) source container
1083 :param src_object: (str) source object path
1085 :param dst_container: (str) destination container
1087 :param dst_object: (str) destination object path
1089 :param source_version: (str) source object version
1091 :param source_account: (str) account to copy from
1093 :param public: (bool)
1095 :param content_type: (str)
1097 :param delimiter: (str)
1099 self._assert_account()
1100 self.container = dst_container
1101 src_path = path4url(src_container, src_object)
1103 dst_object or src_object,
1107 source_version=source_version,
1108 source_account=source_account,
1110 content_type=content_type,
1111 delimiter=delimiter)
1114 self, src_container, src_object, dst_container,
1116 source_account=None,
1117 source_version=None,
1122 :param src_container: (str) source container
1124 :param src_object: (str) source object path
1126 :param dst_container: (str) destination container
1128 :param dst_object: (str) destination object path
1130 :param source_account: (str) account to move from
1132 :param source_version: (str) source object version
1134 :param public: (bool)
1136 :param content_type: (str)
1138 :param delimiter: (str)
1140 self._assert_account()
1141 self.container = dst_container
1142 dst_object = dst_object or src_object
1143 src_path = path4url(src_container, src_object)
1149 source_account=source_account,
1150 source_version=source_version,
1152 content_type=content_type,
1153 delimiter=delimiter)
1155 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1156 """Get accounts that share with self.account
1160 :param marker: (str)
1164 self._assert_account()
1166 self.set_param('format', 'json')
1167 self.set_param('limit', limit, iff=limit is not None)
1168 self.set_param('marker', marker, iff=marker is not None)
1171 success = kwargs.pop('success', (200, 204))
1172 r = self.get(path, *args, success=success, **kwargs)
1175 def get_object_versionlist(self, obj):
1177 :param obj: (str) remote object path
1181 self._assert_container()
1182 r = self.object_get(obj, format='json', version='list')
1183 return r.json['versions']