Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / __init__.py @ c977b0b6

History | View | Annotate | Download (24.8 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from urlparse import urlunsplit, urlsplit, urlparse
38
from xml.dom import minidom
39
from urllib import quote, unquote
40

    
41
from snf_django.utils.testing import with_settings, astakos_user
42

    
43
from pithos.api import settings as pithos_settings
44
from pithos.api.test.util import is_date, get_random_data, get_random_name
45
from pithos.backends.migrate import initialize_db
46

    
47
from synnefo.lib.services import get_service_path
48
from synnefo.lib import join_urls
49

    
50
from django.test import TestCase
51
from django.test.client import Client, MULTIPART_CONTENT, FakePayload
52
from django.test.simple import DjangoTestSuiteRunner
53
from django.conf import settings
54
from django.utils.http import urlencode
55
from django.db.backends.creation import TEST_DATABASE_PREFIX
56

    
57
import django.utils.simplejson as json
58

    
59
import random
60
import functools
61

    
62

    
63
pithos_test_settings = functools.partial(with_settings, pithos_settings)
64

    
65
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
66
                "%A, %d-%b-%y %H:%M:%S GMT",
67
                "%a, %d %b %Y %H:%M:%S GMT"]
68

    
69
o_names = ['kate.jpg',
70
           'kate_beckinsale.jpg',
71
           'How To Win Friends And Influence People.pdf',
72
           'moms_birthday.jpg',
73
           'poodle_strut.mov',
74
           'Disturbed - Down With The Sickness.mp3',
75
           'army_of_darkness.avi',
76
           'the_mad.avi',
77
           'photos/animals/dogs/poodle.jpg',
78
           'photos/animals/dogs/terrier.jpg',
79
           'photos/animals/cats/persian.jpg',
80
           'photos/animals/cats/siamese.jpg',
81
           'photos/plants/fern.jpg',
82
           'photos/plants/rose.jpg',
83
           'photos/me.jpg']
84

    
85
details = {'container': ('name', 'count', 'bytes', 'last_modified',
86
                         'x_container_policy'),
87
           'object': ('name', 'hash', 'bytes', 'content_type',
88
                      'content_encoding', 'last_modified',)}
89

    
90
TEST_BLOCK_SIZE = 1024
91
TEST_HASH_ALGORITHM = 'sha256'
92

    
93
print 'backend module:', pithos_settings.BACKEND_DB_MODULE
94
print 'backend database engine:', settings.DATABASES['default']['ENGINE']
95
print 'update md5:', pithos_settings.UPDATE_MD5
96

    
97

    
98
django_sqlalchemy_engines = {
99
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
100
    'django.db.backends.postgresql': 'postgresql',
101
    'django.db.backends.mysql': '',
102
    'django.db.backends.sqlite3': 'mssql',
103
    'django.db.backends.oracle': 'oracle'}
104

    
105

    
106
def prepare_db_connection():
107
    """Build pithos backend connection string from django default database"""
108

    
109
    db = settings.DATABASES['default']
110
    name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME'])
111

    
112
    if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'):
113
        if db['ENGINE'] == 'django.db.backends.sqlite3':
114
            db_connection = 'sqlite:///%s' % name
115
        else:
116
            d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
117
                     user=db['USER'],
118
                     pwd=db['PASSWORD'],
119
                     host=db['HOST'].lower(),
120
                     port=int(db['PORT']) if db['PORT'] != '' else '',
121
                     name=name)
