1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent
43 from kamaki.clients.pithos_rest_api import PithosRestAPI
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestAPI):
69 """GRNet Pithos API client"""
71 _thread_exceptions = []
73 def __init__(self, base_url, token, account=None, container=None):
74 super(PithosClient, self).__init__(base_url, token, account, container)
76 def purge_container(self):
77 r = self.container_delete(until=unicode(time()))
80 def upload_object_unchunked(self, obj, f,
84 content_encoding=None,
85 content_disposition=None,
89 self.assert_container()
95 data = json.dumps(json.loads(data))
97 raise ClientError(message='"%s" is not json-formated' % f.name,
100 raise ClientError(message='"%s" is not a valid hashmap file'\
103 data = f.read(size) if size is not None else f.read()
104 r = self.object_put(obj,
107 content_encoding=content_encoding,
108 content_disposition=content_disposition,
109 content_type=content_type,
115 # upload_* auxiliary methods
116 def put_block_async(self, data, hash):
117 event = SilentEvent(method=self.put_block, data=data, hash=hash)
121 def put_block(self, data, hash):
122 r = self.container_post(update=True,
123 content_type='application/octet-stream',
124 content_length=len(data),
127 assert r.json[0] == hash, 'Local hash does not match server'
129 def create_object_by_manifestation(self, obj,
131 content_encoding=None,
132 content_disposition=None,
136 self.assert_container()
137 r = self.object_put(obj,
140 content_encoding=content_encoding,
141 content_disposition=content_disposition,
142 content_type=content_type,
145 manifest='%s/%s' % (self.container, obj))
148 def _get_file_block_info(self, fileobj, size=None):
149 meta = self.get_container_info()
150 blocksize = int(meta['x-container-block-size'])
151 blockhash = meta['x-container-block-hash']
152 size = size if size is not None else fstat(fileobj.fileno()).st_size
153 nblocks = 1 + (size - 1) // blocksize
154 return (blocksize, blockhash, size, nblocks)
156 def _get_missing_hashes(self, obj, json,
162 content_encoding=None,
163 content_disposition=None,
167 r = self.object_put(obj,
170 content_type=content_type,
173 content_encoding=content_encoding,
174 content_disposition=content_disposition,
175 permissions=permissions,
178 if r.status_code == 201:
183 def _caclulate_uploaded_blocks(self,
194 hash_gen = hash_cb(nblocks)
197 for i in range(nblocks):
198 block = fileobj.read(min(blocksize, size - offset))
200 hash = pithos_hash(block, blockhash)
202 hmap[hash] = (offset, bytes)
206 assert offset == size
208 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
209 """upload missing blocks asynchronously.
212 upload_gen = upload_cb(len(missing))
215 self._init_thread_limit()
219 offset, bytes = hmap[hash]
221 data = fileobj.read(bytes)
222 r = self.put_block_async(data, hash)
225 for i, thread in enumerate(flying):
227 unfinished = self._watch_thread_limit(unfinished)
229 if thread.isAlive() or thread.exception:
230 unfinished.append(thread)
236 for thread in flying:
239 failures = [r for r in flying if r.exception]
241 details = ', '.join([' (%s).%s' % (i, r.exception)\
242 for i, r in enumerate(failures)])
243 raise ClientError(message="Block uploading failed",
250 except StopIteration:
253 def upload_object(self, obj, f,
258 content_encoding=None,
259 content_disposition=None,
263 self.assert_container()
266 block_info = (blocksize, blockhash, size, nblocks) =\
267 self._get_file_block_info(f, size)
268 (hashes, hmap, offset) = ([], {}, 0)
269 if content_type is None:
270 content_type = 'application/octet-stream'
272 self._caclulate_uploaded_blocks(*block_info,
278 hashmap = dict(bytes=size, hashes=hashes)
279 missing = self._get_missing_hashes(obj, hashmap,
280 content_type=content_type,
283 content_encoding=content_encoding,
284 content_disposition=content_disposition,
291 self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
292 except KeyboardInterrupt:
293 print('- - - wait for threads to finish')
294 for thread in activethreads():
302 content_type=content_type,
307 # download_* auxiliary methods
308 def _get_remote_blocks_info(self, obj, **restargs):
309 #retrieve object hashmap
310 myrange = restargs.pop('data_range', None)
311 hashmap = self.get_object_hashmap(obj, **restargs)
312 restargs['data_range'] = myrange
313 blocksize = int(hashmap['block_size'])
314 blockhash = hashmap['block_hash']
315 total_size = hashmap['bytes']
316 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
318 for i, h in enumerate(hashmap['hashes']):
320 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
322 def _dump_blocks_sync(self,
330 for blockid, blockhash in enumerate(remote_hashes):
331 if blockhash == None:
333 start = blocksize * blockid
334 end = total_size - 1 if start + blocksize > total_size\
335 else start + blocksize - 1
336 (start, end) = _range_up(start, end, range)
337 restargs['data_range'] = 'bytes=%s-%s' % (start, end)
338 r = self.object_get(obj, success=(200, 206), **restargs)
343 def _get_block_async(self, obj, **restargs):
344 event = SilentEvent(self.object_get,
351 def _hash_from_file(self, fp, start, size, blockhash):
353 block = fp.read(size)
354 h = newhashlib(blockhash)
355 h.update(block.strip('\x00'))
356 return hexlify(h.digest())
358 def _thread2file(self,
363 """write the results of a greenleted rest call to a file
364 @offset: the offset of the file up to blocksize
365 - e.g. if the range is 10-100, all
366 blocks will be written to normal_position - 10"""
368 for i, (start, g) in enumerate(flying.items()):
372 block = g.value.content
373 local_file.seek(start - offset)
374 local_file.write(block)
376 finished.append(flying.pop(start))
380 def _dump_blocks_async(self,
391 file_size = fstat(local_file.fileno()).st_size if resume else 0
395 if filerange is not None:
396 rstart = int(filerange.split('-')[0])
397 offset = rstart if blocksize > rstart else rstart % blocksize
399 self._init_thread_limit()
400 for block_hash, blockid in remote_hashes.items():
401 start = blocksize * blockid
402 if start < file_size\
403 and block_hash == self._hash_from_file(
410 self._watch_thread_limit(flying.values())
411 finished += self._thread2file(
416 end = total_size - 1 if start + blocksize > total_size\
417 else start + blocksize - 1
418 (start, end) = _range_up(start, end, filerange)
422 restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
423 flying[start] = self._get_block_async(obj, **restargs)
425 for thread in flying.values():
427 finished += self._thread2file(flying, local_file, offset, **restargs)
429 def download_object(self,
439 if_modified_since=None,
440 if_unmodified_since=None):
442 restargs = dict(version=version,
443 data_range=None if range is None else 'bytes=%s' % range,
445 if_none_match=if_none_match,
446 if_modified_since=if_modified_since,
447 if_unmodified_since=if_unmodified_since)
453 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
454 assert total_size >= 0
457 self.progress_bar_gen = download_cb(len(remote_hashes))
461 self._dump_blocks_sync(obj,
469 self._dump_blocks_async(obj,
479 dst.truncate(total_size)
483 #Command Progress Bar method
485 if hasattr(self, 'progress_bar_gen'):
487 self.progress_bar_gen.next()
491 def _complete_cb(self):
494 self.progress_bar_gen.next()
498 # Untested - except is download_object is tested first
499 def get_object_hashmap(self, obj,
503 if_modified_since=None,
504 if_unmodified_since=None,
507 r = self.object_get(obj,
510 if_etag_match=if_match,
511 if_etag_not_match=if_none_match,
512 if_modified_since=if_modified_since,
513 if_unmodified_since=if_unmodified_since,
514 data_range=data_range)
515 except ClientError as err:
516 if err.status == 304 or err.status == 412:
521 def set_account_group(self, group, usernames):
522 r = self.account_post(update=True, groups={group: usernames})
525 def del_account_group(self, group):
526 r = self.account_post(update=True, groups={group: []})
529 def get_account_info(self, until=None):
530 r = self.account_head(until=until)
531 if r.status_code == 401:
532 raise ClientError("No authorization")
535 def get_account_quota(self):
536 return filter_in(self.get_account_info(),
537 'X-Account-Policy-Quota',
540 def get_account_versioning(self):
541 return filter_in(self.get_account_info(),
542 'X-Account-Policy-Versioning',
545 def get_account_meta(self, until=None):
546 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
548 def get_account_group(self):
549 return filter_in(self.get_account_info(), 'X-Account-Group-')
551 def set_account_meta(self, metapairs):
552 assert(type(metapairs) is dict)
553 r = self.account_post(update=True, metadata=metapairs)
556 def del_account_meta(self, metakey):
557 r = self.account_post(update=True, metadata={metakey: ''})
560 def set_account_quota(self, quota):
561 r = self.account_post(update=True, quota=quota)
564 def set_account_versioning(self, versioning):
565 r = self.account_post(update=True, versioning=versioning)
568 def list_containers(self):
569 r = self.account_get()
572 def del_container(self, until=None, delimiter=None):
573 self.assert_container()
574 r = self.container_delete(until=until,
576 success=(204, 404, 409))
578 if r.status_code == 404:
579 raise ClientError('Container "%s" does not exist' % self.container,
581 elif r.status_code == 409:
582 raise ClientError('Container "%s" is not empty' % self.container,
585 def get_container_versioning(self, container):
586 self.container = container
587 return filter_in(self.get_container_info(),
588 'X-Container-Policy-Versioning')
590 def get_container_quota(self, container):
591 self.container = container
592 return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
594 def get_container_info(self, until=None):
595 r = self.container_head(until=until)
598 def get_container_meta(self, until=None):
599 return filter_in(self.get_container_info(until=until),
602 def get_container_object_meta(self, until=None):
603 return filter_in(self.get_container_info(until=until),
604 'X-Container-Object-Meta')
606 def set_container_meta(self, metapairs):
607 assert(type(metapairs) is dict)
608 r = self.container_post(update=True, metadata=metapairs)
611 def del_container_meta(self, metakey):
612 r = self.container_post(update=True, metadata={metakey: ''})
615 def set_container_quota(self, quota):
616 r = self.container_post(update=True, quota=quota)
619 def set_container_versioning(self, versioning):
620 r = self.container_post(update=True, versioning=versioning)
623 def del_object(self, obj, until=None, delimiter=None):
624 self.assert_container()
625 r = self.object_delete(obj, until=until, delimiter=delimiter)
628 def set_object_meta(self, object, metapairs):
629 assert(type(metapairs) is dict)
630 r = self.object_post(object, update=True, metadata=metapairs)
633 def del_object_meta(self, metakey, object):
634 r = self.object_post(object, update=True, metadata={metakey: ''})
637 def publish_object(self, object):
638 r = self.object_post(object, update=True, public=True)
641 def unpublish_object(self, object):
642 r = self.object_post(object, update=True, public=False)
645 def get_object_info(self, obj, version=None):
646 r = self.object_head(obj, version=version)
649 def get_object_meta(self, obj, version=None):
650 return filter_in(self.get_object_info(obj, version=version),
653 def get_object_sharing(self, object):
654 r = filter_in(self.get_object_info(object),
659 perms = r['x-object-sharing'].split(';')
664 raise ClientError('Incorrect reply format')
665 (key, val) = perm.strip().split('=')
669 def set_object_sharing(self, object,
670 read_permition=False,
671 write_permition=False):
672 """Give read/write permisions to an object.
673 @param object is the object to change sharing permissions
675 @param read_permition is a list of users and user groups that
676 get read permition for this object
677 False means all previous read permissions
679 @param write_perimition is a list of users and user groups to
680 get write permition for this object
681 False means all previous read permissions
684 perms = dict(read='' if not read_permition else read_permition,
685 write='' if not write_permition else write_permition)
686 r = self.object_post(object, update=True, permissions=perms)
689 def del_object_sharing(self, object):
690 self.set_object_sharing(object)
692 def append_object(self, object, source_file, upload_cb=None):
693 """@param upload_db is a generator for showing progress of upload
694 to caller application, e.g. a progress bar. Its next is called
695 whenever a block is uploaded
697 self.assert_container()
698 meta = self.get_container_info()
699 blocksize = int(meta['x-container-block-size'])
700 filesize = fstat(source_file.fileno()).st_size
701 nblocks = 1 + (filesize - 1) // blocksize
703 if upload_cb is not None:
704 upload_gen = upload_cb(nblocks)
705 for i in range(nblocks):
706 block = source_file.read(min(blocksize, filesize - offset))
708 r = self.object_post(object,
710 content_range='bytes */*',
711 content_type='application/octet-stream',
712 content_length=len(block),
716 if upload_cb is not None:
719 def truncate_object(self, object, upto_bytes):
720 r = self.object_post(object,
722 content_range='bytes 0-%s/*' % upto_bytes,
723 content_type='application/octet-stream',
724 object_bytes=upto_bytes,
725 source_object=path4url(self.container, object))
728 def overwrite_object(self,
734 """Overwrite a part of an object with given source file
735 @start the part of the remote object to start overwriting from,
737 @end the part of the remote object to stop overwriting to, in bytes
739 self.assert_container()
740 meta = self.get_container_info()
741 blocksize = int(meta['x-container-block-size'])
742 filesize = fstat(source_file.fileno()).st_size
743 datasize = int(end) - int(start) + 1
744 nblocks = 1 + (datasize - 1) // blocksize
746 if upload_cb is not None:
747 upload_gen = upload_cb(nblocks)
748 for i in range(nblocks):
749 block = source_file.read(min(blocksize,
753 r = self.object_post(object,
755 content_type='application/octet-stream',
756 content_length=len(block),
757 content_range='bytes %s-%s/*' % (start, end),
761 if upload_cb is not None:
764 def copy_object(self, src_container, src_object, dst_container,
770 self.assert_account()
771 self.container = dst_container
772 dst_object = dst_object or src_object
773 src_path = path4url(src_container, src_object)
774 r = self.object_put(dst_object,
778 source_version=source_version,
780 content_type=content_type,
784 def move_object(self, src_container, src_object, dst_container,
790 self.assert_account()
791 self.container = dst_container
792 dst_object = dst_object or src_object
793 src_path = path4url(src_container, src_object)
794 r = self.object_put(dst_object,
798 source_version=source_version,
800 content_type=content_type,
804 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
805 """Get accounts that share with self.account"""
806 self.assert_account()
808 self.set_param('format', 'json')
809 self.set_param('limit', limit, iff=limit is not None)
810 self.set_param('marker', marker, iff=marker is not None)
813 success = kwargs.pop('success', (200, 204))
814 r = self.get(path, *args, success=success, **kwargs)
817 def get_object_versionlist(self, path):
818 self.assert_container()
819 r = self.object_get(path, format='json', version='list')
820 return r.json['versions']