1 # Copyright 2011 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from functools import wraps
36 from traceback import format_exc
37 from wsgiref.handlers import format_date_time
38 from binascii import hexlify, unhexlify
39 from datetime import datetime, tzinfo, timedelta
41 from django.conf import settings
42 from django.http import HttpResponse
43 from django.utils import simplejson as json
44 from django.utils.http import http_date, parse_etags
45 from django.utils.encoding import smart_str
47 from pithos.api.compat import parse_http_date_safe, parse_http_date
48 from pithos.api.faults import (Fault, NotModified, BadRequest, Unauthorized, ItemNotFound,
49 Conflict, LengthRequired, PreconditionFailed, RangeNotSatisfiable,
51 from pithos.backends import connect_backend
52 from pithos.backends.base import NotAllowedError
60 logger = logging.getLogger(__name__)
64 def utcoffset(self, dt):
73 def json_encode_decimal(obj):
74 if isinstance(obj, decimal.Decimal):
76 raise TypeError(repr(obj) + " is not JSON serializable")
79 """Return an ISO8601 date string that includes a timezone."""
81 return d.replace(tzinfo=UTC()).isoformat()
83 def rename_meta_key(d, old, new):
89 def printable_header_dict(d):
90 """Format a meta dictionary for printing out json/xml.
92 Convert all keys to lower case and replace dashes with underscores.
93 Format 'last_modified' timestamp.
96 d['last_modified'] = isoformat(datetime.fromtimestamp(d['last_modified']))
97 return dict([(k.lower().replace('-', '_'), v) for k, v in d.iteritems()])
99 def format_header_key(k):
100 """Convert underscores to dashes and capitalize intra-dash strings."""
101 return '-'.join([x.capitalize() for x in k.replace('_', '-').split('-')])
103 def get_header_prefix(request, prefix):
104 """Get all prefix-* request headers in a dict. Reformat keys with format_header_key()."""
106 prefix = 'HTTP_' + prefix.upper().replace('-', '_')
107 # TODO: Document or remove '~' replacing.
108 return dict([(format_header_key(k[5:]), v.replace('~', '')) for k, v in request.META.iteritems() if k.startswith(prefix) and len(k) > len(prefix)])
110 def get_account_headers(request):
111 meta = get_header_prefix(request, 'X-Account-Meta-')
113 for k, v in get_header_prefix(request, 'X-Account-Group-').iteritems():
115 if '-' in n or '_' in n:
116 raise BadRequest('Bad characters in group name')
117 groups[n] = v.replace(' ', '').split(',')
118 while '' in groups[n]:
122 def put_account_headers(response, meta, groups, policy):
124 response['X-Account-Container-Count'] = meta['count']
126 response['X-Account-Bytes-Used'] = meta['bytes']
127 response['Last-Modified'] = http_date(int(meta['modified']))
128 for k in [x for x in meta.keys() if x.startswith('X-Account-Meta-')]:
129 response[smart_str(k, strings_only=True)] = smart_str(meta[k], strings_only=True)
130 if 'until_timestamp' in meta:
131 response['X-Account-Until-Timestamp'] = http_date(int(meta['until_timestamp']))
132 for k, v in groups.iteritems():
133 k = smart_str(k, strings_only=True)
134 k = format_header_key('X-Account-Group-' + k)
135 v = smart_str(','.join(v), strings_only=True)
137 for k, v in policy.iteritems():
138 response[smart_str(format_header_key('X-Account-Policy-' + k), strings_only=True)] = smart_str(v, strings_only=True)
140 def get_container_headers(request):
141 meta = get_header_prefix(request, 'X-Container-Meta-')
142 policy = dict([(k[19:].lower(), v.replace(' ', '')) for k, v in get_header_prefix(request, 'X-Container-Policy-').iteritems()])
145 def put_container_headers(request, response, meta, policy):
147 response['X-Container-Object-Count'] = meta['count']
149 response['X-Container-Bytes-Used'] = meta['bytes']
150 response['Last-Modified'] = http_date(int(meta['modified']))
151 for k in [x for x in meta.keys() if x.startswith('X-Container-Meta-')]:
152 response[smart_str(k, strings_only=True)] = smart_str(meta[k], strings_only=True)
153 l = [smart_str(x, strings_only=True) for x in meta['object_meta'] if x.startswith('X-Object-Meta-')]
154 response['X-Container-Object-Meta'] = ','.join([x[14:] for x in l])
155 response['X-Container-Block-Size'] = request.backend.block_size
156 response['X-Container-Block-Hash'] = request.backend.hash_algorithm
157 if 'until_timestamp' in meta:
158 response['X-Container-Until-Timestamp'] = http_date(int(meta['until_timestamp']))
159 for k, v in policy.iteritems():
160 response[smart_str(format_header_key('X-Container-Policy-' + k), strings_only=True)] = smart_str(v, strings_only=True)
162 def get_object_headers(request):
163 meta = get_header_prefix(request, 'X-Object-Meta-')
164 if request.META.get('CONTENT_TYPE'):
165 meta['Content-Type'] = request.META['CONTENT_TYPE']
166 if request.META.get('HTTP_CONTENT_ENCODING'):
167 meta['Content-Encoding'] = request.META['HTTP_CONTENT_ENCODING']
168 if request.META.get('HTTP_CONTENT_DISPOSITION'):
169 meta['Content-Disposition'] = request.META['HTTP_CONTENT_DISPOSITION']
170 if request.META.get('HTTP_X_OBJECT_MANIFEST'):
171 meta['X-Object-Manifest'] = request.META['HTTP_X_OBJECT_MANIFEST']
172 return meta, get_sharing(request), get_public(request)
174 def put_object_headers(response, meta, restricted=False):
175 response['ETag'] = meta['hash']
176 response['Content-Length'] = meta['bytes']
177 response['Content-Type'] = meta.get('Content-Type', 'application/octet-stream')
178 response['Last-Modified'] = http_date(int(meta['modified']))
180 response['X-Object-Modified-By'] = smart_str(meta['modified_by'], strings_only=True)
181 response['X-Object-Version'] = meta['version']
182 response['X-Object-Version-Timestamp'] = http_date(int(meta['version_timestamp']))
183 for k in [x for x in meta.keys() if x.startswith('X-Object-Meta-')]:
184 response[smart_str(k, strings_only=True)] = smart_str(meta[k], strings_only=True)
185 for k in ('Content-Encoding', 'Content-Disposition', 'X-Object-Manifest',
186 'X-Object-Sharing', 'X-Object-Shared-By', 'X-Object-Allowed-To',
189 response[k] = smart_str(meta[k], strings_only=True)
191 for k in ('Content-Encoding', 'Content-Disposition'):
193 response[k] = meta[k]
195 def update_manifest_meta(request, v_account, meta):
196 """Update metadata if the object has an X-Object-Manifest."""
198 if 'X-Object-Manifest' in meta:
202 src_container, src_name = split_container_object_string('/' + meta['X-Object-Manifest'])
203 objects = request.backend.list_objects(request.user, v_account,
204 src_container, prefix=src_name, virtual=False)
206 src_meta = request.backend.get_object_meta(request.user,
207 v_account, src_container, x[0], x[1])
208 hash += src_meta['hash']
209 bytes += src_meta['bytes']
213 meta['bytes'] = bytes
216 meta['hash'] = md5.hexdigest().lower()
218 def update_sharing_meta(request, permissions, v_account, v_container, v_object, meta):
219 if permissions is None:
221 allowed, perm_path, perms = permissions
225 r = ','.join(perms.get('read', []))
227 ret.append('read=' + r)
228 w = ','.join(perms.get('write', []))
230 ret.append('write=' + w)
231 meta['X-Object-Sharing'] = '; '.join(ret)
232 if '/'.join((v_account, v_container, v_object)) != perm_path:
233 meta['X-Object-Shared-By'] = perm_path
234 if request.user != v_account:
235 meta['X-Object-Allowed-To'] = allowed
237 def update_public_meta(public, meta):
240 meta['X-Object-Public'] = public
242 def validate_modification_preconditions(request, meta):
243 """Check that the modified timestamp conforms with the preconditions set."""
245 if 'modified' not in meta:
246 return # TODO: Always return?
248 if_modified_since = request.META.get('HTTP_IF_MODIFIED_SINCE')
249 if if_modified_since is not None:
250 if_modified_since = parse_http_date_safe(if_modified_since)
251 if if_modified_since is not None and int(meta['modified']) <= if_modified_since:
252 raise NotModified('Resource has not been modified')
254 if_unmodified_since = request.META.get('HTTP_IF_UNMODIFIED_SINCE')
255 if if_unmodified_since is not None:
256 if_unmodified_since = parse_http_date_safe(if_unmodified_since)
257 if if_unmodified_since is not None and int(meta['modified']) > if_unmodified_since:
258 raise PreconditionFailed('Resource has been modified')
260 def validate_matching_preconditions(request, meta):
261 """Check that the ETag conforms with the preconditions set."""
263 hash = meta.get('hash', None)
265 if_match = request.META.get('HTTP_IF_MATCH')
266 if if_match is not None:
268 raise PreconditionFailed('Resource does not exist')
269 if if_match != '*' and hash not in [x.lower() for x in parse_etags(if_match)]:
270 raise PreconditionFailed('Resource ETag does not match')
272 if_none_match = request.META.get('HTTP_IF_NONE_MATCH')
273 if if_none_match is not None:
274 # TODO: If this passes, must ignore If-Modified-Since header.
276 if if_none_match == '*' or hash in [x.lower() for x in parse_etags(if_none_match)]:
277 # TODO: Continue if an If-Modified-Since header is present.
278 if request.method in ('HEAD', 'GET'):
279 raise NotModified('Resource ETag matches')
280 raise PreconditionFailed('Resource exists or ETag matches')
282 def split_container_object_string(s):
283 if not len(s) > 0 or s[0] != '/':
289 return s[:pos], s[(pos + 1):]
291 def copy_or_move_object(request, src_account, src_container, src_name, dest_account, dest_container, dest_name, move=False):
292 """Copy or move an object."""
294 meta, permissions, public = get_object_headers(request)
295 src_version = request.META.get('HTTP_X_SOURCE_VERSION')
298 version_id = request.backend.move_object(request.user, src_account, src_container, src_name,
299 dest_account, dest_container, dest_name,
300 meta, False, permissions)
302 version_id = request.backend.copy_object(request.user, src_account, src_container, src_name,
303 dest_account, dest_container, dest_name,
304 meta, False, permissions, src_version)
305 except NotAllowedError:
306 raise Unauthorized('Access denied')
307 except (NameError, IndexError):
308 raise ItemNotFound('Container or object does not exist')
310 raise BadRequest('Invalid sharing header')
311 except AttributeError, e:
312 raise Conflict('\n'.join(e.data) + '\n')
313 if public is not None:
315 request.backend.update_object_public(request.user, dest_account, dest_container, dest_name, public)
316 except NotAllowedError:
317 raise Unauthorized('Access denied')
319 raise ItemNotFound('Object does not exist')
322 def get_int_parameter(p):
332 def get_content_length(request):
333 content_length = get_int_parameter(request.META.get('CONTENT_LENGTH'))
334 if content_length is None:
335 raise LengthRequired('Missing or invalid Content-Length header')
336 return content_length
338 def get_range(request, size):
339 """Parse a Range header from the request.
341 Either returns None, when the header is not existent or should be ignored,
342 or a list of (offset, length) tuples - should be further checked.
345 ranges = request.META.get('HTTP_RANGE', '').replace(' ', '')
346 if not ranges.startswith('bytes='):
350 for r in (x.strip() for x in ranges[6:].split(',')):
351 p = re.compile('^(?P<offset>\d*)-(?P<upto>\d*)$')
355 offset = m.group('offset')
356 upto = m.group('upto')
357 if offset == '' and upto == '':
366 ret.append((offset, upto - offset + 1))
368 ret.append((offset, size - offset))
371 ret.append((size - length, length))
375 def get_content_range(request):
376 """Parse a Content-Range header from the request.
378 Either returns None, when the header is not existent or should be ignored,
379 or an (offset, length, total) tuple - check as length, total may be None.
380 Returns (None, None, None) if the provided range is '*/*'.
383 ranges = request.META.get('HTTP_CONTENT_RANGE', '')
387 p = re.compile('^bytes (?P<offset>\d+)-(?P<upto>\d*)/(?P<total>(\d+|\*))$')
390 if ranges == 'bytes */*':
391 return (None, None, None)
393 offset = int(m.group('offset'))
394 upto = m.group('upto')
395 total = m.group('total')
404 if (upto is not None and offset > upto) or \
405 (total is not None and offset >= total) or \
406 (total is not None and upto is not None and upto >= total):
412 length = upto - offset + 1
413 return (offset, length, total)
415 def get_sharing(request):
416 """Parse an X-Object-Sharing header from the request.
418 Raises BadRequest on error.
421 permissions = request.META.get('HTTP_X_OBJECT_SHARING')
422 if permissions is None:
425 # TODO: Document or remove '~' replacing.
426 permissions = permissions.replace('~', '')
429 permissions = permissions.replace(' ', '')
430 if permissions == '':
432 for perm in (x for x in permissions.split(';')):
433 if perm.startswith('read='):
434 ret['read'] = list(set([v.replace(' ','').lower() for v in perm[5:].split(',')]))
435 if '' in ret['read']:
436 ret['read'].remove('')
437 if '*' in ret['read']:
439 if len(ret['read']) == 0:
440 raise BadRequest('Bad X-Object-Sharing header value')
441 elif perm.startswith('write='):
442 ret['write'] = list(set([v.replace(' ','').lower() for v in perm[6:].split(',')]))
443 if '' in ret['write']:
444 ret['write'].remove('')
445 if '*' in ret['write']:
447 if len(ret['write']) == 0:
448 raise BadRequest('Bad X-Object-Sharing header value')
450 raise BadRequest('Bad X-Object-Sharing header value')
452 # Keep duplicates only in write list.
453 dups = [x for x in ret.get('read', []) if x in ret.get('write', []) and x != '*']
456 ret['read'].remove(x)
457 if len(ret['read']) == 0:
462 def get_public(request):
463 """Parse an X-Object-Public header from the request.
465 Raises BadRequest on error.
468 public = request.META.get('HTTP_X_OBJECT_PUBLIC')
472 public = public.replace(' ', '').lower()
475 elif public == 'false' or public == '':
477 raise BadRequest('Bad X-Object-Public header value')
479 def raw_input_socket(request):
480 """Return the socket for reading the rest of the request."""
482 server_software = request.META.get('SERVER_SOFTWARE')
483 if server_software and server_software.startswith('mod_python'):
485 if 'wsgi.input' in request.environ:
486 return request.environ['wsgi.input']
487 raise ServiceUnavailable('Unknown server software')
489 MAX_UPLOAD_SIZE = 5 * (1024 * 1024 * 1024) # 5GB
491 def socket_read_iterator(request, length=0, blocksize=4096):
492 """Return a maximum of blocksize data read from the socket in each iteration.
494 Read up to 'length'. If 'length' is negative, will attempt a chunked read.
495 The maximum ammount of data read is controlled by MAX_UPLOAD_SIZE.
498 sock = raw_input_socket(request)
499 if length < 0: # Chunked transfers
500 # Small version (server does the dechunking).
501 if request.environ.get('mod_wsgi.input_chunked', None):
502 while length < MAX_UPLOAD_SIZE:
503 data = sock.read(blocksize)
507 raise BadRequest('Maximum size is reached')
509 # Long version (do the dechunking).
511 while length < MAX_UPLOAD_SIZE:
513 if hasattr(sock, 'readline'):
514 chunk_length = sock.readline()
517 while chunk_length[-1:] != '\n':
518 chunk_length += sock.read(1)
520 pos = chunk_length.find(';')
522 chunk_length = chunk_length[:pos]
524 chunk_length = int(chunk_length, 16)
526 raise BadRequest('Bad chunk size') # TODO: Change to something more appropriate.
528 if chunk_length == 0:
532 # Get the actual data.
533 while chunk_length > 0:
534 chunk = sock.read(min(chunk_length, blocksize))
535 chunk_length -= len(chunk)
539 if len(data) >= blocksize:
540 ret = data[:blocksize]
541 data = data[blocksize:]
544 raise BadRequest('Maximum size is reached')
546 if length > MAX_UPLOAD_SIZE:
547 raise BadRequest('Maximum size is reached')
549 data = sock.read(min(length, blocksize))
555 class ObjectWrapper(object):
556 """Return the object's data block-per-block in each iteration.
558 Read from the object using the offset and length provided in each entry of the range list.
561 def __init__(self, backend, ranges, sizes, hashmaps, boundary):
562 self.backend = backend
565 self.hashmaps = hashmaps
566 self.boundary = boundary
567 self.size = sum(self.sizes)
574 self.range_index = -1
575 self.offset, self.length = self.ranges[0]
580 def part_iterator(self):
582 # Get the file for the current offset.
583 file_size = self.sizes[self.file_index]
584 while self.offset >= file_size:
585 self.offset -= file_size
587 file_size = self.sizes[self.file_index]
589 # Get the block for the current position.
590 self.block_index = int(self.offset / self.backend.block_size)
591 if self.block_hash != self.hashmaps[self.file_index][self.block_index]:
592 self.block_hash = self.hashmaps[self.file_index][self.block_index]
594 self.block = self.backend.get_block(self.block_hash)
596 raise ItemNotFound('Block does not exist')
598 # Get the data from the block.
599 bo = self.offset % self.backend.block_size
600 bl = min(self.length, len(self.block) - bo)
601 data = self.block[bo:bo + bl]
609 if len(self.ranges) == 1:
610 return self.part_iterator()
611 if self.range_index == len(self.ranges):
614 if self.range_index == -1:
616 return self.part_iterator()
617 except StopIteration:
618 self.range_index += 1
620 if self.range_index < len(self.ranges):
622 self.offset, self.length = self.ranges[self.range_index]
624 if self.range_index > 0:
626 out.append('--' + self.boundary)
627 out.append('Content-Range: bytes %d-%d/%d' % (self.offset, self.offset + self.length - 1, self.size))
628 out.append('Content-Transfer-Encoding: binary')
631 return '\r\n'.join(out)
635 out.append('--' + self.boundary + '--')
637 return '\r\n'.join(out)
639 def object_data_response(request, sizes, hashmaps, meta, public=False):
640 """Get the HttpResponse object for replying with the object's data."""
644 ranges = get_range(request, size)
649 check = [True for offset, length in ranges if
650 length <= 0 or length > size or
651 offset < 0 or offset >= size or
652 offset + length > size]
654 raise RangeNotSatisfiable('Requested range exceeds object limits')
656 if_range = request.META.get('HTTP_IF_RANGE')
659 # Modification time has passed instead.
660 last_modified = parse_http_date(if_range)
661 if last_modified != meta['modified']:
665 if if_range != meta['hash']:
669 if ret == 206 and len(ranges) > 1:
670 boundary = uuid.uuid4().hex
673 wrapper = ObjectWrapper(request.backend, ranges, sizes, hashmaps, boundary)
674 response = HttpResponse(wrapper, status=ret)
675 put_object_headers(response, meta, public)
678 offset, length = ranges[0]
679 response['Content-Length'] = length # Update with the correct length.
680 response['Content-Range'] = 'bytes %d-%d/%d' % (offset, offset + length - 1, size)
682 del(response['Content-Length'])
683 response['Content-Type'] = 'multipart/byteranges; boundary=%s' % (boundary,)
686 def put_object_block(request, hashmap, data, offset):
687 """Put one block of data at the given offset."""
689 bi = int(offset / request.backend.block_size)
690 bo = offset % request.backend.block_size
691 bl = min(len(data), request.backend.block_size - bo)
692 if bi < len(hashmap):
693 hashmap[bi] = request.backend.update_block(hashmap[bi], data[:bl], bo)
695 hashmap.append(request.backend.put_block(('\x00' * bo) + data[:bl]))
696 return bl # Return ammount of data written.
698 def hashmap_hash(request, hashmap):
699 """Produce the root hash, treating the hashmap as a Merkle-like tree."""
702 h = hashlib.new(request.backend.hash_algorithm)
706 if len(hashmap) == 0:
707 return hexlify(subhash(''))
708 if len(hashmap) == 1:
712 while s < len(hashmap):
714 h = [unhexlify(x) for x in hashmap]
715 h += [('\x00' * len(h[0]))] * (s - len(hashmap))
717 h = [subhash(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
720 def update_response_headers(request, response):
721 if request.serialization == 'xml':
722 response['Content-Type'] = 'application/xml; charset=UTF-8'
723 elif request.serialization == 'json':
724 response['Content-Type'] = 'application/json; charset=UTF-8'
725 elif not response['Content-Type']:
726 response['Content-Type'] = 'text/plain; charset=UTF-8'
728 if not response.has_header('Content-Length') and not (response.has_header('Content-Type') and response['Content-Type'].startswith('multipart/byteranges')):
729 response['Content-Length'] = len(response.content)
732 response['Date'] = format_date_time(time())
734 def render_fault(request, fault):
735 if settings.DEBUG or settings.TEST:
736 fault.details = format_exc(fault)
738 request.serialization = 'text'
739 data = '\n'.join((fault.message, fault.details)) + '\n'
740 response = HttpResponse(data, status=fault.code)
741 update_response_headers(request, response)
744 def request_serialization(request, format_allowed=False):
745 """Return the serialization format requested.
747 Valid formats are 'text' and 'json', 'xml' if 'format_allowed' is True.
750 if not format_allowed:
753 format = request.GET.get('format')
756 elif format == 'xml':
759 for item in request.META.get('HTTP_ACCEPT', '').split(','):
760 accept, sep, rest = item.strip().partition(';')
761 if accept == 'application/json':
763 elif accept == 'application/xml' or accept == 'text/xml':
768 def api_method(http_method=None, format_allowed=False):
769 """Decorator function for views that implement an API method."""
773 def wrapper(request, *args, **kwargs):
775 if http_method and request.method != http_method:
776 raise BadRequest('Method not allowed.')
778 # The args variable may contain up to (account, container, object).
779 if len(args) > 1 and len(args[1]) > 256:
780 raise BadRequest('Container name too large.')
781 if len(args) > 2 and len(args[2]) > 1024:
782 raise BadRequest('Object name too large.')
784 # Fill in custom request variables.
785 request.serialization = request_serialization(request, format_allowed)
786 request.backend = connect_backend()
788 response = func(request, *args, **kwargs)
789 update_response_headers(request, response)
792 return render_fault(request, fault)
793 except BaseException, e:
794 logger.exception('Unexpected error: %s' % e)
795 fault = ServiceUnavailable('Unexpected error')
796 return render_fault(request, fault)
798 request.backend.wrapper.conn.close()