Revision bc4f25c0

b/snf-common/synnefo/settings/test.py
60 60
CLOUDBAR_LOCATION = '/static/im/cloudbar/'
61 61
CLOUDBAR_SERVICES_URL = '/ui/get_services'
62 62
CLOUDBAR_MENU_URL = '/ui/get_menu'
63

  
64
TEST_RUNNER = 'pithos.api.test.PithosTestSuiteRunner'
b/snf-pithos-app/pithos/api/test/__init__.py
40 40
from snf_django.utils.testing import with_settings, astakos_user
41 41

  
42 42
from pithos.api import settings as pithos_settings
43
from pithos.api.test.util import is_date, get_random_data
43
from pithos.api.test.util import is_date, get_random_data, get_random_name
44
from pithos.backends.migrate import initialize_db
44 45

  
45 46
from synnefo.lib.services import get_service_path
46 47
from synnefo.lib import join_urls
47 48

  
48 49
from django.test import TestCase
50
from django.test.simple import DjangoTestSuiteRunner
49 51
from django.conf import settings
50 52
from django.utils.http import urlencode
53
from django.db.backends.creation import TEST_DATABASE_PREFIX
51 54

  
52 55
import django.utils.simplejson as json
53 56

  
54 57
import random
55
import threading
56 58
import functools
57 59

  
58 60

  
......
83 85
           'object': ('name', 'hash', 'bytes', 'content_type',
84 86
                      'content_encoding', 'last_modified',)}
85 87

  
86
return_codes = (400, 401, 403, 404, 503)
87

  
88 88
TEST_BLOCK_SIZE = 1024
89 89
TEST_HASH_ALGORITHM = 'sha256'
90 90

  
91
BACKEND_DB_CONNECTION = None
91
print 'backend module:', pithos_settings.BACKEND_DB_MODULE
92
print 'backend database engine:', settings.DATABASES['default']['ENGINE']
93
print 'update md5:', pithos_settings.UPDATE_MD5
92 94

  
93 95

  
94
def django_to_sqlalchemy():
95
    """Convert the django default database to sqlalchemy connection string"""
96
django_sqlalchemy_engines = {
97
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
98
    'django.db.backends.postgresql': 'postgresql',
99
    'django.db.backends.mysql': '',
100
    'django.db.backends.sqlite3': 'mssql',
101
    'django.db.backends.oracle': 'oracle'}
96 102

  
97
    global BACKEND_DB_CONNECTION
98
    if BACKEND_DB_CONNECTION:
99
        return BACKEND_DB_CONNECTION
100 103

  
101
    # TODO support for more complex configuration
104
def prepate_db_connection():
105
    """Build pithos backend connection string from django default database"""
106

  
102 107
    db = settings.DATABASES['default']
103
    name = db.get('TEST_NAME', 'test_%s' % db['NAME'])
104
    if db['ENGINE'] == 'django.db.backends.sqlite3':
105
        BACKEND_DB_CONNECTION = 'sqlite:///%s' % name
108
    name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME'])
109

  
110
    if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'):
111
        if db['ENGINE'] == 'django.db.backends.sqlite3':
112
            db_connection = 'sqlite:///%s' % name
113
        else:
114
            d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
115
                     user=db['USER'],
116
                     pwd=db['PASSWORD'],
117
                     host=db['HOST'].lower(),
118
                     port=int(db['PORT']) if db['PORT'] != '' else '',
119
                     name=name)
120
            db_connection = (
121
                '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
122

  
123
            # initialize pithos database
124
            initialize_db(db_connection)
106 125
    else:
107
        d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
108
                 user=db['USER'],
109
                 pwd=db['PASSWORD'],
110
                 host=db['HOST'].lower(),
111
                 port=int(db['PORT']) if db['PORT'] != '' else '',
112
                 name=name)
