Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / util.py @ 761c2b3c

History | View | Annotate | Download (37.5 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
# 
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
# 
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
# 
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from functools import wraps
35
from time import time
36
from traceback import format_exc
37
from wsgiref.handlers import format_date_time
38
from binascii import hexlify, unhexlify
39
from datetime import datetime, tzinfo, timedelta
40
from urllib import quote, unquote
41

    
42
from django.conf import settings
43
from django.http import HttpResponse
44
from django.template.loader import render_to_string
45
from django.utils import simplejson as json
46
from django.utils.http import http_date, parse_etags
47
from django.utils.encoding import smart_unicode, smart_str
48
from django.core.files.uploadhandler import FileUploadHandler
49
from django.core.files.uploadedfile import UploadedFile
50

    
51
from synnefo.lib.parsedate import parse_http_date_safe, parse_http_date
52
from synnefo.lib.astakos import get_user
53

    
54
from pithos.api.faults import (Fault, NotModified, BadRequest, Unauthorized, Forbidden, ItemNotFound,
55
                                Conflict, LengthRequired, PreconditionFailed, RequestEntityTooLarge,
56
                                RangeNotSatisfiable, InternalServerError, NotImplemented)
57
from pithos.api.short_url import encode_url
58
from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
59
                                 BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH,
60
                                 BACKEND_BLOCK_UMASK,
61
                                 BACKEND_QUEUE_MODULE, BACKEND_QUEUE_HOSTS,
62
                                 BACKEND_QUEUE_EXCHANGE,
63
                                 PITHOS_QUOTAHOLDER_URL,
64
                                 BACKEND_QUOTA, BACKEND_VERSIONING,
65
                                 AUTHENTICATION_URL, AUTHENTICATION_USERS,
66
                                 SERVICE_TOKEN, COOKIE_NAME)
67

    
68
from pithos.backends import connect_backend
69
from pithos.backends.base import NotAllowedError, QuotaError, ItemNotExists, VersionNotExists
70

    
71
import logging
72
import re
73
import hashlib
74
import uuid
75
import decimal
76

    
77

    
78
logger = logging.getLogger(__name__)
79

    
80

    
81
class UTC(tzinfo):
82
   def utcoffset(self, dt):
83
       return timedelta(0)
84

    
85
   def tzname(self, dt):
86
       return 'UTC'
87

    
88
   def dst(self, dt):
89
       return timedelta(0)
90

    
91
def json_encode_decimal(obj):
92
    if isinstance(obj, decimal.Decimal):
93
        return str(obj)
94
    raise TypeError(repr(obj) + " is not JSON serializable")
95

    
96
def isoformat(d):
97
   """Return an ISO8601 date string that includes a timezone."""
98

    
99
   return d.replace(tzinfo=UTC()).isoformat()
100

    
101
def rename_meta_key(d, old, new):
102
    if old not in d:
103
        return
104
    d[new] = d[old]
105
    del(d[old])
106

    
107
def printable_header_dict(d):
108
    """Format a meta dictionary for printing out json/xml.
109
    
110
    Convert all keys to lower case and replace dashes with underscores.
111
    Format 'last_modified' timestamp.
112
    """
113
    
114
    if 'last_modified' in d and d['last_modified']:
115
        d['last_modified'] = isoformat(datetime.fromtimestamp(d['last_modified']))
116
    return dict([(k.lower().replace('-', '_'), v) for k, v in d.iteritems()])
117

    
118
def format_header_key(k):
119
    """Convert underscores to dashes and capitalize intra-dash strings."""
120
    return '-'.join([x.capitalize() for x in k.replace('_', '-').split('-')])
121

    
122
def get_header_prefix(request, prefix):
123
    """Get all prefix-* request headers in a dict. Reformat keys with format_header_key()."""
124
    
125
    prefix = 'HTTP_' + prefix.upper().replace('-', '_')
126
    # TODO: Document or remove '~' replacing.
127
    return dict([(format_header_key(k[5:]), v.replace('~', '')) for k, v in request.META.iteritems() if k.startswith(prefix) and len(k) > len(prefix)])
128

    
129
def check_meta_headers(meta):
130
    if len(meta) > 90:
131
        raise BadRequest('Too many headers.')
132
    for k, v in meta.iteritems():
133
        if len(k) > 128:
134
            raise BadRequest('Header name too large.')
135
        if len(v) > 256:
136
            raise BadRequest('Header value too large.')
137

    
138
def get_account_headers(request):
139
    meta = get_header_prefix(request, 'X-Account-Meta-')
140
    check_meta_headers(meta)
141
    groups = {}
142
    for k, v in get_header_prefix(request, 'X-Account-Group-').iteritems():
143
        n = k[16:].lower()
144
        if '-' in n or '_' in n:
145
            raise BadRequest('Bad characters in group name')
146
        groups[n] = v.replace(' ', '').split(',')