122
            db_connection = (
123
                '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
124

    
125
            # initialize pithos database
126
            initialize_db(db_connection)
127
    else:
128
        db_connection = name
129
    pithos_settings.BACKEND_DB_CONNECTION = db_connection
130

    
131

    
132
def filter_headers(headers, prefix):
133
    meta = {}
134
    for k, v in headers.iteritems():
135
        if not k.startswith(prefix):
136
            continue
137
        meta[unquote(k[len(prefix):])] = unquote(v)
138
    return meta
139

    
140

    
141
class PithosTestSuiteRunner(DjangoTestSuiteRunner):
142
    def setup_databases(self, **kwargs):
143
        old_names, mirrors = super(PithosTestSuiteRunner,
144
                                   self).setup_databases(**kwargs)
145
        prepare_db_connection()
146
        return old_names, mirrors
147

    
148
    def teardown_databases(self, old_config, **kwargs):
149
        from pithos.api.util import _pithos_backend_pool
150
        _pithos_backend_pool.shutdown()
151
        super(PithosTestSuiteRunner, self).teardown_databases(old_config,
152
                                                              **kwargs)
153

    
154

    
155
class PithosTestClient(Client):
156
    def _get_path(self, parsed):
157
        # If there are parameters, add them
158
        if parsed[3]:
159
            return unquote(parsed[2] + ";" + parsed[3])
160
        else:
161
            return unquote(parsed[2])
162

    
163
    def copy(self, path, data={}, content_type=MULTIPART_CONTENT,
164
             follow=False, **extra):
165
        """
166
        Send a resource to the server using COPY.
167
        """
168
        parsed = urlparse(path)
169
        r = {
170
            'CONTENT_TYPE':    'text/html; charset=utf-8',
171
            'PATH_INFO':       self._get_path(parsed),
172
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
173
            'REQUEST_METHOD': 'COPY',
174
            'wsgi.input':      FakePayload('')
175
        }
176
        r.update(extra)
177

    
178
        response = self.request(**r)
179
        if follow:
180
            response = self._handle_redirects(response, **extra)
181
        return response
182

    
183
    def move(self, path, data={}, content_type=MULTIPART_CONTENT,
184
             follow=False, **extra):
185
        """
186
        Send a resource to the server using MOVE.
187
        """
188
        parsed = urlparse(path)
189
        r = {
190
            'CONTENT_TYPE':    'text/html; charset=utf-8',
191
            'PATH_INFO':       self._get_path(parsed),
192
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
193
            'REQUEST_METHOD': 'MOVE',
194
            'wsgi.input':      FakePayload('')
195
        }
196
        r.update(extra)
197

    
198
        response = self.request(**r)
199
        if follow:
200
            response = self._handle_redirects(response, **extra)
201
        return response
202

    
203

    
204
class PithosAPITest(TestCase):
205
    def setUp(self):
206
        self.client = PithosTestClient()
207

    
208
        # Override default block size to spead up tests
209
        pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE
210
        pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM
211

    
212
        self.user = 'user'
213
        self.pithos_path = join_urls(get_service_path(
214
            pithos_settings.pithos_services, 'object-store'))
215

    
216
    def tearDown(self):
217
        #delete additionally created metadata
218
        meta = self.get_account_meta()
219
        self.delete_account_meta(meta)
220

    
221
        #delete additionally created groups
222
        groups = self.get_account_groups()
223
        self.delete_account_groups(groups)
224

    
225
        self._clean_account()
226

    
227
    def _clean_account(self):
228
        for c in self.list_containers():
229
            self.delete_container_content(c['name'])
230
            self.delete_container(c['name'])
231

    
232
    def head(self, url, user='user', token='DummyToken', data={}, follow=False,
233
             **extra):
234
        with astakos_user(user):
235
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
236
            if token:
237
                extra['HTTP_X_AUTH_TOKEN'] = token
238
            response = self.client.head(url, data, follow, **extra)
239
        return response
240

    
241
    def get(self, url, user='user', token='DummyToken', data={}, follow=False,
242
            **extra):
243
        with astakos_user(user):
244
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
245
            if token:
246
                extra['HTTP_X_AUTH_TOKEN'] = token
247
            response = self.client.get(url, data, follow, **extra)
248
        return response
249

    
250
    def delete(self, url, user='user', token='DummyToken', data={},
251
               follow=False, **extra):
252
        with astakos_user(user):
253
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
254
            if token:
255
                extra['HTTP_X_AUTH_TOKEN'] = token
256
            response = self.client.delete(url, data, follow, **extra)
257
        return response
258

    
259
    def post(self, url, user='user', token='DummyToken', data={},
260
             content_type='application/octet-stream', follow=False, **extra):
261
        with astakos_user(user):
262
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
263
            if token:
264
                extra['HTTP_X_AUTH_TOKEN'] = token
265
            response = self.client.post(url, data, content_type, follow,
266
                                        **extra)
267
        return response
268

    
269
    def put(self, url, user='user', token='DummyToken', data={},
270
            content_type='application/octet-stream', follow=False, **extra):
271
        with astakos_user(user):
272
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
273
            if token:
274
                extra['HTTP_X_AUTH_TOKEN'] = token
275
            response = self.client.put(url, data, content_type, follow,
276
                                       **extra)
277
        return response
278

    
279
    def copy(self, url, user='user', token='DummyToken', data={},
280
             content_type='application/octet-stream', follow=False, **extra):
281
        with astakos_user(user):
282
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
283
            if token:
284
                extra['HTTP_X_AUTH_TOKEN'] = token
285
            response = self.client.copy(url, data, content_type, follow,
286
                                        **extra)
287
        return response
288

    
289
    def move(self, url, user='user', token='DummyToken', data={},
290
             content_type='application/octet-stream', follow=False, **extra):
291
        with astakos_user(user):
292
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
293
            if token:
294
                extra['HTTP_X_AUTH_TOKEN'] = token
295
            response = self.client.move(url, data, content_type, follow,
296
                                        **extra)
297
        return response
298

    
299
    def update_account_meta(self, meta, user=None, verify_status=True):
300
        user = user or self.user
301
        kwargs = dict(
302
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
303
        url = join_urls(self.pithos_path, user)
304
        r = self.post('%s?update=' % url, user=user, **kwargs)
305
        if verify_status:
306
            self.assertEqual(r.status_code, 202)
307
        account_meta = self.get_account_meta(user=user)
308
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
309
            k in meta.keys())
310
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
311
            k, v in meta.items())
