1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """GRNet Pithos API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
74 def purge_container(self, container=None):
75 """Delete an empty container and destroy associated blocks
77 cnt_back_up = self.container
79 self.container = container or cnt_back_up
80 self.container_delete(until=unicode(time()))
82 self.container = cnt_back_up
84 def upload_object_unchunked(
89 content_encoding=None,
90 content_disposition=None,
95 :param obj: (str) remote object path
97 :param f: open file descriptor
99 :param withHashFile: (bool)
101 :param size: (int) size of data to upload
105 :param content_encoding: (str)
107 :param content_disposition: (str)
109 :param content_type: (str)
111 :param sharing: {'read':[user and/or grp names],
112 'write':[usr and/or grp names]}
114 :param public: (bool)
116 self._assert_container()
122 data = json.dumps(json.loads(data))
124 raise ClientError('"%s" is not json-formated' % f.name, 1)
126 msg = '"%s" is not a valid hashmap file' % f.name
127 raise ClientError(msg, 1)
130 data = f.read(size) if size else f.read()
135 content_encoding=content_encoding,
136 content_disposition=content_disposition,
137 content_type=content_type,
142 def create_object_by_manifestation(
145 content_encoding=None,
146 content_disposition=None,
151 :param obj: (str) remote object path
155 :param content_encoding: (str)
157 :param content_disposition: (str)
159 :param content_type: (str)
161 :param sharing: {'read':[user and/or grp names],
162 'write':[usr and/or grp names]}
164 :param public: (bool)
166 self._assert_container()
171 content_encoding=content_encoding,
172 content_disposition=content_disposition,
173 content_type=content_type,
176 manifest='%s/%s' % (self.container, obj))
178 # upload_* auxiliary methods
179 def _put_block_async(self, data, hash, upload_gen=None):
180 event = SilentEvent(method=self._put_block, data=data, hash=hash)
184 def _put_block(self, data, hash):
185 r = self.container_post(
187 content_type='application/octet-stream',
188 content_length=len(data),
191 assert r.json[0] == hash, 'Local hash does not match server'
193 def _get_file_block_info(self, fileobj, size=None):
194 meta = self.get_container_info()
195 blocksize = int(meta['x-container-block-size'])
196 blockhash = meta['x-container-block-hash']
197 size = size if size is not None else fstat(fileobj.fileno()).st_size
198 nblocks = 1 + (size - 1) // blocksize
199 return (blocksize, blockhash, size, nblocks)
201 def _get_missing_hashes(
208 content_encoding=None,
209 content_disposition=None,
217 content_type=content_type,
220 content_encoding=content_encoding,
221 content_disposition=content_disposition,
222 permissions=permissions,
225 return None if r.status_code == 201 else r.json
227 def _culculate_blocks_for_upload(
228 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
232 hash_gen = hash_cb(nblocks)
235 for i in range(nblocks):
236 block = fileobj.read(min(blocksize, size - offset))
238 hash = _pithos_hash(block, blockhash)
240 hmap[hash] = (offset, bytes)
244 msg = 'Failed to calculate uploaded blocks:'
245 ' Offset and object size do not match'
246 assert offset == size, msg
248 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
249 """upload missing blocks asynchronously"""
251 self._init_thread_limit()
256 offset, bytes = hmap[hash]
258 data = fileobj.read(bytes)
259 r = self._put_block_async(data, hash, upload_gen)
261 unfinished = self._watch_thread_limit(flying)
262 for thread in set(flying).difference(unfinished):
264 failures.append(thread)
267 ClientError) and thread.exception.status == 502:
268 self.POOLSIZE = self._thread_limit
269 elif thread.isAlive():
270 flying.append(thread)
278 for thread in flying:
281 failures.append(thread)
288 return [failure.kwargs['hash'] for failure in failures]
296 content_encoding=None,
297 content_disposition=None,
301 """Upload an object using multiple connections (threads)
303 :param obj: (str) remote object path
305 :param f: open file descriptor (rb)
307 :param hash_cb: optional progress.bar object for calculating hashes
309 :param upload_cb: optional progress.bar object for uploading
313 :param content_encoding: (str)
315 :param content_disposition: (str)
317 :param content_type: (str)
319 :param sharing: {'read':[user and/or grp names],
320 'write':[usr and/or grp names]}
322 :param public: (bool)
324 self._assert_container()
327 block_info = (blocksize, blockhash, size, nblocks) =\
328 self._get_file_block_info(f, size)
329 (hashes, hmap, offset) = ([], {}, 0)
331 content_type = 'application/octet-stream'
333 self._culculate_blocks_for_upload(
340 hashmap = dict(bytes=size, hashes=hashes)
341 missing = self._get_missing_hashes(
343 content_type=content_type,
346 content_encoding=content_encoding,
347 content_disposition=content_disposition,
355 upload_gen = upload_cb(len(missing))
356 for i in range(len(missing), len(hashmap['hashes']) + 1):
367 sendlog.info('%s blocks missing' % len(missing))
368 num_of_blocks = len(missing)
369 missing = self._upload_missing_blocks(
375 if num_of_blocks == len(missing):
378 num_of_blocks = len(missing)
383 '%s blocks failed to upload' % len(missing),
385 except KeyboardInterrupt:
386 sendlog.info('- - - wait for threads to finish')
387 for thread in activethreads():
395 content_type=content_type,
399 # download_* auxiliary methods
400 def _get_remote_blocks_info(self, obj, **restargs):
401 #retrieve object hashmap
402 myrange = restargs.pop('data_range', None)
403 hashmap = self.get_object_hashmap(obj, **restargs)
404 restargs['data_range'] = myrange
405 blocksize = int(hashmap['block_size'])
406 blockhash = hashmap['block_hash']
407 total_size = hashmap['bytes']
408 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
410 for i, h in enumerate(hashmap['hashes']):
411 # map_dict[h] = i CHAGE
413 map_dict[h].append(i)
416 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
418 def _dump_blocks_sync(
419 self, obj, remote_hashes, blocksize, total_size, dst, range,
421 for blockid, blockhash in enumerate(remote_hashes):
423 start = blocksize * blockid
424 is_last = start + blocksize > total_size
425 end = (total_size - 1) if is_last else (start + blocksize - 1)
426 (start, end) = _range_up(start, end, range)
427 args['data_range'] = 'bytes=%s-%s' % (start, end)
428 r = self.object_get(obj, success=(200, 206), **args)
433 def _get_block_async(self, obj, **args):
434 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
438 def _hash_from_file(self, fp, start, size, blockhash):
440 block = fp.read(size)
441 h = newhashlib(blockhash)
442 h.update(block.strip('\x00'))
443 return hexlify(h.digest())
445 def _thread2file(self, flying, local_file, offset=0, **restargs):
446 """write the results of a greenleted rest call to a file
448 :param offset: the offset of the file up to blocksize
449 - e.g. if the range is 10-100, all blocks will be written to
453 for i, (start, g) in enumerate(flying.items()):
457 block = g.value.content
458 local_file.seek(start - offset)
459 local_file.write(block)
461 finished.append(flying.pop(start))
465 def _dump_blocks_async(
466 self, obj, remote_hashes, blocksize, total_size, local_file,
467 blockhash=None, resume=False, filerange=None, **restargs):
468 file_size = fstat(local_file.fileno()).st_size if resume else 0
472 if filerange is not None:
473 rstart = int(filerange.split('-')[0])
474 offset = rstart if blocksize > rstart else rstart % blocksize
476 self._init_thread_limit()
477 for block_hash, blockids in remote_hashes.items():
478 for blockid in blockids:
479 start = blocksize * blockid
480 if start < file_size and block_hash == self._hash_from_file(
481 local_file, start, blocksize, blockhash):
484 self._watch_thread_limit(flying.values())
485 finished += self._thread2file(
490 end = total_size - 1 if start + blocksize > total_size\
491 else start + blocksize - 1
492 (start, end) = _range_up(start, end, filerange)
496 restargs['async_headers'] = {
497 'Range': 'bytes=%s-%s' % (start, end)}
498 flying[start] = self._get_block_async(obj, **restargs)
500 for thread in flying.values():
502 finished += self._thread2file(flying, local_file, offset, **restargs)
512 if_modified_since=None,
513 if_unmodified_since=None):
514 """Download an object (multiple connections, random blocks)
516 :param obj: (str) remote object path
518 :param dst: open file descriptor (wb+)
520 :param download_cb: optional progress.bar object for downloading
522 :param version: (str) file version
524 :param resume: (bool) if set, preserve already downloaded file parts
526 :param range_str: (str) from, to are file positions (int) in bytes
528 :param if_match: (str)
530 :param if_none_match: (str)
532 :param if_modified_since: (str) formated date
534 :param if_unmodified_since: (str) formated date"""
537 data_range=None if range_str is None else 'bytes=%s' % range_str,
539 if_none_match=if_none_match,
540 if_modified_since=if_modified_since,
541 if_unmodified_since=if_unmodified_since)
548 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
549 assert total_size >= 0
552 self.progress_bar_gen = download_cb(len(hash_list))
556 self._dump_blocks_sync(
565 self._dump_blocks_async(
576 dst.truncate(total_size)
580 #Command Progress Bar method
582 if hasattr(self, 'progress_bar_gen'):
584 self.progress_bar_gen.next()
588 def _complete_cb(self):
591 self.progress_bar_gen.next()
595 def get_object_hashmap(
600 if_modified_since=None,
601 if_unmodified_since=None,
604 :param obj: (str) remote object path
606 :param if_match: (str)
608 :param if_none_match: (str)
610 :param if_modified_since: (str) formated date
612 :param if_unmodified_since: (str) formated date
614 :param data_range: (str) from-to where from and to are integers
615 denoting file positions in bytes
624 if_etag_match=if_match,
625 if_etag_not_match=if_none_match,
626 if_modified_since=if_modified_since,
627 if_unmodified_since=if_unmodified_since,
628 data_range=data_range)
629 except ClientError as err:
630 if err.status == 304 or err.status == 412:
635 def set_account_group(self, group, usernames):
639 :param usernames: (list)
641 self.account_post(update=True, groups={group: usernames})
643 def del_account_group(self, group):
647 self.account_post(update=True, groups={group: []})
649 def get_account_info(self, until=None):
651 :param until: (str) formated date
655 r = self.account_head(until=until)
656 if r.status_code == 401:
657 raise ClientError("No authorization", status=401)
660 def get_account_quota(self):
665 self.get_account_info(),
666 'X-Account-Policy-Quota',
669 def get_account_versioning(self):
674 self.get_account_info(),
675 'X-Account-Policy-Versioning',
678 def get_account_meta(self, until=None):
680 :meta until: (str) formated date
684 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
686 def get_account_group(self):
690 return filter_in(self.get_account_info(), 'X-Account-Group-')
692 def set_account_meta(self, metapairs):
694 :param metapairs: (dict) {key1:val1, key2:val2, ...}
696 assert(type(metapairs) is dict)
697 self.account_post(update=True, metadata=metapairs)
699 def del_account_meta(self, metakey):
701 :param metakey: (str) metadatum key
703 self.account_post(update=True, metadata={metakey: ''})
705 def set_account_quota(self, quota):
709 self.account_post(update=True, quota=quota)
711 def set_account_versioning(self, versioning):
713 "param versioning: (str)
715 self.account_post(update=True, versioning=versioning)
717 def list_containers(self):
721 r = self.account_get()
724 def del_container(self, until=None, delimiter=None):
726 :param until: (str) formated date
728 :param delimiter: (str) with / empty container
730 :raises ClientError: 404 Container does not exist
732 :raises ClientError: 409 Container is not empty
734 self._assert_container()
735 r = self.container_delete(
738 success=(204, 404, 409))
739 if r.status_code == 404:
741 'Container "%s" does not exist' % self.container,
743 elif r.status_code == 409:
745 'Container "%s" is not empty' % self.container,
748 def get_container_versioning(self, container=None):
750 :param container: (str)
754 cnt_back_up = self.container
756 self.container = container or cnt_back_up
758 self.get_container_info(),
759 'X-Container-Policy-Versioning')
761 self.container = cnt_back_up
763 def get_container_quota(self, container=None):
765 :param container: (str)
769 cnt_back_up = self.container
771 self.container = container or cnt_back_up
773 self.get_container_info(),
774 'X-Container-Policy-Quota')
776 self.container = cnt_back_up
778 def get_container_info(self, until=None):
780 :param until: (str) formated date
784 :raises ClientError: 404 Container not found
787 r = self.container_head(until=until)
788 except ClientError as err:
789 err.details.append('for container %s' % self.container)
793 def get_container_meta(self, until=None):
795 :param until: (str) formated date
800 self.get_container_info(until=until),
803 def get_container_object_meta(self, until=None):
805 :param until: (str) formated date
810 self.get_container_info(until=until),
811 'X-Container-Object-Meta')
813 def set_container_meta(self, metapairs):
815 :param metapairs: (dict) {key1:val1, key2:val2, ...}
817 assert(type(metapairs) is dict)
818 self.container_post(update=True, metadata=metapairs)
820 def del_container_meta(self, metakey):
822 :param metakey: (str) metadatum key
824 self.container_post(update=True, metadata={metakey: ''})
826 def set_container_quota(self, quota):
830 self.container_post(update=True, quota=quota)
832 def set_container_versioning(self, versioning):
834 :param versioning: (str)
836 self.container_post(update=True, versioning=versioning)
838 def del_object(self, obj, until=None, delimiter=None):
840 :param obj: (str) remote object path
842 :param until: (str) formated date
844 :param delimiter: (str)
846 self._assert_container()
847 self.object_delete(obj, until=until, delimiter=delimiter)
849 def set_object_meta(self, obj, metapairs):
851 :param obj: (str) remote object path
853 :param metapairs: (dict) {key1:val1, key2:val2, ...}
855 assert(type(metapairs) is dict)
856 self.object_post(obj, update=True, metadata=metapairs)
858 def del_object_meta(self, obj, metakey):
860 :param obj: (str) remote object path
862 :param metakey: (str) metadatum key
864 self.object_post(obj, update=True, metadata={metakey: ''})
866 def publish_object(self, obj):
868 :param obj: (str) remote object path
870 :returns: (str) access url
872 self.object_post(obj, update=True, public=True)
873 info = self.get_object_info(obj)
874 pref, sep, rest = self.base_url.partition('//')
875 base = rest.split('/')[0]
876 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
878 def unpublish_object(self, obj):
880 :param obj: (str) remote object path
882 self.object_post(obj, update=True, public=False)
884 def get_object_info(self, obj, version=None):
886 :param obj: (str) remote object path
888 :param version: (str)
893 r = self.object_head(obj, version=version)
895 except ClientError as ce:
897 raise ClientError('Object %s not found' % obj, status=404)
900 def get_object_meta(self, obj, version=None):
902 :param obj: (str) remote object path
904 :param version: (str)
909 self.get_object_info(obj, version=version),
912 def get_object_sharing(self, obj):
914 :param obj: (str) remote object path
919 self.get_object_info(obj),
924 perms = r['x-object-sharing'].split(';')
929 raise ClientError('Incorrect reply format')
930 (key, val) = perm.strip().split('=')
934 def set_object_sharing(
936 read_permition=False, write_permition=False):
937 """Give read/write permisions to an object.
939 :param obj: (str) remote object path
941 :param read_permition: (list - bool) users and user groups that get
942 read permition for this object - False means all previous read
943 permissions will be removed
945 :param write_perimition: (list - bool) of users and user groups to get
946 write permition for this object - False means all previous write
947 permissions will be removed
950 perms = dict(read=read_permition or '', write=write_permition or '')
951 self.object_post(obj, update=True, permissions=perms)
953 def del_object_sharing(self, obj):
955 :param obj: (str) remote object path
957 self.set_object_sharing(obj)
959 def append_object(self, obj, source_file, upload_cb=None):
961 :param obj: (str) remote object path
963 :param source_file: open file descriptor
965 :param upload_db: progress.bar for uploading
968 self._assert_container()
969 meta = self.get_container_info()
970 blocksize = int(meta['x-container-block-size'])
971 filesize = fstat(source_file.fileno()).st_size
972 nblocks = 1 + (filesize - 1) // blocksize
975 upload_gen = upload_cb(nblocks)
977 for i in range(nblocks):
978 block = source_file.read(min(blocksize, filesize - offset))
983 content_range='bytes */*',
984 content_type='application/octet-stream',
985 content_length=len(block),
991 def truncate_object(self, obj, upto_bytes):
993 :param obj: (str) remote object path
995 :param upto_bytes: max number of bytes to leave on file
1000 content_range='bytes 0-%s/*' % upto_bytes,
1001 content_type='application/octet-stream',
1002 object_bytes=upto_bytes,
1003 source_object=path4url(self.container, obj))
1005 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1006 """Overwrite a part of an object from local source file
1008 :param obj: (str) remote object path
1010 :param start: (int) position in bytes to start overwriting from
1012 :param end: (int) position in bytes to stop overwriting at
1014 :param source_file: open file descriptor
1016 :param upload_db: progress.bar for uploading
1019 r = self.get_object_info(obj)
1020 rf_size = int(r['content-length'])
1021 if rf_size < int(start):
1023 'Range start exceeds file size',
1025 elif rf_size < int(end):
1027 'Range end exceeds file size',
1029 self._assert_container()
1030 meta = self.get_container_info()
1031 blocksize = int(meta['x-container-block-size'])
1032 filesize = fstat(source_file.fileno()).st_size
1033 datasize = int(end) - int(start) + 1
1034 nblocks = 1 + (datasize - 1) // blocksize
1037 upload_gen = upload_cb(nblocks)
1039 for i in range(nblocks):
1040 read_size = min(blocksize, filesize - offset, datasize - offset)
1041 block = source_file.read(read_size)
1045 content_type='application/octet-stream',
1046 content_length=len(block),
1047 content_range='bytes %s-%s/*' % (
1049 start + offset + len(block) - 1),
1051 offset += len(block)
1057 self, src_container, src_object, dst_container,
1059 source_version=None,
1060 source_account=None,
1065 :param src_container: (str) source container
1067 :param src_object: (str) source object path
1069 :param dst_container: (str) destination container
1071 :param dst_object: (str) destination object path
1073 :param source_version: (str) source object version
1075 :param source_account: (str) account to copy from
1077 :param public: (bool)
1079 :param content_type: (str)
1081 :param delimiter: (str)
1083 self._assert_account()
1084 self.container = dst_container
1085 src_path = path4url(src_container, src_object)
1087 dst_object or src_object,
1091 source_version=source_version,
1092 source_account=source_account,
1094 content_type=content_type,
1095 delimiter=delimiter)
1098 self, src_container, src_object, dst_container,
1100 source_account=None,
1101 source_version=None,
1106 :param src_container: (str) source container
1108 :param src_object: (str) source object path
1110 :param dst_container: (str) destination container
1112 :param dst_object: (str) destination object path
1114 :param source_account: (str) account to move from
1116 :param source_version: (str) source object version
1118 :param public: (bool)
1120 :param content_type: (str)
1122 :param delimiter: (str)
1124 self._assert_account()
1125 self.container = dst_container
1126 dst_object = dst_object or src_object
1127 src_path = path4url(src_container, src_object)
1133 source_account=source_account,
1134 source_version=source_version,
1136 content_type=content_type,
1137 delimiter=delimiter)
1139 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1140 """Get accounts that share with self.account
1144 :param marker: (str)
1148 self._assert_account()
1150 self.set_param('format', 'json')
1151 self.set_param('limit', limit, iff=limit is not None)
1152 self.set_param('marker', marker, iff=marker is not None)
1155 success = kwargs.pop('success', (200, 204))
1156 r = self.get(path, *args, success=success, **kwargs)
1159 def get_object_versionlist(self, obj):
1161 :param obj: (str) remote object path
1165 self._assert_container()
1166 r = self.object_get(obj, format='json', version='list')
1167 return r.json['versions']