Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / __init__.py @ 3b8f938b

History | View | Annotate | Download (26.2 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from urlparse import urlunsplit, urlsplit, urlparse
38
from xml.dom import minidom
39
from urllib import quote, unquote
40
from mock import patch, PropertyMock
41

    
42
from snf_django.utils.testing import with_settings, astakos_user
43

    
44
from pithos.api import settings as pithos_settings
45
from pithos.api.test.util import is_date, get_random_data, get_random_name
46
from pithos.backends.migrate import initialize_db
47

    
48
from synnefo.lib.services import get_service_path
49
from synnefo.lib import join_urls
50
from synnefo.util import text
51

    
52
from django.test import TestCase
53
from django.test.client import Client, MULTIPART_CONTENT, FakePayload
54
from django.test.simple import DjangoTestSuiteRunner
55
from django.conf import settings
56
from django.utils.http import urlencode
57
from django.db.backends.creation import TEST_DATABASE_PREFIX
58

    
59
import django.utils.simplejson as json
60

    
61

    
62
import random
63
import functools
64

    
65

    
66
pithos_test_settings = functools.partial(with_settings, pithos_settings)
67

    
68
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
69
                "%A, %d-%b-%y %H:%M:%S GMT",
70
                "%a, %d %b %Y %H:%M:%S GMT"]
71

    
72
o_names = ['kate.jpg',
73
           'kate_beckinsale.jpg',
74
           'How To Win Friends And Influence People.pdf',
75
           'moms_birthday.jpg',
76
           'poodle_strut.mov',
77
           'Disturbed - Down With The Sickness.mp3',
78
           'army_of_darkness.avi',
79
           'the_mad.avi',
80
           'photos/animals/dogs/poodle.jpg',
81
           'photos/animals/dogs/terrier.jpg',
82
           'photos/animals/cats/persian.jpg',
83
           'photos/animals/cats/siamese.jpg',
84
           'photos/plants/fern.jpg',
85
           'photos/plants/rose.jpg',
86
           'photos/me.jpg']
87

    
88
details = {'container': ('name', 'count', 'bytes', 'last_modified',
89
                         'x_container_policy'),
90
           'object': ('name', 'hash', 'bytes', 'content_type',
91
                      'content_encoding', 'last_modified',)}
92

    
93
TEST_BLOCK_SIZE = 1024
94
TEST_HASH_ALGORITHM = 'sha256'
95

    
96
print 'backend module:', pithos_settings.BACKEND_DB_MODULE
97
print 'backend database engine:', settings.DATABASES['default']['ENGINE']
98
print 'update md5:', pithos_settings.UPDATE_MD5
99

    
100

    
101
django_sqlalchemy_engines = {
102
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
103
    'django.db.backends.postgresql': 'postgresql',
104
    'django.db.backends.mysql': '',
105
    'django.db.backends.sqlite3': 'mssql',
106
    'django.db.backends.oracle': 'oracle'}
107

    
108

    
109
def prepare_db_connection():
110
    """Build pithos backend connection string from django default database"""
111

    
112
    db = settings.DATABASES['default']
113
    name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME'])
114

    
115
    if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'):
116
        if db['ENGINE'] == 'django.db.backends.sqlite3':
117
            db_connection = 'sqlite:///%s' % name
118
        else:
119
            d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
120
                     user=db['USER'],
121
                     pwd=db['PASSWORD'],
122
                     host=db['HOST'].lower(),
123
                     port=int(db['PORT']) if db['PORT'] != '' else '',
124
                     name=name)