147
        while '' in groups[n]:
148
            groups[n].remove('')
149
    return meta, groups
150

    
151
def put_account_headers(response, meta, groups, policy):
152
    if 'count' in meta:
153
        response['X-Account-Container-Count'] = meta['count']
154
    if 'bytes' in meta:
155
        response['X-Account-Bytes-Used'] = meta['bytes']
156
    response['Last-Modified'] = http_date(int(meta['modified']))
157
    for k in [x for x in meta.keys() if x.startswith('X-Account-Meta-')]:
158
        response[smart_str(k, strings_only=True)] = smart_str(meta[k], strings_only=True)
159
    if 'until_timestamp' in meta:
160
        response['X-Account-Until-Timestamp'] = http_date(int(meta['until_timestamp']))
161
    for k, v in groups.iteritems():
162
        k = smart_str(k, strings_only=True)
163
        k = format_header_key('X-Account-Group-' + k)
164
        v = smart_str(','.join(v), strings_only=True)
165
        response[k] = v
166
    for k, v in policy.iteritems():
167
        response[smart_str(format_header_key('X-Account-Policy-' + k), strings_only=True)] = smart_str(v, strings_only=True)
168

    
169
def get_container_headers(request):
170
    meta = get_header_prefix(request, 'X-Container-Meta-')
171
    check_meta_headers(meta)
172
    policy = dict([(k[19:].lower(), v.replace(' ', '')) for k, v in get_header_prefix(request, 'X-Container-Policy-').iteritems()])
173
    return meta, policy
174

    
175
def put_container_headers(request, response, meta, policy):
176
    if 'count' in meta:
177
        response['X-Container-Object-Count'] = meta['count']
178
    if 'bytes' in meta:
179
        response['X-Container-Bytes-Used'] = meta['bytes']
180
    response['Last-Modified'] = http_date(int(meta['modified']))
181
    for k in [x for x in meta.keys() if x.startswith('X-Container-Meta-')]:
182
        response[smart_str(k, strings_only=True)] = smart_str(meta[k], strings_only=True)
183
    l = [smart_str(x, strings_only=True) for x in meta['object_meta'] if x.startswith('X-Object-Meta-')]
184
    response['X-Container-Object-Meta'] = ','.join([x[14:] for x in l])
185
    response['X-Container-Block-Size'] = request.backend.block_size
186
    response['X-Container-Block-Hash'] = request.backend.hash_algorithm
187
    if 'until_timestamp' in meta:
188
        response['X-Container-Until-Timestamp'] = http_date(int(meta['until_timestamp']))
189
    for k, v in policy.iteritems():
190
        response[smart_str(format_header_key('X-Container-Policy-' + k), strings_only=True)] = smart_str(v, strings_only=True)
191

    
192
def get_object_headers(request):
193
    content_type = request.META.get('CONTENT_TYPE', None)
194
    meta = get_header_prefix(request, 'X-Object-Meta-')
195
    check_meta_headers(meta)
196
    if request.META.get('HTTP_CONTENT_ENCODING'):
197
        meta['Content-Encoding'] = request.META['HTTP_CONTENT_ENCODING']
198
    if request.META.get('HTTP_CONTENT_DISPOSITION'):
199
        meta['Content-Disposition'] = request.META['HTTP_CONTENT_DISPOSITION']
200
    if request.META.get('HTTP_X_OBJECT_MANIFEST'):
201
        meta['X-Object-Manifest'] = request.META['HTTP_X_OBJECT_MANIFEST']
202
    return content_type, meta, get_sharing(request), get_public(request)
203

    
204
def put_object_headers(response, meta, restricted=False):
205
    response['ETag'] = meta['checksum']
206
    response['Content-Length'] = meta['bytes']
207
    response['Content-Type'] = meta.get('type', 'application/octet-stream')
208
    response['Last-Modified'] = http_date(int(meta['modified']))
209
    if not restricted:
210
        response['X-Object-Hash'] = meta['hash']
211
        response['X-Object-UUID'] = meta['uuid']
212
        response['X-Object-Modified-By'] = smart_str(meta['modified_by'], strings_only=True)
213
        response['X-Object-Version'] = meta['version']
214
        response['X-Object-Version-Timestamp'] = http_date(int(meta['version_timestamp']))
215
        for k in [x for x in meta.keys() if x.startswith('X-Object-Meta-')]:
216
            response[smart_str(k, strings_only=True)] = smart_str(meta[k], strings_only=True)
217
        for k in ('Content-Encoding', 'Content-Disposition', 'X-Object-Manifest',
218
                  'X-Object-Sharing', 'X-Object-Shared-By', 'X-Object-Allowed-To',
219
                  'X-Object-Public'):
220
            if k in meta:
221
                response[k] = smart_str(meta[k], strings_only=True)
222
    else:
223
        for k in ('Content-Encoding', 'Content-Disposition'):
224
            if k in meta:
