Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / util.py @ 57e09fb4

History | View | Annotate | Download (37.6 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from functools import wraps
35
from time import time
36
from traceback import format_exc
37
from wsgiref.handlers import format_date_time
38
from binascii import hexlify, unhexlify
39
from datetime import datetime, tzinfo, timedelta
40
from urllib import quote, unquote
41

    
42
from django.conf import settings
43
from django.http import HttpResponse
44
from django.template.loader import render_to_string
45
from django.utils import simplejson as json
46
from django.utils.http import http_date, parse_etags
47
from django.utils.encoding import smart_unicode, smart_str
48
from django.core.files.uploadhandler import FileUploadHandler
49
from django.core.files.uploadedfile import UploadedFile
50

    
51
from synnefo.lib.parsedate import parse_http_date_safe, parse_http_date
52
from synnefo.lib.astakos import get_user
53

    
54
from pithos.api.faults import (
55
    Fault, NotModified, BadRequest, Unauthorized, Forbidden, ItemNotFound,
56
    Conflict, LengthRequired, PreconditionFailed, RequestEntityTooLarge,
57
    RangeNotSatisfiable, InternalServerError, NotImplemented)
58
from pithos.api.short_url import encode_url
59
from pithos.api.settings import (BACKEND_DB_MODULE, BACKEND_DB_CONNECTION,
60
                                 BACKEND_BLOCK_MODULE, BACKEND_BLOCK_PATH,
61
                                 BACKEND_BLOCK_UMASK,
62
                                 BACKEND_QUEUE_MODULE, BACKEND_QUEUE_HOSTS,
63
                                 BACKEND_QUEUE_EXCHANGE,
64
                                 PITHOS_QUOTAHOLDER_URL,
65
                                 BACKEND_QUOTA, BACKEND_VERSIONING,
66
                                 AUTHENTICATION_URL, AUTHENTICATION_USERS,
67
                                 SERVICE_TOKEN, COOKIE_NAME)
68

    
69
from pithos.backends import connect_backend
70
from pithos.backends.base import NotAllowedError, QuotaError, ItemNotExists, VersionNotExists
71

    
72
import logging
73
import re
74
import hashlib
75
import uuid
76
import decimal
77

    
78

    
79
logger = logging.getLogger(__name__)
80

    
81

    
82
class UTC(tzinfo):
83
    def utcoffset(self, dt):
84
        return timedelta(0)
85

    
86
    def tzname(self, dt):
87
        return 'UTC'
88

    
89
    def dst(self, dt):
90
        return timedelta(0)
91

    
92

    
93
def json_encode_decimal(obj):
94
    if isinstance(obj, decimal.Decimal):
95
        return str(obj)
96
    raise TypeError(repr(obj) + " is not JSON serializable")
97

    
98

    
99
def isoformat(d):
100
    """Return an ISO8601 date string that includes a timezone."""
101

    
102
    return d.replace(tzinfo=UTC()).isoformat()
103

    
104

    
105
def rename_meta_key(d, old, new):
106
    if old not in d:
107
        return
108
    d[new] = d[old]
109
    del(d[old])
110

    
111

    
112
def printable_header_dict(d):
113
    """Format a meta dictionary for printing out json/xml.
114

115
    Convert all keys to lower case and replace dashes with underscores.
116
    Format 'last_modified' timestamp.
117
    """
118

    
119
    if 'last_modified' in d and d['last_modified']:
120
        d['last_modified'] = isoformat(
121
            datetime.fromtimestamp(d['last_modified']))
122
    return dict([(k.lower().replace('-', '_'), v) for k, v in d.iteritems()])
123

    
124

    
125
def format_header_key(k):
126
    """Convert underscores to dashes and capitalize intra-dash strings."""
127
    return '-'.join([x.capitalize() for x in k.replace('_', '-').split('-')])
128

    
129

    
130
def get_header_prefix(request, prefix):
131
    """Get all prefix-* request headers in a dict. Reformat keys with format_header_key()."""
132

    
133
    prefix = 'HTTP_' + prefix.upper().replace('-', '_')
134
    # TODO: Document or remove '~' replacing.
135
    return dict([(format_header_key(k[5:]), v.replace('~', '')) for k, v in request.META.iteritems() if k.startswith(prefix) and len(k) > len(prefix)])
136

    
137

    
138
def check_meta_headers(meta):
139
    if len(meta) > 90:
140
        raise BadRequest('Too many headers.')
141
    for k, v in meta.iteritems():
142
        if len(k) > 128:
143
            raise BadRequest('Header name too large.')
144
        if len(v) > 256:
145
            raise BadRequest('Header value too large.')
146

    
147

    
148
def get_account_headers(request):
149
    meta = get_header_prefix(request, 'X-Account-Meta-')
150
    check_meta_headers(meta)
151
    groups = {}
152
    for k, v in get_header_prefix(request, 'X-Account-Group-').iteritems():
153
        n = k[16:].lower()
154
        if '-' in n or '_' in n:
155
            raise BadRequest('Bad characters in group name')
156
        groups[n] = v.replace(' ', '').split(',')
157
        while '' in groups[n]:
158
            groups[n].remove('')
159
    return meta, groups
160

    
161

    
162
def put_account_headers(response, meta, groups, policy):
163
    if 'count' in meta:
164
        response['X-Account-Container-Count'] = meta['count']
165
    if 'bytes' in meta:
166
        response['X-Account-Bytes-Used'] = meta['bytes']
167
    response['Last-Modified'] = http_date(int(meta['modified']))
168
    for k in [x for x in meta.keys() if x.startswith('X-Account-Meta-')]:
169
        response[smart_str(
170
            k, strings_only=True)] = smart_str(meta[k], strings_only=True)
171
    if 'until_timestamp' in meta:
172
        response['X-Account-Until-Timestamp'] = http_date(
173
            int(meta['until_timestamp']))
174
    for k, v in groups.iteritems():
175
        k = smart_str(k, strings_only=True)
176
        k = format_header_key('X-Account-Group-' + k)
177
        v = smart_str(','.join(v), strings_only=True)
178
        response[k] = v
179
    for k, v in policy.iteritems():
180
        response[smart_str(format_header_key('X-Account-Policy-' + k), strings_only=True)] = smart_str(v, strings_only=True)
181

    
182

    
183
def get_container_headers(request):
184
    meta = get_header_prefix(request, 'X-Container-Meta-')
185
    check_meta_headers(meta)
186
    policy = dict([(k[19:].lower(), v.replace(' ', '')) for k, v in get_header_prefix(request, 'X-Container-Policy-').iteritems()])
187
    return meta, policy
188

    
189

    
190
def put_container_headers(request, response, meta, policy):
191
    if 'count' in meta:
192
        response['X-Container-Object-Count'] = meta['count']
193
    if 'bytes' in meta:
194
        response['X-Container-Bytes-Used'] = meta['bytes']
195
    response['Last-Modified'] = http_date(int(meta['modified']))
196
    for k in [x for x in meta.keys() if x.startswith('X-Container-Meta-')]:
197
        response[smart_str(
198
            k, strings_only=True)] = smart_str(meta[k], strings_only=True)
199
    l = [smart_str(x, strings_only=True) for x in meta['object_meta']
200
         if x.startswith('X-Object-Meta-')]
201
    response['X-Container-Object-Meta'] = ','.join([x[14:] for x in l])
202
    response['X-Container-Block-Size'] = request.backend.block_size
203
    response['X-Container-Block-Hash'] = request.backend.hash_algorithm
204
    if 'until_timestamp' in meta:
205
        response['X-Container-Until-Timestamp'] = http_date(
206
            int(meta['until_timestamp']))
207
    for k, v in policy.iteritems():
208
        response[smart_str(format_header_key('X-Container-Policy-' + k), strings_only=True)] = smart_str(v, strings_only=True)
209

    
210

    
211
def get_object_headers(request):
212
    content_type = request.META.get('CONTENT_TYPE', None)
213
    meta = get_header_prefix(request, 'X-Object-Meta-')
214
    check_meta_headers(meta)
215
    if request.META.get('HTTP_CONTENT_ENCODING'):
216
        meta['Content-Encoding'] = request.META['HTTP_CONTENT_ENCODING']
217
    if request.META.get('HTTP_CONTENT_DISPOSITION'):
218
        meta['Content-Disposition'] = request.META['HTTP_CONTENT_DISPOSITION']
219
    if request.META.get('HTTP_X_OBJECT_MANIFEST'):
220
        meta['X-Object-Manifest'] = request.META['HTTP_X_OBJECT_MANIFEST']
221
    return content_type, meta, get_sharing(request), get_public(request)
222

    
223

    
224
def put_object_headers(response, meta, restricted=False):
225
    response['ETag'] = meta['checksum']
226
    response['Content-Length'] = meta['bytes']
227
    response['Content-Type'] = meta.get('type', 'application/octet-stream')
228
    response['Last-Modified'] = http_date(int(meta['modified']))
229
    if not restricted:
230
        response['X-Object-Hash'] = meta['hash']
231
        response['X-Object-UUID'] = meta['uuid']
232
        response['X-Object-Modified-By'] = smart_str(
233
            meta['modified_by'], strings_only=True)
234
        response['X-Object-Version'] = meta['version']
235
        response['X-Object-Version-Timestamp'] = http_date(
236
            int(meta['version_timestamp']))
237
        for k in [x for x in meta.keys() if x.startswith('X-Object-Meta-')]:
238
            response[smart_str(
239
                k, strings_only=True)] = smart_str(meta[k], strings_only=True)
240
        for k in (
241
            'Content-Encoding', 'Content-Disposition', 'X-Object-Manifest',
242
            'X-Object-Sharing', 'X-Object-Shared-By', 'X-Object-Allowed-To',
243
                'X-Object-Public'):
244
            if k in meta:
245
                response[k] = smart_str(meta[k], strings_only=True)
246
    else:
247
        for k in ('Content-Encoding', 'Content-Disposition'):
248
            if k in meta:
249
                response[k] = smart_str(meta[k], strings_only=True)
250

    
251

    
252
def update_manifest_meta(request, v_account, meta):
253
    """Update metadata if the object has an X-Object-Manifest."""
254

    
255
    if 'X-Object-Manifest' in meta:
256
        etag = ''
257
        bytes = 0
258
        try:
259
            src_container, src_name = split_container_object_string(
260
                '/' + meta['X-Object-Manifest'])
261
            objects = request.backend.list_objects(
262
                request.user_uniq, v_account,
263
                src_container, prefix=src_name, virtual=False)
264
            for x in objects:
265
                src_meta = request.backend.get_object_meta(request.user_uniq,
266
                                                           v_account, src_container, x[0], 'pithos', x[1])
267
                etag += src_meta['checksum']
268
                bytes += src_meta['bytes']
269
        except:
270
            # Ignore errors.
271
            return
272
        meta['bytes'] = bytes
273
        md5 = hashlib.md5()
274
        md5.update(etag)
275
        meta['checksum'] = md5.hexdigest().lower()
276

    
277

    
278
def update_sharing_meta(request, permissions, v_account, v_container, v_object, meta):
279
    if permissions is None:
280
        return
281
    allowed, perm_path, perms = permissions
282
    if len(perms) == 0:
283
        return
284
    ret = []
285
    r = ','.join(perms.get('read', []))
286
    if r:
287
        ret.append('read=' + r)
288
    w = ','.join(perms.get('write', []))
289
    if w:
290
        ret.append('write=' + w)
291
    meta['X-Object-Sharing'] = '; '.join(ret)
292
    if '/'.join((v_account, v_container, v_object)) != perm_path:
293
        meta['X-Object-Shared-By'] = perm_path
294
    if request.user_uniq != v_account:
295
        meta['X-Object-Allowed-To'] = allowed
296

    
297

    
298
def update_public_meta(public, meta):
299
    if not public:
300
        return
301
    meta['X-Object-Public'] = '/public/' + encode_url(public)
302

    
303

    
304
def validate_modification_preconditions(request, meta):
305
    """Check that the modified timestamp conforms with the preconditions set."""
306

    
307
    if 'modified' not in meta:
308
        return  # TODO: Always return?
309

    
310
    if_modified_since = request.META.get('HTTP_IF_MODIFIED_SINCE')
311
    if if_modified_since is not None:
312
        if_modified_since = parse_http_date_safe(if_modified_since)
313
    if if_modified_since is not None and int(meta['modified']) <= if_modified_since:
314
        raise NotModified('Resource has not been modified')
315

    
316
    if_unmodified_since = request.META.get('HTTP_IF_UNMODIFIED_SINCE')
317
    if if_unmodified_since is not None:
318
        if_unmodified_since = parse_http_date_safe(if_unmodified_since)
319
    if if_unmodified_since is not None and int(meta['modified']) > if_unmodified_since:
320
        raise PreconditionFailed('Resource has been modified')
321

    
322

    
323
def validate_matching_preconditions(request, meta):
324
    """Check that the ETag conforms with the preconditions set."""
325

    
326
    etag = meta['checksum']
327
    if not etag:
328
        etag = None
329

    
330
    if_match = request.META.get('HTTP_IF_MATCH')
331
    if if_match is not None:
332
        if etag is None:
333
            raise PreconditionFailed('Resource does not exist')
334
        if if_match != '*' and etag not in [x.lower() for x in parse_etags(if_match)]:
335
            raise PreconditionFailed('Resource ETag does not match')
336

    
337
    if_none_match = request.META.get('HTTP_IF_NONE_MATCH')
338
    if if_none_match is not None:
339
        # TODO: If this passes, must ignore If-Modified-Since header.
340
        if etag is not None:
341
            if if_none_match == '*' or etag in [x.lower() for x in parse_etags(if_none_match)]:
342
                # TODO: Continue if an If-Modified-Since header is present.
343
                if request.method in ('HEAD', 'GET'):
344
                    raise NotModified('Resource ETag matches')
345
                raise PreconditionFailed('Resource exists or ETag matches')
346

    
347

    
348
def split_container_object_string(s):
349
    if not len(s) > 0 or s[0] != '/':
350
        raise ValueError
351
    s = s[1:]
352
    pos = s.find('/')
353
    if pos == -1 or pos == len(s) - 1:
354
        raise ValueError
355
    return s[:pos], s[(pos + 1):]
356

    
357

    
358
def copy_or_move_object(request, src_account, src_container, src_name, dest_account, dest_container, dest_name, move=False, delimiter=None):
359
    """Copy or move an object."""
360

    
361
    if 'ignore_content_type' in request.GET and 'CONTENT_TYPE' in request.META:
362
        del(request.META['CONTENT_TYPE'])
363
    content_type, meta, permissions, public = get_object_headers(request)
364
    src_version = request.META.get('HTTP_X_SOURCE_VERSION')
365
    try:
366
        if move:
367
            version_id = request.backend.move_object(
368
                request.user_uniq, src_account, src_container, src_name,
369
                dest_account, dest_container, dest_name,
370
                content_type, 'pithos', meta, False, permissions, delimiter)
371
        else:
372
            version_id = request.backend.copy_object(
373
                request.user_uniq, src_account, src_container, src_name,
374
                dest_account, dest_container, dest_name,
375
                content_type, 'pithos', meta, False, permissions, src_version, delimiter)
376
    except NotAllowedError:
377
        raise Forbidden('Not allowed')
378
    except (ItemNotExists, VersionNotExists):
379
        raise ItemNotFound('Container or object does not exist')
380
    except ValueError:
381
        raise BadRequest('Invalid sharing header')
382
    except QuotaError:
383
        raise RequestEntityTooLarge('Quota exceeded')
384
    if public is not None:
385
        try:
386
            request.backend.update_object_public(request.user_uniq, dest_account, dest_container, dest_name, public)
387
        except NotAllowedError:
388
            raise Forbidden('Not allowed')
389
        except ItemNotExists:
390
            raise ItemNotFound('Object does not exist')
391
    return version_id
392

    
393

    
394
def get_int_parameter(p):
395
    if p is not None:
396
        try:
397
            p = int(p)
398
        except ValueError:
399
            return None
400
        if p < 0:
401
            return None
402
    return p
403

    
404

    
405
def get_content_length(request):
406
    content_length = get_int_parameter(request.META.get('CONTENT_LENGTH'))
407
    if content_length is None:
408
        raise LengthRequired('Missing or invalid Content-Length header')
409
    return content_length
410

    
411

    
412
def get_range(request, size):
413
    """Parse a Range header from the request.
414

415
    Either returns None, when the header is not existent or should be ignored,
416
    or a list of (offset, length) tuples - should be further checked.
417
    """
418

    
419
    ranges = request.META.get('HTTP_RANGE', '').replace(' ', '')
420
    if not ranges.startswith('bytes='):
421
        return None
422

    
423
    ret = []
424
    for r in (x.strip() for x in ranges[6:].split(',')):
425
        p = re.compile('^(?P<offset>\d*)-(?P<upto>\d*)$')
426
        m = p.match(r)
427
        if not m:
428
            return None
429
        offset = m.group('offset')
430
        upto = m.group('upto')
431
        if offset == '' and upto == '':
432
            return None
433

    
434
        if offset != '':
435
            offset = int(offset)
436
            if upto != '':
437
                upto = int(upto)
438
                if offset > upto:
439
                    return None
440
                ret.append((offset, upto - offset + 1))
441
            else:
442
                ret.append((offset, size - offset))
443
        else:
444
            length = int(upto)
445
            ret.append((size - length, length))
446

    
447
    return ret
448

    
449

    
450
def get_content_range(request):
451
    """Parse a Content-Range header from the request.
452

453
    Either returns None, when the header is not existent or should be ignored,
454
    or an (offset, length, total) tuple - check as length, total may be None.
455
    Returns (None, None, None) if the provided range is '*/*'.
456
    """
457

    
458
    ranges = request.META.get('HTTP_CONTENT_RANGE', '')
459
    if not ranges:
460
        return None
461

    
462
    p = re.compile('^bytes (?P<offset>\d+)-(?P<upto>\d*)/(?P<total>(\d+|\*))$')
463
    m = p.match(ranges)
464
    if not m:
465
        if ranges == 'bytes */*':
466
            return (None, None, None)
467
        return None
468
    offset = int(m.group('offset'))
469
    upto = m.group('upto')
470
    total = m.group('total')
471
    if upto != '':
472
        upto = int(upto)
473
    else:
474
        upto = None
475
    if total != '*':
476
        total = int(total)
477
    else:
478
        total = None
479
    if (upto is not None and offset > upto) or \
480
        (total is not None and offset >= total) or \
481
            (total is not None and upto is not None and upto >= total):
482
        return None
483

    
484
    if upto is None:
485
        length = None
486
    else:
487
        length = upto - offset + 1
488
    return (offset, length, total)
489

    
490

    
491
def get_sharing(request):
492
    """Parse an X-Object-Sharing header from the request.
493

494
    Raises BadRequest on error.
495
    """
496

    
497
    permissions = request.META.get('HTTP_X_OBJECT_SHARING')
498
    if permissions is None:
499
        return None
500

    
501
    # TODO: Document or remove '~' replacing.
502
    permissions = permissions.replace('~', '')
503

    
504
    ret = {}
505
    permissions = permissions.replace(' ', '')
506
    if permissions == '':
507
        return ret
508
    for perm in (x for x in permissions.split(';')):
509
        if perm.startswith('read='):
510
            ret['read'] = list(set(
511
                [v.replace(' ', '').lower() for v in perm[5:].split(',')]))
512
            if '' in ret['read']:
513
                ret['read'].remove('')
514
            if '*' in ret['read']:
515
                ret['read'] = ['*']
516
            if len(ret['read']) == 0:
517
                raise BadRequest('Bad X-Object-Sharing header value')
518
        elif perm.startswith('write='):
519
            ret['write'] = list(set(
520
                [v.replace(' ', '').lower() for v in perm[6:].split(',')]))
521
            if '' in ret['write']:
522
                ret['write'].remove('')
523
            if '*' in ret['write']:
524
                ret['write'] = ['*']
525
            if len(ret['write']) == 0:
526
                raise BadRequest('Bad X-Object-Sharing header value')
527
        else:
528
            raise BadRequest('Bad X-Object-Sharing header value')
529

    
530
    # Keep duplicates only in write list.
531
    dups = [x for x in ret.get(
532
        'read', []) if x in ret.get('write', []) and x != '*']
533
    if dups:
534
        for x in dups:
535
            ret['read'].remove(x)
536
        if len(ret['read']) == 0:
537
            del(ret['read'])
538

    
539
    return ret
540

    
541

    
542
def get_public(request):
543
    """Parse an X-Object-Public header from the request.
544

545
    Raises BadRequest on error.
546
    """
547

    
548
    public = request.META.get('HTTP_X_OBJECT_PUBLIC')
549
    if public is None:
550
        return None
551

    
552
    public = public.replace(' ', '').lower()
553
    if public == 'true':
554
        return True
555
    elif public == 'false' or public == '':
556
        return False
557
    raise BadRequest('Bad X-Object-Public header value')
558

    
559

    
560
def raw_input_socket(request):
561
    """Return the socket for reading the rest of the request."""
562

    
563
    server_software = request.META.get('SERVER_SOFTWARE')
564
    if server_software and server_software.startswith('mod_python'):
565
        return request._req
566
    if 'wsgi.input' in request.environ:
567
        return request.environ['wsgi.input']
568
    raise NotImplemented('Unknown server software')
569

    
570
MAX_UPLOAD_SIZE = 5 * (1024 * 1024 * 1024)  # 5GB
571

    
572

    
573
def socket_read_iterator(request, length=0, blocksize=4096):
574
    """Return a maximum of blocksize data read from the socket in each iteration.
575

576
    Read up to 'length'. If 'length' is negative, will attempt a chunked read.
577
    The maximum ammount of data read is controlled by MAX_UPLOAD_SIZE.
578
    """
579

    
580
    sock = raw_input_socket(request)
581
    if length < 0:  # Chunked transfers
582
        # Small version (server does the dechunking).
583
        if request.environ.get('mod_wsgi.input_chunked', None) or request.META['SERVER_SOFTWARE'].startswith('gunicorn'):
584
            while length < MAX_UPLOAD_SIZE:
585
                data = sock.read(blocksize)
586
                if data == '':
587
                    return
588
                yield data
589
            raise BadRequest('Maximum size is reached')
590

    
591
        # Long version (do the dechunking).
592
        data = ''
593
        while length < MAX_UPLOAD_SIZE:
594
            # Get chunk size.
595
            if hasattr(sock, 'readline'):
596
                chunk_length = sock.readline()
597
            else:
598
                chunk_length = ''
599
                while chunk_length[-1:] != '\n':
600
                    chunk_length += sock.read(1)
601
                chunk_length.strip()
602
            pos = chunk_length.find(';')
603
            if pos >= 0:
604
                chunk_length = chunk_length[:pos]
605
            try:
606
                chunk_length = int(chunk_length, 16)
607
            except Exception, e:
608
                raise BadRequest('Bad chunk size')
609
                                 # TODO: Change to something more appropriate.
610
            # Check if done.
611
            if chunk_length == 0:
612
                if len(data) > 0:
613
                    yield data
614
                return
615
            # Get the actual data.
616
            while chunk_length > 0:
617
                chunk = sock.read(min(chunk_length, blocksize))
618
                chunk_length -= len(chunk)
619
                if length > 0:
620
                    length += len(chunk)
621
                data += chunk
622
                if len(data) >= blocksize:
623
                    ret = data[:blocksize]
624
                    data = data[blocksize:]
625
                    yield ret
626
            sock.read(2)  # CRLF
627
        raise BadRequest('Maximum size is reached')
628
    else:
629
        if length > MAX_UPLOAD_SIZE:
630
            raise BadRequest('Maximum size is reached')
631
        while length > 0:
632
            data = sock.read(min(length, blocksize))
633
            if not data:
634
                raise BadRequest()
635
            length -= len(data)
636
            yield data
637

    
638

    
639
class SaveToBackendHandler(FileUploadHandler):
640
    """Handle a file from an HTML form the django way."""
641

    
642
    def __init__(self, request=None):
643
        super(SaveToBackendHandler, self).__init__(request)
644
        self.backend = request.backend
645

    
646
    def put_data(self, length):
647
        if len(self.data) >= length:
648
            block = self.data[:length]
649
            self.file.hashmap.append(self.backend.put_block(block))
650
            self.md5.update(block)
651
            self.data = self.data[length:]
652

    
653
    def new_file(self, field_name, file_name, content_type, content_length, charset=None):
654
        self.md5 = hashlib.md5()
655
        self.data = ''
656
        self.file = UploadedFile(
657
            name=file_name, content_type=content_type, charset=charset)
658
        self.file.size = 0
659
        self.file.hashmap = []
660

    
661
    def receive_data_chunk(self, raw_data, start):
662
        self.data += raw_data
663
        self.file.size += len(raw_data)
664
        self.put_data(self.request.backend.block_size)
665
        return None
666

    
667
    def file_complete(self, file_size):
668
        l = len(self.data)
669
        if l > 0:
670
            self.put_data(l)
671
        self.file.etag = self.md5.hexdigest().lower()
672
        return self.file
673

    
674

    
675
class ObjectWrapper(object):
676
    """Return the object's data block-per-block in each iteration.
677

678
    Read from the object using the offset and length provided in each entry of the range list.
679
    """
680

    
681
    def __init__(self, backend, ranges, sizes, hashmaps, boundary):
682
        self.backend = backend
683
        self.ranges = ranges
684
        self.sizes = sizes
685
        self.hashmaps = hashmaps
686
        self.boundary = boundary
687
        self.size = sum(self.sizes)
688

    
689
        self.file_index = 0
690
        self.block_index = 0
691
        self.block_hash = -1
692
        self.block = ''
693

    
694
        self.range_index = -1
695
        self.offset, self.length = self.ranges[0]
696

    
697
    def __iter__(self):
698
        return self
699

    
700
    def part_iterator(self):
701
        if self.length > 0:
702
            # Get the file for the current offset.
703
            file_size = self.sizes[self.file_index]
704
            while self.offset >= file_size:
705
                self.offset -= file_size
706
                self.file_index += 1
707
                file_size = self.sizes[self.file_index]
708

    
709
            # Get the block for the current position.
710
            self.block_index = int(self.offset / self.backend.block_size)
711
            if self.block_hash != self.hashmaps[self.file_index][self.block_index]:
712
                self.block_hash = self.hashmaps[
713
                    self.file_index][self.block_index]
714
                try:
715
                    self.block = self.backend.get_block(self.block_hash)
716
                except ItemNotExists:
717
                    raise ItemNotFound('Block does not exist')
718

    
719
            # Get the data from the block.
720
            bo = self.offset % self.backend.block_size
721
            bs = self.backend.block_size
722
            if (self.block_index == len(self.hashmaps[self.file_index]) - 1 and
723
                    self.sizes[self.file_index] % self.backend.block_size):
724
                bs = self.sizes[self.file_index] % self.backend.block_size
725
            bl = min(self.length, bs - bo)
726
            data = self.block[bo:bo + bl]
727
            self.offset += bl
728
            self.length -= bl
729
            return data
730
        else:
731
            raise StopIteration
732

    
733
    def next(self):
734
        if len(self.ranges) == 1:
735
            return self.part_iterator()
736
        if self.range_index == len(self.ranges):
737
            raise StopIteration
738
        try:
739
            if self.range_index == -1:
740
                raise StopIteration
741
            return self.part_iterator()
742
        except StopIteration:
743
            self.range_index += 1
744
            out = []
745
            if self.range_index < len(self.ranges):
746
                # Part header.
747
                self.offset, self.length = self.ranges[self.range_index]
748
                self.file_index = 0
749
                if self.range_index > 0:
750
                    out.append('')
751
                out.append('--' + self.boundary)
752
                out.append('Content-Range: bytes %d-%d/%d' % (
753
                    self.offset, self.offset + self.length - 1, self.size))
754
                out.append('Content-Transfer-Encoding: binary')
755
                out.append('')
756
                out.append('')
757
                return '\r\n'.join(out)
758
            else:
759
                # Footer.
760
                out.append('')
761
                out.append('--' + self.boundary + '--')
762
                out.append('')
763
                return '\r\n'.join(out)
764

    
765

    
766
def object_data_response(request, sizes, hashmaps, meta, public=False):
767
    """Get the HttpResponse object for replying with the object's data."""
768

    
769
    # Range handling.
770
    size = sum(sizes)
771
    ranges = get_range(request, size)
772
    if ranges is None:
773
        ranges = [(0, size)]
774
        ret = 200
775
    else:
776
        check = [True for offset, length in ranges if
777
                 length <= 0 or length > size or
778
                 offset < 0 or offset >= size or
779
                 offset + length > size]
780
        if len(check) > 0:
781
            raise RangeNotSatisfiable('Requested range exceeds object limits')
782
        ret = 206
783
        if_range = request.META.get('HTTP_IF_RANGE')
784
        if if_range:
785
            try:
786
                # Modification time has passed instead.
787
                last_modified = parse_http_date(if_range)
788
                if last_modified != meta['modified']:
789
                    ranges = [(0, size)]
790
                    ret = 200
791
            except ValueError:
792
                if if_range != meta['checksum']:
793
                    ranges = [(0, size)]
794
                    ret = 200
795

    
796
    if ret == 206 and len(ranges) > 1:
797
        boundary = uuid.uuid4().hex
798
    else:
799
        boundary = ''
800
    wrapper = ObjectWrapper(request.backend, ranges, sizes, hashmaps, boundary)
801
    response = HttpResponse(wrapper, status=ret)
802
    put_object_headers(response, meta, public)
803
    if ret == 206:
804
        if len(ranges) == 1:
805
            offset, length = ranges[0]
806
            response[
807
                'Content-Length'] = length  # Update with the correct length.
808
            response['Content-Range'] = 'bytes %d-%d/%d' % (
809
                offset, offset + length - 1, size)
810
        else:
811
            del(response['Content-Length'])
812
            response['Content-Type'] = 'multipart/byteranges; boundary=%s' % (
813
                boundary,)
814
    return response
815

    
816

    
817
def put_object_block(request, hashmap, data, offset):
818
    """Put one block of data at the given offset."""
819

    
820
    bi = int(offset / request.backend.block_size)
821
    bo = offset % request.backend.block_size
822
    bl = min(len(data), request.backend.block_size - bo)
823
    if bi < len(hashmap):
824
        hashmap[bi] = request.backend.update_block(hashmap[bi], data[:bl], bo)
825
    else:
826
        hashmap.append(request.backend.put_block(('\x00' * bo) + data[:bl]))
827
    return bl  # Return ammount of data written.
828

    
829

    
830
def hashmap_md5(backend, hashmap, size):
831
    """Produce the MD5 sum from the data in the hashmap."""
832

    
833
    # TODO: Search backend for the MD5 of another object with the same hashmap and size...
834
    md5 = hashlib.md5()
835
    bs = backend.block_size
836
    for bi, hash in enumerate(hashmap):
837
        data = backend.get_block(hash)  # Blocks come in padded.
838
        if bi == len(hashmap) - 1:
839
            data = data[:size % bs]
840
        md5.update(data)
841
    return md5.hexdigest().lower()
842

    
843

    
844
def simple_list_response(request, l):
845
    if request.serialization == 'text':
846
        return '\n'.join(l) + '\n'
847
    if request.serialization == 'xml':
848
        return render_to_string('items.xml', {'items': l})
849
    if request.serialization == 'json':
850
        return json.dumps(l)
851

    
852

    
853

    
854
def _get_backend():
855
    backend = connect_backend(db_module=BACKEND_DB_MODULE,
856
                              db_connection=BACKEND_DB_CONNECTION,
857
                              block_module=BACKEND_BLOCK_MODULE,
858
                              block_path=BACKEND_BLOCK_PATH,
859
                              block_umask=BACKEND_BLOCK_UMASK,
860
                              queue_module=BACKEND_QUEUE_MODULE,
861
                              queue_hosts=BACKEND_QUEUE_HOSTS,
862
                              queue_exchange=BACKEND_QUEUE_EXCHANGE,
863
                              quotaholder_url=PITHOS_QUOTAHOLDER_URL)
864
    backend.default_policy['quota'] = BACKEND_QUOTA
865
    backend.default_policy['versioning'] = BACKEND_VERSIONING
866
    return backend
867

    
868

    
869
def _pooled_backend_close(backend):
870
    backend._pool.pool_put(backend)
871

    
872

    
873
from synnefo.lib.pool import ObjectPool
874
from new import instancemethod
875
from select import select
876
from traceback import print_exc
877

    
878
USAGE_LIMIT = 500
879
POOL_SIZE = 5
880

    
881
class PithosBackendPool(ObjectPool):
882
    def _pool_create(self):
883
        backend = _get_backend()
884
        backend._real_close = backend.close
885
        backend.close = instancemethod(_pooled_backend_close, backend,
886
                                       type(backend))
887
        backend._pool = self
888
        backend._use_count = USAGE_LIMIT
889
        return backend
890

    
891
    def _pool_verify(self, backend):
892
        wrapper = backend.wrapper
893
        conn = wrapper.conn
894
        if conn.closed:
895
            return False
896

    
897
        if conn.in_transaction():
898
            conn.close()
899
            return False
900

    
901
        try:
902
            fd = conn.connection.connection.fileno()
903
            r, w, x = select([fd], (), (), 0)
904
            if r:
905
                conn.close()
906
                return False
907
        except:
908
            print_exc()
909
            return False
910

    
911
        return True
912

    
913
    def _pool_cleanup(self, backend):
914
        c = backend._use_count - 1
915
        if c < 0:
916
            backend._real_close()
917
            return True
918

    
919
        backend._use_count = c
920
        wrapper = backend.wrapper
921
        if wrapper.trans is not None:
922
            conn = wrapper.conn
923
            if conn.closed:
924
                wrapper.trans = None
925
            else:
926
                wrapper.rollback()
927
        if backend.messages:
928
            backend.messages = []
929
        return False
930

    
931
_pithos_backend_pool = PithosBackendPool(size=POOL_SIZE)
932

    
933

    
934
def get_backend():
935
    return _pithos_backend_pool.pool_get()
936

    
937

    
938
def update_request_headers(request):
939
    # Handle URL-encoded keys and values.
940
    meta = dict([(
941
        k, v) for k, v in request.META.iteritems() if k.startswith('HTTP_')])
942
    for k, v in meta.iteritems():
943
        try:
944
            k.decode('ascii')
945
            v.decode('ascii')
946
        except UnicodeDecodeError:
947
            raise BadRequest('Bad character in headers.')
948
        if '%' in k or '%' in v:
949
            del(request.META[k])
950
            request.META[unquote(k)] = smart_unicode(unquote(
951
                v), strings_only=True)
952

    
953

    
954
def update_response_headers(request, response):
955
    if request.serialization == 'xml':
956
        response['Content-Type'] = 'application/xml; charset=UTF-8'
957
    elif request.serialization == 'json':
958
        response['Content-Type'] = 'application/json; charset=UTF-8'
959
    elif not response['Content-Type']:
960
        response['Content-Type'] = 'text/plain; charset=UTF-8'
961

    
962
    if (not response.has_header('Content-Length') and
963
        not (response.has_header('Content-Type') and
964
             response['Content-Type'].startswith('multipart/byteranges'))):
965
        response['Content-Length'] = len(response.content)
966

    
967
    # URL-encode unicode in headers.
968
    meta = response.items()
969
    for k, v in meta:
970
        if (k.startswith('X-Account-') or k.startswith('X-Container-') or
971
                k.startswith('X-Object-') or k.startswith('Content-')):
972
            del(response[k])
973
            response[quote(k)] = quote(v, safe='/=,:@; ')
974

    
975

    
976
def render_fault(request, fault):
977
    if isinstance(fault, InternalServerError) and settings.DEBUG:
978
        fault.details = format_exc(fault)
979

    
980
    request.serialization = 'text'
981
    data = fault.message + '\n'
982
    if fault.details:
983
        data += '\n' + fault.details
984
    response = HttpResponse(data, status=fault.code)
985
    update_response_headers(request, response)
986
    return response
987

    
988

    
989
def request_serialization(request, format_allowed=False):
990
    """Return the serialization format requested.
991

992
    Valid formats are 'text' and 'json', 'xml' if 'format_allowed' is True.
993
    """
994

    
995
    if not format_allowed:
996
        return 'text'
997

    
998
    format = request.GET.get('format')
999
    if format == 'json':
1000
        return 'json'
1001
    elif format == 'xml':
1002
        return 'xml'
1003

    
1004
    for item in request.META.get('HTTP_ACCEPT', '').split(','):
1005
        accept, sep, rest = item.strip().partition(';')
1006
        if accept == 'application/json':
1007
            return 'json'
1008
        elif accept == 'application/xml' or accept == 'text/xml':
1009
            return 'xml'
1010

    
1011
    return 'text'
1012

    
1013

    
1014
def api_method(http_method=None, format_allowed=False, user_required=True):
1015
    """Decorator function for views that implement an API method."""
1016

    
1017
    def decorator(func):
1018
        @wraps(func)
1019
        def wrapper(request, *args, **kwargs):
1020
            try:
1021
                if http_method and request.method != http_method:
1022
                    raise BadRequest('Method not allowed.')
1023

    
1024
                if user_required:
1025
                    token = None
1026
                    if request.method in ('HEAD', 'GET') and COOKIE_NAME in request.COOKIES:
1027
                        cookie_value = unquote(
1028
                            request.COOKIES.get(COOKIE_NAME, ''))
1029
                        if cookie_value and '|' in cookie_value:
1030
                            token = cookie_value.split('|', 1)[1]
1031
                    get_user(request,
1032
                             AUTHENTICATION_URL, AUTHENTICATION_USERS, token)
1033
                    if  getattr(request, 'user', None) is None:
1034
                        raise Unauthorized('Access denied')
1035

    
1036
                # The args variable may contain up to (account, container, object).
1037
                if len(args) > 1 and len(args[1]) > 256:
1038
                    raise BadRequest('Container name too large.')
1039
                if len(args) > 2 and len(args[2]) > 1024:
1040
                    raise BadRequest('Object name too large.')
1041

    
1042
                # Format and check headers.
1043
                update_request_headers(request)
1044

    
1045
                # Fill in custom request variables.
1046
                request.serialization = request_serialization(
1047
                    request, format_allowed)
1048
                request.backend = get_backend()
1049

    
1050
                response = func(request, *args, **kwargs)
1051
                update_response_headers(request, response)
1052
                return response
1053
            except Fault, fault:
1054
                return render_fault(request, fault)
1055
            except BaseException, e:
1056
                logger.exception('Unexpected error: %s' % e)
1057
                fault = InternalServerError('Unexpected error: %s' % e)
1058
                return render_fault(request, fault)
1059
            finally:
1060
                if getattr(request, 'backend', None) is not None:
1061
                    request.backend.close()
1062
        return wrapper
1063
    return decorator