125
            db_connection = (
126
                '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
127

    
128
            # initialize pithos database
129
            initialize_db(db_connection)
130
    else:
131
        db_connection = name
132
    pithos_settings.BACKEND_DB_CONNECTION = db_connection
133

    
134

    
135
def filter_headers(headers, prefix):
136
    meta = {}
137
    for k, v in headers.iteritems():
138
        if not k.startswith(prefix):
139
            continue
140
        meta[unquote(k[len(prefix):])] = unquote(v)
141
    return meta
142

    
143

    
144
class PithosTestSuiteRunner(DjangoTestSuiteRunner):
145
    def setup_databases(self, **kwargs):
146
        old_names, mirrors = super(PithosTestSuiteRunner,
147
                                   self).setup_databases(**kwargs)
148
        prepare_db_connection()
149
        return old_names, mirrors
150

    
151
    def teardown_databases(self, old_config, **kwargs):
152
        from pithos.api.util import _pithos_backend_pool
153
        _pithos_backend_pool.shutdown()
154
        super(PithosTestSuiteRunner, self).teardown_databases(old_config,
155
                                                              **kwargs)
156

    
157

    
158
class PithosTestClient(Client):
159
    def _get_path(self, parsed):
160
        # If there are parameters, add them
161
        if parsed[3]:
162
            return unquote(parsed[2] + ";" + parsed[3])
163
        else:
164
            return unquote(parsed[2])
165

    
166
    def copy(self, path, data={}, content_type=MULTIPART_CONTENT,
167
             follow=False, **extra):
168
        """
169
        Send a resource to the server using COPY.
170
        """
171
        parsed = urlparse(path)
172
        r = {
173
            'CONTENT_TYPE':    'text/html; charset=utf-8',
174
            'PATH_INFO':       self._get_path(parsed),
175
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
176
            'REQUEST_METHOD': 'COPY',
177
            'wsgi.input':      FakePayload('')
178
        }
179
        r.update(extra)
180

    
181
        response = self.request(**r)
182
        if follow:
183
            response = self._handle_redirects(response, **extra)
184
        return response
185

    
186
    def move(self, path, data={}, content_type=MULTIPART_CONTENT,
187
             follow=False, **extra):
188
        """
189
        Send a resource to the server using MOVE.
190
        """
191
        parsed = urlparse(path)
192
        r = {
193
            'CONTENT_TYPE':    'text/html; charset=utf-8',
194
            'PATH_INFO':       self._get_path(parsed),
195
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
196
            'REQUEST_METHOD': 'MOVE',
197
            'wsgi.input':      FakePayload('')
198
        }
199
        r.update(extra)
200

    
201
        response = self.request(**r)
202
        if follow:
203
            response = self._handle_redirects(response, **extra)
204
        return response
205

    
206

    
207
class PithosAPITest(TestCase):
208
    def create_patch(self, name, new_callable=None):
209
        patcher = patch(name, new_callable=new_callable)
210
        thing = patcher.start()
211
        self.addCleanup(patcher.stop)
212
        return thing
213

    
214
    def setUp(self):
215
        self.client = PithosTestClient()
216

    
217
        # Override default block size to spead up tests
218
        pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE
219
        pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM
220

    
221
        self.user = 'user'
222
        self.pithos_path = join_urls(get_service_path(
223
            pithos_settings.pithos_services, 'object-store'))
224

    
225
        # patch astakosclient.AstakosClient.validate_token
226
        mock_validate_token = self.create_patch(
227
            'astakosclient.AstakosClient.validate_token')
228
        mock_validate_token.return_value = {
229
            'access': {'user': {'id': text.udec(self.user, 'utf8')}}}
230

    
231
        # patch astakosclient.AstakosClient.get_token
232
        mock_get_token = self.create_patch(
233
            'astakosclient.AstakosClient.get_token')
234
        mock_get_token.return_value = {'access_token': 'valid_token'}
235

    
236
        # patch astakosclient.AstakosClient.api_oa2_auth
237
        mock_api_oauth2_auth = self.create_patch(
238
            'astakosclient.AstakosClient.oauth2_url',
239
            new_callable=PropertyMock)
240
        mock_api_oauth2_auth.return_value = '/astakos/oauth2/'
241

    
242
        mock_service_get_quotas = self.create_patch(
243
            'astakosclient.AstakosClient.service_get_quotas')
244
        mock_service_get_quotas.return_value = {
245
            self.user: {
246
                "system": {
247
                    "pithos.diskspace": {
248
                        "usage": 0,
249
                        "limit": 1073741824,  # 1GB
250
                        "pending": 0}}}}
251

    
252
    def tearDown(self):
253
        #delete additionally created metadata
254
        meta = self.get_account_meta()
255
        self.delete_account_meta(meta)
256

    
257
        #delete additionally created groups
258
        groups = self.get_account_groups()
259
        self.delete_account_groups(groups)
260

    
261
        self._clean_account()
262

    
263
    def _clean_account(self):
264
        for c in self.list_containers():
265
            self.delete_container_content(c['name'])
266
            self.delete_container(c['name'])
267

    
268
    def head(self, url, user='user', token='DummyToken', data={}, follow=False,
269
             **extra):
270
        with astakos_user(user):
271
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
272
            if token:
273
                extra['HTTP_X_AUTH_TOKEN'] = token
274
            response = self.client.head(url, data, follow, **extra)
275
        return response
276

    
277
    def get(self, url, user='user', token='DummyToken', data={}, follow=False,
278
            **extra):
279
        with astakos_user(user):
280
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
281
            if token:
282
                extra['HTTP_X_AUTH_TOKEN'] = token
283
            response = self.client.get(url, data, follow, **extra)
284
        return response
285

    
286
    def delete(self, url, user='user', token='DummyToken', data={},
287
               follow=False, **extra):
288
        with astakos_user(user):
289
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
290
            if token:
291
                extra['HTTP_X_AUTH_TOKEN'] = token
292
            response = self.client.delete(url, data, follow, **extra)
293
        return response
294

    
295
    def post(self, url, user='user', token='DummyToken', data={},
296
             content_type='application/octet-stream', follow=False, **extra):
297
        with astakos_user(user):
298
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
299
            if token:
300
                extra['HTTP_X_AUTH_TOKEN'] = token
301
            response = self.client.post(url, data, content_type, follow,
302
                                        **extra)
303
        return response
304

    
305
    def put(self, url, user='user', token='DummyToken', data={},
306
            content_type='application/octet-stream', follow=False, **extra):
307
        with astakos_user(user):
308
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
309
            if token:
310
                extra['HTTP_X_AUTH_TOKEN'] = token
311
            response = self.client.put(url, data, content_type, follow,
312
                                       **extra)
313
        return response
314

    
315
    def copy(self, url, user='user', token='DummyToken', data={},
316
             content_type='application/octet-stream', follow=False, **extra):
317
        with astakos_user(user):
318
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
319
            if token:
320
                extra['HTTP_X_AUTH_TOKEN'] = token
321
            response = self.client.copy(url, data, content_type, follow,
322
                                        **extra)
323
        return response
324

    
325
    def move(self, url, user='user', token='DummyToken', data={},
326
             content_type='application/octet-stream', follow=False, **extra):
327
        with astakos_user(user):
328
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
329
            if token:
330
                extra['HTTP_X_AUTH_TOKEN'] = token
331
            response = self.client.move(url, data, content_type, follow,
332
                                        **extra)
333
        return response
334

    
335
    def update_account_meta(self, meta, user=None, verify_status=True):
336
        user = user or self.user
337
        kwargs = dict(
338
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
339
        url = join_urls(self.pithos_path, user)
340
        r = self.post('%s?update=' % url, user=user, **kwargs)
341
        if verify_status:
342
            self.assertEqual(r.status_code, 202)
343
        account_meta = self.get_account_meta(user=user)
344
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
345
            k in meta.keys())
346
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
347
            k, v in meta.items())