225
                response[k] = smart_str(meta[k], strings_only=True)
226

    
227
def update_manifest_meta(request, v_account, meta):
228
    """Update metadata if the object has an X-Object-Manifest."""
229
    
230
    if 'X-Object-Manifest' in meta:
231
        etag = ''
232
        bytes = 0
233
        try:
234
            src_container, src_name = split_container_object_string('/' + meta['X-Object-Manifest'])
235
            objects = request.backend.list_objects(request.user_uniq, v_account,
236
                                src_container, prefix=src_name, virtual=False)
237
            for x in objects:
238
                src_meta = request.backend.get_object_meta(request.user_uniq,
239
                                        v_account, src_container, x[0], 'pithos', x[1])
240
                etag += src_meta['checksum']
241
                bytes += src_meta['bytes']
242
        except:
243
            # Ignore errors.
244
            return
245
        meta['bytes'] = bytes
246
        md5 = hashlib.md5()
247
        md5.update(etag)
248
        meta['checksum'] = md5.hexdigest().lower()
249

    
250
def update_sharing_meta(request, permissions, v_account, v_container, v_object, meta):
251
    if permissions is None:
252
        return
253
    allowed, perm_path, perms = permissions
254
    if len(perms) == 0:
255
        return
256
    ret = []
257
    r = ','.join(perms.get('read', []))
258
    if r:
259
        ret.append('read=' + r)
260
    w = ','.join(perms.get('write', []))
261
    if w:
262
        ret.append('write=' + w)
263
    meta['X-Object-Sharing'] = '; '.join(ret)
264
    if '/'.join((v_account, v_container, v_object)) != perm_path:
265
        meta['X-Object-Shared-By'] = perm_path
266
    if request.user_uniq != v_account:
267
        meta['X-Object-Allowed-To'] = allowed
268

    
269
def update_public_meta(public, meta):
270
    if not public:
271
        return
272
    meta['X-Object-Public'] = '/public/' + encode_url(public)
273

    
274
def validate_modification_preconditions(request, meta):
275
    """Check that the modified timestamp conforms with the preconditions set."""
276
    
277
    if 'modified' not in meta:
278
        return # TODO: Always return?
279
    
280
    if_modified_since = request.META.get('HTTP_IF_MODIFIED_SINCE')
281
    if if_modified_since is not None:
282
        if_modified_since = parse_http_date_safe(if_modified_since)
283
    if if_modified_since is not None and int(meta['modified']) <= if_modified_since:
284
        raise NotModified('Resource has not been modified')
285
    
286
    if_unmodified_since = request.META.get('HTTP_IF_UNMODIFIED_SINCE')
287
    if if_unmodified_since is not None:
288
        if_unmodified_since = parse_http_date_safe(if_unmodified_since)
289
    if if_unmodified_since is not None and int(meta['modified']) > if_unmodified_since:
290
        raise PreconditionFailed('Resource has been modified')
291

    
292
def validate_matching_preconditions(request, meta):
293
    """Check that the ETag conforms with the preconditions set."""
294
    
295
    etag = meta['checksum']
296
    if not etag:
297
        etag = None
298
    
299
    if_match = request.META.get('HTTP_IF_MATCH')
300
    if if_match is not None:
301
        if etag is None:
302
            raise PreconditionFailed('Resource does not exist')
303
        if if_match != '*' and etag not in [x.lower() for x in parse_etags(if_match)]:
304
            raise PreconditionFailed('Resource ETag does not match')
305
    
306
    if_none_match = request.META.get('HTTP_IF_NONE_MATCH')
307
    if if_none_match is not None:
308
        # TODO: If this passes, must ignore If-Modified-Since header.
309
        if etag is not None:
310
            if if_none_match == '*' or etag in [x.lower() for x in parse_etags(if_none_match)]:
311
                # TODO: Continue if an If-Modified-Since header is present.
312
                if request.method in ('HEAD', 'GET'):
313
                    raise NotModified('Resource ETag matches')
314
                raise PreconditionFailed('Resource exists or ETag matches')
315

    
316
def split_container_object_string(s):
317
    if not len(s) > 0 or s[0] != '/':
318
        raise ValueError
319
    s = s[1:]
320
    pos = s.find('/')
321
    if pos == -1 or pos == len(s) - 1:
322
        raise ValueError
323
    return s[:pos], s[(pos + 1):]
324

    
325
def copy_or_move_object(request, src_account, src_container, src_name, dest_account, dest_container, dest_name, move=False, delimiter=None):
326
    """Copy or move an object."""
327
    
328
    if 'ignore_content_type' in request.GET and 'CONTENT_TYPE' in request.META:
329
        del(request.META['CONTENT_TYPE'])
330
    content_type, meta, permissions, public = get_object_headers(request)
331
    src_version = request.META.get('HTTP_X_SOURCE_VERSION')
332
    try:
333
        if move:
