Raise block upload greenlet failure error
[kamaki] / kamaki / clients / pithos.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import gevent
35 #import gevent.monkey
36 # Monkey-patch everything for gevent early on
37 #gevent.monkey.patch_all()
38 import gevent.pool
39
40 from os import fstat, path
41 from hashlib import new as newhashlib
42 from time import time, sleep
43 from datetime import datetime
44 import sys
45
46 from binascii import hexlify
47
48 from .pithos_rest_api import PithosRestAPI
49 from .storage import ClientError
50 from .utils import path4url, filter_in
51 from StringIO import StringIO
52
53 def pithos_hash(block, blockhash):
54     h = newhashlib(blockhash)
55     h.update(block.rstrip('\x00'))
56     return h.hexdigest()
57
58 def _range_up(start, end, a_range):
59     if a_range:
60         (rstart, rend) = a_range.split('-')
61         (rstart, rend) = (int(rstart), int(rend))
62         if rstart > end or rend < start:
63             return (0,0)
64         if rstart > start:
65             start = rstart
66         if rend < end:
67             end = rend
68     return (start, end)
69
70 class PithosClient(PithosRestAPI):
71     """GRNet Pithos API client"""
72
73     def __init__(self, base_url, token, account=None, container = None):
74         super(PithosClient, self).__init__(base_url, token, account = account,
75             container = container)
76         self.async_pool = None
77
78     def purge_container(self):
79         self.container_delete(until=unicode(time()))
80         
81     def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
82         content_encoding=None, content_disposition=None, content_type=None, sharing=None,
83         public=None):
84         # This is a naive implementation, it loads the whole file in memory
85         #Look in pithos for a nice implementation
86         self.assert_container()
87
88         if withHashFile:
89             data = f.read()
90             try:
91                 import json
92                 data = json.dumps(json.loads(data))
93             except ValueError:
94                 raise ClientError(message='"%s" is not json-formated'%f.name, status=1)
95             except SyntaxError:
96                 raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1)
97             f = StringIO(data)
98         data = f.read(size) if size is not None else f.read()
99         self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
100             content_disposition=content_disposition, content_type=content_type, permitions=sharing,
101             public=public, success=201)
102         
103     #upload_* auxiliary methods 
104     def put_block_async(self, data, hash):
105         class SilentGreenlet(gevent.Greenlet):
106             def _report_error(self, exc_info):
107                 try:
108                     sys.stderr = StringIO()
109                     gevent.Greenlet._report_error(self, exc_info)
110                 finally:
111                     if hasattr(sys, '_stderr'):
112                         sys.stderr = _stderr
113         POOL_SIZE = self.POOL_SIZE if hasattr(self, 'POOL_SIZE') else 5
114         if self.async_pool is None:
115             self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
116         g = SilentGreenlet(self.put_block, data, hash)
117         self.async_pool.start(g)
118         return g
119
120     def put_block(self, data, hash):
121         r = self.container_post(update=True, content_type='application/octet-stream',
122             content_length=len(data), data=data, format='json')
123         assert r.json[0] == hash, 'Local hash does not match server'
124         
125     def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
126         content_disposition=None, content_type=None, sharing=None, public=None):
127         self.assert_container()
128         obj_content_type = 'application/octet-stream' if content_type is None else content_type
129         self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
130             content_disposition=content_disposition, content_type=content_type, permitions=sharing,
131             public=public, manifest='%s/%s'%(self.container,obj))
132        
133     def _get_file_block_info(self, fileobj, size=None):
134         meta = self.get_container_info()
135         blocksize = int(meta['x-container-block-size'])
136         blockhash = meta['x-container-block-hash']
137         size = size if size is not None else fstat(fileobj.fileno()).st_size
138         nblocks = 1 + (size - 1) // blocksize
139         return (blocksize, blockhash, size, nblocks)
140
141     def _get_missing_hashes(self, obj, json, size=None, format='json', hashmap=True,
142         content_type=None, etag=None, content_encoding=None, content_disposition=None,
143         permitions=None, public=None, success=(201, 409)):
144         r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
145             json=json, etag=etag, content_encoding=content_encoding,
146             content_disposition=content_disposition, permitions=permitions, public=public,
147             success=success)
148         if r.status_code == 201:
149             return None
150         return r.json
151
152     def _caclulate_uploaded_blocks(self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
153         hash_cb=None):
154         offset=0
155         if hash_cb:
156             hash_gen = hash_cb(nblocks)
157             hash_gen.next()
158
159         for i in range(nblocks):
160             block = fileobj.read(min(blocksize, size - offset))
161             bytes = len(block)
162             hash = pithos_hash(block, blockhash)
163             hashes.append(hash)
164             hmap[hash] = (offset, bytes)
165             offset += bytes
166             if hash_cb:
167                 hash_gen.next()
168         assert offset == size
169
170     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
171         """upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
172         """
173         if upload_cb:
174             upload_gen = upload_cb(len(missing))
175             upload_gen.next()
176
177         flying = []
178         for hash in missing:
179             offset, bytes = hmap[hash]
180             fileobj.seek(offset)
181             data = fileobj.read(bytes)
182             r = self.put_block_async(data, hash)
183             flying.append(r)
184             for r in flying:
185                 if r.ready():
186                     if r.exception:
187                         raise r.exception
188                     if upload_cb:
189                         upload_gen.next()
190             flying = [r for r in flying if not r.ready()]
191         while upload_cb:
192             try:
193                 upload_gen.next()
194             except StopIteration:
195                 break
196         gevent.joinall(flying)
197
198         failures = [r for r in flying if r.exception]
199         if len(flying):
200             details = ', '.join(['%s.%s'%(i,r) for i,r in enumerate(failures)])
201             raise ClientError(message="Block uploading failed", status=505, details=details)
202
203     def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None,
204         content_encoding=None, content_disposition=None, content_type=None, sharing=None,
205         public=None):
206         self.assert_container()
207
208         #init
209         block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size)
210         (hashes, hmap, offset) = ([], {}, 0)
211         content_type = 'application/octet-stream' if content_type is None else content_type
212
213         self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f,
214             hash_cb=hash_cb)
215
216         hashmap = dict(bytes=size, hashes=hashes)
217         missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size,
218             etag=etag, content_encoding=content_encoding, content_disposition=content_disposition,
219             permitions=sharing, public=public)
220
221         if missing is None:
222             return
223         self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
224
225         self.object_put(obj, format='json', hashmap=True, content_type=content_type, 
226             json=hashmap, success=201)
227     
228     #download_* auxiliary methods
229     #ALl untested
230     def _get_remote_blocks_info(self, obj, **restargs):
231         #retrieve object hashmap
232         myrange = restargs.pop('data_range') if 'data_range' in restargs.keys() else None
233         hashmap = self.get_object_hashmap(obj, **restargs)
234         restargs['data_range'] = myrange
235         blocksize = int(hashmap['block_size'])
236         blockhash = hashmap['block_hash']
237         total_size = hashmap['bytes']
238         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
239         map_dict = {}
240         for i, h in enumerate(hashmap['hashes']):
241             map_dict[h] = i
242         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
243
244     def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, range, **restargs):
245         for blockid, blockhash in enumerate(remote_hashes):
246             if blockhash == None:
247                 continue
248             start = blocksize*blockid
249             end = total_size-1 if start+blocksize > total_size else start+blocksize-1
250             (start, end) = _range_up(start, end, range)
251             restargs['data_range'] = 'bytes=%s-%s'%(start, end)
252             r = self.object_get(obj, success=(200, 206), **restargs)
253             self._cb_next()
254             dst.write(r.content)
255             dst.flush()
256
257     def _get_block_async(self, obj, **restargs):
258         class SilentGreenlet(gevent.Greenlet):
259             def _report_error(self, exc_info):
260                 try:
261                     sys.stderr = StringIO()
262                     gevent.Greenlet._report_error(self, exc_info)
263                 finally:
264                     if hasattr(sys, '_stderr'):
265                         sys.stderr = sys._stderr
266         if not hasattr(self, 'POOL_SIZE'):
267             self.POOL_SIZE = 5
268         if self.async_pool is None:
269             self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
270         g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
271         self.async_pool.start(g)
272         return g
273
274     def _hash_from_file(self, fp, start, size, blockhash):
275         fp.seek(start)
276         block = fp.read(size)
277         h = newhashlib(blockhash)
278         h.update(block.strip('\x00'))
279         return hexlify(h.digest())
280
281     def _greenlet2file(self, flying_greenlets, local_file, offset = 0, **restargs):
282         """write the results of a greenleted rest call to a file
283         @offset: the offset of the file up to blocksize - e.g. if the range is 10-100, all
284         blocks will be written to normal_position - 10"""
285         finished = []
286         for start, g in flying_greenlets.items():
287             if g.ready():
288                 if g.exception:
289                     raise g.exception
290                 block = g.value.content
291                 local_file.seek(start - offset)
292                 local_file.write(block)
293                 self._cb_next()
294                 finished.append(flying_greenlets.pop(start))
295         local_file.flush()
296         return finished
297
298     def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file,
299         blockhash=None, resume=False, filerange = None, **restargs):
300
301         file_size = fstat(local_file.fileno()).st_size if resume else 0
302         flying_greenlets = {}
303         finished_greenlets = []
304         offset = 0
305         if filerange is not None:
306             rstart = int(filerange.split('-')[0])
307             offset = rstart if blocksize > rstart else rstart%blocksize
308         for block_hash, blockid in remote_hashes.items():
309             start = blocksize*blockid
310             if start < file_size and block_hash == self._hash_from_file(local_file, 
311                 start, blocksize, blockhash):
312                     self._cb_next()
313                     continue
314             if len(flying_greenlets) >= self.POOL_SIZE:
315                 finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
316                     **restargs)
317             end = total_size-1 if start+blocksize > total_size else start+blocksize-1
318             (start, end) = _range_up(start, end, filerange)
319             if start == end:
320                 self._cb_next()
321                 continue
322             restargs['async_headers'] = dict(Range='bytes=%s-%s'%(start, end))
323             flying_greenlets[start] = self._get_block_async(obj, **restargs)
324
325         #check the greenlets
326         while len(flying_greenlets) > 0:
327             sleep(0.001)
328             finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
329                 **restargs)
330
331         gevent.joinall(finished_greenlets)
332
333     def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
334         range=None, if_match=None, if_none_match=None, if_modified_since=None,
335         if_unmodified_since=None):
336
337         restargs=dict(version=version,
338             data_range = None if range is None else 'bytes=%s'%range,
339             if_match=if_match,
340             if_none_match=if_none_match,
341             if_modified_since=if_modified_since,
342             if_unmodified_since=if_unmodified_since)
343
344         (   blocksize,
345             blockhash,
346             total_size,
347             hash_list, 
348             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
349         assert total_size >= 0
350         self.POOL_SIZE = 5
351
352         if download_cb:
353             self.progress_bar_gen = download_cb(len(remote_hashes))
354             self._cb_next()
355
356         if dst.isatty():
357             self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, range, **restargs)
358         else:
359             self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, blockhash,
360                 resume, range, **restargs)
361             if range is None:
362                 dst.truncate(total_size)
363
364         self._complete_cb()
365
366     #Command Progress Bar method
367     def _cb_next(self):
368         if hasattr(self, 'progress_bar_gen'):
369             try:
370                 self.progress_bar_gen.next()
371             except:
372                 pass
373     def _complete_cb(self):
374         while True:
375             try:
376                 self.progress_bar_gen.next()
377             except:
378                 break
379
380     #Untested - except is download_object is tested first
381     def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
382         if_modified_since=None, if_unmodified_since=None, data_range=None):
383         try:
384             r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
385                 if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
386                 if_unmodified_since=if_unmodified_since, data_range=data_range)
387         except ClientError as err:
388             if err.status == 304 or err.status == 412:
389                 return {}
390             raise
391         return r.json
392
393     def set_account_group(self, group, usernames):
394         self.account_post(update=True, groups = {group:usernames})
395
396     def del_account_group(self, group):
397         self.account_post(update=True, groups={group:[]})
398
399     def get_account_info(self, until=None):
400         r = self.account_head(until=until)
401         if r.status_code == 401:
402             raise ClientError("No authorization")
403         return r.headers
404
405     def get_account_quota(self):
406         return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
407
408     def get_account_versioning(self):
409         return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
410
411     def get_account_meta(self, until=None):
412         return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
413
414     def get_account_group(self):
415         return filter_in(self.get_account_info(), 'X-Account-Group-')
416
417     def set_account_meta(self, metapairs):
418         assert(type(metapairs) is dict)
419         self.account_post(update=True, metadata=metapairs)
420
421     def del_account_meta(self, metakey):
422         self.account_post(update=True, metadata={metakey:''})
423
424     def set_account_quota(self, quota):
425         self.account_post(update=True, quota=quota)
426
427     def set_account_versioning(self, versioning):
428         self.account_post(update=True, versioning = versioning)
429
430     def list_containers(self):
431         r = self.account_get()
432         return r.json
433
434     def del_container(self, until=None, delimiter=None):
435         self.assert_container()
436         r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
437         if r.status_code == 404:
438             raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
439         elif r.status_code == 409:
440             raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
441
442     def get_container_versioning(self, container):
443         self.container = container
444         return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
445
446     def get_container_quota(self, container):
447         self.container = container
448         return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
449
450     def get_container_info(self, until = None):
451         r = self.container_head(until=until)
452         return r.headers
453
454     def get_container_meta(self, until = None):
455         return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
456
457     def get_container_object_meta(self, until = None):
458         return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
459
460     def set_container_meta(self, metapairs):
461         assert(type(metapairs) is dict)
462         self.container_post(update=True, metadata=metapairs)
463         
464     def del_container_meta(self, metakey):
465         self.container_post(update=True, metadata={metakey:''})
466
467     def set_container_quota(self, quota):
468         self.container_post(update=True, quota=quota)
469
470     def set_container_versioning(self, versioning):
471         self.container_post(update=True, versioning=versioning)
472
473     def del_object(self, obj, until=None, delimiter=None):
474         self.assert_container()
475         self.object_delete(obj, until=until, delimiter=delimiter)
476
477     def set_object_meta(self, object, metapairs):
478         assert(type(metapairs) is dict)
479         self.object_post(object, update=True, metadata=metapairs)
480
481     def del_object_meta(self, metakey, object):
482         self.object_post(object, update=True, metadata={metakey:''})
483
484     def publish_object(self, object):
485         self.object_post(object, update=True, public=True)
486
487     def unpublish_object(self, object):
488         self.object_post(object, update=True, public=False)
489
490     def get_object_info(self, obj, version=None):
491         r = self.object_head(obj, version=version)
492         return r.headers
493
494     def get_object_meta(self, obj, version=None):
495         return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
496
497     def get_object_sharing(self, object):
498         r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
499         reply = {}
500         if len(r) > 0:
501             perms = r['x-object-sharing'].split(';')
502             for perm in perms:
503                 try:
504                     perm.index('=')
505                 except ValueError:
506                     raise ClientError('Incorrect reply format')
507                 (key, val) = perm.strip().split('=')
508                 reply[key] = val
509         return reply
510
511     def set_object_sharing(self, object, read_permition = False, write_permition = False):
512         """Give read/write permisions to an object.
513            @param object is the object to change sharing permitions onto
514            @param read_permition is a list of users and user groups that get read permition for this object
515                 False means all previous read permitions will be removed
516            @param write_perimition is a list of users and user groups to get write permition for this object
517                 False means all previous read permitions will be removed
518         """
519         perms = {}
520         perms['read'] = read_permition if isinstance(read_permition, list) else ''
521         perms['write'] = write_permition if isinstance(write_permition, list) else ''
522         self.object_post(object, update=True, permitions=perms)
523
524     def del_object_sharing(self, object):
525         self.set_object_sharing(object)
526
527     def append_object(self, object, source_file, upload_cb = None):
528         """@param upload_db is a generator for showing progress of upload
529             to caller application, e.g. a progress bar. Its next is called
530             whenever a block is uploaded
531         """
532         self.assert_container()
533         meta = self.get_container_info()
534         blocksize = int(meta['x-container-block-size'])
535         filesize = fstat(source_file.fileno()).st_size
536         nblocks = 1 + (filesize - 1)//blocksize
537         offset = 0
538         if upload_cb is not None:
539             upload_gen = upload_cb(nblocks)
540         for i in range(nblocks):
541             block = source_file.read(min(blocksize, filesize - offset))
542             offset += len(block)
543             self.object_post(object, update=True, content_range='bytes */*',
544                 content_type='application/octet-stream', content_length=len(block), data=block)
545             
546             if upload_cb is not None:
547                 upload_gen.next()
548
549     def truncate_object(self, object, upto_bytes):
550         self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
551             content_type='application/octet-stream', object_bytes=upto_bytes,
552             source_object=path4url(self.container, object))
553
554     def overwrite_object(self, object, start, end, source_file, upload_cb=None):
555         """Overwrite a part of an object with given source file
556            @start the part of the remote object to start overwriting from, in bytes
557            @end the part of the remote object to stop overwriting to, in bytes
558         """
559         self.assert_container()
560         meta = self.get_container_info()
561         blocksize = int(meta['x-container-block-size'])
562         filesize = fstat(source_file.fileno()).st_size
563         datasize = int(end) - int(start) + 1
564         nblocks = 1 + (datasize - 1)//blocksize
565         offset = 0
566         if upload_cb is not None:
567             upload_gen = upload_cb(nblocks)
568         for i in range(nblocks):
569             block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
570             offset += len(block)
571             self.object_post(object, update=True, content_type='application/octet-stream', 
572                 content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
573             
574             if upload_cb is not None:
575                 upload_gen.next()
576
577     def copy_object(self, src_container, src_object, dst_container, dst_object=False,
578         source_version = None, public=False, content_type=None, delimiter=None):
579         self.assert_account()
580         self.container = dst_container
581         dst_object = dst_object or src_object
582         src_path = path4url(src_container, src_object)
583         self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
584             source_version=source_version, public=public, content_type=content_type,
585             delimiter=delimiter)
586
587     def move_object(self, src_container, src_object, dst_container, dst_object=False,
588         source_version = None, public=False, content_type=None, delimiter=None):
589         self.assert_account()
590         self.container = dst_container
591         dst_object = dst_object or src_object
592         src_path = path4url(src_container, src_object)
593         self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
594             source_version=source_version, public=public, content_type=content_type,
595             delimiter=delimiter)
596
597     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
598         """Get accounts that share with self.account"""
599         self.assert_account()
600
601         self.set_param('format','json')
602         self.set_param('limit',limit, iff = limit is not None)
603         self.set_param('marker',marker, iff = marker is not None)
604
605         path = ''
606         success = kwargs.pop('success', (200, 204))
607         r = self.get(path, *args, success = success, **kwargs)
608         return r.json
609
610     def get_object_versionlist(self, path):
611         self.assert_container()
612         r = self.object_get(path, format='json', version='list')
613         return r.json['versions']