348

    
349
    def reset_account_meta(self, meta, user=None, verify_status=True):
350
        user = user or self.user
351
        kwargs = dict(
352
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
353
        url = join_urls(self.pithos_path, user)
354
        r = self.post(url, user=user, **kwargs)
355
        if verify_status:
356
            self.assertEqual(r.status_code, 202)
357
        account_meta = self.get_account_meta(user=user)
358
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
359
            k in meta.keys())
360
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
361
            k, v in meta.items())
362

    
363
    def delete_account_meta(self, meta, user=None, verify_status=True):
364
        user = user or self.user
365
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
366
        kwargs = dict((transform(k), '') for k, v in meta.items())
367
        url = join_urls(self.pithos_path, user)
368
        r = self.post('%s?update=' % url, user=user, **kwargs)
369
        if verify_status:
370
            self.assertEqual(r.status_code, 202)
371
        account_meta = self.get_account_meta(user=user)
372
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
373
            k in meta.keys())
374
        return r
375

    
376
    def delete_account_groups(self, groups, user=None, verify_status=True):
377
        user = user or self.user
378
        url = join_urls(self.pithos_path, user)
379
        r = self.post('%s?update=' % url, user=user, **groups)
380
        if verify_status:
381
            self.assertEqual(r.status_code, 202)
