1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from functools import wraps
36 from traceback import format_exc
37 from wsgiref.handlers import format_date_time
38 from binascii import hexlify, unhexlify
39 from datetime import datetime, tzinfo, timedelta
41 from django.conf import settings
42 from django.http import HttpResponse
43 from django.utils import simplejson as json
44 from django.utils.http import http_date, parse_etags
45 from django.utils.encoding import smart_str
47 from pithos.api.compat import parse_http_date_safe, parse_http_date
48 from pithos.api.faults import (Fault, NotModified, BadRequest, Unauthorized, ItemNotFound,
49 Conflict, LengthRequired, PreconditionFailed, RequestEntityTooLarge,
50 RangeNotSatisfiable, ServiceUnavailable)
51 from pithos.backends import connect_backend
52 from pithos.backends.base import NotAllowedError, QuotaError
60 logger = logging.getLogger(__name__)
64 def utcoffset(self, dt):
73 def json_encode_decimal(obj):
74 if isinstance(obj, decimal.Decimal):
76 raise TypeError(repr(obj) + " is not JSON serializable")
79 """Return an ISO8601 date string that includes a timezone."""
81 return d.replace(tzinfo=UTC()).isoformat()
83 def rename_meta_key(d, old, new):
89 def printable_header_dict(d):
90 """Format a meta dictionary for printing out json/xml.
92 Convert all keys to lower case and replace dashes with underscores.
93 Format 'last_modified' timestamp.
96 d['last_modified'] = isoformat(datetime.fromtimestamp(d['last_modified']))
97 return dict([(k.lower().replace('-', '_'), v) for k, v in d.iteritems()])
99 def format_header_key(k):
100 """Convert underscores to dashes and capitalize intra-dash strings."""
101 return '-'.join([x.capitalize() for x in k.replace('_', '-').split('-')])
103 def get_header_prefix(request, prefix):
104 """Get all prefix-* request headers in a dict. Reformat keys with format_header_key()."""
106 prefix = 'HTTP_' + prefix.upper().replace('-', '_')
107 # TODO: Document or remove '~' replacing.
108 return dict([(format_header_key(k[5:]), v.replace('~', '')) for k, v in request.META.iteritems() if k.startswith(prefix) and len(k) > len(prefix)])
110 def get_account_headers(request):
111 meta = get_header_prefix(request, 'X-Account-Meta-')
113 for k, v in get_header_prefix(request, 'X-Account-Group-').iteritems():
115 if '-' in n or '_' in n:
116 raise BadRequest('Bad characters in group name')
117 groups[n] = v.replace(' ', '').split(',')
118 while '' in groups[n]:
122 def put_account_headers(response, meta, groups, policy):
124 response['X-Account-Container-Count'] = meta['count']
126 response['X-Account-Bytes-Used'] = meta['bytes']
127 response['Last-Modified'] = http_date(int(meta['modified']))
128 for k in [x for x in meta.keys() if x.startswith('X-Account-Meta-')]:
129 response[smart_str(k, strings_only=True)] = smart_str(meta[k], strings_only=True)
130 if 'until_timestamp' in meta:
131 response['X-Account-Until-Timestamp'] = http_date(int(meta['until_timestamp']))
132 for k, v in groups.iteritems():
133 k = smart_str(k, strings_only=True)
134 k = format_header_key('X-Account-Group-' + k)
135 v = smart_str(','.join(v), strings_only=True)
137 for k, v in policy.iteritems():
138 response[smart_str(format_header_key('X-Account-Policy-' + k), strings_only=True)] = smart_str(v, strings_only=True)
140 def get_container_headers(request):
141 meta = get_header_prefix(request, 'X-Container-Meta-')
142 policy = dict([(k[19:].lower(), v.replace(' ', '')) for k, v in get_header_prefix(request, 'X-Container-Policy-').iteritems()])
145 def put_container_headers(request, response, meta, policy):
147 response['X-Container-Object-Count'] = meta['count']
149 response['X-Container-Bytes-Used'] = meta['bytes']
150 response['Last-Modified'] = http_date(int(meta['modified']))
151 for k in [x for x in meta.keys() if x.startswith('X-Container-Meta-')]:
152 response[smart_str(k, strings_only=True)] = smart_str(meta[k], strings_only=True)
153 l = [smart_str(x, strings_only=True) for x in meta['object_meta'] if x.startswith('X-Object-Meta-')]
154 response['X-Container-Object-Meta'] = ','.join([x[14:] for x in l])
155 response['X-Container-Block-Size'] = request.backend.block_size
156 response['X-Container-Block-Hash'] = request.backend.hash_algorithm
157 if 'until_timestamp' in meta:
158 response['X-Container-Until-Timestamp'] = http_date(int(meta['until_timestamp']))
159 for k, v in policy.iteritems():
160 response[smart_str(format_header_key('X-Container-Policy-' + k), strings_only=True)] = smart_str(v, strings_only=True)
162 def get_object_headers(request):
163 meta = get_header_prefix(request, 'X-Object-Meta-')
164 if request.META.get('CONTENT_TYPE'):
165 meta['Content-Type'] = request.META['CONTENT_TYPE']
166 if request.META.get('HTTP_CONTENT_ENCODING'):
167 meta['Content-Encoding'] = request.META['HTTP_CONTENT_ENCODING']
168 if request.META.get('HTTP_CONTENT_DISPOSITION'):
169 meta['Content-Disposition'] = request.META['HTTP_CONTENT_DISPOSITION']
170 if request.META.get('HTTP_X_OBJECT_MANIFEST'):
171 meta['X-Object-Manifest'] = request.META['HTTP_X_OBJECT_MANIFEST']
172 return meta, get_sharing(request), get_public(request)
174 def put_object_headers(response, meta, restricted=False):
175 response['ETag'] = meta['hash']
176 response['Content-Length'] = meta['bytes']
177 response['Content-Type'] = meta.get('Content-Type', 'application/octet-stream')
178 response['Last-Modified'] = http_date(int(meta['modified']))
180 response['X-Object-Modified-By'] = smart_str(meta['modified_by'], strings_only=True)
181 response['X-Object-Version'] = meta['version']
182 response['X-Object-Version-Timestamp'] = http_date(int(meta['version_timestamp']))
183 for k in [x for x in meta.keys() if x.startswith('X-Object-Meta-')]:
184 response[smart_str(k, strings_only=True)] = smart_str(meta[k], strings_only=True)
185 for k in ('Content-Encoding', 'Content-Disposition', 'X-Object-Manifest',
186 'X-Object-Sharing', 'X-Object-Shared-By', 'X-Object-Allowed-To',
189 response[k] = smart_str(meta[k], strings_only=True)
191 for k in ('Content-Encoding', 'Content-Disposition'):
193 response[k] = meta[k]
195 def update_manifest_meta(request, v_account, meta):
196 """Update metadata if the object has an X-Object-Manifest."""
198 if 'X-Object-Manifest' in meta:
202 src_container, src_name = split_container_object_string('/' + meta['X-Object-Manifest'])
203 objects = request.backend.list_objects(request.user, v_account,
204 src_container, prefix=src_name, virtual=False)
206 src_meta = request.backend.get_object_meta(request.user,
207 v_account, src_container, x[0], x[1])
208 hash += src_meta['hash']
209 bytes += src_meta['bytes']
213 meta['bytes'] = bytes
216 meta['hash'] = md5.hexdigest().lower()
218 def update_sharing_meta(request, permissions, v_account, v_container, v_object, meta):
219 if permissions is None:
221 allowed, perm_path, perms = permissions
225 r = ','.join(perms.get('read', []))
227 ret.append('read=' + r)
228 w = ','.join(perms.get('write', []))
230 ret.append('write=' + w)
231 meta['X-Object-Sharing'] = '; '.join(ret)
232 if '/'.join((v_account, v_container, v_object)) != perm_path:
233 meta['X-Object-Shared-By'] = perm_path
234 if request.user != v_account:
235 meta['X-Object-Allowed-To'] = allowed
237 def update_public_meta(public, meta):
240 meta['X-Object-Public'] = public
242 def validate_modification_preconditions(request, meta):
243 """Check that the modified timestamp conforms with the preconditions set."""
245 if 'modified' not in meta:
246 return # TODO: Always return?
248 if_modified_since = request.META.get('HTTP_IF_MODIFIED_SINCE')
249 if if_modified_since is not None:
250 if_modified_since = parse_http_date_safe(if_modified_since)
251 if if_modified_since is not None and int(meta['modified']) <= if_modified_since:
252 raise NotModified('Resource has not been modified')
254 if_unmodified_since = request.META.get('HTTP_IF_UNMODIFIED_SINCE')
255 if if_unmodified_since is not None:
256 if_unmodified_since = parse_http_date_safe(if_unmodified_since)
257 if if_unmodified_since is not None and int(meta['modified']) > if_unmodified_since:
258 raise PreconditionFailed('Resource has been modified')
260 def validate_matching_preconditions(request, meta):
261 """Check that the ETag conforms with the preconditions set."""
263 hash = meta.get('hash', None)
265 if_match = request.META.get('HTTP_IF_MATCH')
266 if if_match is not None:
268 raise PreconditionFailed('Resource does not exist')
269 if if_match != '*' and hash not in [x.lower() for x in parse_etags(if_match)]:
270 raise PreconditionFailed('Resource ETag does not match')
272 if_none_match = request.META.get('HTTP_IF_NONE_MATCH')
273 if if_none_match is not None:
274 # TODO: If this passes, must ignore If-Modified-Since header.
276 if if_none_match == '*' or hash in [x.lower() for x in parse_etags(if_none_match)]:
277 # TODO: Continue if an If-Modified-Since header is present.
278 if request.method in ('HEAD', 'GET'):
279 raise NotModified('Resource ETag matches')
280 raise PreconditionFailed('Resource exists or ETag matches')
282 def split_container_object_string(s):
283 if not len(s) > 0 or s[0] != '/':
289 return s[:pos], s[(pos + 1):]
291 def copy_or_move_object(request, src_account, src_container, src_name, dest_account, dest_container, dest_name, move=False):
292 """Copy or move an object."""
294 meta, permissions, public = get_object_headers(request)
295 src_version = request.META.get('HTTP_X_SOURCE_VERSION')
298 version_id = request.backend.move_object(request.user, src_account, src_container, src_name,
299 dest_account, dest_container, dest_name,
300 meta, False, permissions)
302 version_id = request.backend.copy_object(request.user, src_account, src_container, src_name,
303 dest_account, dest_container, dest_name,
304 meta, False, permissions, src_version)
305 except NotAllowedError:
306 raise Unauthorized('Access denied')
307 except (NameError, IndexError):
308 raise ItemNotFound('Container or object does not exist')
310 raise BadRequest('Invalid sharing header')
311 except AttributeError, e:
312 raise Conflict('\n'.join(e.data) + '\n')
314 raise RequestEntityTooLarge('Quota exceeded')
315 if public is not None:
317 request.backend.update_object_public(request.user, dest_account, dest_container, dest_name, public)
318 except NotAllowedError:
319 raise Unauthorized('Access denied')
321 raise ItemNotFound('Object does not exist')
324 def get_int_parameter(p):
334 def get_content_length(request):
335 content_length = get_int_parameter(request.META.get('CONTENT_LENGTH'))
336 if content_length is None:
337 raise LengthRequired('Missing or invalid Content-Length header')
338 return content_length
340 def get_range(request, size):
341 """Parse a Range header from the request.
343 Either returns None, when the header is not existent or should be ignored,
344 or a list of (offset, length) tuples - should be further checked.
347 ranges = request.META.get('HTTP_RANGE', '').replace(' ', '')
348 if not ranges.startswith('bytes='):
352 for r in (x.strip() for x in ranges[6:].split(',')):
353 p = re.compile('^(?P<offset>\d*)-(?P<upto>\d*)$')
357 offset = m.group('offset')
358 upto = m.group('upto')
359 if offset == '' and upto == '':
368 ret.append((offset, upto - offset + 1))
370 ret.append((offset, size - offset))
373 ret.append((size - length, length))
377 def get_content_range(request):
378 """Parse a Content-Range header from the request.
380 Either returns None, when the header is not existent or should be ignored,
381 or an (offset, length, total) tuple - check as length, total may be None.
382 Returns (None, None, None) if the provided range is '*/*'.
385 ranges = request.META.get('HTTP_CONTENT_RANGE', '')
389 p = re.compile('^bytes (?P<offset>\d+)-(?P<upto>\d*)/(?P<total>(\d+|\*))$')
392 if ranges == 'bytes */*':
393 return (None, None, None)
395 offset = int(m.group('offset'))
396 upto = m.group('upto')
397 total = m.group('total')
406 if (upto is not None and offset > upto) or \
407 (total is not None and offset >= total) or \
408 (total is not None and upto is not None and upto >= total):
414 length = upto - offset + 1
415 return (offset, length, total)
417 def get_sharing(request):
418 """Parse an X-Object-Sharing header from the request.
420 Raises BadRequest on error.
423 permissions = request.META.get('HTTP_X_OBJECT_SHARING')
424 if permissions is None:
427 # TODO: Document or remove '~' replacing.
428 permissions = permissions.replace('~', '')
431 permissions = permissions.replace(' ', '')
432 if permissions == '':
434 for perm in (x for x in permissions.split(';')):
435 if perm.startswith('read='):
436 ret['read'] = list(set([v.replace(' ','').lower() for v in perm[5:].split(',')]))
437 if '' in ret['read']:
438 ret['read'].remove('')
439 if '*' in ret['read']:
441 if len(ret['read']) == 0:
442 raise BadRequest('Bad X-Object-Sharing header value')
443 elif perm.startswith('write='):
444 ret['write'] = list(set([v.replace(' ','').lower() for v in perm[6:].split(',')]))
445 if '' in ret['write']:
446 ret['write'].remove('')
447 if '*' in ret['write']:
449 if len(ret['write']) == 0:
450 raise BadRequest('Bad X-Object-Sharing header value')
452 raise BadRequest('Bad X-Object-Sharing header value')
454 # Keep duplicates only in write list.
455 dups = [x for x in ret.get('read', []) if x in ret.get('write', []) and x != '*']
458 ret['read'].remove(x)
459 if len(ret['read']) == 0:
464 def get_public(request):
465 """Parse an X-Object-Public header from the request.
467 Raises BadRequest on error.
470 public = request.META.get('HTTP_X_OBJECT_PUBLIC')
474 public = public.replace(' ', '').lower()
477 elif public == 'false' or public == '':
479 raise BadRequest('Bad X-Object-Public header value')
481 def raw_input_socket(request):
482 """Return the socket for reading the rest of the request."""
484 server_software = request.META.get('SERVER_SOFTWARE')
485 if server_software and server_software.startswith('mod_python'):
487 if 'wsgi.input' in request.environ:
488 return request.environ['wsgi.input']
489 raise ServiceUnavailable('Unknown server software')
491 MAX_UPLOAD_SIZE = 5 * (1024 * 1024 * 1024) # 5GB
493 def socket_read_iterator(request, length=0, blocksize=4096):
494 """Return a maximum of blocksize data read from the socket in each iteration.
496 Read up to 'length'. If 'length' is negative, will attempt a chunked read.
497 The maximum ammount of data read is controlled by MAX_UPLOAD_SIZE.
500 sock = raw_input_socket(request)
501 if length < 0: # Chunked transfers
502 # Small version (server does the dechunking).
503 if request.environ.get('mod_wsgi.input_chunked', None) or request.META['SERVER_SOFTWARE'].startswith('gunicorn'):
504 while length < MAX_UPLOAD_SIZE:
505 data = sock.read(blocksize)
509 raise BadRequest('Maximum size is reached')
511 # Long version (do the dechunking).
513 while length < MAX_UPLOAD_SIZE:
515 if hasattr(sock, 'readline'):
516 chunk_length = sock.readline()
519 while chunk_length[-1:] != '\n':
520 chunk_length += sock.read(1)
522 pos = chunk_length.find(';')
524 chunk_length = chunk_length[:pos]
526 chunk_length = int(chunk_length, 16)
528 raise BadRequest('Bad chunk size') # TODO: Change to something more appropriate.
530 if chunk_length == 0:
534 # Get the actual data.
535 while chunk_length > 0:
536 chunk = sock.read(min(chunk_length, blocksize))
537 chunk_length -= len(chunk)
541 if len(data) >= blocksize:
542 ret = data[:blocksize]
543 data = data[blocksize:]
546 raise BadRequest('Maximum size is reached')
548 if length > MAX_UPLOAD_SIZE:
549 raise BadRequest('Maximum size is reached')
551 data = sock.read(min(length, blocksize))
557 class ObjectWrapper(object):
558 """Return the object's data block-per-block in each iteration.
560 Read from the object using the offset and length provided in each entry of the range list.
563 def __init__(self, backend, ranges, sizes, hashmaps, boundary):
564 self.backend = backend
567 self.hashmaps = hashmaps
568 self.boundary = boundary
569 self.size = sum(self.sizes)
576 self.range_index = -1
577 self.offset, self.length = self.ranges[0]
582 def part_iterator(self):
584 # Get the file for the current offset.
585 file_size = self.sizes[self.file_index]
586 while self.offset >= file_size:
587 self.offset -= file_size
589 file_size = self.sizes[self.file_index]
591 # Get the block for the current position.
592 self.block_index = int(self.offset / self.backend.block_size)
593 if self.block_hash != self.hashmaps[self.file_index][self.block_index]:
594 self.block_hash = self.hashmaps[self.file_index][self.block_index]
596 self.block = self.backend.get_block(self.block_hash)
598 raise ItemNotFound('Block does not exist')
600 # Get the data from the block.
601 bo = self.offset % self.backend.block_size
602 bl = min(self.length, len(self.block) - bo)
603 data = self.block[bo:bo + bl]
611 if len(self.ranges) == 1:
612 return self.part_iterator()
613 if self.range_index == len(self.ranges):
616 if self.range_index == -1:
618 return self.part_iterator()
619 except StopIteration:
620 self.range_index += 1
622 if self.range_index < len(self.ranges):
624 self.offset, self.length = self.ranges[self.range_index]
626 if self.range_index > 0:
628 out.append('--' + self.boundary)
629 out.append('Content-Range: bytes %d-%d/%d' % (self.offset, self.offset + self.length - 1, self.size))
630 out.append('Content-Transfer-Encoding: binary')
633 return '\r\n'.join(out)
637 out.append('--' + self.boundary + '--')
639 return '\r\n'.join(out)
641 def object_data_response(request, sizes, hashmaps, meta, public=False):
642 """Get the HttpResponse object for replying with the object's data."""
646 ranges = get_range(request, size)
651 check = [True for offset, length in ranges if
652 length <= 0 or length > size or
653 offset < 0 or offset >= size or
654 offset + length > size]
656 raise RangeNotSatisfiable('Requested range exceeds object limits')
658 if_range = request.META.get('HTTP_IF_RANGE')
661 # Modification time has passed instead.
662 last_modified = parse_http_date(if_range)
663 if last_modified != meta['modified']:
667 if if_range != meta['hash']:
671 if ret == 206 and len(ranges) > 1:
672 boundary = uuid.uuid4().hex
675 wrapper = ObjectWrapper(request.backend, ranges, sizes, hashmaps, boundary)
676 response = HttpResponse(wrapper, status=ret)
677 put_object_headers(response, meta, public)
680 offset, length = ranges[0]
681 response['Content-Length'] = length # Update with the correct length.
682 response['Content-Range'] = 'bytes %d-%d/%d' % (offset, offset + length - 1, size)
684 del(response['Content-Length'])
685 response['Content-Type'] = 'multipart/byteranges; boundary=%s' % (boundary,)
688 def put_object_block(request, hashmap, data, offset):
689 """Put one block of data at the given offset."""
691 bi = int(offset / request.backend.block_size)
692 bo = offset % request.backend.block_size
693 bl = min(len(data), request.backend.block_size - bo)
694 if bi < len(hashmap):
695 hashmap[bi] = request.backend.update_block(hashmap[bi], data[:bl], bo)
697 hashmap.append(request.backend.put_block(('\x00' * bo) + data[:bl]))
698 return bl # Return ammount of data written.
700 def hashmap_hash(request, hashmap):
701 """Produce the root hash, treating the hashmap as a Merkle-like tree."""
704 h = hashlib.new(request.backend.hash_algorithm)
708 if len(hashmap) == 0:
709 return hexlify(subhash(''))
710 if len(hashmap) == 1:
714 while s < len(hashmap):
716 h = [unhexlify(x) for x in hashmap]
717 h += [('\x00' * len(h[0]))] * (s - len(hashmap))
719 h = [subhash(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
722 def update_response_headers(request, response):
723 if request.serialization == 'xml':
724 response['Content-Type'] = 'application/xml; charset=UTF-8'
725 elif request.serialization == 'json':
726 response['Content-Type'] = 'application/json; charset=UTF-8'
727 elif not response['Content-Type']:
728 response['Content-Type'] = 'text/plain; charset=UTF-8'
730 if not response.has_header('Content-Length') and not (response.has_header('Content-Type') and response['Content-Type'].startswith('multipart/byteranges')):
731 response['Content-Length'] = len(response.content)
734 response['Date'] = format_date_time(time())
736 def render_fault(request, fault):
737 if settings.DEBUG or settings.TEST:
738 fault.details = format_exc(fault)
740 request.serialization = 'text'
741 data = '\n'.join((fault.message, fault.details)) + '\n'
742 response = HttpResponse(data, status=fault.code)
743 update_response_headers(request, response)
746 def request_serialization(request, format_allowed=False):
747 """Return the serialization format requested.
749 Valid formats are 'text' and 'json', 'xml' if 'format_allowed' is True.
752 if not format_allowed:
755 format = request.GET.get('format')
758 elif format == 'xml':
761 for item in request.META.get('HTTP_ACCEPT', '').split(','):
762 accept, sep, rest = item.strip().partition(';')
763 if accept == 'application/json':
765 elif accept == 'application/xml' or accept == 'text/xml':
770 def api_method(http_method=None, format_allowed=False):
771 """Decorator function for views that implement an API method."""
775 def wrapper(request, *args, **kwargs):
777 if http_method and request.method != http_method:
778 raise BadRequest('Method not allowed.')
780 # The args variable may contain up to (account, container, object).
781 if len(args) > 1 and len(args[1]) > 256:
782 raise BadRequest('Container name too large.')
783 if len(args) > 2 and len(args[2]) > 1024:
784 raise BadRequest('Object name too large.')
786 # Fill in custom request variables.
787 request.serialization = request_serialization(request, format_allowed)
788 request.backend = connect_backend()
790 response = func(request, *args, **kwargs)
791 update_response_headers(request, response)
794 return render_fault(request, fault)
795 except BaseException, e:
796 logger.exception('Unexpected error: %s' % e)
797 fault = ServiceUnavailable('Unexpected error')
798 return render_fault(request, fault)
800 request.backend.wrapper.conn.close()