113
        BACKEND_DB_CONNECTION = (
114
            '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
115
    return BACKEND_DB_CONNECTION
126
        db_connection = name
127
    pithos_settings.BACKEND_DB_CONNECTION = db_connection
128

  
129

  
130
class PithosTestSuiteRunner(DjangoTestSuiteRunner):
131
    def setup_databases(self, **kwargs):
132
        old_names, mirrors = super(PithosTestSuiteRunner,
133
                                   self).setup_databases(**kwargs)
134
        prepate_db_connection()
135
        return old_names, mirrors
116 136

  
117 137

  
118 138
class PithosAPITest(TestCase):
119 139
    def setUp(self):
120
        if (pithos_settings.BACKEND_DB_MODULE ==
121
                'pithos.backends.lib.sqlalchemy'):
122
            pithos_settings.BACKEND_DB_CONNECTION = django_to_sqlalchemy()
123
            pithos_settings.BACKEND_POOL_SIZE = 1
124

  
125 140
        # Override default block size to spead up tests
126 141
        pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE
127 142
        pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM
......
318 333

  
319 334
    def upload_object(self, cname, oname=None, length=None, verify=True,
320 335
                      **meta):
321
        oname = oname or get_random_data(8)
336
        oname = oname or get_random_name()
322 337
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
323 338
        data = get_random_data(length=length)
324 339
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
......
332 347
    def update_object_data(self, cname, oname=None, length=None,
333 348
                           content_type=None, content_range=None,
334 349
                           verify=True, **meta):
335
        oname = oname or get_random_data(8)
350
        oname = oname or get_random_name()
336 351
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
337 352
        content_type = content_type or 'application/octet-stream'
338 353
        data = get_random_data(length=length)
......
354 369
                                       content_range='bytes */*')
355 370

  
356 371
    def create_folder(self, cname, oname=None, **headers):
357
        oname = oname or get_random_data(8)
372
        oname = oname or get_random_name()
358 373
        url = join_urls(self.pithos_path, self.user, cname, oname)
359 374
        r = self.put(url, data='', content_type='application/directory',
360 375
                     **headers)
......
408 423
        (self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
409 424
            k, v in meta.items())
410 425

  
411
    def assert_status(self, status, codes):
412
        l = [elem for elem in return_codes]
413
        if isinstance(codes, list):
414
            l.extend(codes)
415
        else:
416
            l.append(codes)
417
        self.assertTrue(status in l)
418

  
419 426
    def assert_extended(self, data, format, type, size=10000):
420 427
        if format == 'xml':
421 428
            self._assert_xml(data, type, size)
......
484 491
        assert('x-object-uuid' in self.map)
485 492
        uuid = map['x-object-uuid']
486 493
        assert(uuid == self.uuid)
487

  
488

  
489
django_sqlalchemy_engines = {
490
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
491
    'django.db.backends.postgresql': 'postgresql',
492
    'django.db.backends.mysql': '',
493
    'django.db.backends.sqlite3': 'mssql',
494
    'django.db.backends.oracle': 'oracle'}
495

  
496

  
497
def test_concurrently(times=2):
498
    """
499
    Add this decorator to small pieces of code that you want to test
500
    concurrently to make sure they don't raise exceptions when run at the
501
    same time.  E.g., some Django views that do a SELECT and then a subsequent
502
    INSERT might fail when the INSERT assumes that the data has not changed
503
    since the SELECT.
504
    """
505
    def test_concurrently_decorator(test_func):
506
        def wrapper(*args, **kwargs):
507
            exceptions = []
508

  
509
            def call_test_func():
510
                try:
511
                    test_func(*args, **kwargs)
512
                except Exception, e:
513
                    exceptions.append(e)
514
                    raise
515

  
516
            threads = []
517
            for i in range(times):
518
                threads.append(threading.Thread())
519
            for t in threads:
520
                t.start()
521
            for t in threads:
522
                t.join()
523
            if exceptions:
524
                raise Exception(
525
                    ('test_concurrently intercepted %s',
526
                     'exceptions: %s') % (len(exceptions), exceptions))
527
        return wrapper
528
    return test_concurrently_decorator
b/snf-pithos-app/pithos/api/test/accounts.py
34 34
# interpreted as representing official policies, either expressed
35 35
# or implied, of GRNET S.A.
36 36

  
37
from pithos.api.test import PithosAPITest, AssertMappingInvariant,\
38
    DATE_FORMATS
37
from pithos.api.test import (PithosAPITest, AssertMappingInvariant,
38
                             DATE_FORMATS)
39 39

  
40 40
from synnefo.lib import join_urls
41 41

  
b/snf-pithos-app/pithos/api/test/containers.py
36 36

  
37 37
from pithos.api.test import (PithosAPITest, DATE_FORMATS, o_names,
38 38
                             pithos_settings, pithos_test_settings)
39
from pithos.api.test.util import strnextling, get_random_data
39
from pithos.api.test.util import strnextling, get_random_data, get_random_name
40 40

  
41 41
from synnefo.lib import join_urls
42 42

  
......
170 170
        # check folder inheritance
