Revision 3a19e99b snf-pithos-app/pithos/api/test/__init__.py

b/snf-pithos-app/pithos/api/test/__init__.py
39 39

  
40 40
from snf_django.utils.testing import with_settings, astakos_user
41 41

  
42
from pithos.backends.random_word import get_random_word
43 42
from pithos.api import settings as pithos_settings
43
from pithos.api.test.util import is_date, get_random_data
44 44

  
45 45
from synnefo.lib.services import get_service_path
46 46
from synnefo.lib import join_urls
......
51 51

  
52 52
import django.utils.simplejson as json
53 53

  
54
import re
55 54
import random
56 55
import threading
57 56
import functools
58 57

  
58

  
59 59
pithos_test_settings = functools.partial(with_settings, pithos_settings)
60 60

  
61 61
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
......
87 87

  
88 88

  
89 89
class PithosAPITest(TestCase):
90
    #TODO unauthorized request
91 90
    def setUp(self):
92 91
        pithos_settings.BACKEND_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
93 92
        pithos_settings.BACKEND_DB_CONNECTION = django_to_sqlalchemy()
......
212 211
                if not k.startswith('X-Account-Group-')])
213 212
        return headers
214 213

  
214
    def get_container_info(self, container, until=None):
215
        url = join_urls(self.pithos_path, self.user, container)
216
        if until is not None:
217
            parts = list(urlsplit(url))
218
            parts[3] = urlencode({
219
                'until': until
220
            })
221
            url = urlunsplit(parts)
222
        r = self.head(url)
223
        self.assertEqual(r.status_code, 204)
224
        return r
225

  
226
    def get_container_meta(self, container, until=None):
227
        r = self.get_container_info(container, until=until)
228
        headers = dict(r._headers.values())
229
        map(headers.pop,
230
            [k for k in headers.keys()
231
                if not k.startswith('X-Container-Meta-')])
232
        return headers
233

  
234
    def update_container_meta(self, container, meta):
235
        kwargs = dict(
236
            ('HTTP_X_CONTAINER_META_%s' % k, str(v)) for k, v in meta.items())
237
        url = join_urls(self.pithos_path, self.user, container)
238
        r = self.post('%s?update=' % url, **kwargs)
239
        self.assertEqual(r.status_code, 202)
240
        container_meta = self.get_container_meta(container)
241
        (self.assertTrue('X-Container-Meta-%s' % k in container_meta) for
242
            k in meta.keys())
243
        (self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for
244
            k, v in meta.items())
245

  
215 246
    def list_containers(self, format='json', headers={}, **params):
216 247
        _url = join_urls(self.pithos_path, self.user)
217 248
        parts = list(urlsplit(_url))
......
254 285
        self.assertTrue(r.status_code in (202, 201))
255 286
        return r
256 287

  
257
    def upload_object(self, cname, oname=None, **meta):
258
        oname = oname or get_random_word(8)
259
        data = get_random_word(length=random.randint(1, 1024))
288
    def upload_object(self, cname, oname=None, length=1024, verify=True,
289
                      **meta):
290
        oname = oname or get_random_data(8)
291
        length = length or random.randint(1, 1024)
292
        data = get_random_data(length=length)
260 293
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
261 294
                       for k, v in meta.iteritems())
262 295
        url = join_urls(self.pithos_path, self.user, cname, oname)
263 296
        r = self.put(url, data=data, **headers)
264
        self.assertEqual(r.status_code, 201)
297
        if verify:
298
            self.assertEqual(r.status_code, 201)
299
        return oname, data, r
300

  
301
    def update_object_data(self, cname, oname=None, length=None,
302
                           content_type=None, content_range=None,
303
                           verify=True, **meta):
304
        oname = oname or get_random_data(8)
305
        length = length or random.randint(1, 1024)
306
        content_type = content_type or 'application/octet-stream'
307
        data = get_random_data(length=length)
308
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
309
                       for k, v in meta.iteritems())
310
        if content_range:
311
            headers['HTTP_CONTENT_RANGE'] = content_range
312
        url = join_urls(self.pithos_path, self.user, cname, oname)
313
        r = self.post(url, data=data, content_type=content_type, **headers)
314
        if verify:
315
            self.assertEqual(r.status_code, 204)
265 316
        return oname, data, r
266 317

  
267
    def create_folder(self, cname, oname=get_random_word(8), **headers):
318
    def append_object_data(self, cname, oname=None, length=None,
319
                           content_type=None):
320
        return self.update_object_data(cname, oname=oname,
321
                                       length=length,
322
                                       content_type=content_type,
323
                                       content_range='bytes */*')
324

  
325
    def create_folder(self, cname, oname=None, **headers):
326
        oname = oname or get_random_data(8)
268 327
        url = join_urls(self.pithos_path, self.user, cname, oname)
269 328
        r = self.put(url, data='', content_type='application/directory',
270 329
                     **headers)
271 330
        self.assertEqual(r.status_code, 201)
272 331
        return oname, r
273 332

  
274
    def list_objects(self, cname):
