1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from httplib import HTTPConnection, HTTP
36 from xml.dom import minidom
41 import pithos.api.faults
43 ERROR_CODES = {304:'Not Modified',
48 411:'Length Required',
49 412:'Precondition Failed',
50 416:'Range Not Satisfiable',
51 422:'Unprocessable Entity',
52 503:'Service Unavailable'}
54 class Fault(Exception):
55 def __init__(self, data='', status=None):
56 if data == '' and status in ERROR_CODES.keys():
57 data = ERROR_CODES[status]
58 Exception.__init__(self, data)
63 def __init__(self, host, token, account, api='v1', verbose=False, debug=False):
64 """`host` can also include a port, e.g '127.0.0.1:8000'."""
67 self.account = account
69 self.verbose = verbose or debug
73 def _req(self, method, path, body=None, headers={}, format='text',
75 full_path = '/%s/%s%s?format=%s' % (self.api, self.account, path,
77 for k,v in params.items():
79 full_path = '%s&%s=%s' %(full_path, k, v)
81 full_path = '%s&%s' %(full_path, k)
82 conn = HTTPConnection(self.host)
85 full_path = full_path.replace(' ', '%20')
88 for k,v in headers.items():
90 k = k.replace('_', '-')
93 kwargs['headers'] = headers
94 kwargs['headers']['X-Auth-Token'] = self.token
97 kwargs['headers'].setdefault('content-type',
98 'application/octet-stream')
99 kwargs['headers'].setdefault('content-length', len(body) if body else 0)
101 #print '*', method, full_path, kwargs
102 conn.request(method, full_path, **kwargs)
103 except socket.error, e:
104 raise Fault(status=503)
106 resp = conn.getresponse()
107 headers = dict(resp.getheaders())
110 print '%d %s' % (resp.status, resp.reason)
111 for key, val in headers.items():
112 print '%s: %s' % (key.capitalize(), val)
115 length = resp.getheader('content-length', None)
116 data = resp.read(length)
121 if int(resp.status) in ERROR_CODES.keys():
122 #print '**', resp.status
123 raise Fault(data, int(resp.status))
125 #print '**', resp.status, headers, data
126 return resp.status, headers, data
128 def delete(self, path, format='text', params={}):
129 return self._req('DELETE', path, format=format, params=params)
131 def get(self, path, format='text', headers=None, params={}):
132 return self._req('GET', path, headers=headers, format=format,
135 def head(self, path, format='text', params={}):
136 return self._req('HEAD', path, format=format, params=params)
138 def post(self, path, body=None, format='text', headers=None, params={}):
139 return self._req('POST', path, body, headers=headers, format=format,
142 def put(self, path, body=None, format='text', headers=None):
143 return self._req('PUT', path, body, headers=headers, format=format)
145 def _list(self, path, format='text', params={}, **headers):
146 status, headers, data = self.get(path, format=format, headers=headers,
149 data = json.loads(data) if data else ''
150 elif format == 'xml':
151 data = minidom.parseString(data)
153 data = data.strip().split('\n') if data else ''
156 def _get_metadata(self, path, prefix=None, params={}):
157 status, headers, data = self.head(path, params=params)
158 prefixlen = len(prefix) if prefix else 0
160 for key, val in headers.items():
161 if prefix and not key.startswith(prefix):
163 elif prefix and key.startswith(prefix):
164 key = key[prefixlen:]
168 def _filter(self, l, d):
170 filter out from l elements having the metadata values provided
174 if type(elem) == types.DictionaryType:
176 k = 'x_object_meta_%s' % key
177 if k in elem.keys() and elem[k] == d[key]:
182 class OOS_Client(Client):
183 """Openstack Object Storage Client"""
185 def _update_metadata(self, path, entity, **meta):
186 """adds new and updates the values of previously set metadata"""
187 ex_meta = self.retrieve_account_metadata(restricted=True)
190 prefix = 'x-%s-meta-' % entity
191 for k,v in ex_meta.items():
192 k = '%s%s' % (prefix, k)
194 return self.post(path, headers=headers)
196 def _reset_metadata(self, path, entity, **meta):
198 overwrites all user defined metadata
201 prefix = 'x-%s-meta-' % entity
202 for k,v in meta.items():
203 k = '%s%s' % (prefix, k)
205 return self.post(path, headers=headers)
207 def _delete_metadata(self, path, entity, meta=[]):
208 """delete previously set metadata"""
209 ex_meta = self.retrieve_account_metadata(restricted=True)
211 prefix = 'x-%s-meta-' % entity
212 for k in ex_meta.keys():
214 headers['%s%s' % (prefix, k)] = ex_meta[k]
215 return self.post(path, headers=headers)
217 # Storage Account Services
219 def list_containers(self, format='text', limit=10000, marker=None, params={},
221 """lists containers"""
224 params.update({'limit':limit, 'marker':marker})
225 return self._list('', format, params, **headers)
227 def retrieve_account_metadata(self, restricted=False, **params):
228 """returns the account metadata"""
229 prefix = 'x-account-meta-' if restricted else None
230 return self._get_metadata('', prefix, params)
232 def update_account_metadata(self, **meta):
233 """updates the account metadata"""
234 return self._update_metadata('', 'account', **meta)
236 def delete_account_metadata(self, meta=[]):
237 """deletes the account metadata"""
238 return self._delete_metadata('', 'account', meta)
240 def reset_account_metadata(self, **meta):
241 """resets account metadata"""
242 return self._reset_metadata('', 'account', **meta)
244 # Storage Container Services
246 def _filter_trashed(self, l):
247 return self._filter(l, {'trash':'true'})
249 def list_objects(self, container, format='text', limit=10000, marker=None,
250 prefix=None, delimiter=None, path=None,
251 include_trashed=False, params={}, **headers):
252 """returns a list with the container objects"""
253 params.update({'limit':limit, 'marker':marker, 'prefix':prefix,
254 'delimiter':delimiter, 'path':path})
255 l = self._list('/' + container, format, params, **headers)
256 #TODO support filter trashed with xml also
257 if format != 'xml' and not include_trashed:
258 l = self._filter_trashed(l)
261 def create_container(self, container, **meta):
262 """creates a container"""
264 for k,v in meta.items():
265 headers['x-container-meta-%s' %k.strip().upper()] = v.strip()
266 status, header, data = self.put('/' + container, headers=headers)
270 raise Fault(data, int(status))
273 def delete_container(self, container, params={}):
274 """deletes a container"""
275 return self.delete('/' + container, params=params)
277 def retrieve_container_metadata(self, container, restricted=False, **params):
278 """returns the container metadata"""
279 prefix = 'x-container-meta-' if restricted else None
280 return self._get_metadata('/%s' % container, prefix, params)
282 def update_container_metadata(self, container, **meta):
283 """unpdates the container metadata"""
284 return self._update_metadata('/' + container, 'container', **meta)
286 def delete_container_metadata(self, container, meta=[]):
287 """deletes the container metadata"""
288 path = '/%s' % (container)
289 return self._delete_metadata(path, 'container', meta)
291 # Storage Object Services
293 def request_object(self, container, object, format='text', params={},
295 """returns tuple containing the status, headers and data response for an object request"""
296 path = '/%s/%s' % (container, object)
297 status, headers, data = self.get(path, format, headers, params)
298 return status, headers, data
300 def retrieve_object(self, container, object, format='text', params={},
302 """returns an object's data"""
303 t = self.request_object(container, object, format, params, **headers)
306 def create_directory_marker(self, container, object):
307 """creates a dierectory marker"""
309 raise Fault('Directory markers have to be nested in a container')
310 h = {'content_type':'application/directory'}
311 return self.create_zero_length_object(container, object, **h)
313 def create_object(self, container, object, f=stdin, format='text', meta={},
314 etag=None, content_type=None, content_encoding=None,
315 content_disposition=None, **headers):
316 """creates a zero-length object"""
317 path = '/%s/%s' % (container, object)
318 for k, v in headers.items():
322 l = ['etag', 'content_encoding', 'content_disposition', 'content_type']
323 l = [elem for elem in l if eval(elem)]
325 headers.update({elem:eval(elem)})
327 for k,v in meta.items():
328 headers['x-object-meta-%s' %k.strip()] = v.strip()
329 data = f.read() if f else None
330 return self.put(path, data, format, headers=headers)
332 def create_zero_length_object(self, container, object, meta={}, etag=None,
333 content_type=None, content_encoding=None,
334 content_disposition=None, **headers):
336 for elem in ['self', 'container', 'headers']:
339 return self.create_object(container, f=None, **args)
341 def update_object(self, container, object, f=stdin, offset=None, meta={},
342 content_length=None, content_type=None,
343 content_encoding=None, content_disposition=None,
345 path = '/%s/%s' % (container, object)
346 for k, v in headers.items():
350 l = ['content_encoding', 'content_disposition', 'content_type',
352 l = [elem for elem in l if eval(elem)]
354 headers.update({elem:eval(elem)})
356 if 'content_range' not in headers.keys():
358 headers['content_range'] = 'bytes %s-/*' % offset
360 headers['content_range'] = 'bytes */*'
362 for k,v in meta.items():
363 headers['x-object-meta-%s' %k.strip()] = v.strip()
364 data = f.read() if f else None
365 return self.post(path, data, headers=headers)
367 def _change_obj_location(self, src_container, src_object, dst_container,
368 dst_object, remove=False, meta={}, **headers):
369 path = '/%s/%s' % (dst_container, dst_object)
370 headers = {} if not headers else headers
371 for k, v in meta.items():
372 headers['x-object-meta-%s' % k] = v
374 headers['x-move-from'] = '/%s/%s' % (src_container, src_object)
376 headers['x-copy-from'] = '/%s/%s' % (src_container, src_object)
377 headers['content_length'] = 0
378 return self.put(path, headers=headers)
380 def copy_object(self, src_container, src_object, dst_container, dst_object,
382 """copies an object"""
383 return self._change_obj_location(src_container, src_object,
384 dst_container, dst_object, remove=False,
385 meta=meta, **headers)
387 def move_object(self, src_container, src_object, dst_container,
388 dst_object, meta={}, **headers):
389 """moves an object"""
390 return self._change_obj_location(src_container, src_object,
391 dst_container, dst_object, remove=True,
392 meta=meta, **headers)
394 def delete_object(self, container, object, params={}):
395 """deletes an object"""
396 return self.delete('/%s/%s' % (container, object), params=params)
398 def retrieve_object_metadata(self, container, object, restricted=False,
401 set restricted to True to get only user defined metadata
403 path = '/%s/%s' % (container, object)
404 prefix = 'x-object-meta-' if restricted else None
405 params = {'version':version} if version else {}
406 return self._get_metadata(path, prefix, params=params)
408 def update_object_metadata(self, container, object, **meta):
409 path = '/%s/%s' % (container, object)
410 return self._update_metadata(path, 'object', **meta)
412 def delete_object_metadata(self, container, object, meta=[]):
413 path = '/%s/%s' % (container, object)
414 return self._delete_metadata(path, 'object', meta)
416 class Pithos_Client(OOS_Client):
417 """Pithos Storage Client. Extends OOS_Client"""
419 def _chunked_transfer(self, path, method='PUT', f=stdin, headers=None,
421 """perfomrs a chunked request"""
422 http = HTTPConnection(self.host)
425 path = '/%s/%s%s' % (self.api, self.account, path)
426 http.putrequest(method, path)
427 http.putheader('x-auth-token', self.token)
428 http.putheader('content-type', 'application/octet-stream')
429 http.putheader('transfer-encoding', 'chunked')
431 for header,value in headers.items():
432 http.putheader(header, value)
440 block = f.read(blocksize)
443 data = '%s\r\n%s\r\n' % (hex(len(block)), block)
457 resp = http.getresponse()
459 headers = dict(resp.getheaders())
462 print '%d %s' % (resp.status, resp.reason)
463 for key, val in headers.items():
464 print '%s: %s' % (key.capitalize(), val)
467 length = resp.getheader('Content-length', None)
468 data = resp.read(length)
473 if int(resp.status) in ERROR_CODES.keys():
474 raise Fault(data, int(resp.status))
476 #print '*', resp.status, headers, data
477 return resp.status, headers, data
479 def _update_metadata(self, path, entity, **meta):
481 adds new and updates the values of previously set metadata
483 params = {'update':None}
485 prefix = 'x-%s-meta-' % entity
486 for k,v in meta.items():
487 k = '%s%s' % (prefix, k)
489 return self.post(path, headers=headers, params=params)
491 def _delete_metadata(self, path, entity, meta=[]):
493 delete previously set metadata
495 params = {'update':None}
497 prefix = 'x-%s-meta-' % entity
499 headers['%s%s' % (prefix, m)] = ''
500 return self.post(path, headers=headers, params=params)
502 # Storage Account Services
504 def list_containers(self, format='text', if_modified_since=None,
505 if_unmodified_since=None, limit=1000, marker=None,
507 """returns a list with the account containers"""
508 params = {'until':until} if until else None
509 headers = {'if-modified-since':if_modified_since,
510 'if-unmodified-since':if_unmodified_since}
511 return OOS_Client.list_containers(self, format=format, limit=limit,
512 marker=marker, params=params,
515 def retrieve_account_metadata(self, restricted=False, until=None):
516 """returns the account metadata"""
517 params = {'until':until} if until else {}
518 return OOS_Client.retrieve_account_metadata(self, restricted=restricted,
521 def set_account_groups(self, **groups):
522 """create account groups"""
524 for key, val in groups.items():
525 headers['x-account-group-%s' % key] = val
526 params = {'update':None}
527 return self.post('', headers=headers, params=params)
529 def retrieve_account_groups(self):
530 """returns the account groups"""
531 meta = self.retrieve_account_metadata()
532 prefix = 'x-account-group-'
533 prefixlen = len(prefix)
535 for key, val in meta.items():
536 if prefix and not key.startswith(prefix):
538 elif prefix and key.startswith(prefix):
539 key = key[prefixlen:]
543 def unset_account_groups(self, groups=[]):
544 """delete account groups"""
547 headers['x-account-group-%s' % elem] = ''
548 params = {'update':None}
549 return self.post('', headers=headers, params=params)
551 def reset_account_groups(self, **groups):
552 """overrides account groups"""
554 for key, val in groups.items():
555 headers['x-account-group-%s' % key] = val
556 meta = self.retrieve_account_metadata()
558 return self.post('', headers=headers)
560 # Storage Container Services
562 def list_objects(self, container, format='text', limit=10000, marker=None,
563 prefix=None, delimiter=None, path=None,
564 include_trashed=False, params={}, if_modified_since=None,
565 if_unmodified_since=None, meta={}, until=None):
566 """returns a list with the container objects"""
567 params = {'until':until, 'meta':meta}
569 for elem in ['self', 'container', 'params', 'until', 'meta']:
571 return OOS_Client.list_objects(self, container, params=params,
574 def retrieve_container_metadata(self, container, restricted=False,
576 """returns container's metadata"""
577 params = {'until':until} if until else {}
578 return OOS_Client.retrieve_container_metadata(self, container,
579 restricted=restricted,
582 def set_container_policies(self, container, **policies):
583 """sets containers policies"""
584 path = '/%s' % (container)
587 for key, val in policies.items():
588 headers['x-container-policy-%s' % key] = val
589 return self.post(path, headers=headers)
591 def delete_container(self, container, until=None):
592 """deletes a container or the container history until the date provided"""
593 params = {'until':until} if until else {}
594 return OOS_Client.delete_container(self, container, params)
596 # Storage Object Services
598 def retrieve_object(self, container, object, params={}, format='text', range=None,
599 if_range=None, if_match=None, if_none_match=None,
600 if_modified_since=None, if_unmodified_since=None,
602 """returns an object"""
604 l = ['range', 'if_range', 'if_match', 'if_none_match',
605 'if_modified_since', 'if_unmodified_since']
606 l = [elem for elem in l if eval(elem)]
608 headers.update({elem:eval(elem)})
609 return OOS_Client.retrieve_object(self, container, object, format=format,
610 params=params, **headers)
612 def retrieve_object_version(self, container, object, version, detail=False,
613 range=None, if_range=None, if_match=None,
614 if_none_match=None, if_modified_since=None,
615 if_unmodified_since=None):
616 """returns a specific object version"""
618 l = ['self', 'container', 'object']
621 params = {'version':version}
622 return self.retrieve_object(container, object, params, **args)
624 def retrieve_object_versionlist(self, container, object, range=None,
625 if_range=None, if_match=None,
626 if_none_match=None, if_modified_since=None,
627 if_unmodified_since=None):
628 """returns the object version list"""
630 l = ['self', 'container', 'object']
634 return self.retrieve_object_version(container, object, version='list',
637 def create_zero_length_object(self, container, object, meta={},
638 etag=None, content_type=None, content_encoding=None,
639 content_disposition=None, x_object_manifest=None,
640 x_object_sharing=None, x_object_public=None):
641 """createas a zero length object"""
643 for elem in ['self', 'container', 'object']:
645 return OOS_Client.create_zero_length_object(self, container, object,
648 def create_object(self, container, object, f=stdin, meta={},
649 etag=None, content_type=None, content_encoding=None,
650 content_disposition=None, x_object_manifest=None,
651 x_object_sharing=None, x_object_public=None):
652 """creates an object"""
654 for elem in ['self', 'container', 'object']:
656 return OOS_Client.create_object(self, container, object, **args)
658 def create_object_using_chunks(self, container, object, f=stdin,
659 blocksize=1024, meta={}, etag=None,
660 content_type=None, content_encoding=None,
661 content_disposition=None,
662 x_object_sharing=None,
663 x_object_manifest=None,
664 x_object_public=None):
665 """creates an object (incremental upload)"""
666 path = '/%s/%s' % (container, object)
668 l = ['etag', 'content_type', 'content_encoding', 'content_disposition',
669 'x_object_sharing', 'x_object_manifest', 'x_object_public']
670 l = [elem for elem in l if eval(elem)]
672 headers.update({elem:eval(elem)})
674 for k,v in meta.items():
675 headers['x-object-meta-%s' %k.strip()] = v.strip()
677 return self._chunked_transfer(path, 'PUT', f, headers=headers,
680 def create_object_by_hashmap(container, object, f=stdin, format='json',
681 meta={}, etag=None, content_encoding=None,
682 content_disposition=None, content_type=None,
683 x_object_sharing=None, x_object_manifest=None,
684 x_object_public = None):
685 """creates an object by uploading hashes representing data instead of data"""
687 for elem in ['self', 'container', 'object']:
690 data = f.read() if f else None
691 if data and format == 'json':
694 data = json.dumps(data)
696 raise Fault('Invalid formatting')
699 return self.create_object(container, object, **args)
701 def create_manifestation(self, container, object, manifest):
702 """creates a manifestation"""
703 headers={'x_object_manifest':manifest}
704 return self.create_object(container, object, f=None, **headers)
706 def update_object(self, container, object, f=stdin, offset=None, meta={},
707 content_length=None, content_type=None, content_range=None,
708 content_encoding=None, content_disposition=None,
709 x_object_bytes=None, x_object_manifest=None,
710 x_object_sharing=None, x_object_public=None):
711 """updates an object"""
712 spath = '/%s/%s' % (container, object)
714 for elem in ['self', 'container', 'object']:
717 return OOS_Client.update_object(self, container, object, **args)
719 def update_object_using_chunks(self, container, object, f=stdin,
720 blocksize=1024, offset=None, meta={},
721 content_type=None, content_encoding=None,
722 content_disposition=None, x_object_bytes=None,
723 x_object_manifest=None, x_object_sharing=None,
724 x_object_public=None):
725 """updates an object (incremental upload)"""
726 path = '/%s/%s' % (container, object)
728 l = ['content_type', 'content_encoding', 'content_disposition',
729 'x_object_bytes', 'x_object_manifest', 'x_object_sharing',
731 l = [elem for elem in l if eval(elem)]
733 headers.update({elem:eval(elem)})
736 headers['content_range'] = 'bytes %s-/*' % offset
738 headers['content_range'] = 'bytes */*'
740 for k,v in meta.items():
741 headers['x-object-meta-%s' %k.strip()] = v.strip()
743 return self._chunked_transfer(path, 'POST', f, headers=headers,
746 def delete_object(self, container, object, until=None):
747 """deletes an object or the object history until the date provided"""
748 params = {'until':until} if until else {}
749 return OOS_Client.delete_object(self, container, object, params)
751 def trash_object(self, container, object):
752 """trashes an object"""
753 path = '/%s/%s' % (container, object)
754 meta = {'trash':'true'}
755 return self._update_metadata(path, 'object', **meta)
757 def restore_object(self, container, object):
758 """restores a trashed object"""
759 return self.delete_object_metadata(container, object, ['trash'])
761 def publish_object(self, container, object):
762 """sets a previously created object publicly accessible"""
763 path = '/%s/%s' % (container, object)
764 headers = {'content_range':'bytes */*'}
765 headers['x_object_public'] = True
766 return self.post(path, headers=headers)
768 def unpublish_object(self, container, object):
769 """unpublish an object"""
770 path = '/%s/%s' % (container, object)
771 headers = {'content_range':'bytes */*'}
772 headers['x_object_public'] = False
773 return self.post(path, headers=headers)
775 def _change_obj_location(self, src_container, src_object, dst_container,
776 dst_object, remove=False, meta={}, **headers):
777 path = '/%s/%s' % (dst_container, dst_object)
778 headers = {} if not headers else headers
779 for k, v in meta.items():
780 headers['x-object-meta-%s' % k] = v
782 headers['x-move-from'] = '/%s/%s' % (src_container, src_object)
784 headers['x-copy-from'] = '/%s/%s' % (src_container, src_object)
785 headers['content_length'] = 0
786 return self.put(path, headers=headers)
788 def copy_object(self, src_container, src_object, dst_container, dst_object,
789 meta={}, public=False, version=None):
790 """copies an object"""
792 headers['x_object_public'] = public
794 headers['x_object_version'] = version
795 return OOS_Client.copy_object(self, src_container, src_object,
796 dst_container, dst_object, meta=meta,
799 def move_object(self, src_container, src_object, dst_container,
800 dst_object, meta={}, public=False, version=None):
801 """moves an object"""
803 headers['x_object_public'] = public
805 headers['x_object_version'] = version
806 return OOS_Client.move_object(self, src_container, src_object,
807 dst_container, dst_object, meta=meta,