382
        account_groups = self.get_account_groups()
383
        (self.assertTrue(k not in account_groups) for k in groups.keys())
384
        return r
385

    
386
    def get_account_info(self, until=None, user=None, verify_status=True):
387
        user = user or self.user
388
        url = join_urls(self.pithos_path, user)
389
        if until is not None:
390
            parts = list(urlsplit(url))
391
            parts[3] = urlencode({
392
                'until': until
393
            })
394
            url = urlunsplit(parts)
395
        r = self.head(url, user=user)
396
        if verify_status:
397
            self.assertEqual(r.status_code, 204)
398
        return r
399

    
400
    def get_account_meta(self, until=None, user=None):
401
        prefix = 'X-Account-Meta-'
402
        r = self.get_account_info(until=until, user=user)
403
        headers = dict(r._headers.values())
404
        return filter_headers(headers, prefix)
405

    
406
    def get_account_groups(self, until=None, user=None):
407
        prefix = 'X-Account-Group-'
408
        r = self.get_account_info(until=until, user=user)
409
        headers = dict(r._headers.values())
410
        return filter_headers(headers, prefix)
411

    
412
    def get_container_info(self, container, until=None, user=None,
413
                           verify_status=True):
414
        user = user or self.user
415
        url = join_urls(self.pithos_path, user, container)
416
        if until is not None:
417
            parts = list(urlsplit(url))
418
            parts[3] = urlencode({
419
                'until': until
420
            })
421
            url = urlunsplit(parts)
422
        r = self.head(url, user=user)
423
        if verify_status:
424
            self.assertEqual(r.status_code, 204)
425
        return r
426

    
427
    def get_container_meta(self, container, until=None, user=None):
428
        prefix = 'X-Container-Meta-'
429
        r = self.get_container_info(container, until=until, user=user)
430
        headers = dict(r._headers.values())
431
        return filter_headers(headers, prefix)
432

    
433
    def update_container_meta(self, container, meta=None, user=None,
434
                              verify_status=True):
435
        user = user or self.user
436
        meta = meta or {get_random_name(): get_random_name()}
437
        kwargs = dict(
438
            ('HTTP_X_CONTAINER_META_%s' % k, str(v)) for k, v in meta.items())
439
        url = join_urls(self.pithos_path, user, container)
440
        r = self.post('%s?update=' % url, user=user, **kwargs)
441
        if verify_status:
442
            self.assertEqual(r.status_code, 202)
443
        container_meta = self.get_container_meta(container, user=user)
444
        (self.assertTrue('X-Container-Meta-%s' % k in container_meta) for
445
            k in meta.keys())
446
        (self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for
447
            k, v in meta.items())
448
        return r
449

    
450
    def list_containers(self, format='json', headers={}, user=None, **params):
451
        user = user or self.user
452
        _url = join_urls(self.pithos_path, user)
453
        parts = list(urlsplit(_url))
454
        params['format'] = format
455
        parts[3] = urlencode(params)
456
        url = urlunsplit(parts)
457
        _headers = dict(('HTTP_%s' % k.upper(), str(v))
458
                        for k, v in headers.items())
459
        r = self.get(url, user=user, **_headers)
460

    
461
        if format is None:
462
            containers = r.content.split('\n')
463
            if '' in containers:
464
                containers.remove('')
465
            return containers
466
        elif format == 'json':
467
            try:
468
                containers = json.loads(r.content)