333
    def list_objects(self, cname, prefix=None):
275 334
        url = join_urls(self.pithos_path, self.user, cname)
276
        r = self.get('%s?format=json' % url)
335
        path = '%s?format=json' % url
336
        if prefix is not None:
337
            path = '%s&prefix=%s' % (path, prefix)
338
        r = self.get(path)
277 339
        self.assertTrue(r.status_code in (200, 204))
278 340
        try:
279 341
            objects = json.loads(r.content)
......
281 343
            self.fail('json format expected')
282 344
        return objects
283 345

  
346
    def get_object_info(self, container, object, version=None, until=None):
347
        url = join_urls(self.pithos_path, self.user, container, object)
348
        if until is not None:
349
            parts = list(urlsplit(url))
350
            parts[3] = urlencode({
351
                'until': until
352
            })
353
            url = urlunsplit(parts)
354
        if version:
355
            url = '%s?version=%s' % (url, version)
356
        r = self.head(url)
357
        self.assertEqual(r.status_code, 200)
358
        return r
359

  
360
    def get_object_meta(self, container, object, version=None, until=None):
361
        r = self.get_object_info(container, object, version, until=until)
362
        headers = dict(r._headers.values())
363
        map(headers.pop,
364
            [k for k in headers.keys()
365
                if not k.startswith('X-Object-Meta-')])
366
        return headers
367

  
368
    def update_object_meta(self, container, object, meta):
369
        kwargs = dict(
370
            ('HTTP_X_OBJECT_META_%s' % k, str(v)) for k, v in meta.items())
371
        url = join_urls(self.pithos_path, self.user, container, object)
372
        r = self.post('%s?update=' % url, content_type='', **kwargs)
373
        self.assertEqual(r.status_code, 202)
374
        object_meta = self.get_object_meta(container, object)
375
        (self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for
376
            k in meta.keys())
377
        (self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
378
            k, v in meta.items())
379

  
284 380
    def assert_status(self, status, codes):
285 381
        l = [elem for elem in return_codes]
286 382
        if isinstance(codes, list):
......
339 435
            assert(k in map), '%s not in map' % k
340 436
            assert v == map[k]
341 437

  
438

  
439
class AssertUUidInvariant(object):
440
    def __init__(self, callable, *args, **kwargs):
441
        self.callable = callable
442
        self.args = args
443
        self.kwargs = kwargs
444

  
445
    def __enter__(self):
446
        self.map = self.callable(*self.args, **self.kwargs)
447
        assert('x-object-uuid' in self.map)
448
        self.uuid = self.map['x-object-uuid']
449
        return self.map
450

  
451
    def __exit__(self, type, value, tb):
452
        map = self.callable(*self.args, **self.kwargs)
453
        assert('x-object-uuid' in self.map)
454
        uuid = map['x-object-uuid']
455
        assert(uuid == self.uuid)
456

  
457

  
342 458
django_sqlalchemy_engines = {
343 459
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
344 460
    'django.db.backends.postgresql': 'postgresql',
......
365 481
        return '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d
366 482

  
367 483

  
368
def is_date(date):
369
    __D = r'(?P<day>\d{2})'
370
    __D2 = r'(?P<day>[ \d]\d)'
371
    __M = r'(?P<mon>\w{3})'
372
    __Y = r'(?P<year>\d{4})'
373
    __Y2 = r'(?P<year>\d{2})'
374
    __T = r'(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})'
375
    RFC1123_DATE = re.compile(r'^\w{3}, %s %s %s %s GMT$' % (
376
        __D, __M, __Y, __T))
377
    RFC850_DATE = re.compile(r'^\w{6,9}, %s-%s-%s %s GMT$' % (
378
        __D, __M, __Y2, __T))
379
    ASCTIME_DATE = re.compile(r'^\w{3} %s %s %s %s$' % (
380
        __M, __D2, __T, __Y))
381
    for regex in RFC1123_DATE, RFC850_DATE, ASCTIME_DATE:
382
        m = regex.match(date)
383
        if m is not None:
384
            return True
385
    return False
386

  
387

  
388
def strnextling(prefix):
389
    """Return the first unicode string
390
       greater than but not starting with given prefix.
391
       strnextling('hello') -> 'hellp'
392
    """
393
    if not prefix:
394
        ## all strings start with the null string,
395
        ## therefore we have to approximate strnextling('')
396
        ## with the last unicode character supported by python
397
        ## 0x10ffff for wide (32-bit unicode) python builds
398
        ## 0x00ffff for narrow (16-bit unicode) python builds
399
        ## We will not autodetect. 0xffff is safe enough.
400
        return unichr(0xffff)
401
    s = prefix[:-1]
402
    c = ord(prefix[-1])
403
    if c >= 0xffff:
404
        raise RuntimeError
405
    s += unichr(c + 1)
406
    return s
407

  
408

  
409 484
def test_concurrently(times=2):
410 485
    """
411 486
    Add this decorator to small pieces of code that you want to test

Also available in: Unified diff