1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """GRNet Pithos API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
74 def purge_container(self, container=None):
75 """Delete an empty container and destroy associated blocks
77 cnt_back_up = self.container
79 self.container = container or cnt_back_up
80 self.container_delete(until=unicode(time()))
82 self.container = cnt_back_up
84 def upload_object_unchunked(
89 content_encoding=None,
90 content_disposition=None,
95 :param obj: (str) remote object path
97 :param f: open file descriptor
99 :param withHashFile: (bool)
101 :param size: (int) size of data to upload
105 :param content_encoding: (str)
107 :param content_disposition: (str)
109 :param content_type: (str)
111 :param sharing: {'read':[user and/or grp names],
112 'write':[usr and/or grp names]}
114 :param public: (bool)
116 self._assert_container()
122 data = json.dumps(json.loads(data))
124 raise ClientError('"%s" is not json-formated' % f.name, 1)
126 msg = '"%s" is not a valid hashmap file' % f.name
127 raise ClientError(msg, 1)
130 data = f.read(size) if size else f.read()
135 content_encoding=content_encoding,
136 content_disposition=content_disposition,
137 content_type=content_type,
142 def create_object_by_manifestation(
145 content_encoding=None,
146 content_disposition=None,
151 :param obj: (str) remote object path
155 :param content_encoding: (str)
157 :param content_disposition: (str)
159 :param content_type: (str)
161 :param sharing: {'read':[user and/or grp names],
162 'write':[usr and/or grp names]}
164 :param public: (bool)
166 self._assert_container()
171 content_encoding=content_encoding,
172 content_disposition=content_disposition,
173 content_type=content_type,
176 manifest='%s/%s' % (self.container, obj))
178 # upload_* auxiliary methods
179 def _put_block_async(self, data, hash, upload_gen=None):
180 event = SilentEvent(method=self._put_block, data=data, hash=hash)
184 def _put_block(self, data, hash):
185 r = self.container_post(
187 content_type='application/octet-stream',
188 content_length=len(data),
191 assert r.json[0] == hash, 'Local hash does not match server'
193 def _get_file_block_info(self, fileobj, size=None):
194 meta = self.get_container_info()
195 blocksize = int(meta['x-container-block-size'])
196 blockhash = meta['x-container-block-hash']
197 size = size if size is not None else fstat(fileobj.fileno()).st_size
198 nblocks = 1 + (size - 1) // blocksize
199 return (blocksize, blockhash, size, nblocks)
201 def _get_missing_hashes(
207 content_encoding=None,
208 content_disposition=None,
216 content_type=content_type,
218 content_encoding=content_encoding,
219 content_disposition=content_disposition,
220 permissions=permissions,
223 return None if r.status_code == 201 else r.json
225 def _culculate_blocks_for_upload(
226 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
230 hash_gen = hash_cb(nblocks)
233 for i in range(nblocks):
234 block = fileobj.read(min(blocksize, size - offset))
236 hash = _pithos_hash(block, blockhash)
238 hmap[hash] = (offset, bytes)
242 msg = 'Failed to calculate uploaded blocks:'
243 ' Offset and object size do not match'
244 assert offset == size, msg
246 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
247 """upload missing blocks asynchronously"""
249 self._init_thread_limit()
254 offset, bytes = hmap[hash]
256 data = fileobj.read(bytes)
257 r = self._put_block_async(data, hash, upload_gen)
259 unfinished = self._watch_thread_limit(flying)
260 for thread in set(flying).difference(unfinished):
262 failures.append(thread)
265 ClientError) and thread.exception.status == 502:
266 self.POOLSIZE = self._thread_limit
267 elif thread.isAlive():
268 flying.append(thread)
276 for thread in flying:
279 failures.append(thread)
286 return [failure.kwargs['hash'] for failure in failures]
295 content_encoding=None,
296 content_disposition=None,
300 """Upload an object using multiple connections (threads)
302 :param obj: (str) remote object path
304 :param f: open file descriptor (rb)
306 :param hash_cb: optional progress.bar object for calculating hashes
308 :param upload_cb: optional progress.bar object for uploading
312 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
313 it does not exist remotely, otherwise the operation will fail.
314 Involves the case of an object with the same path is created while
315 the object is being uploaded.
317 :param content_encoding: (str)
319 :param content_disposition: (str)
321 :param content_type: (str)
323 :param sharing: {'read':[user and/or grp names],
324 'write':[usr and/or grp names]}
326 :param public: (bool)
328 self._assert_container()
331 block_info = (blocksize, blockhash, size, nblocks) =\
332 self._get_file_block_info(f, size)
333 (hashes, hmap, offset) = ([], {}, 0)
335 content_type = 'application/octet-stream'
337 self._culculate_blocks_for_upload(
344 hashmap = dict(bytes=size, hashes=hashes)
345 missing = self._get_missing_hashes(
347 content_type=content_type,
349 content_encoding=content_encoding,
350 content_disposition=content_disposition,
358 upload_gen = upload_cb(len(missing))
359 for i in range(len(missing), len(hashmap['hashes']) + 1):
370 sendlog.info('%s blocks missing' % len(missing))
371 num_of_blocks = len(missing)
372 missing = self._upload_missing_blocks(
378 if num_of_blocks == len(missing):
381 num_of_blocks = len(missing)
386 '%s blocks failed to upload' % len(missing),
388 except KeyboardInterrupt:
389 sendlog.info('- - - wait for threads to finish')
390 for thread in activethreads():
398 content_type=content_type,
399 if_etag_not_match='*' if if_not_exist else None,
404 # download_* auxiliary methods
405 def _get_remote_blocks_info(self, obj, **restargs):
406 #retrieve object hashmap
407 myrange = restargs.pop('data_range', None)
408 hashmap = self.get_object_hashmap(obj, **restargs)
409 restargs['data_range'] = myrange
410 blocksize = int(hashmap['block_size'])
411 blockhash = hashmap['block_hash']
412 total_size = hashmap['bytes']
413 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
415 for i, h in enumerate(hashmap['hashes']):
416 # map_dict[h] = i CHAGE
418 map_dict[h].append(i)
421 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
423 def _dump_blocks_sync(
424 self, obj, remote_hashes, blocksize, total_size, dst, range,
426 for blockid, blockhash in enumerate(remote_hashes):
428 start = blocksize * blockid
429 is_last = start + blocksize > total_size
430 end = (total_size - 1) if is_last else (start + blocksize - 1)
431 (start, end) = _range_up(start, end, range)
432 args['data_range'] = 'bytes=%s-%s' % (start, end)
433 r = self.object_get(obj, success=(200, 206), **args)
438 def _get_block_async(self, obj, **args):
439 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
443 def _hash_from_file(self, fp, start, size, blockhash):
445 block = fp.read(size)
446 h = newhashlib(blockhash)
447 h.update(block.strip('\x00'))
448 return hexlify(h.digest())
450 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
451 """write the results of a greenleted rest call to a file
453 :param offset: the offset of the file up to blocksize
454 - e.g. if the range is 10-100, all blocks will be written to
457 for i, (key, g) in enumerate(flying.items()):
462 block = g.value.content
463 for block_start in blockids[key]:
464 local_file.seek(block_start + offset)
465 local_file.write(block)
471 def _dump_blocks_async(
472 self, obj, remote_hashes, blocksize, total_size, local_file,
473 blockhash=None, resume=False, filerange=None, **restargs):
474 file_size = fstat(local_file.fileno()).st_size if resume else 0
476 blockid_dict = dict()
478 if filerange is not None:
479 rstart = int(filerange.split('-')[0])
480 offset = rstart if blocksize > rstart else rstart % blocksize
482 self._init_thread_limit()
483 for block_hash, blockids in remote_hashes.items():
484 blockids = [blk * blocksize for blk in blockids]
485 unsaved = [blk for blk in blockids if not (
486 blk < file_size and block_hash == self._hash_from_file(
487 local_file, blk, blocksize, blockhash))]
488 self._cb_next(len(blockids) - len(unsaved))
491 self._watch_thread_limit(flying.values())
493 flying, blockid_dict, local_file, offset,
495 end = total_size - 1 if key + blocksize > total_size\
496 else key + blocksize - 1
497 start, end = _range_up(key, end, filerange)
501 restargs['async_headers'] = {
502 'Range': 'bytes=%s-%s' % (start, end)}
503 flying[key] = self._get_block_async(obj, **restargs)
504 blockid_dict[key] = unsaved
506 for thread in flying.values():
508 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
518 if_modified_since=None,
519 if_unmodified_since=None):
520 """Download an object (multiple connections, random blocks)
522 :param obj: (str) remote object path
524 :param dst: open file descriptor (wb+)
526 :param download_cb: optional progress.bar object for downloading
528 :param version: (str) file version
530 :param resume: (bool) if set, preserve already downloaded file parts
532 :param range_str: (str) from, to are file positions (int) in bytes
534 :param if_match: (str)
536 :param if_none_match: (str)
538 :param if_modified_since: (str) formated date
540 :param if_unmodified_since: (str) formated date"""
543 data_range=None if range_str is None else 'bytes=%s' % range_str,
545 if_none_match=if_none_match,
546 if_modified_since=if_modified_since,
547 if_unmodified_since=if_unmodified_since)
554 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
555 assert total_size >= 0
558 self.progress_bar_gen = download_cb(len(hash_list))
562 self._dump_blocks_sync(
571 self._dump_blocks_async(
582 dst.truncate(total_size)
586 #Command Progress Bar method
587 def _cb_next(self, step=1):
588 if hasattr(self, 'progress_bar_gen'):
590 for i in xrange(step):
591 self.progress_bar_gen.next()
595 def _complete_cb(self):
598 self.progress_bar_gen.next()
602 def get_object_hashmap(
607 if_modified_since=None,
608 if_unmodified_since=None,
611 :param obj: (str) remote object path
613 :param if_match: (str)
615 :param if_none_match: (str)
617 :param if_modified_since: (str) formated date
619 :param if_unmodified_since: (str) formated date
621 :param data_range: (str) from-to where from and to are integers
622 denoting file positions in bytes
631 if_etag_match=if_match,
632 if_etag_not_match=if_none_match,
633 if_modified_since=if_modified_since,
634 if_unmodified_since=if_unmodified_since,
635 data_range=data_range)
636 except ClientError as err:
637 if err.status == 304 or err.status == 412:
642 def set_account_group(self, group, usernames):
646 :param usernames: (list)
648 self.account_post(update=True, groups={group: usernames})
650 def del_account_group(self, group):
654 self.account_post(update=True, groups={group: []})
656 def get_account_info(self, until=None):
658 :param until: (str) formated date
662 r = self.account_head(until=until)
663 if r.status_code == 401:
664 raise ClientError("No authorization", status=401)
667 def get_account_quota(self):
672 self.get_account_info(),
673 'X-Account-Policy-Quota',
676 def get_account_versioning(self):
681 self.get_account_info(),
682 'X-Account-Policy-Versioning',
685 def get_account_meta(self, until=None):
687 :meta until: (str) formated date
691 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
693 def get_account_group(self):
697 return filter_in(self.get_account_info(), 'X-Account-Group-')
699 def set_account_meta(self, metapairs):
701 :param metapairs: (dict) {key1:val1, key2:val2, ...}
703 assert(type(metapairs) is dict)
704 self.account_post(update=True, metadata=metapairs)
706 def del_account_meta(self, metakey):
708 :param metakey: (str) metadatum key
710 self.account_post(update=True, metadata={metakey: ''})
712 def set_account_quota(self, quota):
716 self.account_post(update=True, quota=quota)
718 def set_account_versioning(self, versioning):
720 "param versioning: (str)
722 self.account_post(update=True, versioning=versioning)
724 def list_containers(self):
728 r = self.account_get()
731 def del_container(self, until=None, delimiter=None):
733 :param until: (str) formated date
735 :param delimiter: (str) with / empty container
737 :raises ClientError: 404 Container does not exist
739 :raises ClientError: 409 Container is not empty
741 self._assert_container()
742 r = self.container_delete(
745 success=(204, 404, 409))
746 if r.status_code == 404:
748 'Container "%s" does not exist' % self.container,
750 elif r.status_code == 409:
752 'Container "%s" is not empty' % self.container,
755 def get_container_versioning(self, container=None):
757 :param container: (str)
761 cnt_back_up = self.container
763 self.container = container or cnt_back_up
765 self.get_container_info(),
766 'X-Container-Policy-Versioning')
768 self.container = cnt_back_up
770 def get_container_quota(self, container=None):
772 :param container: (str)
776 cnt_back_up = self.container
778 self.container = container or cnt_back_up
780 self.get_container_info(),
781 'X-Container-Policy-Quota')
783 self.container = cnt_back_up
785 def get_container_info(self, until=None):
787 :param until: (str) formated date
791 :raises ClientError: 404 Container not found
794 r = self.container_head(until=until)
795 except ClientError as err:
796 err.details.append('for container %s' % self.container)
800 def get_container_meta(self, until=None):
802 :param until: (str) formated date
807 self.get_container_info(until=until),
810 def get_container_object_meta(self, until=None):
812 :param until: (str) formated date
817 self.get_container_info(until=until),
818 'X-Container-Object-Meta')
820 def set_container_meta(self, metapairs):
822 :param metapairs: (dict) {key1:val1, key2:val2, ...}
824 assert(type(metapairs) is dict)
825 self.container_post(update=True, metadata=metapairs)
827 def del_container_meta(self, metakey):
829 :param metakey: (str) metadatum key
831 self.container_post(update=True, metadata={metakey: ''})
833 def set_container_quota(self, quota):
837 self.container_post(update=True, quota=quota)
839 def set_container_versioning(self, versioning):
841 :param versioning: (str)
843 self.container_post(update=True, versioning=versioning)
845 def del_object(self, obj, until=None, delimiter=None):
847 :param obj: (str) remote object path
849 :param until: (str) formated date
851 :param delimiter: (str)
853 self._assert_container()
854 self.object_delete(obj, until=until, delimiter=delimiter)
856 def set_object_meta(self, obj, metapairs):
858 :param obj: (str) remote object path
860 :param metapairs: (dict) {key1:val1, key2:val2, ...}
862 assert(type(metapairs) is dict)
863 self.object_post(obj, update=True, metadata=metapairs)
865 def del_object_meta(self, obj, metakey):
867 :param obj: (str) remote object path
869 :param metakey: (str) metadatum key
871 self.object_post(obj, update=True, metadata={metakey: ''})
873 def publish_object(self, obj):
875 :param obj: (str) remote object path
877 :returns: (str) access url
879 self.object_post(obj, update=True, public=True)
880 info = self.get_object_info(obj)
881 pref, sep, rest = self.base_url.partition('//')
882 base = rest.split('/')[0]
883 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
885 def unpublish_object(self, obj):
887 :param obj: (str) remote object path
889 self.object_post(obj, update=True, public=False)
891 def get_object_info(self, obj, version=None):
893 :param obj: (str) remote object path
895 :param version: (str)
900 r = self.object_head(obj, version=version)
902 except ClientError as ce:
904 raise ClientError('Object %s not found' % obj, status=404)
907 def get_object_meta(self, obj, version=None):
909 :param obj: (str) remote object path
911 :param version: (str)
916 self.get_object_info(obj, version=version),
919 def get_object_sharing(self, obj):
921 :param obj: (str) remote object path
926 self.get_object_info(obj),
931 perms = r['x-object-sharing'].split(';')
936 raise ClientError('Incorrect reply format')
937 (key, val) = perm.strip().split('=')
941 def set_object_sharing(
943 read_permition=False, write_permition=False):
944 """Give read/write permisions to an object.
946 :param obj: (str) remote object path
948 :param read_permition: (list - bool) users and user groups that get
949 read permition for this object - False means all previous read
950 permissions will be removed
952 :param write_perimition: (list - bool) of users and user groups to get
953 write permition for this object - False means all previous write
954 permissions will be removed
957 perms = dict(read=read_permition or '', write=write_permition or '')
958 self.object_post(obj, update=True, permissions=perms)
960 def del_object_sharing(self, obj):
962 :param obj: (str) remote object path
964 self.set_object_sharing(obj)
966 def append_object(self, obj, source_file, upload_cb=None):
968 :param obj: (str) remote object path
970 :param source_file: open file descriptor
972 :param upload_db: progress.bar for uploading
975 self._assert_container()
976 meta = self.get_container_info()
977 blocksize = int(meta['x-container-block-size'])
978 filesize = fstat(source_file.fileno()).st_size
979 nblocks = 1 + (filesize - 1) // blocksize
982 upload_gen = upload_cb(nblocks)
984 for i in range(nblocks):
985 block = source_file.read(min(blocksize, filesize - offset))
990 content_range='bytes */*',
991 content_type='application/octet-stream',
992 content_length=len(block),
998 def truncate_object(self, obj, upto_bytes):
1000 :param obj: (str) remote object path
1002 :param upto_bytes: max number of bytes to leave on file
1007 content_range='bytes 0-%s/*' % upto_bytes,
1008 content_type='application/octet-stream',
1009 object_bytes=upto_bytes,
1010 source_object=path4url(self.container, obj))
1012 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1013 """Overwrite a part of an object from local source file
1015 :param obj: (str) remote object path
1017 :param start: (int) position in bytes to start overwriting from
1019 :param end: (int) position in bytes to stop overwriting at
1021 :param source_file: open file descriptor
1023 :param upload_db: progress.bar for uploading
1026 r = self.get_object_info(obj)
1027 rf_size = int(r['content-length'])
1028 if rf_size < int(start):
1030 'Range start exceeds file size',
1032 elif rf_size < int(end):
1034 'Range end exceeds file size',
1036 self._assert_container()
1037 meta = self.get_container_info()
1038 blocksize = int(meta['x-container-block-size'])
1039 filesize = fstat(source_file.fileno()).st_size
1040 datasize = int(end) - int(start) + 1
1041 nblocks = 1 + (datasize - 1) // blocksize
1044 upload_gen = upload_cb(nblocks)
1046 for i in range(nblocks):
1047 read_size = min(blocksize, filesize - offset, datasize - offset)
1048 block = source_file.read(read_size)
1052 content_type='application/octet-stream',
1053 content_length=len(block),
1054 content_range='bytes %s-%s/*' % (
1056 start + offset + len(block) - 1),
1058 offset += len(block)
1064 self, src_container, src_object, dst_container,
1066 source_version=None,
1067 source_account=None,
1072 :param src_container: (str) source container
1074 :param src_object: (str) source object path
1076 :param dst_container: (str) destination container
1078 :param dst_object: (str) destination object path
1080 :param source_version: (str) source object version
1082 :param source_account: (str) account to copy from
1084 :param public: (bool)
1086 :param content_type: (str)
1088 :param delimiter: (str)
1090 self._assert_account()
1091 self.container = dst_container
1092 src_path = path4url(src_container, src_object)
1094 dst_object or src_object,
1098 source_version=source_version,
1099 source_account=source_account,
1101 content_type=content_type,
1102 delimiter=delimiter)
1105 self, src_container, src_object, dst_container,
1107 source_account=None,
1108 source_version=None,
1113 :param src_container: (str) source container
1115 :param src_object: (str) source object path
1117 :param dst_container: (str) destination container
1119 :param dst_object: (str) destination object path
1121 :param source_account: (str) account to move from
1123 :param source_version: (str) source object version
1125 :param public: (bool)
1127 :param content_type: (str)
1129 :param delimiter: (str)
1131 self._assert_account()
1132 self.container = dst_container
1133 dst_object = dst_object or src_object
1134 src_path = path4url(src_container, src_object)
1140 source_account=source_account,
1141 source_version=source_version,
1143 content_type=content_type,
1144 delimiter=delimiter)
1146 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1147 """Get accounts that share with self.account
1151 :param marker: (str)
1155 self._assert_account()
1157 self.set_param('format', 'json')
1158 self.set_param('limit', limit, iff=limit is not None)
1159 self.set_param('marker', marker, iff=marker is not None)
1162 success = kwargs.pop('success', (200, 204))
1163 r = self.get(path, *args, success=success, **kwargs)
1166 def get_object_versionlist(self, obj):
1168 :param obj: (str) remote object path
1172 self._assert_container()
1173 r = self.object_get(obj, format='json', version='list')
1174 return r.json['versions']