171 171
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_SHARING='read=*')
172 172
        # create child object
173
        descendant = '%s/%s' % (oname, get_random_data(8))
173
        descendant = '%s/%s' % (oname, get_random_name())
174 174
        self.upload_object(cname, descendant)
175 175
        # request shared
176 176
        url = join_urls(self.pithos_path, self.user, cname)
......
261 261
        # test folder inheritance
262 262
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
263 263
        # create child object
264
        descendant = '%s/%s' % (oname, get_random_data(8))
264
        descendant = '%s/%s' % (oname, get_random_name())
265 265
        self.upload_object(cname, descendant)
266 266
        # request public
267 267
        r = self.get('%s?public=' % url)
......
322 322
        # test folder inheritance
323 323
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
324 324
        # create child object
325
        descendant = '%s/%s' % (oname, get_random_data(8))
325
        descendant = '%s/%s' % (oname, get_random_name())
326 326
        self.upload_object(cname, descendant)
327 327
        # request public
328 328
        r = self.get('%s?shared=&public=' % container_url)
b/snf-pithos-app/pithos/api/test/objects.py
42 42
                             AssertMappingInvariant, AssertUUidInvariant,
43 43
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44 44
                             DATE_FORMATS)
45
from pithos.api.test.util import md5_hash, merkle, strnextling, get_random_data
45
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46
                                  get_random_data, get_random_name)
46 47

  
47 48
from synnefo.lib import join_urls
48 49

  
......
128 129
        self.assertEqual(r.status_code, 404)
129 130

  
130 131
        # upload object with trailing space
131
        oname = self.upload_object('c1', quote('%s ' % get_random_data(8)))[0]
132
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
132 133

  
133 134
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
134 135
        r = self.get(url)
......
314 315

  
315 316
        # perform get with If-Match
316 317
        url = join_urls(self.pithos_path, self.user, cname, oname)
317
        r = self.get(url, HTTP_IF_MATCH=get_random_data(8))
318
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
318 319
        self.assertEqual(r.status_code, 412)
319 320

  
320 321
    def test_if_none_match(self):
......
494 495
class ObjectPut(PithosAPITest):
495 496
    def setUp(self):
496 497
        PithosAPITest.setUp(self)
497
        self.container = get_random_data(8)
498
        self.container = get_random_name()
498 499
        self.create_container(self.container)
499 500

  
500 501
    def test_upload(self):
501 502
        cname = self.container
502
        oname = get_random_data(8)
503
        oname = get_random_name()
503 504
        data = get_random_data()
504 505
        meta = {'test': 'test1'}
505 506
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
......
526 527

  
527 528
    def test_maximum_upload_size_exceeds(self):
528 529
        cname = self.container
529
        oname = get_random_data(8)
530
        oname = get_random_name()
530 531

  
531 532
        # set container quota to 100
532 533
        url = join_urls(self.pithos_path, self.user, cname)
......
543 544

  
544 545
    def test_upload_with_name_containing_slash(self):
545 546
        cname = self.container
546
        oname = '/%s' % get_random_data(8)
547
        oname = '/%s' % get_random_name()
547 548
        data = get_random_data()
548 549
        url = join_urls(self.pithos_path, self.user, cname, oname)
549 550
        r = self.put(url, data=data)
......
557 558

  
558 559
    def test_upload_unprocessable_entity(self):
559 560
        cname = self.container
560
        oname = get_random_data(8)
561
        oname = get_random_name()
561 562
        data = get_random_data()
562 563
        url = join_urls(self.pithos_path, self.user, cname, oname)
563 564
        r = self.put(url, data=data, HTTP_ETAG='123')
......
565 566

  
566 567
#    def test_chunked_transfer(self):
567 568
#        cname = self.container
568
#        oname = '/%s' % get_random_data(8)
569
#        oname = '/%s' % get_random_name()
569 570
#        data = get_random_data()
570 571
#        url = join_urls(self.pithos_path, self.user, cname, oname)
571 572
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
......
582 583
            data += self.upload_object(cname, oname=part)[1]
583 584

  
584 585
        manifest = '%s/%s' % (cname, prefix)
585
        oname = get_random_data(8)
586
        oname = get_random_name()
586 587
        url = join_urls(self.pithos_path, self.user, cname, oname)