334
            version_id = request.backend.move_object(request.user_uniq, src_account, src_container, src_name,
335
                                                        dest_account, dest_container, dest_name,
336
                                                        content_type, 'pithos', meta, False, permissions, delimiter)
337
        else:
338
            version_id = request.backend.copy_object(request.user_uniq, src_account, src_container, src_name,
339
                                                        dest_account, dest_container, dest_name,
340
                                                        content_type, 'pithos', meta, False, permissions, src_version, delimiter)
341
    except NotAllowedError:
342
        raise Forbidden('Not allowed')
343
    except (ItemNotExists, VersionNotExists):
344
        raise ItemNotFound('Container or object does not exist')
345
    except ValueError:
346
        raise BadRequest('Invalid sharing header')
347
    except QuotaError:
348
        raise RequestEntityTooLarge('Quota exceeded')
349
    if public is not None:
350
        try:
351
            request.backend.update_object_public(request.user_uniq, dest_account, dest_container, dest_name, public)
352
        except NotAllowedError:
353
            raise Forbidden('Not allowed')
354
        except ItemNotExists:
355
            raise ItemNotFound('Object does not exist')
356
    return version_id
357

    
358
def get_int_parameter(p):
359
    if p is not None:
360
        try:
361
            p = int(p)
362
        except ValueError:
363
            return None
364
        if p < 0:
365
            return None
366
    return p
367

    
368
def get_content_length(request):
369
    content_length = get_int_parameter(request.META.get('CONTENT_LENGTH'))
370
    if content_length is None:
371
        raise LengthRequired('Missing or invalid Content-Length header')
372
    return content_length
373

    
374
def get_range(request, size):
375
    """Parse a Range header from the request.
376
    
377
    Either returns None, when the header is not existent or should be ignored,
378
    or a list of (offset, length) tuples - should be further checked.
379
    """
380
    
381
    ranges = request.META.get('HTTP_RANGE', '').replace(' ', '')
382
    if not ranges.startswith('bytes='):
383
        return None
384
    
385
    ret = []
386
    for r in (x.strip() for x in ranges[6:].split(',')):
387
        p = re.compile('^(?P<offset>\d*)-(?P<upto>\d*)$')
388
        m = p.match(r)
389
        if not m:
390
            return None
391
        offset = m.group('offset')
392
        upto = m.group('upto')
393
        if offset == '' and upto == '':
394
            return None
395
        
396
        if offset != '':
397
            offset = int(offset)
398
            if upto != '':
399
                upto = int(upto)
400
                if offset > upto:
401
                    return None
402
                ret.append((offset, upto - offset + 1))
403
            else:
404
                ret.append((offset, size - offset))
405
        else:
406
            length = int(upto)
407
            ret.append((size - length, length))
408
    
409
    return ret
410

    
411
def get_content_range(request):
412
    """Parse a Content-Range header from the request.
413
    
414
    Either returns None, when the header is not existent or should be ignored,
415
    or an (offset, length, total) tuple - check as length, total may be None.
416
    Returns (None, None, None) if the provided range is '*/*'.
417
    """
418
    
419
    ranges = request.META.get('HTTP_CONTENT_RANGE', '')
420
    if not ranges:
421
        return None
422
    
423
    p = re.compile('^bytes (?P<offset>\d+)-(?P<upto>\d*)/(?P<total>(\d+|\*))$')
424
    m = p.match(ranges)
425
    if not m:
426
        if ranges == 'bytes */*':
427
            return (None, None, None)
428
        return None
429
    offset = int(m.group('offset'))
430
    upto = m.group('upto')
431
    total = m.group('total')
432
    if upto != '':
433
        upto = int(upto)
434
    else:
435
        upto = None
436
    if total != '*':
437
        total = int(total)
438
    else:
439
        total = None
440
    if (upto is not None and offset > upto) or \
441
        (total is not None and offset >= total) or \
442
        (total is not None and upto is not None and upto >= total):
443
        return None
444
    
445
    if upto is None:
446
        length = None
447
    else:
448
        length = upto - offset + 1
449
    return (offset, length, total)
450

    
451
def get_sharing(request):
452
    """Parse an X-Object-Sharing header from the request.
453
    
454
    Raises BadRequest on error.
455
    """
456
    
457
    permissions = request.META.get('HTTP_X_OBJECT_SHARING')
458
    if permissions is None:
459
        return None
460
    
461
    # TODO: Document or remove '~' replacing.
462
    permissions = permissions.replace('~', '')
463
    
464
    ret = {}
465
    permissions = permissions.replace(' ', '')
466
    if permissions == '':
467
        return ret
468
    for perm in (x for x in permissions.split(';')):
469
        if perm.startswith('read='):
470
            ret['read'] = list(set([v.replace(' ','').lower() for v in perm[5:].split(',')]))
471
            if '' in ret['read']:
472
                ret['read'].remove('')