312

    
313
    def reset_account_meta(self, meta, user=None, verify_status=True):
314
        user = user or self.user
315
        kwargs = dict(
316
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
317
        url = join_urls(self.pithos_path, user)
318
        r = self.post(url, user=user, **kwargs)
319
        if verify_status:
320
            self.assertEqual(r.status_code, 202)
321
        account_meta = self.get_account_meta(user=user)
322
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
323
            k in meta.keys())
324
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
325
            k, v in meta.items())
326

    
327
    def delete_account_meta(self, meta, user=None, verify_status=True):
328
        user = user or self.user
329
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
330
        kwargs = dict((transform(k), '') for k, v in meta.items())
331
        url = join_urls(self.pithos_path, user)
332
        r = self.post('%s?update=' % url, user=user, **kwargs)
333
        if verify_status:
334
            self.assertEqual(r.status_code, 202)
335
        account_meta = self.get_account_meta(user=user)
336
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
337
            k in meta.keys())
338
        return r
339

    
340
    def delete_account_groups(self, groups, user=None, verify_status=True):
341
        user = user or self.user
342
        url = join_urls(self.pithos_path, user)
343
        r = self.post('%s?update=' % url, user=user, **groups)
344
        if verify_status:
345
            self.assertEqual(r.status_code, 202)
346
        account_groups = self.get_account_groups()
347
        (self.assertTrue(k not in account_groups) for k in groups.keys())
348
        return r
349

    
350
    def get_account_info(self, until=None, user=None, verify_status=True):
351
        user = user or self.user
352
        url = join_urls(self.pithos_path, user)
353
        if until is not None:
354
            parts = list(urlsplit(url))
355
            parts[3] = urlencode({
356
                'until': until
357
            })
358
            url = urlunsplit(parts)
359
        r = self.head(url, user=user)
360
        if verify_status:
361
            self.assertEqual(r.status_code, 204)
362
        return r
363

    
364
    def get_account_meta(self, until=None, user=None):
365
        prefix = 'X-Account-Meta-'
366
        r = self.get_account_info(until=until, user=user)
367
        headers = dict(r._headers.values())
368
        return filter_headers(headers, prefix)
369

    
370
    def get_account_groups(self, until=None, user=None):
371
        prefix = 'X-Account-Group-'
372
        r = self.get_account_info(until=until, user=user)
373
        headers = dict(r._headers.values())
374
        return filter_headers(headers, prefix)
375

    
376
    def get_container_info(self, container, until=None, user=None,
377
                           verify_status=True):
378
        user = user or self.user
379
        url = join_urls(self.pithos_path, user, container)
380
        if until is not None:
381
            parts = list(urlsplit(url))
382
            parts[3] = urlencode({
383
                'until': until
384
            })
385
            url = urlunsplit(parts)
386
        r = self.head(url, user=user)
387
        if verify_status:
388
            self.assertEqual(r.status_code, 204)
389
        return r
390

    
391
    def get_container_meta(self, container, until=None, user=None):
392
        prefix = 'X-Container-Meta-'
393
        r = self.get_container_info(container, until=until, user=user)
394
        headers = dict(r._headers.values())
395
        return filter_headers(headers, prefix)
396

    
397
    def update_container_meta(self, container, meta=None, user=None,
398
                              verify_status=True):
399
        user = user or self.user
400
        meta = meta or {get_random_name(): get_random_name()}
401
        kwargs = dict(
402
            ('HTTP_X_CONTAINER_META_%s' % k, str(v)) for k, v in meta.items())
403
        url = join_urls(self.pithos_path, user, container)
404
        r = self.post('%s?update=' % url, user=user, **kwargs)
405
        if verify_status:
