root / snf-pithos-app / pithos / api / test / __init__.py @ 1a720e84
History | View | Annotate | Download (26.3 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
#coding=utf8
|
3 |
|
4 |
# Copyright 2011-2013 GRNET S.A. All rights reserved.
|
5 |
#
|
6 |
# Redistribution and use in source and binary forms, with or
|
7 |
# without modification, are permitted provided that the following
|
8 |
# conditions are met:
|
9 |
#
|
10 |
# 1. Redistributions of source code must retain the above
|
11 |
# copyright notice, this list of conditions and the following
|
12 |
# disclaimer.
|
13 |
#
|
14 |
# 2. Redistributions in binary form must reproduce the above
|
15 |
# copyright notice, this list of conditions and the following
|
16 |
# disclaimer in the documentation and/or other materials
|
17 |
# provided with the distribution.
|
18 |
#
|
19 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
20 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
21 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
22 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
23 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
24 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
25 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
26 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
27 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
28 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
29 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
30 |
# POSSIBILITY OF SUCH DAMAGE.
|
31 |
#
|
32 |
# The views and conclusions contained in the software and
|
33 |
# documentation are those of the authors and should not be
|
34 |
# interpreted as representing official policies, either expressed
|
35 |
# or implied, of GRNET S.A.
|
36 |
|
37 |
from urlparse import urlunsplit, urlsplit, urlparse |
38 |
from xml.dom import minidom |
39 |
from urllib import quote, unquote |
40 |
from mock import patch, PropertyMock |
41 |
|
42 |
from snf_django.utils.testing import with_settings, astakos_user |
43 |
|
44 |
from pithos.api import settings as pithos_settings |
45 |
from pithos.api.test.util import is_date, get_random_data, get_random_name |
46 |
from pithos.backends.migrate import initialize_db |
47 |
|
48 |
from synnefo.lib.services import get_service_path |
49 |
from synnefo.lib import join_urls |
50 |
from synnefo.util import text |
51 |
|
52 |
from django.test import TestCase |
53 |
from django.test.client import Client, MULTIPART_CONTENT, FakePayload |
54 |
from django.test.simple import DjangoTestSuiteRunner |
55 |
from django.conf import settings |
56 |
from django.utils.http import urlencode |
57 |
from django.db.backends.creation import TEST_DATABASE_PREFIX |
58 |
|
59 |
import django.utils.simplejson as json |
60 |
|
61 |
|
62 |
import sys |
63 |
import random |
64 |
import functools |
65 |
|
66 |
|
67 |
pithos_test_settings = functools.partial(with_settings, pithos_settings) |
68 |
|
69 |
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
|
70 |
"%A, %d-%b-%y %H:%M:%S GMT",
|
71 |
"%a, %d %b %Y %H:%M:%S GMT"]
|
72 |
|
73 |
o_names = ['kate.jpg',
|
74 |
'kate_beckinsale.jpg',
|
75 |
'How To Win Friends And Influence People.pdf',
|
76 |
'moms_birthday.jpg',
|
77 |
'poodle_strut.mov',
|
78 |
'Disturbed - Down With The Sickness.mp3',
|
79 |
'army_of_darkness.avi',
|
80 |
'the_mad.avi',
|
81 |
'photos/animals/dogs/poodle.jpg',
|
82 |
'photos/animals/dogs/terrier.jpg',
|
83 |
'photos/animals/cats/persian.jpg',
|
84 |
'photos/animals/cats/siamese.jpg',
|
85 |
'photos/plants/fern.jpg',
|
86 |
'photos/plants/rose.jpg',
|
87 |
'photos/me.jpg']
|
88 |
|
89 |
details = {'container': ('name', 'count', 'bytes', 'last_modified', |
90 |
'x_container_policy'),
|
91 |
'object': ('name', 'hash', 'bytes', 'content_type', |
92 |
'content_encoding', 'last_modified',)} |
93 |
|
94 |
TEST_BLOCK_SIZE = 1024
|
95 |
TEST_HASH_ALGORITHM = 'sha256'
|
96 |
|
97 |
print 'backend module:', pithos_settings.BACKEND_DB_MODULE |
98 |
print 'backend database engine:', settings.DATABASES['default']['ENGINE'] |
99 |
print 'update md5:', pithos_settings.UPDATE_MD5 |
100 |
|
101 |
|
102 |
django_sqlalchemy_engines = { |
103 |
'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2', |
104 |
'django.db.backends.postgresql': 'postgresql', |
105 |
'django.db.backends.mysql': '', |
106 |
'django.db.backends.sqlite3': 'mssql', |
107 |
'django.db.backends.oracle': 'oracle'} |
108 |
|
109 |
|
110 |
def prepare_db_connection(): |
111 |
"""Build pithos backend connection string from django default database"""
|
112 |
|
113 |
db = settings.DATABASES['default']
|
114 |
name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME']) |
115 |
|
116 |
if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'): |
117 |
if db['ENGINE'] == 'django.db.backends.sqlite3': |
118 |
db_connection = 'sqlite:///%s' % name
|
119 |
else:
|
120 |
d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']), |
121 |
user=db['USER'],
|
122 |
pwd=db['PASSWORD'],
|
123 |
host=db['HOST'].lower(),
|
124 |
port=int(db['PORT']) if db['PORT'] != '' else '', |
125 |
name=name) |
126 |
db_connection = ( |
127 |
'%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
|
128 |
|
129 |
# initialize pithos database
|
130 |
initialize_db(db_connection) |
131 |
else:
|
132 |
db_connection = name |
133 |
pithos_settings.BACKEND_DB_CONNECTION = db_connection |
134 |
|
135 |
|
136 |
def filter_headers(headers, prefix): |
137 |
meta = {} |
138 |
for k, v in headers.iteritems(): |
139 |
if not k.startswith(prefix): |
140 |
continue
|
141 |
meta[unquote(k[len(prefix):])] = unquote(v)
|
142 |
return meta
|
143 |
|
144 |
|
145 |
class PithosTestSuiteRunner(DjangoTestSuiteRunner): |
146 |
def setup_databases(self, **kwargs): |
147 |
old_names, mirrors = super(PithosTestSuiteRunner,
|
148 |
self).setup_databases(**kwargs)
|
149 |
prepare_db_connection() |
150 |
return old_names, mirrors
|
151 |
|
152 |
def teardown_databases(self, old_config, **kwargs): |
153 |
from pithos.api.util import _pithos_backend_pool |
154 |
_pithos_backend_pool.shutdown() |
155 |
try:
|
156 |
super(PithosTestSuiteRunner, self).teardown_databases(old_config, |
157 |
**kwargs) |
158 |
except Exception as e: |
159 |
sys.stderr.write("FAILED to teardown databases: %s\n" % str(e)) |
160 |
|
161 |
|
162 |
class PithosTestClient(Client): |
163 |
def _get_path(self, parsed): |
164 |
# If there are parameters, add them
|
165 |
if parsed[3]: |
166 |
return unquote(parsed[2] + ";" + parsed[3]) |
167 |
else:
|
168 |
return unquote(parsed[2]) |
169 |
|
170 |
def copy(self, path, data={}, content_type=MULTIPART_CONTENT, |
171 |
follow=False, **extra):
|
172 |
"""
|
173 |
Send a resource to the server using COPY.
|
174 |
"""
|
175 |
parsed = urlparse(path) |
176 |
r = { |
177 |
'CONTENT_TYPE': 'text/html; charset=utf-8', |
178 |
'PATH_INFO': self._get_path(parsed), |
179 |
'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], |
180 |
'REQUEST_METHOD': 'COPY', |
181 |
'wsgi.input': FakePayload('') |
182 |
} |
183 |
r.update(extra) |
184 |
|
185 |
response = self.request(**r)
|
186 |
if follow:
|
187 |
response = self._handle_redirects(response, **extra)
|
188 |
return response
|
189 |
|
190 |
def move(self, path, data={}, content_type=MULTIPART_CONTENT, |
191 |
follow=False, **extra):
|
192 |
"""
|
193 |
Send a resource to the server using MOVE.
|
194 |
"""
|
195 |
parsed = urlparse(path) |
196 |
r = { |
197 |
'CONTENT_TYPE': 'text/html; charset=utf-8', |
198 |
'PATH_INFO': self._get_path(parsed), |
199 |
'QUERY_STRING': urlencode(data, doseq=True) or parsed[4], |
200 |
'REQUEST_METHOD': 'MOVE', |
201 |
'wsgi.input': FakePayload('') |
202 |
} |
203 |
r.update(extra) |
204 |
|
205 |
response = self.request(**r)
|
206 |
if follow:
|
207 |
response = self._handle_redirects(response, **extra)
|
208 |
return response
|
209 |
|
210 |
|
211 |
class PithosAPITest(TestCase): |
212 |
def create_patch(self, name, new_callable=None): |
213 |
patcher = patch(name, new_callable=new_callable) |
214 |
thing = patcher.start() |
215 |
self.addCleanup(patcher.stop)
|
216 |
return thing
|
217 |
|
218 |
def setUp(self): |
219 |
self.client = PithosTestClient()
|
220 |
|
221 |
# Override default block size to spead up tests
|
222 |
pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE |
223 |
pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM |
224 |
|
225 |
self.user = 'user' |
226 |
self.pithos_path = join_urls(get_service_path(
|
227 |
pithos_settings.pithos_services, 'object-store'))
|
228 |
|
229 |
# patch astakosclient.AstakosClient.validate_token
|
230 |
mock_validate_token = self.create_patch(
|
231 |
'astakosclient.AstakosClient.validate_token')
|
232 |
mock_validate_token.return_value = { |
233 |
'access': {'user': {'id': text.udec(self.user, 'utf8')}}} |
234 |
|
235 |
# patch astakosclient.AstakosClient.get_token
|
236 |
mock_get_token = self.create_patch(
|
237 |
'astakosclient.AstakosClient.get_token')
|
238 |
mock_get_token.return_value = {'access_token': 'valid_token'} |
239 |
|
240 |
# patch astakosclient.AstakosClient.api_oa2_auth
|
241 |
mock_api_oauth2_auth = self.create_patch(
|
242 |
'astakosclient.AstakosClient.oauth2_url',
|
243 |
new_callable=PropertyMock) |
244 |
mock_api_oauth2_auth.return_value = '/astakos/oauth2/'
|
245 |
|
246 |
mock_service_get_quotas = self.create_patch(
|
247 |
'astakosclient.AstakosClient.service_get_quotas')
|
248 |
mock_service_get_quotas.return_value = { |
249 |
self.user: {
|
250 |
"system": {
|
251 |
"pithos.diskspace": {
|
252 |
"usage": 0, |
253 |
"limit": 1073741824, # 1GB |
254 |
"pending": 0}}}} |
255 |
|
256 |
def tearDown(self): |
257 |
#delete additionally created metadata
|
258 |
meta = self.get_account_meta()
|
259 |
self.delete_account_meta(meta)
|
260 |
|
261 |
#delete additionally created groups
|
262 |
groups = self.get_account_groups()
|
263 |
self.delete_account_groups(groups)
|
264 |
|
265 |
self._clean_account()
|
266 |
|
267 |
def _clean_account(self): |
268 |
for c in self.list_containers(): |
269 |
self.delete_container_content(c['name']) |
270 |
self.delete_container(c['name']) |
271 |
|
272 |
def head(self, url, user='user', token='DummyToken', data={}, follow=False, |
273 |
**extra): |
274 |
with astakos_user(user):
|
275 |
extra = dict((quote(k), quote(v)) for k, v in extra.items()) |
276 |
if token:
|
277 |
extra['HTTP_X_AUTH_TOKEN'] = token
|
278 |
response = self.client.head(url, data, follow, **extra)
|
279 |
return response
|
280 |
|
281 |
def get(self, url, user='user', token='DummyToken', data={}, follow=False, |
282 |
**extra): |
283 |
with astakos_user(user):
|
284 |
extra = dict((quote(k), quote(v)) for k, v in extra.items()) |
285 |
if token:
|
286 |
extra['HTTP_X_AUTH_TOKEN'] = token
|
287 |
response = self.client.get(url, data, follow, **extra)
|
288 |
return response
|
289 |
|
290 |
def delete(self, url, user='user', token='DummyToken', data={}, |
291 |
follow=False, **extra):
|
292 |
with astakos_user(user):
|
293 |
extra = dict((quote(k), quote(v)) for k, v in extra.items()) |
294 |
if token:
|
295 |
extra['HTTP_X_AUTH_TOKEN'] = token
|
296 |
response = self.client.delete(url, data, follow, **extra)
|
297 |
return response
|
298 |
|
299 |
def post(self, url, user='user', token='DummyToken', data={}, |
300 |
content_type='application/octet-stream', follow=False, **extra): |
301 |
with astakos_user(user):
|
302 |
extra = dict((quote(k), quote(v)) for k, v in extra.items()) |
303 |
if token:
|
304 |
extra['HTTP_X_AUTH_TOKEN'] = token
|
305 |
response = self.client.post(url, data, content_type, follow,
|
306 |
**extra) |
307 |
return response
|
308 |
|
309 |
def put(self, url, user='user', token='DummyToken', data={}, |
310 |
content_type='application/octet-stream', follow=False, **extra): |
311 |
with astakos_user(user):
|
312 |
extra = dict((quote(k), quote(v)) for k, v in extra.items()) |
313 |
if token:
|
314 |
extra['HTTP_X_AUTH_TOKEN'] = token
|
315 |
response = self.client.put(url, data, content_type, follow,
|
316 |
**extra) |
317 |
return response
|
318 |
|
319 |
def copy(self, url, user='user', token='DummyToken', data={}, |
320 |
content_type='application/octet-stream', follow=False, **extra): |
321 |
with astakos_user(user):
|
322 |
extra = dict((quote(k), quote(v)) for k, v in extra.items()) |
323 |
if token:
|
324 |
extra['HTTP_X_AUTH_TOKEN'] = token
|
325 |
response = self.client.copy(url, data, content_type, follow,
|
326 |
**extra) |
327 |
return response
|
328 |
|
329 |
def move(self, url, user='user', token='DummyToken', data={}, |
330 |
content_type='application/octet-stream', follow=False, **extra): |
331 |
with astakos_user(user):
|
332 |
extra = dict((quote(k), quote(v)) for k, v in extra.items()) |
333 |
if token:
|
334 |
extra['HTTP_X_AUTH_TOKEN'] = token
|
335 |
response = self.client.move(url, data, content_type, follow,
|
336 |
**extra) |
337 |
return response
|
338 |
|
339 |
def update_account_meta(self, meta, user=None, verify_status=True): |
340 |
user = user or self.user |
341 |
kwargs = dict(
|
342 |
('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items()) |
343 |
url = join_urls(self.pithos_path, user)
|
344 |
r = self.post('%s?update=' % url, user=user, **kwargs) |
345 |
if verify_status:
|
346 |
self.assertEqual(r.status_code, 202) |
347 |
account_meta = self.get_account_meta(user=user)
|
348 |
(self.assertTrue('X-Account-Meta-%s' % k in account_meta) for |
349 |
k in meta.keys())
|
350 |
(self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for |
351 |
k, v in meta.items())
|
352 |
|
353 |
def reset_account_meta(self, meta, user=None, verify_status=True): |
354 |
user = user or self.user |
355 |
kwargs = dict(
|
356 |
('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items()) |
357 |
url = join_urls(self.pithos_path, user)
|
358 |
r = self.post(url, user=user, **kwargs)
|
359 |
if verify_status:
|
360 |
self.assertEqual(r.status_code, 202) |
361 |
account_meta = self.get_account_meta(user=user)
|
362 |
(self.assertTrue('X-Account-Meta-%s' % k in account_meta) for |
363 |
k in meta.keys())
|
364 |
(self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for |
365 |
k, v in meta.items())
|
366 |
|
367 |
def delete_account_meta(self, meta, user=None, verify_status=True): |
368 |
user = user or self.user |
369 |
transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper() |
370 |
kwargs = dict((transform(k), '') for k, v in meta.items()) |
371 |
url = join_urls(self.pithos_path, user)
|
372 |
r = self.post('%s?update=' % url, user=user, **kwargs) |
373 |
if verify_status:
|
374 |
self.assertEqual(r.status_code, 202) |
375 |
account_meta = self.get_account_meta(user=user)
|
376 |
(self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for |
377 |
k in meta.keys())
|
378 |
return r
|
379 |
|
380 |
def delete_account_groups(self, groups, user=None, verify_status=True): |
381 |
user = user or self.user |
382 |
url = join_urls(self.pithos_path, user)
|
383 |
r = self.post('%s?update=' % url, user=user, **groups) |
384 |
if verify_status:
|
385 |
self.assertEqual(r.status_code, 202) |
386 |
account_groups = self.get_account_groups()
|
387 |
(self.assertTrue(k not in account_groups) for k in groups.keys()) |
388 |
return r
|
389 |
|
390 |
def get_account_info(self, until=None, user=None, verify_status=True): |
391 |
user = user or self.user |
392 |
url = join_urls(self.pithos_path, user)
|
393 |
if until is not None: |
394 |
parts = list(urlsplit(url))
|
395 |
parts[3] = urlencode({
|
396 |
'until': until
|
397 |
}) |
398 |
url = urlunsplit(parts) |
399 |
r = self.head(url, user=user)
|
400 |
if verify_status:
|
401 |
self.assertEqual(r.status_code, 204) |
402 |
return r
|
403 |
|
404 |
def get_account_meta(self, until=None, user=None): |
405 |
prefix = 'X-Account-Meta-'
|
406 |
r = self.get_account_info(until=until, user=user)
|
407 |
headers = dict(r._headers.values())
|
408 |
return filter_headers(headers, prefix)
|
409 |
|
410 |
def get_account_groups(self, until=None, user=None): |
411 |
prefix = 'X-Account-Group-'
|
412 |
r = self.get_account_info(until=until, user=user)
|
413 |
headers = dict(r._headers.values())
|
414 |
return filter_headers(headers, prefix)
|
415 |
|
416 |
def get_container_info(self, container, until=None, user=None, |
417 |
verify_status=True):
|
418 |
user = user or self.user |
419 |
url = join_urls(self.pithos_path, user, container)
|
420 |
if until is not None: |
421 |
parts = list(urlsplit(url))
|
422 |
parts[3] = urlencode({
|
423 |
'until': until
|
424 |
}) |
425 |
url = urlunsplit(parts) |
426 |
r = self.head(url, user=user)
|
427 |
if verify_status:
|
428 |
self.assertEqual(r.status_code, 204) |
429 |
return r
|
430 |
|
431 |
def get_container_meta(self, container, until=None, user=None): |
432 |
prefix = 'X-Container-Meta-'
|
433 |
r = self.get_container_info(container, until=until, user=user)
|
434 |
headers = dict(r._headers.values())
|
435 |
return filter_headers(headers, prefix)
|
436 |
|
437 |
def update_container_meta(self, container, meta=None, user=None, |
438 |
verify_status=True):
|
439 |
user = user or self.user |
440 |
meta = meta or {get_random_name(): get_random_name()}
|
441 |
kwargs = dict(
|
442 |
('HTTP_X_CONTAINER_META_%s' % k, str(v)) for k, v in meta.items()) |
443 |
url = join_urls(self.pithos_path, user, container)
|
444 |
r = self.post('%s?update=' % url, user=user, **kwargs) |
445 |
if verify_status:
|
446 |
self.assertEqual(r.status_code, 202) |
447 |
container_meta = self.get_container_meta(container, user=user)
|
448 |
(self.assertTrue('X-Container-Meta-%s' % k in container_meta) for |
449 |
k in meta.keys())
|
450 |
(self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for |
451 |
k, v in meta.items())
|
452 |
return r
|
453 |
|
454 |
def list_containers(self, format='json', headers={}, user=None, **params): |
455 |
user = user or self.user |
456 |
_url = join_urls(self.pithos_path, user)
|
457 |
parts = list(urlsplit(_url))
|
458 |
params['format'] = format
|
459 |
parts[3] = urlencode(params)
|
460 |
url = urlunsplit(parts) |
461 |
_headers = dict(('HTTP_%s' % k.upper(), str(v)) |
462 |
for k, v in headers.items()) |
463 |
r = self.get(url, user=user, **_headers)
|
464 |
|
465 |
if format is None: |
466 |
containers = r.content.split('\n')
|
467 |
if '' in containers: |
468 |
containers.remove('')
|
469 |
return containers
|
470 |
elif format == 'json': |
471 |
try:
|
472 |
containers = json.loads(r.content) |
473 |
except:
|
474 |
self.fail('json format expected') |
475 |
return containers
|
476 |
elif format == 'xml': |
477 |
return minidom.parseString(r.content)
|
478 |
|
479 |
def delete_container_content(self, cname, user=None, verify_status=True): |
480 |
user = user or self.user |
481 |
url = join_urls(self.pithos_path, user, cname)
|
482 |
r = self.delete('%s?delimiter=/' % url, user=user) |
483 |
if verify_status:
|
484 |
self.assertEqual(r.status_code, 204) |
485 |
return r
|
486 |
|
487 |
def delete_container(self, cname, user=None, verify_status=True): |
488 |
user = user or self.user |
489 |
url = join_urls(self.pithos_path, user, cname)
|
490 |
r = self.delete(url, user=user)
|
491 |
if verify_status:
|
492 |
self.assertEqual(r.status_code, 204) |
493 |
return r
|
494 |
|
495 |
def delete_object(self, cname, oname, user=None, verify_status=True): |
496 |
user = user or self.user |
497 |
url = join_urls(self.pithos_path, user, cname, oname)
|
498 |
r = self.delete(url, user=user)
|
499 |
if verify_status:
|
500 |
self.assertEqual(r.status_code, 204) |
501 |
return r
|
502 |
|
503 |
def create_container(self, cname=None, user=None, verify_status=True): |
504 |
cname = cname or get_random_name()
|
505 |
user = user or self.user |
506 |
url = join_urls(self.pithos_path, user, cname)
|
507 |
r = self.put(url, user=user, data='') |
508 |
if verify_status:
|
509 |
self.assertTrue(r.status_code in (202, 201)) |
510 |
return cname, r
|
511 |
|
512 |
def upload_object(self, cname, oname=None, length=None, verify_status=True, |
513 |
user=None, **meta):
|
514 |
oname = oname or get_random_name()
|
515 |
length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE) |
516 |
user = user or self.user |
517 |
data = get_random_data(length=length) |
518 |
headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v) |
519 |
for k, v in meta.iteritems()) |
520 |
url = join_urls(self.pithos_path, user, cname, oname)
|
521 |
r = self.put(url, user=user, data=data, **headers)
|
522 |
if verify_status:
|
523 |
self.assertEqual(r.status_code, 201) |
524 |
return oname, data, r
|
525 |
|
526 |
def update_object_data(self, cname, oname=None, length=None, |
527 |
content_type=None, content_range=None, |
528 |
verify_status=True, user=None, **meta): |
529 |
oname = oname or get_random_name()
|
530 |
length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE) |
531 |
content_type = content_type or 'application/octet-stream' |
532 |
user = user or self.user |
533 |
data = get_random_data(length=length) |
534 |
headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v) |
535 |
for k, v in meta.iteritems()) |
536 |
if content_range:
|
537 |
headers['HTTP_CONTENT_RANGE'] = content_range
|
538 |
url = join_urls(self.pithos_path, user, cname, oname)
|
539 |
r = self.post(url, user=user, data=data, content_type=content_type,
|
540 |
**headers) |
541 |
if verify_status:
|
542 |
self.assertEqual(r.status_code, 204) |
543 |
return oname, data, r
|
544 |
|
545 |
def append_object_data(self, cname, oname=None, length=None, |
546 |
content_type=None, user=None): |
547 |
return self.update_object_data(cname, oname=oname, |
548 |
length=length, |
549 |
content_type=content_type, |
550 |
content_range='bytes */*',
|
551 |
user=user) |
552 |
|
553 |
def create_folder(self, cname, oname=None, user=None, verify_status=True, |
554 |
**headers): |
555 |
user = user or self.user |
556 |
oname = oname or get_random_name()
|
557 |
url = join_urls(self.pithos_path, user, cname, oname)
|
558 |
r = self.put(url, user=user, data='', |
559 |
content_type='application/directory', **headers)
|
560 |
if verify_status:
|
561 |
self.assertEqual(r.status_code, 201) |
562 |
return oname, r
|
563 |
|
564 |
def list_objects(self, cname, prefix=None, user=None, verify_status=True): |
565 |
user = user or self.user |
566 |
url = join_urls(self.pithos_path, user, cname)
|
567 |
path = '%s?format=json' % url
|
568 |
if prefix is not None: |
569 |
path = '%s&prefix=%s' % (path, prefix)
|
570 |
r = self.get(path, user=user)
|
571 |
if verify_status:
|
572 |
self.assertTrue(r.status_code in (200, 204)) |
573 |
try:
|
574 |
objects = json.loads(r.content) |
575 |
except:
|
576 |
self.fail('json format expected') |
577 |
return objects
|
578 |
|
579 |
def get_object_info(self, container, object, version=None, until=None, |
580 |
user=None, verify_status=True): |
581 |
user = user or self.user |
582 |
url = join_urls(self.pithos_path, user, container, object) |
583 |
if until is not None: |
584 |
parts = list(urlsplit(url))
|
585 |
parts[3] = urlencode({
|
586 |
'until': until
|
587 |
}) |
588 |
url = urlunsplit(parts) |
589 |
if version:
|
590 |
url = '%s?version=%s' % (url, version)
|
591 |
r = self.head(url, user=user)
|
592 |
if verify_status:
|
593 |
self.assertEqual(r.status_code, 200) |
594 |
return r
|
595 |
|
596 |
def get_object_meta(self, container, object, version=None, until=None, |
597 |
user=None):
|
598 |
prefix = 'X-Object-Meta-'
|
599 |
user = user or self.user |
600 |
r = self.get_object_info(container, object, version, until=until, |
601 |
user=user) |
602 |
headers = dict(r._headers.values())
|
603 |
return filter_headers(headers, prefix)
|
604 |
|
605 |
def update_object_meta(self, container, object, meta=None, user=None, |
606 |
verify_status=True):
|
607 |
user = user or self.user |
608 |
meta = meta or {get_random_name(): get_random_name()}
|
609 |
kwargs = dict(
|
610 |
('HTTP_X_OBJECT_META_%s' % k, str(v)) for k, v in meta.items()) |
611 |
url = join_urls(self.pithos_path, user, container, object) |
612 |
r = self.post('%s?update=' % url, user=user, content_type='', **kwargs) |
613 |
if verify_status:
|
614 |
self.assertEqual(r.status_code, 202) |
615 |
object_meta = self.get_object_meta(container, object, user=user) |
616 |
(self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for |
617 |
k in meta.keys())
|
618 |
(self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for |
619 |
k, v in meta.items())
|
620 |
return r
|
621 |
|
622 |
def assert_extended(self, data, format, type, size=10000): |
623 |
if format == 'xml': |
624 |
self._assert_xml(data, type, size) |
625 |
elif format == 'json': |
626 |
self._assert_json(data, type, size) |
627 |
|
628 |
def _assert_json(self, data, type, size): |
629 |
convert = lambda s: s.lower()
|
630 |
info = [convert(elem) for elem in details[type]] |
631 |
self.assertTrue(len(data) <= size) |
632 |
for item in info: |
633 |
for i in data: |
634 |
if 'subdir' in i.keys(): |
635 |
continue
|
636 |
self.assertTrue(item in i.keys()) |
637 |
|
638 |
def _assert_xml(self, data, type, size): |
639 |
convert = lambda s: s.lower()
|
640 |
info = [convert(elem) for elem in details[type]] |
641 |
try:
|
642 |
info.remove('content_encoding')
|
643 |
except ValueError: |
644 |
pass
|
645 |
xml = data |
646 |
entities = xml.getElementsByTagName(type)
|
647 |
self.assertTrue(len(entities) <= size) |
648 |
for e in entities: |
649 |
for item in info: |
650 |
self.assertTrue(e.getElementsByTagName(item))
|
651 |
|
652 |
|
653 |
class AssertMappingInvariant(object): |
654 |
def __init__(self, callable, *args, **kwargs): |
655 |
self.callable = callable |
656 |
self.args = args
|
657 |
self.kwargs = kwargs
|
658 |
|
659 |
def __enter__(self): |
660 |
self.map = self.callable(*self.args, **self.kwargs) |
661 |
return self.map |
662 |
|
663 |
def __exit__(self, type, value, tb): |
664 |
map = self.callable(*self.args, **self.kwargs) |
665 |
for k, v in self.map.items(): |
666 |
if is_date(v):
|
667 |
continue
|
668 |
|
669 |
assert(k in map), '%s not in map' % k |
670 |
assert v == map[k] |
671 |
|
672 |
|
673 |
class AssertUUidInvariant(object): |
674 |
def __init__(self, callable, *args, **kwargs): |
675 |
self.callable = callable |
676 |
self.args = args
|
677 |
self.kwargs = kwargs
|
678 |
|
679 |
def __enter__(self): |
680 |
self.map = self.callable(*self.args, **self.kwargs) |
681 |
assert('x-object-uuid' in self.map) |
682 |
self.uuid = self.map['x-object-uuid'] |
683 |
return self.map |
684 |
|
685 |
def __exit__(self, type, value, tb): |
686 |
map = self.callable(*self.args, **self.kwargs) |
687 |
assert('x-object-uuid' in self.map) |
688 |
uuid = map['x-object-uuid'] |
689 |
assert(uuid == self.uuid) |