473
            if '*' in ret['read']:
474
                ret['read'] = ['*']
475
            if len(ret['read']) == 0:
476
                raise BadRequest('Bad X-Object-Sharing header value')
477
        elif perm.startswith('write='):
478
            ret['write'] = list(set([v.replace(' ','').lower() for v in perm[6:].split(',')]))
479
            if '' in ret['write']:
480
                ret['write'].remove('')
481
            if '*' in ret['write']:
482
                ret['write'] = ['*']
483
            if len(ret['write']) == 0:
484
                raise BadRequest('Bad X-Object-Sharing header value')
485
        else:
486
            raise BadRequest('Bad X-Object-Sharing header value')
487
    
488
    # Keep duplicates only in write list.
489
    dups = [x for x in ret.get('read', []) if x in ret.get('write', []) and x != '*']
490
    if dups:
491
        for x in dups:
492
            ret['read'].remove(x)
493
        if len(ret['read']) == 0:
494
            del(ret['read'])
495
    
496
    return ret
497

    
498
def get_public(request):
499
    """Parse an X-Object-Public header from the request.
500
    
501
    Raises BadRequest on error.
502
    """
503
    
504
    public = request.META.get('HTTP_X_OBJECT_PUBLIC')
505
    if public is None:
506
        return None
507
    
508
    public = public.replace(' ', '').lower()
509
    if public == 'true':
510
        return True
511
    elif public == 'false' or public == '':
512
        return False
513
    raise BadRequest('Bad X-Object-Public header value')
514

    
515
def raw_input_socket(request):
516
    """Return the socket for reading the rest of the request."""
517
    
518
    server_software = request.META.get('SERVER_SOFTWARE')
519
    if server_software and server_software.startswith('mod_python'):
520
        return request._req
521
    if 'wsgi.input' in request.environ:
522
        return request.environ['wsgi.input']
523
    raise NotImplemented('Unknown server software')
524

    
525
MAX_UPLOAD_SIZE = 5 * (1024 * 1024 * 1024) # 5GB
526

    
527
def socket_read_iterator(request, length=0, blocksize=4096):
528
    """Return a maximum of blocksize data read from the socket in each iteration.
529
    
530
    Read up to 'length'. If 'length' is negative, will attempt a chunked read.
531
    The maximum ammount of data read is controlled by MAX_UPLOAD_SIZE.
532
    """
533
    
534
    sock = raw_input_socket(request)
535
    if length < 0: # Chunked transfers
536
        # Small version (server does the dechunking).
537
        if request.environ.get('mod_wsgi.input_chunked', None) or request.META['SERVER_SOFTWARE'].startswith('gunicorn'):
538
            while length < MAX_UPLOAD_SIZE:
539
                data = sock.read(blocksize)
540
                if data == '':
541
                    return
542
                yield data
543
            raise BadRequest('Maximum size is reached')
544
        
545
        # Long version (do the dechunking).
546
        data = ''
547
        while length < MAX_UPLOAD_SIZE:
548
            # Get chunk size.
549
            if hasattr(sock, 'readline'):
550
                chunk_length = sock.readline()
551
            else:
552
                chunk_length = ''
553
                while chunk_length[-1:] != '\n':
554
                    chunk_length += sock.read(1)
555
                chunk_length.strip()
556
            pos = chunk_length.find(';')
557
            if pos >= 0:
558
                chunk_length = chunk_length[:pos]
559
            try:
560
                chunk_length = int(chunk_length, 16)
561
            except Exception, e:
562
                raise BadRequest('Bad chunk size') # TODO: Change to something more appropriate.
563
            # Check if done.
564
            if chunk_length == 0:
565
                if len(data) > 0:
566
                    yield data
567
                return
568
            # Get the actual data.
569
            while chunk_length > 0:
570
                chunk = sock.read(min(chunk_length, blocksize))
571
                chunk_length -= len(chunk)
572
                if length > 0:
573
                    length += len(chunk)
574
                data += chunk
575
                if len(data) >= blocksize:
576
                    ret = data[:blocksize]
577
                    data = data[blocksize:]
578
                    yield ret
579
            sock.read(2) # CRLF
580
        raise BadRequest('Maximum size is reached')
581
    else:
582
        if length > MAX_UPLOAD_SIZE:
583
            raise BadRequest('Maximum size is reached')
584
        while length > 0:
585
            data = sock.read(min(length, blocksize))
586
            if not data:
587
                raise BadRequest()
588
            length -= len(data)
589
            yield data
590

    
591
class SaveToBackendHandler(FileUploadHandler):
592
    """Handle a file from an HTML form the django way."""
593
    
594
    def __init__(self, request=None):
595
        super(SaveToBackendHandler, self).__init__(request)
596
        self.backend = request.backend
597
    
598
    def put_data(self, length):
599
        if len(self.data) >= length:
600
            block = self.data[:length]