406
            self.assertEqual(r.status_code, 202)
407
        container_meta = self.get_container_meta(container, user=user)
408
        (self.assertTrue('X-Container-Meta-%s' % k in container_meta) for
409
            k in meta.keys())
410
        (self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for
411
            k, v in meta.items())
412
        return r
413

    
414
    def list_containers(self, format='json', headers={}, user=None, **params):
415
        user = user or self.user
416
        _url = join_urls(self.pithos_path, user)
417
        parts = list(urlsplit(_url))
418
        params['format'] = format
419
        parts[3] = urlencode(params)
420
        url = urlunsplit(parts)
421
        _headers = dict(('HTTP_%s' % k.upper(), str(v))
422
                        for k, v in headers.items())
423
        r = self.get(url, user=user, **_headers)
424

    
425
        if format is None:
426
            containers = r.content.split('\n')
427
            if '' in containers:
428
                containers.remove('')
429
            return containers
430
        elif format == 'json':
431
            try:
432
                containers = json.loads(r.content)
433
            except:
434
                self.fail('json format expected')
435
            return containers
436
        elif format == 'xml':
437
            return minidom.parseString(r.content)
438

    
439
    def delete_container_content(self, cname, user=None, verify_status=True):
440
        user = user or self.user
441
        url = join_urls(self.pithos_path, user, cname)
442
        r = self.delete('%s?delimiter=/' % url, user=user)
443
        if verify_status:
444
            self.assertEqual(r.status_code, 204)
445
        return r
446

    
447
    def delete_container(self, cname, user=None, verify_status=True):
448
        user = user or self.user
449
        url = join_urls(self.pithos_path, user, cname)
450
        r = self.delete(url, user=user)
451
        if verify_status:
452
            self.assertEqual(r.status_code, 204)
453
        return r
454

    
455
    def delete_object(self, cname, oname, user=None, verify_status=True):
456
        user = user or self.user
457
        url = join_urls(self.pithos_path, user, cname, oname)
458
        r = self.delete(url, user=user)
459
        if verify_status:
460
            self.assertEqual(r.status_code, 204)
461
        return r
462

    
463
    def create_container(self, cname=None, user=None, verify_status=True):
464
        cname = cname or get_random_name()
465
        user = user or self.user
466
        url = join_urls(self.pithos_path, user, cname)
467
        r = self.put(url, user=user, data='')
468
        if verify_status:
469
            self.assertTrue(r.status_code in (202, 201))
470
        return cname, r
471

    
472
    def upload_object(self, cname, oname=None, length=None, verify_status=True,
473
                      user=None, **meta):
474
        oname = oname or get_random_name()
475
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
476
        user = user or self.user
477
        data = get_random_data(length=length)
478
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
479
                       for k, v in meta.iteritems())
480
        url = join_urls(self.pithos_path, user, cname, oname)
481
        r = self.put(url, user=user, data=data, **headers)
482
        if verify_status:
483
            self.assertEqual(r.status_code, 201)
484
        return oname, data, r
485

    
486
    def update_object_data(self, cname, oname=None, length=None,
487
                           content_type=None, content_range=None,
488
                           verify_status=True, user=None, **meta):
489
        oname = oname or get_random_name()
490
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
491
        content_type = content_type or 'application/octet-stream'
492
        user = user or self.user
493
        data = get_random_data(length=length)
494
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
495
                       for k, v in meta.iteritems())
496
        if content_range:
497
            headers['HTTP_CONTENT_RANGE'] = content_range
498
        url = join_urls(self.pithos_path, user, cname, oname)
499
        r = self.post(url, user=user, data=data, content_type=content_type,
500
                      **headers)
501
        if verify_status:
502
            self.assertEqual(r.status_code, 204)
503
        return oname, data, r
504

    
505
    def append_object_data(self, cname, oname=None, length=None,
506
                           content_type=None, user=None):
507
        return self.update_object_data(cname, oname=oname,
508
                                       length=length,
509
                                       content_type=content_type,
510
                                       content_range='bytes */*',
511
                                       user=user)
512

    
513
    def create_folder(self, cname, oname=None, user=None, verify_status=True,
514
                      **headers):
515
        user = user or self.user
516
        oname = oname or get_random_name()
517
        url = join_urls(self.pithos_path, user, cname, oname)
518
        r = self.put(url, user=user, data='',
519
                     content_type='application/directory', **headers)
520
        if verify_status:
521
            self.assertEqual(r.status_code, 201)
522
        return oname, r
523

    
524
    def list_objects(self, cname, prefix=None, user=None, verify_status=True):