587 588
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
588 589
        self.assertEqual(r.status_code, 201)
......
595 596
        self.assertEqual(r.content, data)
596 597

  
597 598
        # invalid manifestation
598
        invalid_manifestation = '%s/%s' % (cname, get_random_data(8))
599
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
599 600
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
600 601
        r = self.get(url)
601 602
        self.assertEqual(r.content, '')
602 603

  
603 604
    def test_create_zero_length_object(self):
604 605
        cname = self.container
605
        oname = get_random_data(8)
606
        oname = get_random_name()
606 607
        url = join_urls(self.pithos_path, self.user, cname, oname)
607 608
        r = self.put(url, data='')
608 609
        self.assertEqual(r.status_code, 201)
......
629 630
        url = join_urls(self.pithos_path, self.user, cname, oname)
630 631
        r = self.get('%s?hashmap=&format=json' % url)
631 632

  
632
        oname = get_random_data(8)
633
        oname = get_random_name()
633 634
        url = join_urls(self.pithos_path, self.user, cname, oname)
634 635
        r = self.put('%s?hashmap=' % url, data=r.content)
635 636
        self.assertEqual(r.status_code, 201)
......
655 656
        with AssertMappingInvariant(self.get_object_info, self.container,
656 657
                                    self.object):
657 658
            # copy object
658
            oname = get_random_data(8)
659
            oname = get_random_name()
659 660
            url = join_urls(self.pithos_path, self.user, self.container, oname)
660 661
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
661 662
                         HTTP_X_COPY_FROM='/%s/%s' % (
......
679 680
        self.create_container(cname)
680 681
        with AssertMappingInvariant(self.get_object_info, self.container,
681 682
                                    self.object):
682
            oname = get_random_data(8)
683
            oname = get_random_name()
683 684
            url = join_urls(self.pithos_path, self.user, cname, oname)
684 685
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
685 686
                         HTTP_X_COPY_FROM='/%s/%s' % (
......
700 701

  
701 702
    def test_copy_invalid(self):
702 703
        # copy from non-existent object
703
        oname = get_random_data(8)
704
        oname = get_random_name()
704 705
        url = join_urls(self.pithos_path, self.user, self.container, oname)
705 706
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
706 707
                     HTTP_X_COPY_FROM='/%s/%s' % (
707
                         self.container, get_random_data(8)))
708
                         self.container, get_random_name()))
708 709
        self.assertEqual(r.status_code, 404)
709 710

  
710 711
        # copy from non-existent container
711
        oname = get_random_data(8)
712
        oname = get_random_name()
712 713
        url = join_urls(self.pithos_path, self.user, self.container, oname)
713 714
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
714 715
                     HTTP_X_COPY_FROM='/%s/%s' % (
715
                         get_random_data(8), self.object))
716
                         get_random_name(), self.object))
716 717
        self.assertEqual(r.status_code, 404)
717 718

  
718 719
    def test_copy_dir(self):
719 720
        folder = self.create_folder(self.container)[0]