601
            self.file.hashmap.append(self.backend.put_block(block))
602
            self.md5.update(block)
603
            self.data = self.data[length:]
604
    
605
    def new_file(self, field_name, file_name, content_type, content_length, charset=None):
606
        self.md5 = hashlib.md5()        
607
        self.data = ''
608
        self.file = UploadedFile(name=file_name, content_type=content_type, charset=charset)
609
        self.file.size = 0
610
        self.file.hashmap = []
611
    
612
    def receive_data_chunk(self, raw_data, start):
613
        self.data += raw_data
614
        self.file.size += len(raw_data)
615
        self.put_data(self.request.backend.block_size)
616
        return None
617
    
618
    def file_complete(self, file_size):
619
        l = len(self.data)
620
        if l > 0:
621
            self.put_data(l)
622
        self.file.etag = self.md5.hexdigest().lower()
623
        return self.file
624

    
625
class ObjectWrapper(object):
626
    """Return the object's data block-per-block in each iteration.
627
    
628
    Read from the object using the offset and length provided in each entry of the range list.
629
    """
630
    
631
    def __init__(self, backend, ranges, sizes, hashmaps, boundary):
632
        self.backend = backend
633
        self.ranges = ranges
634
        self.sizes = sizes
635
        self.hashmaps = hashmaps
636
        self.boundary = boundary
637
        self.size = sum(self.sizes)
638
        
639
        self.file_index = 0
640
        self.block_index = 0
641
        self.block_hash = -1
642
        self.block = ''
643
        
644
        self.range_index = -1
645
        self.offset, self.length = self.ranges[0]
646
    
647
    def __iter__(self):
648
        return self
649
    
650
    def part_iterator(self):
651
        if self.length > 0:
652
            # Get the file for the current offset.
653
            file_size = self.sizes[self.file_index]
654
            while self.offset >= file_size:
655
                self.offset -= file_size
656
                self.file_index += 1
657
                file_size = self.sizes[self.file_index]
658
            
659
            # Get the block for the current position.
660
            self.block_index = int(self.offset / self.backend.block_size)
661
            if self.block_hash != self.hashmaps[self.file_index][self.block_index]:
662
                self.block_hash = self.hashmaps[self.file_index][self.block_index]
663
                try:
664
                    self.block = self.backend.get_block(self.block_hash)
665
                except ItemNotExists:
666
                    raise ItemNotFound('Block does not exist')
667
            
668
            # Get the data from the block.
669
            bo = self.offset % self.backend.block_size
670
            bs = self.backend.block_size
671
            if (self.block_index == len(self.hashmaps[self.file_index]) - 1 and
672
                self.sizes[self.file_index] % self.backend.block_size):
673
                bs = self.sizes[self.file_index] % self.backend.block_size
674
            bl = min(self.length, bs - bo)
675
            data = self.block[bo:bo + bl]
676
            self.offset += bl
677
            self.length -= bl
678
            return data
679
        else:
680
            raise StopIteration
681
    
682
    def next(self):
683
        if len(self.ranges) == 1:
684
            return self.part_iterator()
685
        if self.range_index == len(self.ranges):
686
            raise StopIteration
687
        try:
688
            if self.range_index == -1:
689
                raise StopIteration
690
            return self.part_iterator()
691
        except StopIteration:
692
            self.range_index += 1
693
            out = []
694
            if self.range_index < len(self.ranges):
695
                # Part header.
696
                self.offset, self.length = self.ranges[self.range_index]
697
                self.file_index = 0
698
                if self.range_index > 0:
699
                    out.append('')
700
                out.append('--' + self.boundary)
701
                out.append('Content-Range: bytes %d-%d/%d' % (self.offset, self.offset + self.length - 1, self.size))
702
                out.append('Content-Transfer-Encoding: binary')
703
                out.append('')
704
                out.append('')
705
                return '\r\n'.join(out)
706
            else:
707
                # Footer.
708
                out.append('')
709
                out.append('--' + self.boundary + '--')
710
                out.append('')
711
                return '\r\n'.join(out)
712

    
713
def object_data_response(request, sizes, hashmaps, meta, public=False):
714
    """Get the HttpResponse object for replying with the object's data."""
715
    
716
    # Range handling.
717
    size = sum(sizes)
718
    ranges = get_range(request, size)
719
    if ranges is None:
720
        ranges = [(0, size)]
721
        ret = 200
722
    else:
723
        check = [True for offset, length in ranges if
724
                    length <= 0 or length > size or
725
                    offset < 0 or offset >= size or
726
                    offset + length > size]
727
        if len(check) > 0:
728
            raise RangeNotSatisfiable('Requested range exceeds object limits')
729
        ret = 206
730
        if_range = request.META.get('HTTP_IF_RANGE')
731
        if if_range:
732
            try:
733
                # Modification time has passed instead.
734
                last_modified = parse_http_date(if_range)