525
        user = user or self.user
526
        url = join_urls(self.pithos_path, user, cname)
527
        path = '%s?format=json' % url
528
        if prefix is not None:
529
            path = '%s&prefix=%s' % (path, prefix)
530
        r = self.get(path, user=user)
531
        if verify_status:
532
            self.assertTrue(r.status_code in (200, 204))
533
        try:
534
            objects = json.loads(r.content)
535
        except:
536
            self.fail('json format expected')
537
        return objects
538

    
539
    def get_object_info(self, container, object, version=None, until=None,
540
                        user=None, verify_status=True):
541
        user = user or self.user
542
        url = join_urls(self.pithos_path, user, container, object)
543
        if until is not None:
544
            parts = list(urlsplit(url))
545
            parts[3] = urlencode({
546
                'until': until
547
            })
548
            url = urlunsplit(parts)
549
        if version:
550
            url = '%s?version=%s' % (url, version)
551
        r = self.head(url, user=user)
552
        if verify_status:
553
            self.assertEqual(r.status_code, 200)
554
        return r
555

    
556
    def get_object_meta(self, container, object, version=None, until=None,
557
                        user=None):
558
        prefix = 'X-Object-Meta-'
559
        user = user or self.user
560
        r = self.get_object_info(container, object, version, until=until,
561
                                 user=user)
562
        headers = dict(r._headers.values())
563
        return filter_headers(headers, prefix)
564

    
565
    def update_object_meta(self, container, object, meta=None, user=None,
566
                           verify_status=True):
567
        user = user or self.user
568
        meta = meta or {get_random_name(): get_random_name()}
569
        kwargs = dict(
570
            ('HTTP_X_OBJECT_META_%s' % k, str(v)) for k, v in meta.items())
571
        url = join_urls(self.pithos_path, user, container, object)
572
        r = self.post('%s?update=' % url, user=user, content_type='', **kwargs)
573
        if verify_status:
574
            self.assertEqual(r.status_code, 202)
575
        object_meta = self.get_object_meta(container, object, user=user)
576
        (self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for
577
            k in meta.keys())
578
        (self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
579
            k, v in meta.items())
580
        return r
581

    
582
    def assert_extended(self, data, format, type, size=10000):
583
        if format == 'xml':
584
            self._assert_xml(data, type, size)
585
        elif format == 'json':
586
            self._assert_json(data, type, size)
587

    
588
    def _assert_json(self, data, type, size):
589
        convert = lambda s: s.lower()
590
        info = [convert(elem) for elem in details[type]]
591
        self.assertTrue(len(data) <= size)
592
        for item in info:
593
            for i in data:
594
                if 'subdir' in i.keys():
595
                    continue
596
                self.assertTrue(item in i.keys())
597

    
598
    def _assert_xml(self, data, type, size):
599
        convert = lambda s: s.lower()
600
        info = [convert(elem) for elem in details[type]]
601
        try:
602
            info.remove('content_encoding')
603
        except ValueError:
604
            pass
605
        xml = data
606
        entities = xml.getElementsByTagName(type)
607
        self.assertTrue(len(entities) <= size)
608
        for e in entities:
609
            for item in info:
610
                self.assertTrue(e.getElementsByTagName(item))
611

    
612

    
613
class AssertMappingInvariant(object):
614
    def __init__(self, callable, *args, **kwargs):
615
        self.callable = callable
616
        self.args = args
617
        self.kwargs = kwargs
618

    
619
    def __enter__(self):
620
        self.map = self.callable(*self.args, **self.kwargs)
621
        return self.map
622

    
623
    def __exit__(self, type, value, tb):
624
        map = self.callable(*self.args, **self.kwargs)
625
        for k, v in self.map.items():
626
            if is_date(v):
627
                continue
628

    
629
            assert(k in map), '%s not in map' % k
630
            assert v == map[k]
631

    
632

    
633
class AssertUUidInvariant(object):
634
    def __init__(self, callable, *args, **kwargs):
635
        self.callable = callable
636
        self.args = args
637
        self.kwargs = kwargs
638

    
639
    def __enter__(self):
640
        self.map = self.callable(*self.args, **self.kwargs)
641
        assert('x-object-uuid' in self.map)
642
        self.uuid = self.map['x-object-uuid']
643
        return self.map
644

    
645
    def __exit__(self, type, value, tb):
646
        map = self.callable(*self.args, **self.kwargs)
647
        assert('x-object-uuid' in self.map)
648
        uuid = map['x-object-uuid']
649
        assert(uuid == self.uuid)