720 721
        subfolder = self.create_folder(
721
            self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
722
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
722 723
        objects = [subfolder]
723 724
        append = objects.append
724 725
        append(self.upload_object(self.container,
725
                                  '%s/%s' % (folder, get_random_data(8)),
726
                                  '%s/%s' % (folder, get_random_name()),
726 727
                                  HTTP_X_OBJECT_META_DEPTH='1')[0])
727 728
        append(self.upload_object(self.container,
728
                                  '%s/%s' % (subfolder, get_random_data(8)),
729
                                  '%s/%s' % (subfolder, get_random_name()),
729 730
                                  HTTP_X_OBJECT_META_DEPTH='2')[0])
730 731
        other = self.upload_object(self.container, strnextling(folder))[0]
731 732

  
......
772 773

  
773 774
    def test_move(self):
774 775
        # move object
775
        oname = get_random_data(8)
776
        oname = get_random_name()
776 777
        url = join_urls(self.pithos_path, self.user, self.container, oname)
777 778
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
778 779
                     HTTP_X_MOVE_FROM='/%s/%s' % (
......
799 800
    def test_move_dir(self):
800 801
        folder = self.create_folder(self.container)[0]
801 802
        subfolder = self.create_folder(
802
            self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
803
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
803 804
        objects = [subfolder]
804 805
        append = objects.append
805 806
        meta = {}
806 807
        meta[objects[0]] = {}
807 808
        append(self.upload_object(self.container,
808
                                  '%s/%s' % (folder, get_random_data(8)),
809
                                  '%s/%s' % (folder, get_random_name()),
809 810
                                  HTTP_X_OBJECT_META_DEPTH='1')[0])
810 811
        meta[objects[1]] = {'X-Object-Meta-Depth': '1'}
811 812
        append(self.upload_object(self.container,
812
                                  '%s/%s' % (subfolder, get_random_data(8)),
813
                                  '%s/%s' % (subfolder, get_random_name()),
813 814
                                  HTTP_X_OBJECT_META_DEPTH='2')[0])
814 815
        meta[objects[1]] = {'X-Object-Meta-Depth': '2'}
815 816
        other = self.upload_object(self.container, strnextling(folder))[0]
......
1081 1082

  
1082 1083
    def test_update_from_other_object(self):
1083 1084
        src = self.object
1084
        dest = get_random_data(8)
1085
        dest = get_random_name()
1085 1086

  
1086 1087
        url = join_urls(self.pithos_path, self.user, self.container, src)
1087 1088
        r = self.get(url)
......
1111 1112

  
1112 1113
    def test_update_range_from_other_object(self):
1113 1114
        src = self.object
1114
        dest = get_random_data(8)
1115
        dest = get_random_name()
1115 1116

  
1116 1117
        url = join_urls(self.pithos_path, self.user, self.container, src)
1117 1118
        r = self.get(url)
......
1156 1157

  
1157 1158
    def test_delete_non_existent(self):
1158 1159
        url = join_urls(self.pithos_path, self.user, self.container,
1159
                        get_random_data(8))
1160
                        get_random_name())
1160 1161
        r = self.delete(url)
1161 1162
        self.assertEqual(r.status_code, 404)
1162 1163

  
1163 1164
    def test_delete_dir(self):
1164 1165
        folder = self.create_folder(self.container)[0]
1165 1166
        subfolder = self.create_folder(
1166
            self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
1167
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1167 1168
        objects = [subfolder]
1168 1169
        append = objects.append
1169 1170
        meta = {}
1170 1171
        meta[objects[0]] = {}
1171 1172
        append(self.upload_object(self.container,
1172
                                  '%s/%s' % (folder, get_random_data(8)),
1173
                                  '%s/%s' % (folder, get_random_name()),
1173 1174
                                  HTTP_X_OBJECT_META_DEPTH='1')[0])
1174 1175
        meta[objects[1]] = {'X-Object-Meta-Depth': '1'}
1175 1176
        append(self.upload_object(self.container,
1176
                                  '%s/%s' % (subfolder, get_random_data(8)),
1177
                                  '%s/%s' % (subfolder, get_random_name()),
1177 1178
                                  HTTP_X_OBJECT_META_DEPTH='2')[0])
1178 1179
        meta[objects[1]] = {'X-Object-Meta-Depth': '2'}
1179 1180
        other = self.upload_object(self.container, strnextling(folder))[0]
b/snf-pithos-app/pithos/api/test/permissions.py
35 35
# or implied, of GRNET S.A.
36 36

  
37 37
from pithos.api.test import PithosAPITest
38
from pithos.api.test.util import get_random_data
38
from pithos.api.test.util import get_random_data, get_random_name
39 39

  
40 40
from synnefo.lib import join_urls
41 41

  
......
53 53
        r = self.post(url, **kwargs)
54 54
        self.assertEqual(r.status_code, 202)
55 55

  
56
        self.container = get_random_data(8)
56
        self.container = get_random_name()
57 57
        self.create_container(self.container)
58 58
        self.object = self.upload_object(self.container)[0]
59 59
        l = [self.object + '/', self.object + '/a', self.object + 'a',
b/snf-pithos-app/pithos/api/test/public.py
1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
import random
35
import datetime
36
import time as _time
37

  
38
from synnefo.lib import join_urls
39

  
40
import django.utils.simplejson as json
41

  
42
from pithos.api.test import PithosAPITest
43
from pithos.api.test.util import get_random_name
44
from pithos.api import settings as pithos_settings
45

  
46

  
47
class TestPublic(PithosAPITest):
48
    def _assert_not_public_object(self, cname, oname):
49
        info = self.get_object_info(cname, oname)
50
        self.assertTrue('X-Object-Public' not in info)
51

  
52
    def _assert_public_object(self, cname, oname, odata):
53
        info = self.get_object_info(cname, oname)
54
        self.assertTrue('X-Object-Public' in info)
55
        public = info['X-Object-Public']
56

  
57
        self.assertTrue(len(public) >= pithos_settings.PUBLIC_URL_SECURITY)
58
        (self.assertTrue(l in pithos_settings.PUBLIC_URL_ALPHABET) for
59
         l in public)
60

  
61
        r = self.get(public, user='user2')
62
        self.assertEqual(r.status_code, 200)
63
        self.assertTrue('X-Object-Public' not in r)
64

  
65
        self.assertEqual(r.content, odata)
66

  
67
        # assert other users cannot access the object using the priavate path
68
        url = join_urls(self.pithos_path, self.user, cname, oname)
69
        r = self.head(url, user='user2')
70
        self.assertEqual(r.status_code, 403)
71

  
72
        r = self.get(url, user='user2')
73
        self.assertEqual(r.status_code, 403)
74

  
75
        return public
76

  
77
    def test_set_object_public(self):
78
        cname = get_random_name()
79
        self.create_container(cname)
80
        oname, odata = self.upload_object(cname)[:-1]
81
        self._assert_not_public_object(cname, oname)
82

  
83
        # set public
84
        url = join_urls(self.pithos_path, self.user, cname, oname)
85
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
86
        self.assertEqual(r.status_code, 202)
87

  
88
        self._assert_public_object(cname, oname, odata)
89

  
90
    def test_set_twice(self):
91
        cname = get_random_name()
92
        self.create_container(cname)
93
        oname, odata = self.upload_object(cname)[:-1]
94
        self._assert_not_public_object(cname, oname)
95

  
96
        # set public
97
        url = join_urls(self.pithos_path, self.user, cname, oname)
98
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
99
        self.assertEqual(r.status_code, 202)
100

  
101
        public = self._assert_public_object(cname, oname, odata)
102

  
103
        # set public
104
        url = join_urls(self.pithos_path, self.user, cname, oname)
105
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
106
        self.assertEqual(r.status_code, 202)
107

  
108
        public2 = self._assert_public_object(cname, oname, odata)
109

  
110
        self.assertEqual(public, public2)
111

  
112
    def test_set_unset_set(self):
113
        cname = get_random_name()
114
        self.create_container(cname)
115
        oname, odata = self.upload_object(cname)[:-1]
116
        self._assert_not_public_object(cname, oname)
117

  
118
        # set public
119
        url = join_urls(self.pithos_path, self.user, cname, oname)
120
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
121
        self.assertEqual(r.status_code, 202)
122

  
123
        public = self._assert_public_object(cname, oname, odata)
124

  
125
        # unset public
126
        url = join_urls(self.pithos_path, self.user, cname, oname)
127
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false')
128
        self.assertEqual(r.status_code, 202)
129

  
130
        self._assert_not_public_object(cname, oname)
131

  
132
        # set public
133
        url = join_urls(self.pithos_path, self.user, cname, oname)
134
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
135
        self.assertEqual(r.status_code, 202)
136

  
137
        public2 = self._assert_public_object(cname, oname, odata)
138

  
139
        self.assertTrue(public != public2)
140

  
141
        # unset public
142
        url = join_urls(self.pithos_path, self.user, cname, oname)
143
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false')
144
        self.assertEqual(r.status_code, 202)
145

  
146
        self._assert_not_public_object(cname, oname)
147

  
148
    def test_update_public_object(self):
149
        cname = get_random_name()
150
        self.create_container(cname)
151
        oname, odata = self.upload_object(cname)[:-1]
152
        self._assert_not_public_object(cname, oname)
153

  
154
        # set public
155
        url = join_urls(self.pithos_path, self.user, cname, oname)
156
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
157
        self.assertEqual(r.status_code, 202)
158

  
159
        public = self._assert_public_object(cname, oname, odata)
160

  
161
        odata2 = self.append_object_data(cname, oname)[1]
162

  
163
        public2 = self._assert_public_object(cname, oname, odata + odata2)
164

  
165
        self.assertTrue(public == public2)
166

  
167
    def test_delete_public_object(self):
168
        cname = get_random_name()
169
        self.create_container(cname)
170
        oname, odata = self.upload_object(cname)[:-1]
171
        self._assert_not_public_object(cname, oname)
172

  
173
        # set public
174
        url = join_urls(self.pithos_path, self.user, cname, oname)
175
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
176
        self.assertEqual(r.status_code, 202)
177
        public = self._assert_public_object(cname, oname, odata)
178

  
179
        # delete object
180
        r = self.delete(url)
181
        self.assertEqual(r.status_code, 204)
182
        r = self.get(url)
183
        self.assertEqual(r.status_code, 404)
184
        r = self.get(public)
185
        self.assertEqual(r.status_code, 404)
186

  
187
    def test_delete_public_object_history(self):
188
        cname = get_random_name()
189
        self.create_container(cname)
190
        oname, odata = self.upload_object(cname)[:-1]
191
        self._assert_not_public_object(cname, oname)
192

  
193
        # set public
194
        url = join_urls(self.pithos_path, self.user, cname, oname)
195
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
196
        self.assertEqual(r.status_code, 202)
197
        public = self._assert_public_object(cname, oname, odata)
198

  
199
        for _ in range(random.randint(1, 10)):
200
            odata += self.append_object_data(cname, oname)[1]
201
            _time.sleep(1)
202

  
203
        # get object versions
204
        url = join_urls(self.pithos_path, self.user, cname, oname)
205
        r = self.get('%s?version=list&format=json' % url)
206
        version_list = json.loads(r.content)['versions']
207
        mtime = [int(t[1]) for t in version_list]
208

  
209
        # delete object history
210
        i = random.randrange(len(mtime))
211
        self.delete('%s?until=%d' % (url, mtime[i]))
212
        public2 = self._assert_public_object(cname, oname, odata)
213
        self.assertEqual(public, public2)
214

  
215
        # delete object histoy until now
216
        _time.sleep(1)
217
        t = datetime.datetime.utcnow()
218
        now = int(_time.mktime(t.timetuple()))
219
        r = self.delete('%s?intil=%d' % (url, now))
220
        self.assertEqual(r.status_code, 204)
221
        r = self.get(url)
222
        self.assertEqual(r.status_code, 404)
223
        r = self.get(public)
224
        self.assertEqual(r.status_code, 404)
b/snf-pithos-app/pithos/api/test/util/__init__.py
94 94
    return get_random_word(length)[:length]
95 95

  
96 96

  
97
def get_random_name(length=8):
98
    return get_random_data(length)
99

  
100

  
97 101
def md5_hash(data):
98 102
    md5 = hashlib.md5()
99 103
    md5.update(data)
b/snf-pithos-app/pithos/api/tests.py
36 36
from pithos.api.test.containers import *
37 37
from pithos.api.test.objects import *
38 38
from pithos.api.test.permissions import *
39
from pithos.api.test.public import *
b/snf-pithos-backend/pithos/backends/migrate.py
50 50
from alembic import context, command
51 51

  
52 52
from pithos.backends.lib import sqlalchemy as sqlalchemy_backend
53
from pithos.backends.lib.sqlalchemy import node, groups, public, xfeatures
53
from pithos.backends.lib.sqlalchemy import (node, groups, public, xfeatures,
54
                                            quotaholder_serials)
54 55

  
55 56
os.environ['DJANGO_SETTINGS_MODULE'] = 'synnefo.settings'
56 57

  
......
72 73
    'alembic.ini')
73 74

  
74 75

  
75
def initialize_db():
76
def initialize_db(dbconnection):
76 77
    alembic_cfg = Config(DEFAULT_ALEMBIC_INI_PATH)
77 78

  
78
    db = alembic_cfg.get_main_option(
79
        "sqlalchemy.url", PITHOS_BACKEND_DB_CONNECTION)
79
    db = alembic_cfg.get_main_option("sqlalchemy.url", dbconnection)
80 80
    alembic_cfg.set_main_option("sqlalchemy.url", db)
81 81

  
82 82
    engine = sa.engine_from_config(
......
87 87
    groups.create_tables(engine)
88 88
    public.create_tables(engine)
89 89
    xfeatures.create_tables(engine)
90
    quotaholder_serials.create_tables(engine)
90 91

  
91 92
    # then, load the Alembic configuration and generate the
92 93
    # version table, "stamping" it with the most recent rev:
......
102 103

  
103 104
    if len(argv) >= 1 and argv[0] == 'initdb':
104 105
        print "Initializing db."
105
        initialize_db()
106
        initialize_db(PITHOS_BACKEND_DB_CONNECTION)
106 107
        print "DB initialized."
107 108
        exit(1)
108 109

  

Also available in: Unified diff