735
                if last_modified != meta['modified']:
736
                    ranges = [(0, size)]
737
                    ret = 200
738
            except ValueError:
739
                if if_range != meta['checksum']:
740
                    ranges = [(0, size)]
741
                    ret = 200
742
    
743
    if ret == 206 and len(ranges) > 1:
744
        boundary = uuid.uuid4().hex
745
    else:
746
        boundary = ''
747
    wrapper = ObjectWrapper(request.backend, ranges, sizes, hashmaps, boundary)
748
    response = HttpResponse(wrapper, status=ret)
749
    put_object_headers(response, meta, public)
750
    if ret == 206:
751
        if len(ranges) == 1:
752
            offset, length = ranges[0]
753
            response['Content-Length'] = length # Update with the correct length.
754
            response['Content-Range'] = 'bytes %d-%d/%d' % (offset, offset + length - 1, size)
755
        else:
756
            del(response['Content-Length'])
757
            response['Content-Type'] = 'multipart/byteranges; boundary=%s' % (boundary,)
758
    return response
759

    
760
def put_object_block(request, hashmap, data, offset):
761
    """Put one block of data at the given offset."""
762
    
763
    bi = int(offset / request.backend.block_size)
764
    bo = offset % request.backend.block_size
765
    bl = min(len(data), request.backend.block_size - bo)
766
    if bi < len(hashmap):
767
        hashmap[bi] = request.backend.update_block(hashmap[bi], data[:bl], bo)
768
    else:
769
        hashmap.append(request.backend.put_block(('\x00' * bo) + data[:bl]))
770
    return bl # Return ammount of data written.
771

    
772
def hashmap_md5(backend, hashmap, size):
773
    """Produce the MD5 sum from the data in the hashmap."""
774
    
775
    # TODO: Search backend for the MD5 of another object with the same hashmap and size...
776
    md5 = hashlib.md5()
777
    bs = backend.block_size
778
    for bi, hash in enumerate(hashmap):
779
        data = backend.get_block(hash) # Blocks come in padded.
780
        if bi == len(hashmap) - 1:
781
            data = data[:size % bs]
782
        md5.update(data)
783
    return md5.hexdigest().lower()
784

    
785
def simple_list_response(request, l):
786
    if request.serialization == 'text':
787
        return '\n'.join(l) + '\n'
788
    if request.serialization == 'xml':
789
        return render_to_string('items.xml', {'items': l})
790
    if request.serialization == 'json':
791
        return json.dumps(l)
792

    
793

    
794
def _get_backend():
795
    backend = connect_backend(db_module=BACKEND_DB_MODULE,
796
                              db_connection=BACKEND_DB_CONNECTION,
797
                              block_module=BACKEND_BLOCK_MODULE,
798
                              block_path=BACKEND_BLOCK_PATH,
799
                              block_umask=BACKEND_BLOCK_UMASK,
800
                              queue_module=BACKEND_QUEUE_MODULE,
801
                              queue_connection=BACKEND_QUEUE_CONNECTION)
802
    backend.default_policy['quota'] = BACKEND_QUOTA
803
    backend.default_policy['versioning'] = BACKEND_VERSIONING
804
    return backend
805

    
806

    
807
def _pooled_backend_close(backend):
808
    backend._pool.pool_put(backend)
809

    
810

    
811
from synnefo.lib.pool import ObjectPool
812
from new import instancemethod
813
from select import select
814
from traceback import print_exc
815

    
816
USAGE_LIMIT = 500
817
POOL_SIZE = 5
818

    
819
class PithosBackendPool(ObjectPool):
820
    def _pool_create(self):
821
        backend = _get_backend()
822
        backend._real_close = backend.close
823
        backend.close = instancemethod(_pooled_backend_close, backend,
824
                                       type(backend))
825
        backend._pool = self
826
        backend._use_count = USAGE_LIMIT
827
        return backend
828

    
829
    def _pool_verify(self, backend):
830
        wrapper = backend.wrapper
831
        conn = wrapper.conn
832
        if conn.closed:
833
            return False
834

    
835
        if conn.in_transaction():
836
            conn.close()
837
            return False
838

    
839
        try:
840
            fd = conn.connection.connection.fileno()
841
            r, w, x = select([fd], (), (), 0)
842
            if r:
843
                conn.close()
844
                return False
845
        except:
846
            print_exc()
847
            return False
848

    
849
        return True
850

    
851
    def _pool_cleanup(self, backend):
852
        c = backend._use_count - 1
853
        if c < 0:
854
            backend._real_close()
855
            return True
856

    
857
        backend._use_count = c
858
        wrapper = backend.wrapper
859
        if wrapper.trans is not None:
860
            conn = wrapper.conn
861
            if conn.closed:
862
                wrapper.trans = None
863
            else:
864
                wrapper.rollback()
865
        if backend.messages:
866
            backend.messages = []
867
        return False
