1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from httplib import HTTPConnection, HTTP
36 from xml.dom import minidom
41 import pithos.api.faults
43 ERROR_CODES = {304:'Not Modified',
48 411:'Length Required',
49 412:'Precondition Failed',
50 416:'Range Not Satisfiable',
51 422:'Unprocessable Entity',
52 503:'Service Unavailable'}
54 class Fault(Exception):
55 def __init__(self, data='', status=None):
56 if data == '' and status in ERROR_CODES.keys():
57 data = ERROR_CODES[status]
58 Exception.__init__(self, data)
63 def __init__(self, host, token, account, api='v1', verbose=False, debug=False):
64 """`host` can also include a port, e.g '127.0.0.1:8000'."""
67 self.account = account
69 self.verbose = verbose or debug
73 def _req(self, method, path, body=None, headers={}, format='text',
75 full_path = '/%s/%s%s?format=%s' % (self.api, self.account, path,
77 for k,v in params.items():
79 full_path = '%s&%s=%s' %(full_path, k, v)
81 full_path = '%s&%s' %(full_path, k)
82 conn = HTTPConnection(self.host)
85 full_path = full_path.replace(' ', '%20')
88 for k,v in headers.items():
90 k = k.replace('_', '-')
93 kwargs['headers'] = headers
94 kwargs['headers']['X-Auth-Token'] = self.token
97 kwargs['headers'].setdefault('content-type',
98 'application/octet-stream')
99 kwargs['headers'].setdefault('content-length', len(body) if body else 0)
101 #print '*', method, full_path, kwargs
102 conn.request(method, full_path, **kwargs)
103 except socket.error, e:
104 raise Fault(status=503)
106 resp = conn.getresponse()
107 headers = dict(resp.getheaders())
110 print '%d %s' % (resp.status, resp.reason)
111 for key, val in headers.items():
112 print '%s: %s' % (key.capitalize(), val)
115 length = resp.getheader('content-length', None)
116 data = resp.read(length)
121 if int(resp.status) in ERROR_CODES.keys():
122 #print '**', resp.status
123 raise Fault(data, int(resp.status))
125 #print '**', resp.status, headers, data
126 return resp.status, headers, data
128 def delete(self, path, format='text', params={}):
129 return self._req('DELETE', path, format=format, params=params)
131 def get(self, path, format='text', headers=None, params={}):
132 return self._req('GET', path, headers=headers, format=format,
135 def head(self, path, format='text', params={}):
136 return self._req('HEAD', path, format=format, params=params)
138 def post(self, path, body=None, format='text', headers=None, params={}):
139 return self._req('POST', path, body, headers=headers, format=format,
142 def put(self, path, body=None, format='text', headers=None):
143 return self._req('PUT', path, body, headers=headers, format=format)
145 def _list(self, path, format='text', params={}, **headers):
146 status, headers, data = self.get(path, format=format, headers=headers,
149 data = json.loads(data) if data else ''
150 elif format == 'xml':
151 data = minidom.parseString(data)
153 data = data.strip().split('\n') if data else ''
156 def _get_metadata(self, path, prefix=None, params={}):
157 status, headers, data = self.head(path, params=params)
158 prefixlen = len(prefix) if prefix else 0
160 for key, val in headers.items():
161 if prefix and not key.startswith(prefix):
163 elif prefix and key.startswith(prefix):
164 key = key[prefixlen:]
168 def _filter(self, l, d):
170 filter out from l elements having the metadata values provided
174 if type(elem) == types.DictionaryType:
176 k = 'x_object_meta_%s' % key
177 if k in elem.keys() and elem[k] == d[key]:
182 class OOS_Client(Client):
183 """Openstack Object Storage Client"""
185 def _update_metadata(self, path, entity, **meta):
186 """adds new and updates the values of previously set metadata"""
187 ex_meta = self.retrieve_account_metadata(restricted=True)
190 prefix = 'x-%s-meta-' % entity
191 for k,v in ex_meta.items():
192 k = '%s%s' % (prefix, k)
194 return self.post(path, headers=headers)
196 def _reset_metadata(self, path, entity, **meta):
198 overwrites all user defined metadata
201 prefix = 'x-%s-meta-' % entity
202 for k,v in meta.items():
203 k = '%s%s' % (prefix, k)
205 return self.post(path, headers=headers)
207 def _delete_metadata(self, path, entity, meta=[]):
208 """delete previously set metadata"""
209 ex_meta = self.retrieve_account_metadata(restricted=True)
211 prefix = 'x-%s-meta-' % entity
212 for k in ex_meta.keys():
214 headers['%s%s' % (prefix, k)] = ex_meta[k]
215 return self.post(path, headers=headers)
217 # Storage Account Services
219 def list_containers(self, format='text', limit=10000, marker=None, params={},
221 """lists containers"""
224 params.update({'limit':limit, 'marker':marker})
225 return self._list('', format, params, **headers)
227 def retrieve_account_metadata(self, restricted=False, **params):
228 """returns the account metadata"""
229 prefix = 'x-account-meta-' if restricted else None
230 return self._get_metadata('', prefix, params)
232 def update_account_metadata(self, **meta):
233 """updates the account metadata"""
234 return self._update_metadata('', 'account', **meta)
236 def delete_account_metadata(self, meta=[]):
237 """deletes the account metadata"""
238 return self._delete_metadata('', 'account', meta)
240 def reset_account_metadata(self, **meta):
241 """resets account metadata"""
242 return self._reset_metadata('', 'account', **meta)
244 # Storage Container Services
246 def _filter_trashed(self, l):
247 return self._filter(l, {'trash':'true'})
249 def list_objects(self, container, format='text', limit=10000, marker=None,
250 prefix=None, delimiter=None, path=None,
251 include_trashed=False, params={}, **headers):
252 """returns a list with the container objects"""
253 params.update({'limit':limit, 'marker':marker, 'prefix':prefix,
254 'delimiter':delimiter, 'path':path})
255 l = self._list('/' + container, format, params, **headers)
256 #TODO support filter trashed with xml also
257 if format != 'xml' and not include_trashed:
258 l = self._filter_trashed(l)
261 def create_container(self, container, **meta):
262 """creates a container"""
264 for k,v in meta.items():
265 headers['x-container-meta-%s' %k.strip().upper()] = v.strip()
266 status, header, data = self.put('/' + container, headers=headers)
270 raise Fault(data, int(status))
273 def delete_container(self, container, params={}):
274 """deletes a container"""
275 return self.delete('/' + container, params=params)
277 def retrieve_container_metadata(self, container, restricted=False, **params):
278 """returns the container metadata"""
279 prefix = 'x-container-meta-' if restricted else None
280 return self._get_metadata('/%s' % container, prefix, params)
282 def update_container_metadata(self, container, **meta):
283 """unpdates the container metadata"""
284 return self._update_metadata('/' + container, 'container', **meta)
286 def delete_container_metadata(self, container, meta=[]):
287 """deletes the container metadata"""
288 path = '/%s' % (container)
289 return self._delete_metadata(path, 'container', meta)
291 # Storage Object Services
293 def request_object(self, container, object, format='text', params={},
295 """returns tuple containing the status, headers and data response for an object request"""
296 path = '/%s/%s' % (container, object)
297 status, headers, data = self.get(path, format, headers, params)
298 return status, headers, data
300 def retrieve_object(self, container, object, format='text', params={},
302 """returns an object's data"""
303 t = self.request_object(container, object, format, params, **headers)
306 def create_directory_marker(self, container, object):
307 """creates a dierectory marker"""
309 raise Fault('Directory markers have to be nested in a container')
310 h = {'content_type':'application/directory'}
311 return self.create_zero_length_object(container, object, **h)
313 def create_object(self, container, object, f=stdin, format='text', meta={},
314 etag=None, content_type=None, content_encoding=None,
315 content_disposition=None, **headers):
316 """creates a zero-length object"""
317 path = '/%s/%s' % (container, object)
318 for k, v in headers.items():
322 l = ['etag', 'content_encoding', 'content_disposition', 'content_type']
323 l = [elem for elem in l if eval(elem)]
325 headers.update({elem:eval(elem)})
327 for k,v in meta.items():
328 headers['x-object-meta-%s' %k.strip()] = v.strip()
329 data = f.read() if f else None
330 return self.put(path, data, format, headers=headers)
332 def create_zero_length_object(self, container, object, meta={}, etag=None,
333 content_type=None, content_encoding=None,
334 content_disposition=None, **headers):
336 for elem in ['self', 'container', 'headers']:
339 return self.create_object(container, f=None, **args)
341 def update_object(self, container, object, f=stdin, offset=None, meta={},
342 content_length=None, content_type=None,
343 content_encoding=None, content_disposition=None,
345 path = '/%s/%s' % (container, object)
346 for k, v in headers.items():
350 l = ['content_encoding', 'content_disposition', 'content_type',
352 l = [elem for elem in l if eval(elem)]
354 headers.update({elem:eval(elem)})
356 if 'content_range' not in headers.keys():
358 headers['content_range'] = 'bytes %s-/*' % offset
360 headers['content_range'] = 'bytes */*'
362 for k,v in meta.items():
363 headers['x-object-meta-%s' %k.strip()] = v.strip()
364 data = f.read() if f else None
365 return self.post(path, data, headers=headers)
367 def _change_obj_location(self, src_container, src_object, dst_container,
368 dst_object, remove=False, public=False, **meta):
369 path = '/%s/%s' % (dst_container, dst_object)
371 for k, v in meta.items():
372 headers['x-object-meta-%s' % k] = v
374 headers['x-move-from'] = '/%s/%s' % (src_container, src_object)
376 headers['x-copy-from'] = '/%s/%s' % (src_container, src_object)
377 self._set_public_header(headers, public)
378 self.headers = headers if headers else None
379 headers['content-length'] = 0
380 return self.put(path, headers=headers)
382 def copy_object(self, src_container, src_object, dst_container,
383 dst_object, public=False, **meta):
384 return self._change_obj_location(src_container, src_object,
385 dst_container, dst_object, False,
388 def move_object(self, src_container, src_object, dst_container,
389 dst_object, public=False, **meta):
390 return self._change_obj_location(src_container, src_object,
391 dst_container, dst_object, True,
394 def delete_object(self, container, object, params={}):
395 return self.delete('/%s/%s' % (container, object), params=params)
397 def retrieve_object_metadata(self, container, object, restricted=False,
400 set restricted to True to get only user defined metadata
402 path = '/%s/%s' % (container, object)
403 prefix = 'x-object-meta-' if restricted else None
404 params = {'version':version} if version else {}
405 return self._get_metadata(path, prefix, params=params)
407 def update_object_metadata(self, container, object, **meta):
408 path = '/%s/%s' % (container, object)
409 return self._update_metadata(path, 'object', **meta)
411 def delete_object_metadata(self, container, object, meta=[]):
412 path = '/%s/%s' % (container, object)
413 return self._delete_metadata(path, 'object', meta)
415 class Pithos_Client(OOS_Client):
416 """Pithos Storage Client. Extends OOS_Client"""
418 def _chunked_transfer(self, path, method='PUT', f=stdin, headers=None,
420 """perfomrs a chunked request"""
421 http = HTTPConnection(self.host)
424 path = '/%s/%s%s' % (self.api, self.account, path)
425 http.putrequest(method, path)
426 http.putheader('x-auth-token', self.token)
427 http.putheader('content-type', 'application/octet-stream')
428 http.putheader('transfer-encoding', 'chunked')
430 for header,value in headers.items():
431 http.putheader(header, value)
439 block = f.read(blocksize)
442 data = '%s\r\n%s\r\n' % (hex(len(block)), block)
456 resp = http.getresponse()
458 headers = dict(resp.getheaders())
461 print '%d %s' % (resp.status, resp.reason)
462 for key, val in headers.items():
463 print '%s: %s' % (key.capitalize(), val)
466 length = resp.getheader('Content-length', None)
467 data = resp.read(length)
472 if int(resp.status) in ERROR_CODES.keys():
473 raise Fault(data, int(resp.status))
475 #print '*', resp.status, headers, data
476 return resp.status, headers, data
478 def _update_metadata(self, path, entity, **meta):
480 adds new and updates the values of previously set metadata
482 params = {'update':None}
484 prefix = 'x-%s-meta-' % entity
485 for k,v in meta.items():
486 k = '%s%s' % (prefix, k)
488 return self.post(path, headers=headers, params=params)
490 def _delete_metadata(self, path, entity, meta=[]):
492 delete previously set metadata
494 params = {'update':None}
496 prefix = 'x-%s-meta-' % entity
498 headers['%s%s' % (prefix, m)] = ''
499 return self.post(path, headers=headers, params=params)
501 # Storage Account Services
503 def list_containers(self, format='text', if_modified_since=None,
504 if_unmodified_since=None, limit=1000, marker=None,
506 """returns a list with the account containers"""
507 params = {'until':until} if until else None
508 headers = {'if-modified-since':if_modified_since,
509 'if-unmodified-since':if_unmodified_since}
510 return OOS_Client.list_containers(self, format=format, limit=limit,
511 marker=marker, params=params,
514 def retrieve_account_metadata(self, restricted=False, until=None):
515 """returns the account metadata"""
516 params = {'until':until} if until else {}
517 return OOS_Client.retrieve_account_metadata(self, restricted=restricted,
520 def set_account_groups(self, **groups):
521 """create account groups"""
523 for key, val in groups.items():
524 headers['x-account-group-%s' % key] = val
525 params = {'update':None}
526 return self.post('', headers=headers, params=params)
528 def retrieve_account_groups(self):
529 """returns the account groups"""
530 meta = self.retrieve_account_metadata()
531 prefix = 'x-account-group-'
532 prefixlen = len(prefix)
534 for key, val in meta.items():
535 if prefix and not key.startswith(prefix):
537 elif prefix and key.startswith(prefix):
538 key = key[prefixlen:]
542 def unset_account_groups(self, groups=[]):
543 """delete account groups"""
546 headers['x-account-group-%s' % elem] = ''
547 params = {'update':None}
548 return self.post('', headers=headers, params=params)
550 def reset_account_groups(self, **groups):
551 """overrides account groups"""
553 for key, val in groups.items():
554 headers['x-account-group-%s' % key] = val
555 meta = self.retrieve_account_metadata()
557 return self.post('', headers=headers)
559 # Storage Container Services
561 def list_objects(self, container, format='text', limit=10000, marker=None,
562 prefix=None, delimiter=None, path=None,
563 include_trashed=False, params={}, if_modified_since=None,
564 if_unmodified_since=None, meta={}, until=None):
565 """returns a list with the container objects"""
566 params = {'until':until, 'meta':meta}
568 for elem in ['self', 'container', 'params', 'until', 'meta']:
570 return OOS_Client.list_objects(self, container, params=params,
573 def retrieve_container_metadata(self, container, restricted=False,
575 """returns container's metadata"""
576 params = {'until':until} if until else {}
577 return OOS_Client.retrieve_container_metadata(self, container,
578 restricted=restricted,
581 def set_container_policies(self, container, **policies):
582 """sets containers policies"""
583 path = '/%s' % (container)
586 for key, val in policies.items():
587 headers['x-container-policy-%s' % key] = val
588 return self.post(path, headers=headers)
590 def delete_container(self, container, until=None):
591 """deletes a container or the container history until the date provided"""
592 params = {'until':until} if until else {}
593 return OOS_Client.delete_container(self, container, params)
595 # Storage Object Services
597 def retrieve_object(self, container, object, params={}, format='text', range=None,
598 if_range=None, if_match=None, if_none_match=None,
599 if_modified_since=None, if_unmodified_since=None,
601 """returns an object"""
603 l = ['range', 'if_range', 'if_match', 'if_none_match',
604 'if_modified_since', 'if_unmodified_since']
605 l = [elem for elem in l if eval(elem)]
607 headers.update({elem:eval(elem)})
608 return OOS_Client.retrieve_object(self, container, object, format=format,
609 params=params, **headers)
611 def retrieve_object_version(self, container, object, version, detail=False,
612 range=None, if_range=None, if_match=None,
613 if_none_match=None, if_modified_since=None,
614 if_unmodified_since=None):
615 """returns a specific object version"""
617 l = ['self', 'container', 'object']
620 params = {'version':version}
621 return self.retrieve_object(container, object, params, **args)
623 def retrieve_object_versionlist(self, container, object, range=None,
624 if_range=None, if_match=None,
625 if_none_match=None, if_modified_since=None,
626 if_unmodified_since=None):
627 """returns the object version list"""
629 l = ['self', 'container', 'object']
633 return self.retrieve_object_version(container, object, version='list',
636 def create_zero_length_object(self, container, object, meta={},
637 etag=None, content_type=None, content_encoding=None,
638 content_disposition=None, x_object_manifest=None,
639 x_object_sharing=None, x_object_public=None):
640 """createas a zero length object"""
642 for elem in ['self', 'container', 'object']:
644 return OOS_Client.create_zero_length_object(self, container, object,
647 def create_object(self, container, object, f=stdin, meta={},
648 etag=None, content_type=None, content_encoding=None,
649 content_disposition=None, x_object_manifest=None,
650 x_object_sharing=None, x_object_public=None):
651 """creates an object"""
653 for elem in ['self', 'container', 'object']:
655 return OOS_Client.create_object(self, container, object, **args)
657 def create_object_using_chunks(self, container, object, f=stdin,
658 blocksize=1024, meta={}, etag=None,
659 content_type=None, content_encoding=None,
660 content_disposition=None,
661 x_object_sharing=None,
662 x_object_manifest=None,
663 x_object_public=None):
664 """creates an object (incremental upload)"""
665 path = '/%s/%s' % (container, object)
667 l = ['etag', 'content_type', 'content_encoding', 'content_disposition',
668 'x_object_sharing', 'x_object_manifest', 'x_object_public']
669 l = [elem for elem in l if eval(elem)]
671 headers.update({elem:eval(elem)})
673 for k,v in meta.items():
674 headers['x-object-meta-%s' %k.strip()] = v.strip()
676 return self._chunked_transfer(path, 'PUT', f, headers=headers,
679 def create_object_by_hashmap(container, object, f=stdin, format='json',
680 meta={}, etag=None, content_encoding=None,
681 content_disposition=None, content_type=None,
682 x_object_sharing=None, x_object_manifest=None,
683 x_object_public = None):
684 """creates an object by uploading hashes representing data instead of data"""
686 for elem in ['self', 'container', 'object']:
689 data = f.read() if f else None
690 if data and format == 'json':
693 data = json.dumps(data)
695 raise Fault('Invalid formatting')
698 return self.create_object(container, object, **args)
700 def create_manifestation(self, container, object, manifest):
701 """creates a manifestation"""
702 headers={'x_object_manifest':manifest}
703 return self.create_object(container, object, f=None, **headers)
705 def update_object(self, container, object, f=stdin, offset=None, meta={},
706 content_length=None, content_type=None, content_range=None,
707 content_encoding=None, content_disposition=None,
708 x_object_bytes=None, x_object_manifest=None,
709 x_object_sharing=None, x_object_public=None):
710 """updates an object"""
711 spath = '/%s/%s' % (container, object)
713 for elem in ['self', 'container', 'object']:
716 return OOS_Client.update_object(self, container, object, **args)
718 def update_object_using_chunks(self, container, object, f=stdin,
719 blocksize=1024, offset=None, meta={},
720 content_type=None, content_encoding=None,
721 content_disposition=None, x_object_bytes=None,
722 x_object_manifest=None, x_object_sharing=None,
723 x_object_public=None):
724 """updates an object (incremental upload)"""
725 path = '/%s/%s' % (container, object)
727 l = ['content_type', 'content_encoding', 'content_disposition',
728 'x_object_bytes', 'x_object_manifest', 'x_object_sharing',
730 l = [elem for elem in l if eval(elem)]
732 headers.update({elem:eval(elem)})
735 headers['content_range'] = 'bytes %s-/*' % offset
737 headers['content_range'] = 'bytes */*'
739 for k,v in meta.items():
740 headers['x-object-meta-%s' %k.strip()] = v.strip()
742 return self._chunked_transfer(path, 'POST', f, headers=headers,
745 def delete_object(self, container, object, until=None):
746 """deletes an object or the object history until the date provided"""
747 params = {'until':until} if until else {}
748 return OOS_Client.delete_object(self, container, object, params)
750 def trash_object(self, container, object):
751 """trashes an object"""
752 path = '/%s/%s' % (container, object)
753 meta = {'trash':'true'}
754 return self._update_metadata(path, 'object', **meta)
756 def restore_object(self, container, object):
757 """restores a trashed object"""
758 return self.delete_object_metadata(container, object, ['trash'])
760 def _set_public_header(self, headers, public=False):
761 """sets the public header"""
767 headers['x-object-public'] = public if public else ''
769 def publish_object(self, container, object):
770 """sets a previously created object publicly accessible"""
771 path = '/%s/%s' % (container, object)
772 headers = {'content_range':'bytes */*'}
773 self._set_public_header(headers, public=True)
774 return self.post(path, headers=headers)
776 def unpublish_object(self, container, object):
777 """unpublish an object"""
778 path = '/%s/%s' % (container, object)
779 headers = {'content_range':'bytes */*'}
780 self._set_public_header(headers, public=False)
781 return self.post(path, headers=headers)