Revision bc4f25c0
b/snf-common/synnefo/settings/test.py | ||
---|---|---|
60 | 60 |
CLOUDBAR_LOCATION = '/static/im/cloudbar/' |
61 | 61 |
CLOUDBAR_SERVICES_URL = '/ui/get_services' |
62 | 62 |
CLOUDBAR_MENU_URL = '/ui/get_menu' |
63 |
|
|
64 |
TEST_RUNNER = 'pithos.api.test.PithosTestSuiteRunner' |
b/snf-pithos-app/pithos/api/test/__init__.py | ||
---|---|---|
40 | 40 |
from snf_django.utils.testing import with_settings, astakos_user |
41 | 41 |
|
42 | 42 |
from pithos.api import settings as pithos_settings |
43 |
from pithos.api.test.util import is_date, get_random_data |
|
43 |
from pithos.api.test.util import is_date, get_random_data, get_random_name |
|
44 |
from pithos.backends.migrate import initialize_db |
|
44 | 45 |
|
45 | 46 |
from synnefo.lib.services import get_service_path |
46 | 47 |
from synnefo.lib import join_urls |
47 | 48 |
|
48 | 49 |
from django.test import TestCase |
50 |
from django.test.simple import DjangoTestSuiteRunner |
|
49 | 51 |
from django.conf import settings |
50 | 52 |
from django.utils.http import urlencode |
53 |
from django.db.backends.creation import TEST_DATABASE_PREFIX |
|
51 | 54 |
|
52 | 55 |
import django.utils.simplejson as json |
53 | 56 |
|
54 | 57 |
import random |
55 |
import threading |
|
56 | 58 |
import functools |
57 | 59 |
|
58 | 60 |
|
... | ... | |
83 | 85 |
'object': ('name', 'hash', 'bytes', 'content_type', |
84 | 86 |
'content_encoding', 'last_modified',)} |
85 | 87 |
|
86 |
return_codes = (400, 401, 403, 404, 503) |
|
87 |
|
|
88 | 88 |
TEST_BLOCK_SIZE = 1024 |
89 | 89 |
TEST_HASH_ALGORITHM = 'sha256' |
90 | 90 |
|
91 |
BACKEND_DB_CONNECTION = None |
|
91 |
print 'backend module:', pithos_settings.BACKEND_DB_MODULE |
|
92 |
print 'backend database engine:', settings.DATABASES['default']['ENGINE'] |
|
93 |
print 'update md5:', pithos_settings.UPDATE_MD5 |
|
92 | 94 |
|
93 | 95 |
|
94 |
def django_to_sqlalchemy(): |
|
95 |
"""Convert the django default database to sqlalchemy connection string""" |
|
96 |
django_sqlalchemy_engines = { |
|
97 |
'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2', |
|
98 |
'django.db.backends.postgresql': 'postgresql', |
|
99 |
'django.db.backends.mysql': '', |
|
100 |
'django.db.backends.sqlite3': 'mssql', |
|
101 |
'django.db.backends.oracle': 'oracle'} |
|
96 | 102 |
|
97 |
global BACKEND_DB_CONNECTION |
|
98 |
if BACKEND_DB_CONNECTION: |
|
99 |
return BACKEND_DB_CONNECTION |
|
100 | 103 |
|
101 |
# TODO support for more complex configuration |
|
104 |
def prepate_db_connection(): |
|
105 |
"""Build pithos backend connection string from django default database""" |
|
106 |
|
|
102 | 107 |
db = settings.DATABASES['default'] |
103 |
name = db.get('TEST_NAME', 'test_%s' % db['NAME']) |
|
104 |
if db['ENGINE'] == 'django.db.backends.sqlite3': |
|
105 |
BACKEND_DB_CONNECTION = 'sqlite:///%s' % name |
|
108 |
name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME']) |
|
109 |
|
|
110 |
if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'): |
|
111 |
if db['ENGINE'] == 'django.db.backends.sqlite3': |
|
112 |
db_connection = 'sqlite:///%s' % name |
|
113 |
else: |
|
114 |
d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']), |
|
115 |
user=db['USER'], |
|
116 |
pwd=db['PASSWORD'], |
|
117 |
host=db['HOST'].lower(), |
|
118 |
port=int(db['PORT']) if db['PORT'] != '' else '', |
|
119 |
name=name) |
|
120 |
db_connection = ( |
|
121 |
'%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d) |
|
122 |
|
|
123 |
# initialize pithos database |
|
124 |
initialize_db(db_connection) |
|
106 | 125 |
else: |
107 |
d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']), |
|
108 |
user=db['USER'], |
|
109 |
pwd=db['PASSWORD'], |
|
110 |
host=db['HOST'].lower(), |
|
111 |
port=int(db['PORT']) if db['PORT'] != '' else '', |
|
112 |
name=name) |
|
113 |
BACKEND_DB_CONNECTION = ( |
|
114 |
'%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d) |
|
115 |
return BACKEND_DB_CONNECTION |
|
126 |
db_connection = name |
|
127 |
pithos_settings.BACKEND_DB_CONNECTION = db_connection |
|
128 |
|
|
129 |
|
|
130 |
class PithosTestSuiteRunner(DjangoTestSuiteRunner): |
|
131 |
def setup_databases(self, **kwargs): |
|
132 |
old_names, mirrors = super(PithosTestSuiteRunner, |
|
133 |
self).setup_databases(**kwargs) |
|
134 |
prepate_db_connection() |
|
135 |
return old_names, mirrors |
|
116 | 136 |
|
117 | 137 |
|
118 | 138 |
class PithosAPITest(TestCase): |
119 | 139 |
def setUp(self): |
120 |
if (pithos_settings.BACKEND_DB_MODULE == |
|
121 |
'pithos.backends.lib.sqlalchemy'): |
|
122 |
pithos_settings.BACKEND_DB_CONNECTION = django_to_sqlalchemy() |
|
123 |
pithos_settings.BACKEND_POOL_SIZE = 1 |
|
124 |
|
|
125 | 140 |
# Override default block size to spead up tests |
126 | 141 |
pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE |
127 | 142 |
pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM |
... | ... | |
318 | 333 |
|
319 | 334 |
def upload_object(self, cname, oname=None, length=None, verify=True, |
320 | 335 |
**meta): |
321 |
oname = oname or get_random_data(8)
|
|
336 |
oname = oname or get_random_name()
|
|
322 | 337 |
length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE) |
323 | 338 |
data = get_random_data(length=length) |
324 | 339 |
headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v) |
... | ... | |
332 | 347 |
def update_object_data(self, cname, oname=None, length=None, |
333 | 348 |
content_type=None, content_range=None, |
334 | 349 |
verify=True, **meta): |
335 |
oname = oname or get_random_data(8)
|
|
350 |
oname = oname or get_random_name()
|
|
336 | 351 |
length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE) |
337 | 352 |
content_type = content_type or 'application/octet-stream' |
338 | 353 |
data = get_random_data(length=length) |
... | ... | |
354 | 369 |
content_range='bytes */*') |
355 | 370 |
|
356 | 371 |
def create_folder(self, cname, oname=None, **headers): |
357 |
oname = oname or get_random_data(8)
|
|
372 |
oname = oname or get_random_name()
|
|
358 | 373 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
359 | 374 |
r = self.put(url, data='', content_type='application/directory', |
360 | 375 |
**headers) |
... | ... | |
408 | 423 |
(self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for |
409 | 424 |
k, v in meta.items()) |
410 | 425 |
|
411 |
def assert_status(self, status, codes): |
|
412 |
l = [elem for elem in return_codes] |
|
413 |
if isinstance(codes, list): |
|
414 |
l.extend(codes) |
|
415 |
else: |
|
416 |
l.append(codes) |
|
417 |
self.assertTrue(status in l) |
|
418 |
|
|
419 | 426 |
def assert_extended(self, data, format, type, size=10000): |
420 | 427 |
if format == 'xml': |
421 | 428 |
self._assert_xml(data, type, size) |
... | ... | |
484 | 491 |
assert('x-object-uuid' in self.map) |
485 | 492 |
uuid = map['x-object-uuid'] |
486 | 493 |
assert(uuid == self.uuid) |
487 |
|
|
488 |
|
|
489 |
django_sqlalchemy_engines = { |
|
490 |
'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2', |
|
491 |
'django.db.backends.postgresql': 'postgresql', |
|
492 |
'django.db.backends.mysql': '', |
|
493 |
'django.db.backends.sqlite3': 'mssql', |
|
494 |
'django.db.backends.oracle': 'oracle'} |
|
495 |
|
|
496 |
|
|
497 |
def test_concurrently(times=2): |
|
498 |
""" |
|
499 |
Add this decorator to small pieces of code that you want to test |
|
500 |
concurrently to make sure they don't raise exceptions when run at the |
|
501 |
same time. E.g., some Django views that do a SELECT and then a subsequent |
|
502 |
INSERT might fail when the INSERT assumes that the data has not changed |
|
503 |
since the SELECT. |
|
504 |
""" |
|
505 |
def test_concurrently_decorator(test_func): |
|
506 |
def wrapper(*args, **kwargs): |
|
507 |
exceptions = [] |
|
508 |
|
|
509 |
def call_test_func(): |
|
510 |
try: |
|
511 |
test_func(*args, **kwargs) |
|
512 |
except Exception, e: |
|
513 |
exceptions.append(e) |
|
514 |
raise |
|
515 |
|
|
516 |
threads = [] |
|
517 |
for i in range(times): |
|
518 |
threads.append(threading.Thread()) |
|
519 |
for t in threads: |
|
520 |
t.start() |
|
521 |
for t in threads: |
|
522 |
t.join() |
|
523 |
if exceptions: |
|
524 |
raise Exception( |
|
525 |
('test_concurrently intercepted %s', |
|
526 |
'exceptions: %s') % (len(exceptions), exceptions)) |
|
527 |
return wrapper |
|
528 |
return test_concurrently_decorator |
b/snf-pithos-app/pithos/api/test/accounts.py | ||
---|---|---|
34 | 34 |
# interpreted as representing official policies, either expressed |
35 | 35 |
# or implied, of GRNET S.A. |
36 | 36 |
|
37 |
from pithos.api.test import PithosAPITest, AssertMappingInvariant,\
|
|
38 |
DATE_FORMATS
|
|
37 |
from pithos.api.test import (PithosAPITest, AssertMappingInvariant,
|
|
38 |
DATE_FORMATS)
|
|
39 | 39 |
|
40 | 40 |
from synnefo.lib import join_urls |
41 | 41 |
|
b/snf-pithos-app/pithos/api/test/containers.py | ||
---|---|---|
36 | 36 |
|
37 | 37 |
from pithos.api.test import (PithosAPITest, DATE_FORMATS, o_names, |
38 | 38 |
pithos_settings, pithos_test_settings) |
39 |
from pithos.api.test.util import strnextling, get_random_data |
|
39 |
from pithos.api.test.util import strnextling, get_random_data, get_random_name
|
|
40 | 40 |
|
41 | 41 |
from synnefo.lib import join_urls |
42 | 42 |
|
... | ... | |
170 | 170 |
# check folder inheritance |
171 | 171 |
oname, _ = self.create_folder(cname, HTTP_X_OBJECT_SHARING='read=*') |
172 | 172 |
# create child object |
173 |
descendant = '%s/%s' % (oname, get_random_data(8))
|
|
173 |
descendant = '%s/%s' % (oname, get_random_name())
|
|
174 | 174 |
self.upload_object(cname, descendant) |
175 | 175 |
# request shared |
176 | 176 |
url = join_urls(self.pithos_path, self.user, cname) |
... | ... | |
261 | 261 |
# test folder inheritance |
262 | 262 |
oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true') |
263 | 263 |
# create child object |
264 |
descendant = '%s/%s' % (oname, get_random_data(8))
|
|
264 |
descendant = '%s/%s' % (oname, get_random_name())
|
|
265 | 265 |
self.upload_object(cname, descendant) |
266 | 266 |
# request public |
267 | 267 |
r = self.get('%s?public=' % url) |
... | ... | |
322 | 322 |
# test folder inheritance |
323 | 323 |
oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true') |
324 | 324 |
# create child object |
325 |
descendant = '%s/%s' % (oname, get_random_data(8))
|
|
325 |
descendant = '%s/%s' % (oname, get_random_name())
|
|
326 | 326 |
self.upload_object(cname, descendant) |
327 | 327 |
# request public |
328 | 328 |
r = self.get('%s?shared=&public=' % container_url) |
b/snf-pithos-app/pithos/api/test/objects.py | ||
---|---|---|
42 | 42 |
AssertMappingInvariant, AssertUUidInvariant, |
43 | 43 |
TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM, |
44 | 44 |
DATE_FORMATS) |
45 |
from pithos.api.test.util import md5_hash, merkle, strnextling, get_random_data |
|
45 |
from pithos.api.test.util import (md5_hash, merkle, strnextling, |
|
46 |
get_random_data, get_random_name) |
|
46 | 47 |
|
47 | 48 |
from synnefo.lib import join_urls |
48 | 49 |
|
... | ... | |
128 | 129 |
self.assertEqual(r.status_code, 404) |
129 | 130 |
|
130 | 131 |
# upload object with trailing space |
131 |
oname = self.upload_object('c1', quote('%s ' % get_random_data(8)))[0]
|
|
132 |
oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
|
|
132 | 133 |
|
133 | 134 |
url = join_urls(self.pithos_path, self.user, 'c1', oname) |
134 | 135 |
r = self.get(url) |
... | ... | |
314 | 315 |
|
315 | 316 |
# perform get with If-Match |
316 | 317 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
317 |
r = self.get(url, HTTP_IF_MATCH=get_random_data(8))
|
|
318 |
r = self.get(url, HTTP_IF_MATCH=get_random_name())
|
|
318 | 319 |
self.assertEqual(r.status_code, 412) |
319 | 320 |
|
320 | 321 |
def test_if_none_match(self): |
... | ... | |
494 | 495 |
class ObjectPut(PithosAPITest): |
495 | 496 |
def setUp(self): |
496 | 497 |
PithosAPITest.setUp(self) |
497 |
self.container = get_random_data(8)
|
|
498 |
self.container = get_random_name()
|
|
498 | 499 |
self.create_container(self.container) |
499 | 500 |
|
500 | 501 |
def test_upload(self): |
501 | 502 |
cname = self.container |
502 |
oname = get_random_data(8)
|
|
503 |
oname = get_random_name()
|
|
503 | 504 |
data = get_random_data() |
504 | 505 |
meta = {'test': 'test1'} |
505 | 506 |
headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v) |
... | ... | |
526 | 527 |
|
527 | 528 |
def test_maximum_upload_size_exceeds(self): |
528 | 529 |
cname = self.container |
529 |
oname = get_random_data(8)
|
|
530 |
oname = get_random_name()
|
|
530 | 531 |
|
531 | 532 |
# set container quota to 100 |
532 | 533 |
url = join_urls(self.pithos_path, self.user, cname) |
... | ... | |
543 | 544 |
|
544 | 545 |
def test_upload_with_name_containing_slash(self): |
545 | 546 |
cname = self.container |
546 |
oname = '/%s' % get_random_data(8)
|
|
547 |
oname = '/%s' % get_random_name()
|
|
547 | 548 |
data = get_random_data() |
548 | 549 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
549 | 550 |
r = self.put(url, data=data) |
... | ... | |
557 | 558 |
|
558 | 559 |
def test_upload_unprocessable_entity(self): |
559 | 560 |
cname = self.container |
560 |
oname = get_random_data(8)
|
|
561 |
oname = get_random_name()
|
|
561 | 562 |
data = get_random_data() |
562 | 563 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
563 | 564 |
r = self.put(url, data=data, HTTP_ETAG='123') |
... | ... | |
565 | 566 |
|
566 | 567 |
# def test_chunked_transfer(self): |
567 | 568 |
# cname = self.container |
568 |
# oname = '/%s' % get_random_data(8)
|
|
569 |
# oname = '/%s' % get_random_name()
|
|
569 | 570 |
# data = get_random_data() |
570 | 571 |
# url = join_urls(self.pithos_path, self.user, cname, oname) |
571 | 572 |
# r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked') |
... | ... | |
582 | 583 |
data += self.upload_object(cname, oname=part)[1] |
583 | 584 |
|
584 | 585 |
manifest = '%s/%s' % (cname, prefix) |
585 |
oname = get_random_data(8)
|
|
586 |
oname = get_random_name()
|
|
586 | 587 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
587 | 588 |
r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest) |
588 | 589 |
self.assertEqual(r.status_code, 201) |
... | ... | |
595 | 596 |
self.assertEqual(r.content, data) |
596 | 597 |
|
597 | 598 |
# invalid manifestation |
598 |
invalid_manifestation = '%s/%s' % (cname, get_random_data(8))
|
|
599 |
invalid_manifestation = '%s/%s' % (cname, get_random_name())
|
|
599 | 600 |
self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation) |
600 | 601 |
r = self.get(url) |
601 | 602 |
self.assertEqual(r.content, '') |
602 | 603 |
|
603 | 604 |
def test_create_zero_length_object(self): |
604 | 605 |
cname = self.container |
605 |
oname = get_random_data(8)
|
|
606 |
oname = get_random_name()
|
|
606 | 607 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
607 | 608 |
r = self.put(url, data='') |
608 | 609 |
self.assertEqual(r.status_code, 201) |
... | ... | |
629 | 630 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
630 | 631 |
r = self.get('%s?hashmap=&format=json' % url) |
631 | 632 |
|
632 |
oname = get_random_data(8)
|
|
633 |
oname = get_random_name()
|
|
633 | 634 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
634 | 635 |
r = self.put('%s?hashmap=' % url, data=r.content) |
635 | 636 |
self.assertEqual(r.status_code, 201) |
... | ... | |
655 | 656 |
with AssertMappingInvariant(self.get_object_info, self.container, |
656 | 657 |
self.object): |
657 | 658 |
# copy object |
658 |
oname = get_random_data(8)
|
|
659 |
oname = get_random_name()
|
|
659 | 660 |
url = join_urls(self.pithos_path, self.user, self.container, oname) |
660 | 661 |
r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy', |
661 | 662 |
HTTP_X_COPY_FROM='/%s/%s' % ( |
... | ... | |
679 | 680 |
self.create_container(cname) |
680 | 681 |
with AssertMappingInvariant(self.get_object_info, self.container, |
681 | 682 |
self.object): |
682 |
oname = get_random_data(8)
|
|
683 |
oname = get_random_name()
|
|
683 | 684 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
684 | 685 |
r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy', |
685 | 686 |
HTTP_X_COPY_FROM='/%s/%s' % ( |
... | ... | |
700 | 701 |
|
701 | 702 |
def test_copy_invalid(self): |
702 | 703 |
# copy from non-existent object |
703 |
oname = get_random_data(8)
|
|
704 |
oname = get_random_name()
|
|
704 | 705 |
url = join_urls(self.pithos_path, self.user, self.container, oname) |
705 | 706 |
r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy', |
706 | 707 |
HTTP_X_COPY_FROM='/%s/%s' % ( |
707 |
self.container, get_random_data(8)))
|
|
708 |
self.container, get_random_name()))
|
|
708 | 709 |
self.assertEqual(r.status_code, 404) |
709 | 710 |
|
710 | 711 |
# copy from non-existent container |
711 |
oname = get_random_data(8)
|
|
712 |
oname = get_random_name()
|
|
712 | 713 |
url = join_urls(self.pithos_path, self.user, self.container, oname) |
713 | 714 |
r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy', |
714 | 715 |
HTTP_X_COPY_FROM='/%s/%s' % ( |
715 |
get_random_data(8), self.object))
|
|
716 |
get_random_name(), self.object))
|
|
716 | 717 |
self.assertEqual(r.status_code, 404) |
717 | 718 |
|
718 | 719 |
def test_copy_dir(self): |
719 | 720 |
folder = self.create_folder(self.container)[0] |
720 | 721 |
subfolder = self.create_folder( |
721 |
self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
|
|
722 |
self.container, oname='%s/%s' % (folder, get_random_name()))[0]
|
|
722 | 723 |
objects = [subfolder] |
723 | 724 |
append = objects.append |
724 | 725 |
append(self.upload_object(self.container, |
725 |
'%s/%s' % (folder, get_random_data(8)),
|
|
726 |
'%s/%s' % (folder, get_random_name()),
|
|
726 | 727 |
HTTP_X_OBJECT_META_DEPTH='1')[0]) |
727 | 728 |
append(self.upload_object(self.container, |
728 |
'%s/%s' % (subfolder, get_random_data(8)),
|
|
729 |
'%s/%s' % (subfolder, get_random_name()),
|
|
729 | 730 |
HTTP_X_OBJECT_META_DEPTH='2')[0]) |
730 | 731 |
other = self.upload_object(self.container, strnextling(folder))[0] |
731 | 732 |
|
... | ... | |
772 | 773 |
|
773 | 774 |
def test_move(self): |
774 | 775 |
# move object |
775 |
oname = get_random_data(8)
|
|
776 |
oname = get_random_name()
|
|
776 | 777 |
url = join_urls(self.pithos_path, self.user, self.container, oname) |
777 | 778 |
r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy', |
778 | 779 |
HTTP_X_MOVE_FROM='/%s/%s' % ( |
... | ... | |
799 | 800 |
def test_move_dir(self): |
800 | 801 |
folder = self.create_folder(self.container)[0] |
801 | 802 |
subfolder = self.create_folder( |
802 |
self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
|
|
803 |
self.container, oname='%s/%s' % (folder, get_random_name()))[0]
|
|
803 | 804 |
objects = [subfolder] |
804 | 805 |
append = objects.append |
805 | 806 |
meta = {} |
806 | 807 |
meta[objects[0]] = {} |
807 | 808 |
append(self.upload_object(self.container, |
808 |
'%s/%s' % (folder, get_random_data(8)),
|
|
809 |
'%s/%s' % (folder, get_random_name()),
|
|
809 | 810 |
HTTP_X_OBJECT_META_DEPTH='1')[0]) |
810 | 811 |
meta[objects[1]] = {'X-Object-Meta-Depth': '1'} |
811 | 812 |
append(self.upload_object(self.container, |
812 |
'%s/%s' % (subfolder, get_random_data(8)),
|
|
813 |
'%s/%s' % (subfolder, get_random_name()),
|
|
813 | 814 |
HTTP_X_OBJECT_META_DEPTH='2')[0]) |
814 | 815 |
meta[objects[1]] = {'X-Object-Meta-Depth': '2'} |
815 | 816 |
other = self.upload_object(self.container, strnextling(folder))[0] |
... | ... | |
1081 | 1082 |
|
1082 | 1083 |
def test_update_from_other_object(self): |
1083 | 1084 |
src = self.object |
1084 |
dest = get_random_data(8)
|
|
1085 |
dest = get_random_name()
|
|
1085 | 1086 |
|
1086 | 1087 |
url = join_urls(self.pithos_path, self.user, self.container, src) |
1087 | 1088 |
r = self.get(url) |
... | ... | |
1111 | 1112 |
|
1112 | 1113 |
def test_update_range_from_other_object(self): |
1113 | 1114 |
src = self.object |
1114 |
dest = get_random_data(8)
|
|
1115 |
dest = get_random_name()
|
|
1115 | 1116 |
|
1116 | 1117 |
url = join_urls(self.pithos_path, self.user, self.container, src) |
1117 | 1118 |
r = self.get(url) |
... | ... | |
1156 | 1157 |
|
1157 | 1158 |
def test_delete_non_existent(self): |
1158 | 1159 |
url = join_urls(self.pithos_path, self.user, self.container, |
1159 |
get_random_data(8))
|
|
1160 |
get_random_name())
|
|
1160 | 1161 |
r = self.delete(url) |
1161 | 1162 |
self.assertEqual(r.status_code, 404) |
1162 | 1163 |
|
1163 | 1164 |
def test_delete_dir(self): |
1164 | 1165 |
folder = self.create_folder(self.container)[0] |
1165 | 1166 |
subfolder = self.create_folder( |
1166 |
self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
|
|
1167 |
self.container, oname='%s/%s' % (folder, get_random_name()))[0]
|
|
1167 | 1168 |
objects = [subfolder] |
1168 | 1169 |
append = objects.append |
1169 | 1170 |
meta = {} |
1170 | 1171 |
meta[objects[0]] = {} |
1171 | 1172 |
append(self.upload_object(self.container, |
1172 |
'%s/%s' % (folder, get_random_data(8)),
|
|
1173 |
'%s/%s' % (folder, get_random_name()),
|
|
1173 | 1174 |
HTTP_X_OBJECT_META_DEPTH='1')[0]) |
1174 | 1175 |
meta[objects[1]] = {'X-Object-Meta-Depth': '1'} |
1175 | 1176 |
append(self.upload_object(self.container, |
1176 |
'%s/%s' % (subfolder, get_random_data(8)),
|
|
1177 |
'%s/%s' % (subfolder, get_random_name()),
|
|
1177 | 1178 |
HTTP_X_OBJECT_META_DEPTH='2')[0]) |
1178 | 1179 |
meta[objects[1]] = {'X-Object-Meta-Depth': '2'} |
1179 | 1180 |
other = self.upload_object(self.container, strnextling(folder))[0] |
b/snf-pithos-app/pithos/api/test/permissions.py | ||
---|---|---|
35 | 35 |
# or implied, of GRNET S.A. |
36 | 36 |
|
37 | 37 |
from pithos.api.test import PithosAPITest |
38 |
from pithos.api.test.util import get_random_data |
|
38 |
from pithos.api.test.util import get_random_data, get_random_name
|
|
39 | 39 |
|
40 | 40 |
from synnefo.lib import join_urls |
41 | 41 |
|
... | ... | |
53 | 53 |
r = self.post(url, **kwargs) |
54 | 54 |
self.assertEqual(r.status_code, 202) |
55 | 55 |
|
56 |
self.container = get_random_data(8)
|
|
56 |
self.container = get_random_name()
|
|
57 | 57 |
self.create_container(self.container) |
58 | 58 |
self.object = self.upload_object(self.container)[0] |
59 | 59 |
l = [self.object + '/', self.object + '/a', self.object + 'a', |
b/snf-pithos-app/pithos/api/test/public.py | ||
---|---|---|
1 |
# Copyright 2011 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
import random |
|
35 |
import datetime |
|
36 |
import time as _time |
|
37 |
|
|
38 |
from synnefo.lib import join_urls |
|
39 |
|
|
40 |
import django.utils.simplejson as json |
|
41 |
|
|
42 |
from pithos.api.test import PithosAPITest |
|
43 |
from pithos.api.test.util import get_random_name |
|
44 |
from pithos.api import settings as pithos_settings |
|
45 |
|
|
46 |
|
|
47 |
class TestPublic(PithosAPITest): |
|
48 |
def _assert_not_public_object(self, cname, oname): |
|
49 |
info = self.get_object_info(cname, oname) |
|
50 |
self.assertTrue('X-Object-Public' not in info) |
|
51 |
|
|
52 |
def _assert_public_object(self, cname, oname, odata): |
|
53 |
info = self.get_object_info(cname, oname) |
|
54 |
self.assertTrue('X-Object-Public' in info) |
|
55 |
public = info['X-Object-Public'] |
|
56 |
|
|
57 |
self.assertTrue(len(public) >= pithos_settings.PUBLIC_URL_SECURITY) |
|
58 |
(self.assertTrue(l in pithos_settings.PUBLIC_URL_ALPHABET) for |
|
59 |
l in public) |
|
60 |
|
|
61 |
r = self.get(public, user='user2') |
|
62 |
self.assertEqual(r.status_code, 200) |
|
63 |
self.assertTrue('X-Object-Public' not in r) |
|
64 |
|
|
65 |
self.assertEqual(r.content, odata) |
|
66 |
|
|
67 |
# assert other users cannot access the object using the priavate path |
|
68 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
69 |
r = self.head(url, user='user2') |
|
70 |
self.assertEqual(r.status_code, 403) |
|
71 |
|
|
72 |
r = self.get(url, user='user2') |
|
73 |
self.assertEqual(r.status_code, 403) |
|
74 |
|
|
75 |
return public |
|
76 |
|
|
77 |
def test_set_object_public(self): |
|
78 |
cname = get_random_name() |
|
79 |
self.create_container(cname) |
|
80 |
oname, odata = self.upload_object(cname)[:-1] |
|
81 |
self._assert_not_public_object(cname, oname) |
|
82 |
|
|
83 |
# set public |
|
84 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
85 |
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
|
86 |
self.assertEqual(r.status_code, 202) |
|
87 |
|
|
88 |
self._assert_public_object(cname, oname, odata) |
|
89 |
|
|
90 |
def test_set_twice(self): |
|
91 |
cname = get_random_name() |
|
92 |
self.create_container(cname) |
|
93 |
oname, odata = self.upload_object(cname)[:-1] |
|
94 |
self._assert_not_public_object(cname, oname) |
|
95 |
|
|
96 |
# set public |
|
97 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
98 |
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
|
99 |
self.assertEqual(r.status_code, 202) |
|
100 |
|
|
101 |
public = self._assert_public_object(cname, oname, odata) |
|
102 |
|
|
103 |
# set public |
|
104 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
105 |
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
|
106 |
self.assertEqual(r.status_code, 202) |
|
107 |
|
|
108 |
public2 = self._assert_public_object(cname, oname, odata) |
|
109 |
|
|
110 |
self.assertEqual(public, public2) |
|
111 |
|
|
112 |
def test_set_unset_set(self): |
|
113 |
cname = get_random_name() |
|
114 |
self.create_container(cname) |
|
115 |
oname, odata = self.upload_object(cname)[:-1] |
|
116 |
self._assert_not_public_object(cname, oname) |
|
117 |
|
|
118 |
# set public |
|
119 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
120 |
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
|
121 |
self.assertEqual(r.status_code, 202) |
|
122 |
|
|
123 |
public = self._assert_public_object(cname, oname, odata) |
|
124 |
|
|
125 |
# unset public |
|
126 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
127 |
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false') |
|
128 |
self.assertEqual(r.status_code, 202) |
|
129 |
|
|
130 |
self._assert_not_public_object(cname, oname) |
|
131 |
|
|
132 |
# set public |
|
133 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
134 |
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
|
135 |
self.assertEqual(r.status_code, 202) |
|
136 |
|
|
137 |
public2 = self._assert_public_object(cname, oname, odata) |
|
138 |
|
|
139 |
self.assertTrue(public != public2) |
|
140 |
|
|
141 |
# unset public |
|
142 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
143 |
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false') |
|
144 |
self.assertEqual(r.status_code, 202) |
|
145 |
|
|
146 |
self._assert_not_public_object(cname, oname) |
|
147 |
|
|
148 |
def test_update_public_object(self): |
|
149 |
cname = get_random_name() |
|
150 |
self.create_container(cname) |
|
151 |
oname, odata = self.upload_object(cname)[:-1] |
|
152 |
self._assert_not_public_object(cname, oname) |
|
153 |
|
|
154 |
# set public |
|
155 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
156 |
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
|
157 |
self.assertEqual(r.status_code, 202) |
|
158 |
|
|
159 |
public = self._assert_public_object(cname, oname, odata) |
|
160 |
|
|
161 |
odata2 = self.append_object_data(cname, oname)[1] |
|
162 |
|
|
163 |
public2 = self._assert_public_object(cname, oname, odata + odata2) |
|
164 |
|
|
165 |
self.assertTrue(public == public2) |
|
166 |
|
|
167 |
def test_delete_public_object(self): |
|
168 |
cname = get_random_name() |
|
169 |
self.create_container(cname) |
|
170 |
oname, odata = self.upload_object(cname)[:-1] |
|
171 |
self._assert_not_public_object(cname, oname) |
|
172 |
|
|
173 |
# set public |
|
174 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
175 |
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
|
176 |
self.assertEqual(r.status_code, 202) |
|
177 |
public = self._assert_public_object(cname, oname, odata) |
|
178 |
|
|
179 |
# delete object |
|
180 |
r = self.delete(url) |
|
181 |
self.assertEqual(r.status_code, 204) |
|
182 |
r = self.get(url) |
|
183 |
self.assertEqual(r.status_code, 404) |
|
184 |
r = self.get(public) |
|
185 |
self.assertEqual(r.status_code, 404) |
|
186 |
|
|
187 |
def test_delete_public_object_history(self): |
|
188 |
cname = get_random_name() |
|
189 |
self.create_container(cname) |
|
190 |
oname, odata = self.upload_object(cname)[:-1] |
|
191 |
self._assert_not_public_object(cname, oname) |
|
192 |
|
|
193 |
# set public |
|
194 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
195 |
r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
|
196 |
self.assertEqual(r.status_code, 202) |
|
197 |
public = self._assert_public_object(cname, oname, odata) |
|
198 |
|
|
199 |
for _ in range(random.randint(1, 10)): |
|
200 |
odata += self.append_object_data(cname, oname)[1] |
|
201 |
_time.sleep(1) |
|
202 |
|
|
203 |
# get object versions |
|
204 |
url = join_urls(self.pithos_path, self.user, cname, oname) |
|
205 |
r = self.get('%s?version=list&format=json' % url) |
|
206 |
version_list = json.loads(r.content)['versions'] |
|
207 |
mtime = [int(t[1]) for t in version_list] |
|
208 |
|
|
209 |
# delete object history |
|
210 |
i = random.randrange(len(mtime)) |
|
211 |
self.delete('%s?until=%d' % (url, mtime[i])) |
|
212 |
public2 = self._assert_public_object(cname, oname, odata) |
|
213 |
self.assertEqual(public, public2) |
|
214 |
|
|
215 |
# delete object histoy until now |
|
216 |
_time.sleep(1) |
|
217 |
t = datetime.datetime.utcnow() |
|
218 |
now = int(_time.mktime(t.timetuple())) |
|
219 |
r = self.delete('%s?intil=%d' % (url, now)) |
|
220 |
self.assertEqual(r.status_code, 204) |
|
221 |
r = self.get(url) |
|
222 |
self.assertEqual(r.status_code, 404) |
|
223 |
r = self.get(public) |
|
224 |
self.assertEqual(r.status_code, 404) |
b/snf-pithos-app/pithos/api/test/util/__init__.py | ||
---|---|---|
94 | 94 |
return get_random_word(length)[:length] |
95 | 95 |
|
96 | 96 |
|
97 |
def get_random_name(length=8): |
|
98 |
return get_random_data(length) |
|
99 |
|
|
100 |
|
|
97 | 101 |
def md5_hash(data): |
98 | 102 |
md5 = hashlib.md5() |
99 | 103 |
md5.update(data) |
b/snf-pithos-app/pithos/api/tests.py | ||
---|---|---|
36 | 36 |
from pithos.api.test.containers import * |
37 | 37 |
from pithos.api.test.objects import * |
38 | 38 |
from pithos.api.test.permissions import * |
39 |
from pithos.api.test.public import * |
b/snf-pithos-backend/pithos/backends/migrate.py | ||
---|---|---|
50 | 50 |
from alembic import context, command |
51 | 51 |
|
52 | 52 |
from pithos.backends.lib import sqlalchemy as sqlalchemy_backend |
53 |
from pithos.backends.lib.sqlalchemy import node, groups, public, xfeatures |
|
53 |
from pithos.backends.lib.sqlalchemy import (node, groups, public, xfeatures, |
|
54 |
quotaholder_serials) |
|
54 | 55 |
|
55 | 56 |
os.environ['DJANGO_SETTINGS_MODULE'] = 'synnefo.settings' |
56 | 57 |
|
... | ... | |
72 | 73 |
'alembic.ini') |
73 | 74 |
|
74 | 75 |
|
75 |
def initialize_db(): |
|
76 |
def initialize_db(dbconnection):
|
|
76 | 77 |
alembic_cfg = Config(DEFAULT_ALEMBIC_INI_PATH) |
77 | 78 |
|
78 |
db = alembic_cfg.get_main_option( |
|
79 |
"sqlalchemy.url", PITHOS_BACKEND_DB_CONNECTION) |
|
79 |
db = alembic_cfg.get_main_option("sqlalchemy.url", dbconnection) |
|
80 | 80 |
alembic_cfg.set_main_option("sqlalchemy.url", db) |
81 | 81 |
|
82 | 82 |
engine = sa.engine_from_config( |
... | ... | |
87 | 87 |
groups.create_tables(engine) |
88 | 88 |
public.create_tables(engine) |
89 | 89 |
xfeatures.create_tables(engine) |
90 |
quotaholder_serials.create_tables(engine) |
|
90 | 91 |
|
91 | 92 |
# then, load the Alembic configuration and generate the |
92 | 93 |
# version table, "stamping" it with the most recent rev: |
... | ... | |
102 | 103 |
|
103 | 104 |
if len(argv) >= 1 and argv[0] == 'initdb': |
104 | 105 |
print "Initializing db." |
105 |
initialize_db() |
|
106 |
initialize_db(PITHOS_BACKEND_DB_CONNECTION)
|
|
106 | 107 |
print "DB initialized." |
107 | 108 |
exit(1) |
108 | 109 |
|
Also available in: Unified diff