868

    
869
_pithos_backend_pool = PithosBackendPool(size=POOL_SIZE)
870

    
871

    
872
def get_backend():
873
    return _pithos_backend_pool.pool_get()
874

    
875

    
876
def update_request_headers(request):
877
    # Handle URL-encoded keys and values.
878
    meta = dict([(k, v) for k, v in request.META.iteritems() if k.startswith('HTTP_')])
879
    for k, v in meta.iteritems():
880
        try:
881
            k.decode('ascii')
882
            v.decode('ascii')
883
        except UnicodeDecodeError:
884
            raise BadRequest('Bad character in headers.')
885
        if '%' in k or '%' in v:
886
            del(request.META[k])
887
            request.META[unquote(k)] = smart_unicode(unquote(v), strings_only=True)
888

    
889
def update_response_headers(request, response):
890
    if request.serialization == 'xml':
891
        response['Content-Type'] = 'application/xml; charset=UTF-8'
892
    elif request.serialization == 'json':
893
        response['Content-Type'] = 'application/json; charset=UTF-8'
894
    elif not response['Content-Type']:
895
        response['Content-Type'] = 'text/plain; charset=UTF-8'
896
    
897
    if (not response.has_header('Content-Length') and
898
        not (response.has_header('Content-Type') and
899
             response['Content-Type'].startswith('multipart/byteranges'))):
900
        response['Content-Length'] = len(response.content)
901
    
902
    # URL-encode unicode in headers.
903
    meta = response.items()
904
    for k, v in meta:
905
        if (k.startswith('X-Account-') or k.startswith('X-Container-') or
906
            k.startswith('X-Object-') or k.startswith('Content-')):
907
            del(response[k])
908
            response[quote(k)] = quote(v, safe='/=,:@; ')
909

    
910
def render_fault(request, fault):
911
    if isinstance(fault, InternalServerError) and settings.DEBUG:
912
        fault.details = format_exc(fault)
913
    
914
    request.serialization = 'text'
915
    data = fault.message + '\n'
916
    if fault.details:
917
        data += '\n' + fault.details
918
    response = HttpResponse(data, status=fault.code)
919
    update_response_headers(request, response)
920
    return response
921

    
922
def request_serialization(request, format_allowed=False):
923
    """Return the serialization format requested.
924
    
925
    Valid formats are 'text' and 'json', 'xml' if 'format_allowed' is True.
926
    """
927
    
928
    if not format_allowed:
929
        return 'text'
930
    
931
    format = request.GET.get('format')
932
    if format == 'json':
933
        return 'json'
934
    elif format == 'xml':
935
        return 'xml'
936
    
937
    for item in request.META.get('HTTP_ACCEPT', '').split(','):
938
        accept, sep, rest = item.strip().partition(';')
939
        if accept == 'application/json':
940
            return 'json'
941
        elif accept == 'application/xml' or accept == 'text/xml':
942
            return 'xml'
943
    
944
    return 'text'
945

    
946

    
947
def api_method(http_method=None, format_allowed=False, user_required=True):
948
    """Decorator function for views that implement an API method."""
949
    
950
    def decorator(func):
951
        @wraps(func)
952
        def wrapper(request, *args, **kwargs):
953
            try:
954
                if http_method and request.method != http_method:
955
                    raise BadRequest('Method not allowed.')
956
                
957
                if user_required:
958
                    token = None
959
                    if request.method in ('HEAD', 'GET') and COOKIE_NAME in request.COOKIES:
960
                        cookie_value = unquote(request.COOKIES.get(COOKIE_NAME, ''))
961
                        if cookie_value and '|' in cookie_value:
962
                            token = cookie_value.split('|', 1)[1]
963
                    get_user(request, AUTHENTICATION_URL, AUTHENTICATION_USERS, token)
964
                    if  getattr(request, 'user', None) is None:
965
                        raise Unauthorized('Access denied')
966
                
967
                # The args variable may contain up to (account, container, object).
968
                if len(args) > 1 and len(args[1]) > 256:
969
                    raise BadRequest('Container name too large.')
970
                if len(args) > 2 and len(args[2]) > 1024:
971
                    raise BadRequest('Object name too large.')
972
                
973
                # Format and check headers.
974
                update_request_headers(request)
975
                
976
                # Fill in custom request variables.
977
                request.serialization = request_serialization(request, format_allowed)
978
                request.backend = get_backend()
979
                
980
                response = func(request, *args, **kwargs)
981
                update_response_headers(request, response)
982
                return response
983
            except Fault, fault:
984
                return render_fault(request, fault)
985
            except BaseException, e:
986
                logger.exception('Unexpected error: %s' % e)
987
                fault = InternalServerError('Unexpected error: %s' % e)
988
                return render_fault(request, fault)
989
            finally:
990
                if getattr(request, 'backend', None) is not None:
991
                    request.backend.close()
992
        return wrapper
993
    return decorator