469
            except:
470
                self.fail('json format expected')
471
            return containers
472
        elif format == 'xml':
473
            return minidom.parseString(r.content)
474

    
475
    def delete_container_content(self, cname, user=None, verify_status=True):
476
        user = user or self.user
477
        url = join_urls(self.pithos_path, user, cname)
478
        r = self.delete('%s?delimiter=/' % url, user=user)
479
        if verify_status:
480
            self.assertEqual(r.status_code, 204)
481
        return r
482

    
483
    def delete_container(self, cname, user=None, verify_status=True):
484
        user = user or self.user
485
        url = join_urls(self.pithos_path, user, cname)
486
        r = self.delete(url, user=user)
487
        if verify_status:
488
            self.assertEqual(r.status_code, 204)
489
        return r
490

    
491
    def delete_object(self, cname, oname, user=None, verify_status=True):
492
        user = user or self.user
493
        url = join_urls(self.pithos_path, user, cname, oname)
494
        r = self.delete(url, user=user)
495
        if verify_status:
496
            self.assertEqual(r.status_code, 204)
497
        return r
498

    
499
    def create_container(self, cname=None, user=None, verify_status=True):
500
        cname = cname or get_random_name()
501
        user = user or self.user
502
        url = join_urls(self.pithos_path, user, cname)
503
        r = self.put(url, user=user, data='')
504
        if verify_status:
505
            self.assertTrue(r.status_code in (202, 201))
506
        return cname, r
507

    
508
    def upload_object(self, cname, oname=None, length=None, verify_status=True,
509
                      user=None, **meta):
510
        oname = oname or get_random_name()
511
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
512
        user = user or self.user
513
        data = get_random_data(length=length)
514
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
515
                       for k, v in meta.iteritems())
516
        url = join_urls(self.pithos_path, user, cname, oname)
517
        r = self.put(url, user=user, data=data, **headers)
518
        if verify_status:
519
            self.assertEqual(r.status_code, 201)
520
        return oname, data, r
521

    
522
    def update_object_data(self, cname, oname=None, length=None,
523
                           content_type=None, content_range=None,
524
                           verify_status=True, user=None, **meta):
525
        oname = oname or get_random_name()
526
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
527
        content_type = content_type or 'application/octet-stream'
528
        user = user or self.user
529
        data = get_random_data(length=length)
530
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
531
                       for k, v in meta.iteritems())
532
        if content_range:
533
            headers['HTTP_CONTENT_RANGE'] = content_range
534
        url = join_urls(self.pithos_path, user, cname, oname)
535
        r = self.post(url, user=user, data=data, content_type=content_type,
536
                      **headers)
537
        if verify_status:
538
            self.assertEqual(r.status_code, 204)
539
        return oname, data, r
540

    
541
    def append_object_data(self, cname, oname=None, length=None,
542
                           content_type=None, user=None):
543
        return self.update_object_data(cname, oname=oname,
544
                                       length=length,
545
                                       content_type=content_type,
546
                                       content_range='bytes */*',
547
                                       user=user)
548

    
549
    def create_folder(self, cname, oname=None, user=None, verify_status=True,
550
                      **headers):
551
        user = user or self.user
552
        oname = oname or get_random_name()
553
        url = join_urls(self.pithos_path, user, cname, oname)
554
        r = self.put(url, user=user, data='',
555
                     content_type='application/directory', **headers)
556
        if verify_status:
557
            self.assertEqual(r.status_code, 201)
558
        return oname, r
559

    
560
    def list_objects(self, cname, prefix=None, user=None, verify_status=True):
561
        user = user or self.user
562
        url = join_urls(self.pithos_path, user, cname)
563
        path = '%s?format=json' % url
564
        if prefix is not None:
565
            path = '%s&prefix=%s' % (path, prefix)
566
        r = self.get(path, user=user)
567
        if verify_status:
568
            self.assertTrue(r.status_code in (200, 204))
569
        try:
570
            objects = json.loads(r.content)
571
        except:
572
            self.fail('json format expected')
573
        return objects
