1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent
43 from kamaki.clients.pithos_rest_api import PithosRestAPI
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestAPI):
69 """GRNet Pithos API client"""
71 _thread_exceptions = []
73 def __init__(self, base_url, token, account=None, container=None):
74 super(PithosClient, self).__init__(base_url, token, account, container)
76 def purge_container(self):
77 """Delete an empty container and destroy associated blocks
79 r = self.container_delete(until=unicode(time()))
82 def upload_object_unchunked(self, obj, f,
86 content_encoding=None,
87 content_disposition=None,
92 :param obj: (str) remote object path
94 :param f: open file descriptor
96 :param withHashFile: (bool)
98 :param size: (int) size of data to upload
102 :param content_encoding: (str)
104 :param content_disposition: (str)
106 :param content_type: (str)
108 :param sharing: {'read':[user and/or grp names],
109 'write':[usr and/or grp names]}
111 :param public: (bool)
113 self._assert_container()
119 data = json.dumps(json.loads(data))
121 raise ClientError(message='"%s" is not json-formated' % f.name,
124 raise ClientError(message='"%s" is not a valid hashmap file'\
127 data = f.read(size) if size is not None else f.read()
128 r = self.object_put(obj,
131 content_encoding=content_encoding,
132 content_disposition=content_disposition,
133 content_type=content_type,
139 def create_object_by_manifestation(self, obj,
141 content_encoding=None,
142 content_disposition=None,
147 :param obj: (str) remote object path
151 :param content_encoding: (str)
153 :param content_disposition: (str)
155 :param content_type: (str)
157 :param sharing: {'read':[user and/or grp names],
158 'write':[usr and/or grp names]}
160 :param public: (bool)
162 self._assert_container()
163 r = self.object_put(obj,
166 content_encoding=content_encoding,
167 content_disposition=content_disposition,
168 content_type=content_type,
171 manifest='%s/%s' % (self.container, obj))
174 # upload_* auxiliary methods
175 def _put_block_async(self, data, hash):
176 event = SilentEvent(method=self._put_block, data=data, hash=hash)
180 def _put_block(self, data, hash):
181 r = self.container_post(update=True,
182 content_type='application/octet-stream',
183 content_length=len(data),
186 assert r.json[0] == hash, 'Local hash does not match server'
188 def _get_file_block_info(self, fileobj, size=None):
189 meta = self.get_container_info()
190 blocksize = int(meta['x-container-block-size'])
191 blockhash = meta['x-container-block-hash']
192 size = size if size is not None else fstat(fileobj.fileno()).st_size
193 nblocks = 1 + (size - 1) // blocksize
194 return (blocksize, blockhash, size, nblocks)
196 def _get_missing_hashes(self, obj, json,
202 content_encoding=None,
203 content_disposition=None,
207 r = self.object_put(obj,
210 content_type=content_type,
213 content_encoding=content_encoding,
214 content_disposition=content_disposition,
215 permissions=permissions,
218 if r.status_code == 201:
223 def _caclulate_uploaded_blocks(self,
234 hash_gen = hash_cb(nblocks)
237 for i in range(nblocks):
238 block = fileobj.read(min(blocksize, size - offset))
240 hash = _pithos_hash(block, blockhash)
242 hmap[hash] = (offset, bytes)
247 print("Size is %i" % size)
248 print("Offset is %i" % offset)
249 assert offset == size, \
250 "Failed to calculate uploaded blocks: " \
251 "Offset and object size do not match"
253 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
254 """upload missing blocks asynchronously.
257 upload_gen = upload_cb(len(missing))
260 self._init_thread_limit()
264 offset, bytes = hmap[hash]
266 data = fileobj.read(bytes)
267 r = self._put_block_async(data, hash)
270 for i, thread in enumerate(flying):
272 unfinished = self._watch_thread_limit(unfinished)
274 if thread.isAlive() or thread.exception:
275 unfinished.append(thread)
281 for thread in flying:
284 failures = [r for r in flying if r.exception]
286 details = ', '.join([' (%s).%s' % (i, r.exception)\
287 for i, r in enumerate(failures)])
288 raise ClientError(message="Block uploading failed",
295 except StopIteration:
298 def upload_object(self, obj, f,
303 content_encoding=None,
304 content_disposition=None,
308 """Upload an object using multiple connections (threads)
310 :param obj: (str) remote object path
312 :param f: open file descriptor (rb)
314 :param hash_cb: optional progress.bar object for calculating hashes
316 :param upload_cb: optional progress.bar object for uploading
320 :param content_encoding: (str)
322 :param content_disposition: (str)
324 :param content_type: (str)
326 :param sharing: {'read':[user and/or grp names],
327 'write':[usr and/or grp names]}
329 :param public: (bool)
331 self._assert_container()
334 block_info = (blocksize, blockhash, size, nblocks) =\
335 self._get_file_block_info(f, size)
336 (hashes, hmap, offset) = ([], {}, 0)
337 if content_type is None:
338 content_type = 'application/octet-stream'
340 self._caclulate_uploaded_blocks(*block_info,
346 hashmap = dict(bytes=size, hashes=hashes)
347 missing = self._get_missing_hashes(obj, hashmap,
348 content_type=content_type,
351 content_encoding=content_encoding,
352 content_disposition=content_disposition,
359 self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
360 except KeyboardInterrupt:
361 print('- - - wait for threads to finish')
362 for thread in activethreads():
370 content_type=content_type,
375 # download_* auxiliary methods
376 def _get_remote_blocks_info(self, obj, **restargs):
377 #retrieve object hashmap
378 myrange = restargs.pop('data_range', None)
379 hashmap = self.get_object_hashmapp(obj, **restargs)
380 restargs['data_range'] = myrange
381 blocksize = int(hashmap['block_size'])
382 blockhash = hashmap['block_hash']
383 total_size = hashmap['bytes']
384 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
386 for i, h in enumerate(hashmap['hashes']):
388 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
390 def _dump_blocks_sync(self,
398 for blockid, blockhash in enumerate(remote_hashes):
399 if blockhash == None:
401 start = blocksize * blockid
402 end = total_size - 1 if start + blocksize > total_size\
403 else start + blocksize - 1
404 (start, end) = _range_up(start, end, range)
405 restargs['data_range'] = 'bytes=%s-%s' % (start, end)
406 r = self.object_get(obj, success=(200, 206), **restargs)
411 def _get_block_async(self, obj, **restargs):
412 event = SilentEvent(self.object_get,
419 def _hash_from_file(self, fp, start, size, blockhash):
421 block = fp.read(size)
422 h = newhashlib(blockhash)
423 h.update(block.strip('\x00'))
424 return hexlify(h.digest())
426 def _thread2file(self,
431 """write the results of a greenleted rest call to a file
432 @offset: the offset of the file up to blocksize
433 - e.g. if the range is 10-100, all
434 blocks will be written to normal_position - 10"""
436 for i, (start, g) in enumerate(flying.items()):
440 block = g.value.content
441 local_file.seek(start - offset)
442 local_file.write(block)
444 finished.append(flying.pop(start))
448 def _dump_blocks_async(self,
459 file_size = fstat(local_file.fileno()).st_size if resume else 0
463 if filerange is not None:
464 rstart = int(filerange.split('-')[0])
465 offset = rstart if blocksize > rstart else rstart % blocksize
467 self._init_thread_limit()
468 for block_hash, blockid in remote_hashes.items():
469 start = blocksize * blockid
470 if start < file_size\
471 and block_hash == self._hash_from_file(
478 self._watch_thread_limit(flying.values())
479 finished += self._thread2file(
484 end = total_size - 1 if start + blocksize > total_size\
485 else start + blocksize - 1
486 (start, end) = _range_up(start, end, filerange)
490 restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
491 flying[start] = self._get_block_async(obj, **restargs)
493 for thread in flying.values():
495 finished += self._thread2file(flying, local_file, offset, **restargs)
497 def download_object(self,
506 if_modified_since=None,
507 if_unmodified_since=None):
508 """Download an object using multiple connections (threads) and
509 writing to random parts of the file
511 :param obj: (str) remote object path
513 :param dst: open file descriptor (wb+)
515 :param download_cb: optional progress.bar object for downloading
517 :param version: (str) file version
519 :param resume: (bool) if set, preserve already downloaded file parts
521 :param range: (str) from-to where from and to are integers denoting
522 file positions in bytes
524 :param if_match: (str)
526 :param if_none_match: (str)
528 :param if_modified_since: (str) formated date
530 :param if_unmodified_since: (str) formated date
533 restargs = dict(version=version,
534 data_range=None if range is None else 'bytes=%s' % range,
536 if_none_match=if_none_match,
537 if_modified_since=if_modified_since,
538 if_unmodified_since=if_unmodified_since)
544 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
545 assert total_size >= 0
548 self.progress_bar_gen = download_cb(len(remote_hashes))
552 self._dump_blocks_sync(obj,
560 self._dump_blocks_async(obj,
570 dst.truncate(total_size)
574 #Command Progress Bar method
576 if hasattr(self, 'progress_bar_gen'):
578 self.progress_bar_gen.next()
582 def _complete_cb(self):
585 self.progress_bar_gen.next()
589 def get_object_hashmapp(self, obj,
593 if_modified_since=None,
594 if_unmodified_since=None,
597 :param obj: (str) remote object path
599 :param if_match: (str)
601 :param if_none_match: (str)
603 :param if_modified_since: (str) formated date
605 :param if_unmodified_since: (str) formated date
607 :param data_range: (str) from-to where from and to are integers
608 denoting file positions in bytes
613 r = self.object_get(obj,
616 if_etag_match=if_match,
617 if_etag_not_match=if_none_match,
618 if_modified_since=if_modified_since,
619 if_unmodified_since=if_unmodified_since,
620 data_range=data_range)
621 except ClientError as err:
622 if err.status == 304 or err.status == 412:
627 def set_account_group(self, group, usernames):
631 :param usernames: (list)
633 r = self.account_post(update=True, groups={group: usernames})
636 def del_account_group(self, group):
640 r = self.account_post(update=True, groups={group: []})
643 def get_account_info(self, until=None):
645 :param until: (str) formated date
649 r = self.account_head(until=until)
650 if r.status_code == 401:
651 raise ClientError("No authorization")
654 def get_account_quota(self):
658 return filter_in(self.get_account_info(),
659 'X-Account-Policy-Quota',
662 def get_account_versioning(self):
666 return filter_in(self.get_account_info(),
667 'X-Account-Policy-Versioning',
670 def get_account_meta(self, until=None):
672 :meta until: (str) formated date
676 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
678 def get_account_group(self):
682 return filter_in(self.get_account_info(), 'X-Account-Group-')
684 def set_account_meta(self, metapairs):
686 :param metapairs: (dict) {key1:val1, key2:val2, ...}
688 assert(type(metapairs) is dict)
689 r = self.account_post(update=True, metadata=metapairs)
692 def del_account_meta(self, metakey):
694 :param metakey: (str) metadatum key
696 r = self.account_post(update=True, metadata={metakey: ''})
699 def set_account_quota(self, quota):
703 r = self.account_post(update=True, quota=quota)
706 def set_account_versioning(self, versioning):
708 "param versioning: (str)
710 r = self.account_post(update=True, versioning=versioning)
713 def list_containers(self):
717 r = self.account_get()
720 def del_container(self, until=None, delimiter=None):
722 :param until: (str) formated date
724 :param delimiter: (str)
726 :raises ClientError: 404 Container does not exist
728 :raises ClientError: 409 Container is not empty
730 self._assert_container()
731 r = self.container_delete(until=until,
733 success=(204, 404, 409))
735 if r.status_code == 404:
736 raise ClientError('Container "%s" does not exist' % self.container,
738 elif r.status_code == 409:
739 raise ClientError('Container "%s" is not empty' % self.container,
742 def get_container_versioning(self, container):
744 :param container: (str)
748 self.container = container
749 return filter_in(self.get_container_info(),
750 'X-Container-Policy-Versioning')
752 def get_container_quota(self, container):
754 :param container: (str)
758 self.container = container
759 return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
761 def get_container_info(self, until=None):
763 :param until: (str) formated date
767 r = self.container_head(until=until)
770 def get_container_meta(self, until=None):
772 :param until: (str) formated date
776 return filter_in(self.get_container_info(until=until),
779 def get_container_object_meta(self, until=None):
781 :param until: (str) formated date
785 return filter_in(self.get_container_info(until=until),
786 'X-Container-Object-Meta')
788 def set_container_meta(self, metapairs):
790 :param metapairs: (dict) {key1:val1, key2:val2, ...}
792 assert(type(metapairs) is dict)
793 r = self.container_post(update=True, metadata=metapairs)
796 def del_container_meta(self, metakey):
798 :param metakey: (str) metadatum key
800 r = self.container_post(update=True, metadata={metakey: ''})
803 def set_container_quota(self, quota):
807 r = self.container_post(update=True, quota=quota)
810 def set_container_versioning(self, versioning):
812 :param versioning: (str)
814 r = self.container_post(update=True, versioning=versioning)
817 def del_object(self, obj, until=None, delimiter=None):
819 :param obj: (str) remote object path
821 :param until: (str) formated date
823 :param delimiter: (str)
825 self._assert_container()
826 r = self.object_delete(obj, until=until, delimiter=delimiter)
829 def set_object_meta(self, obj, metapairs):
831 :param obj: (str) remote object path
833 :param metapairs: (dict) {key1:val1, key2:val2, ...}
835 assert(type(metapairs) is dict)
836 r = self.object_post(obj, update=True, metadata=metapairs)
839 def del_object_meta(self, obj, metakey):
841 :param obj: (str) remote object path
843 :param metakey: (str) metadatum key
845 r = self.object_post(obj, update=True, metadata={metakey: ''})
848 def publish_object(self, obj):
850 :param obj: (str) remote object path
852 r = self.object_post(obj, update=True, public=True)
855 def unpublish_object(self, obj):
857 :param obj: (str) remote object path
859 r = self.object_post(obj, update=True, public=False)
862 def get_object_info(self, obj, version=None):
864 :param obj: (str) remote object path
866 :param version: (str)
870 r = self.object_head(obj, version=version)
873 def get_object_meta(self, obj, version=None):
875 :param obj: (str) remote object path
877 :param version: (str)
881 return filter_in(self.get_object_info(obj, version=version),
884 def get_object_sharing(self, obj):
886 :param obj: (str) remote object path
890 r = filter_in(self.get_object_info(obj),
895 perms = r['x-object-sharing'].split(';')
900 raise ClientError('Incorrect reply format')
901 (key, val) = perm.strip().split('=')
905 def set_object_sharing(self, obj,
906 read_permition=False,
907 write_permition=False):
908 """Give read/write permisions to an object.
910 :param obj: (str) remote object path
912 :param read_permition: (list - bool) users and user groups that get
913 read permition for this object - False means all previous read
914 permissions will be removed
916 :param write_perimition: (list - bool) of users and user groups to get
917 write permition for this object - False means all previous write
918 permissions will be removed
921 perms = dict(read='' if not read_permition else read_permition,
922 write='' if not write_permition else write_permition)
923 r = self.object_post(obj, update=True, permissions=perms)
926 def del_object_sharing(self, obj):
928 :param obj: (str) remote object path
930 self.set_object_sharing(obj)
932 def append_object(self, obj, source_file, upload_cb=None):
934 :param obj: (str) remote object path
936 :param source_file: open file descriptor
938 :param upload_db: progress.bar for uploading
941 self._assert_container()
942 meta = self.get_container_info()
943 blocksize = int(meta['x-container-block-size'])
944 filesize = fstat(source_file.fileno()).st_size
945 nblocks = 1 + (filesize - 1) // blocksize
947 if upload_cb is not None:
948 upload_gen = upload_cb(nblocks)
949 for i in range(nblocks):
950 block = source_file.read(min(blocksize, filesize - offset))
952 r = self.object_post(obj,
954 content_range='bytes */*',
955 content_type='application/octet-stream',
956 content_length=len(block),
960 if upload_cb is not None:
963 def truncate_object(self, obj, upto_bytes):
965 :param obj: (str) remote object path
967 :param upto_bytes: max number of bytes to leave on file
969 r = self.object_post(obj,
971 content_range='bytes 0-%s/*' % upto_bytes,
972 content_type='application/octet-stream',
973 object_bytes=upto_bytes,
974 source_object=path4url(self.container, obj))
977 def overwrite_object(self,
983 """Overwrite a part of an object from local source file
985 :param obj: (str) remote object path
987 :param start: (int) position in bytes to start overwriting from
989 :param end: (int) position in bytes to stop overwriting at
991 :param source_file: open file descriptor
993 :param upload_db: progress.bar for uploading
996 self._assert_container()
997 meta = self.get_container_info()
998 blocksize = int(meta['x-container-block-size'])
999 filesize = fstat(source_file.fileno()).st_size
1000 datasize = int(end) - int(start) + 1
1001 nblocks = 1 + (datasize - 1) // blocksize
1003 if upload_cb is not None:
1004 upload_gen = upload_cb(nblocks)
1005 for i in range(nblocks):
1006 block = source_file.read(min(blocksize,
1009 offset += len(block)
1010 r = self.object_post(obj,
1012 content_type='application/octet-stream',
1013 content_length=len(block),
1014 content_range='bytes %s-%s/*' % (start, end),
1018 if upload_cb is not None:
1021 def copy_object(self, src_container, src_object, dst_container,
1023 source_version=None,
1028 :param src_container: (str) source container
1030 :param src_object: (str) source object path
1032 :param dst_container: (str) destination container
1034 :param dst_object: (str) destination object path
1036 :param source_version: (str) source object version
1038 :param public: (bool)
1040 :param content_type: (str)
1042 :param delimiter: (str)
1044 self._assert_account()
1045 self.container = dst_container
1046 dst_object = dst_object or src_object
1047 src_path = path4url(src_container, src_object)
1048 r = self.object_put(dst_object,
1052 source_version=source_version,
1054 content_type=content_type,
1055 delimiter=delimiter)
1058 def move_object(self, src_container, src_object, dst_container,
1060 source_version=None,
1065 :param src_container: (str) source container
1067 :param src_object: (str) source object path
1069 :param dst_container: (str) destination container
1071 :param dst_object: (str) destination object path
1073 :param source_version: (str) source object version
1075 :param public: (bool)
1077 :param content_type: (str)
1079 :param delimiter: (str)
1081 self._assert_account()
1082 self.container = dst_container
1083 dst_object = dst_object or src_object
1084 src_path = path4url(src_container, src_object)
1085 r = self.object_put(dst_object,
1089 source_version=source_version,
1091 content_type=content_type,
1092 delimiter=delimiter)
1095 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1096 """Get accounts that share with self.account
1100 :param marker: (str)
1104 self._assert_account()
1106 self.set_param('format', 'json')
1107 self.set_param('limit', limit, iff=limit is not None)
1108 self.set_param('marker', marker, iff=marker is not None)
1111 success = kwargs.pop('success', (200, 204))
1112 r = self.get(path, *args, success=success, **kwargs)
1115 def get_object_versionlist(self, obj):
1117 :param obj: (str) remote object path
1121 self._assert_container()
1122 r = self.object_get(obj, format='json', version='list')
1123 return r.json['versions']