574

    
575
    def get_object_info(self, container, object, version=None, until=None,
576
                        user=None, verify_status=True):
577
        user = user or self.user
578
        url = join_urls(self.pithos_path, user, container, object)
579
        if until is not None:
580
            parts = list(urlsplit(url))
581
            parts[3] = urlencode({
582
                'until': until
583
            })
584
            url = urlunsplit(parts)
585
        if version:
586
            url = '%s?version=%s' % (url, version)
587
        r = self.head(url, user=user)
588
        if verify_status:
589
            self.assertEqual(r.status_code, 200)
590
        return r
591

    
592
    def get_object_meta(self, container, object, version=None, until=None,
593
                        user=None):
594
        prefix = 'X-Object-Meta-'
595
        user = user or self.user
596
        r = self.get_object_info(container, object, version, until=until,
597
                                 user=user)
598
        headers = dict(r._headers.values())
599
        return filter_headers(headers, prefix)
600

    
601
    def update_object_meta(self, container, object, meta=None, user=None,
602
                           verify_status=True):
603
        user = user or self.user
604
        meta = meta or {get_random_name(): get_random_name()}
605
        kwargs = dict(
606
            ('HTTP_X_OBJECT_META_%s' % k, str(v)) for k, v in meta.items())
607
        url = join_urls(self.pithos_path, user, container, object)
608
        r = self.post('%s?update=' % url, user=user, content_type='', **kwargs)
609
        if verify_status:
610
            self.assertEqual(r.status_code, 202)
611
        object_meta = self.get_object_meta(container, object, user=user)
612
        (self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for
613
            k in meta.keys())
614
        (self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
615
            k, v in meta.items())
616
        return r
617

    
618
    def assert_extended(self, data, format, type, size=10000):
619
        if format == 'xml':
620
            self._assert_xml(data, type, size)
621
        elif format == 'json':
622
            self._assert_json(data, type, size)
623

    
624
    def _assert_json(self, data, type, size):
625
        convert = lambda s: s.lower()
626
        info = [convert(elem) for elem in details[type]]
627
        self.assertTrue(len(data) <= size)
628
        for item in info:
629
            for i in data:
630
                if 'subdir' in i.keys():
631
                    continue
632
                self.assertTrue(item in i.keys())
633

    
634
    def _assert_xml(self, data, type, size):
635
        convert = lambda s: s.lower()
636
        info = [convert(elem) for elem in details[type]]
637
        try:
638
            info.remove('content_encoding')
639
        except ValueError:
640
            pass
641
        xml = data
642
        entities = xml.getElementsByTagName(type)
643
        self.assertTrue(len(entities) <= size)
644
        for e in entities:
645
            for item in info:
646
                self.assertTrue(e.getElementsByTagName(item))
647

    
648

    
649
class AssertMappingInvariant(object):
650
    def __init__(self, callable, *args, **kwargs):
651
        self.callable = callable
652
        self.args = args
653
        self.kwargs = kwargs
654

    
655
    def __enter__(self):
656
        self.map = self.callable(*self.args, **self.kwargs)
657
        return self.map
658

    
659
    def __exit__(self, type, value, tb):
660
        map = self.callable(*self.args, **self.kwargs)
661
        for k, v in self.map.items():
662
            if is_date(v):
663
                continue
664

    
665
            assert(k in map), '%s not in map' % k
666
            assert v == map[k]
667

    
668

    
669
class AssertUUidInvariant(object):
670
    def __init__(self, callable, *args, **kwargs):
671
        self.callable = callable
672
        self.args = args
673
        self.kwargs = kwargs
674

    
675
    def __enter__(self):
676
        self.map = self.callable(*self.args, **self.kwargs)
677
        assert('x-object-uuid' in self.map)
678
        self.uuid = self.map['x-object-uuid']
679
        return self.map
680

    
681
    def __exit__(self, type, value, tb):
682
        map = self.callable(*self.args, **self.kwargs)
683
        assert('x-object-uuid' in self.map)
684
        uuid = map['x-object-uuid']
685
        assert(uuid == self.uuid)