Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / __init__.py @ 35030d55

History | View | Annotate | Download (25.8 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from urlparse import urlunsplit, urlsplit, urlparse
38
from xml.dom import minidom
39
from urllib import quote, unquote
40
from mock import patch, PropertyMock
41

    
42
from snf_django.utils.testing import with_settings, astakos_user
43

    
44
from pithos.api import settings as pithos_settings
45
from pithos.api.test.util import is_date, get_random_data, get_random_name
46
from pithos.backends.migrate import initialize_db
47

    
48
from synnefo.lib.services import get_service_path
49
from synnefo.lib import join_urls
50
from synnefo.util import text
51

    
52
from django.test import TestCase
53
from django.test.client import Client, MULTIPART_CONTENT, FakePayload
54
from django.test.simple import DjangoTestSuiteRunner
55
from django.conf import settings
56
from django.utils.http import urlencode
57
from django.db.backends.creation import TEST_DATABASE_PREFIX
58

    
59
import django.utils.simplejson as json
60

    
61

    
62
import random
63
import functools
64

    
65

    
66
pithos_test_settings = functools.partial(with_settings, pithos_settings)
67

    
68
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
69
                "%A, %d-%b-%y %H:%M:%S GMT",
70
                "%a, %d %b %Y %H:%M:%S GMT"]
71

    
72
o_names = ['kate.jpg',
73
           'kate_beckinsale.jpg',
74
           'How To Win Friends And Influence People.pdf',
75
           'moms_birthday.jpg',
76
           'poodle_strut.mov',
77
           'Disturbed - Down With The Sickness.mp3',
78
           'army_of_darkness.avi',
79
           'the_mad.avi',
80
           'photos/animals/dogs/poodle.jpg',
81
           'photos/animals/dogs/terrier.jpg',
82
           'photos/animals/cats/persian.jpg',
83
           'photos/animals/cats/siamese.jpg',
84
           'photos/plants/fern.jpg',
85
           'photos/plants/rose.jpg',
86
           'photos/me.jpg']
87

    
88
details = {'container': ('name', 'count', 'bytes', 'last_modified',
89
                         'x_container_policy'),
90
           'object': ('name', 'hash', 'bytes', 'content_type',
91
                      'content_encoding', 'last_modified',)}
92

    
93
TEST_BLOCK_SIZE = 1024
94
TEST_HASH_ALGORITHM = 'sha256'
95

    
96
print 'backend module:', pithos_settings.BACKEND_DB_MODULE
97
print 'backend database engine:', settings.DATABASES['default']['ENGINE']
98
print 'update md5:', pithos_settings.UPDATE_MD5
99

    
100

    
101
django_sqlalchemy_engines = {
102
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
103
    'django.db.backends.postgresql': 'postgresql',
104
    'django.db.backends.mysql': '',
105
    'django.db.backends.sqlite3': 'mssql',
106
    'django.db.backends.oracle': 'oracle'}
107

    
108

    
109
def prepare_db_connection():
110
    """Build pithos backend connection string from django default database"""
111

    
112
    db = settings.DATABASES['default']
113
    name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME'])
114

    
115
    if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'):
116
        if db['ENGINE'] == 'django.db.backends.sqlite3':
117
            db_connection = 'sqlite:///%s' % name
118
        else:
119
            d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
120
                     user=db['USER'],
121
                     pwd=db['PASSWORD'],
122
                     host=db['HOST'].lower(),
123
                     port=int(db['PORT']) if db['PORT'] != '' else '',
124
                     name=name)
125
            db_connection = (
126
                '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
127

    
128
            # initialize pithos database
129
            initialize_db(db_connection)
130
    else:
131
        db_connection = name
132
    pithos_settings.BACKEND_DB_CONNECTION = db_connection
133

    
134

    
135
def filter_headers(headers, prefix):
136
    meta = {}
137
    for k, v in headers.iteritems():
138
        if not k.startswith(prefix):
139
            continue
140
        meta[unquote(k[len(prefix):])] = unquote(v)
141
    return meta
142

    
143

    
144
class PithosTestSuiteRunner(DjangoTestSuiteRunner):
145
    def setup_databases(self, **kwargs):
146
        old_names, mirrors = super(PithosTestSuiteRunner,
147
                                   self).setup_databases(**kwargs)
148
        prepare_db_connection()
149
        return old_names, mirrors
150

    
151
    def teardown_databases(self, old_config, **kwargs):
152
        from pithos.api.util import _pithos_backend_pool
153
        _pithos_backend_pool.shutdown()
154
        super(PithosTestSuiteRunner, self).teardown_databases(old_config,
155
                                                              **kwargs)
156

    
157

    
158
class PithosTestClient(Client):
159
    def _get_path(self, parsed):
160
        # If there are parameters, add them
161
        if parsed[3]:
162
            return unquote(parsed[2] + ";" + parsed[3])
163
        else:
164
            return unquote(parsed[2])
165

    
166
    def copy(self, path, data={}, content_type=MULTIPART_CONTENT,
167
             follow=False, **extra):
168
        """
169
        Send a resource to the server using COPY.
170
        """
171
        parsed = urlparse(path)
172
        r = {
173
            'CONTENT_TYPE':    'text/html; charset=utf-8',
174
            'PATH_INFO':       self._get_path(parsed),
175
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
176
            'REQUEST_METHOD': 'COPY',
177
            'wsgi.input':      FakePayload('')
178
        }
179
        r.update(extra)
180

    
181
        response = self.request(**r)
182
        if follow:
183
            response = self._handle_redirects(response, **extra)
184
        return response
185

    
186
    def move(self, path, data={}, content_type=MULTIPART_CONTENT,
187
             follow=False, **extra):
188
        """
189
        Send a resource to the server using MOVE.
190
        """
191
        parsed = urlparse(path)
192
        r = {
193
            'CONTENT_TYPE':    'text/html; charset=utf-8',
194
            'PATH_INFO':       self._get_path(parsed),
195
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
196
            'REQUEST_METHOD': 'MOVE',
197
            'wsgi.input':      FakePayload('')
198
        }
199
        r.update(extra)
200

    
201
        response = self.request(**r)
202
        if follow:
203
            response = self._handle_redirects(response, **extra)
204
        return response
205

    
206

    
207
class PithosAPITest(TestCase):
208
    def create_patch(self, name, new_callable=None):
209
        patcher = patch(name, new_callable=new_callable)
210
        thing = patcher.start()
211
        self.addCleanup(patcher.stop)
212
        return thing
213

    
214
    def setUp(self):
215
        self.client = PithosTestClient()
216

    
217
        # Override default block size to spead up tests
218
        pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE
219
        pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM
220

    
221
        self.user = 'user'
222
        self.pithos_path = join_urls(get_service_path(
223
            pithos_settings.pithos_services, 'object-store'))
224

    
225
        # patch astakosclient.AstakosClient.validate_token
226
        mock_validate_token = self.create_patch(
227
            'astakosclient.AstakosClient.validate_token')
228
        mock_validate_token.return_value = {
229
            'access': {'user': {'id': text.udec(self.user, 'utf8')}}}
230

    
231
        # patch astakosclient.AstakosClient.get_token
232
        mock_get_token = self.create_patch(
233
            'astakosclient.AstakosClient.get_token')
234
        mock_get_token.return_value = {'access_token': 'valid_token'}
235

    
236
        # patch astakosclient.AstakosClient.api_oa2_auth
237
        mock_api_oa2_auth = self.create_patch(
238
            'astakosclient.AstakosClient.oa2_url',
239
            new_callable=PropertyMock)
240
        mock_api_oa2_auth.return_value = '/astakos/oa2/'
241

    
242
    def tearDown(self):
243
        #delete additionally created metadata
244
        meta = self.get_account_meta()
245
        self.delete_account_meta(meta)
246

    
247
        #delete additionally created groups
248
        groups = self.get_account_groups()
249
        self.delete_account_groups(groups)
250

    
251
        self._clean_account()
252

    
253
    def _clean_account(self):
254
        for c in self.list_containers():
255
            self.delete_container_content(c['name'])
256
            self.delete_container(c['name'])
257

    
258
    def head(self, url, user='user', token='DummyToken', data={}, follow=False,
259
             **extra):
260
        with astakos_user(user):
261
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
262
            if token:
263
                extra['HTTP_X_AUTH_TOKEN'] = token
264
            response = self.client.head(url, data, follow, **extra)
265
        return response
266

    
267
    def get(self, url, user='user', token='DummyToken', data={}, follow=False,
268
            **extra):
269
        with astakos_user(user):
270
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
271
            if token:
272
                extra['HTTP_X_AUTH_TOKEN'] = token
273
            response = self.client.get(url, data, follow, **extra)
274
        return response
275

    
276
    def delete(self, url, user='user', token='DummyToken', data={},
277
               follow=False, **extra):
278
        with astakos_user(user):
279
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
280
            if token:
281
                extra['HTTP_X_AUTH_TOKEN'] = token
282
            response = self.client.delete(url, data, follow, **extra)
283
        return response
284

    
285
    def post(self, url, user='user', token='DummyToken', data={},
286
             content_type='application/octet-stream', follow=False, **extra):
287
        with astakos_user(user):
288
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
289
            if token:
290
                extra['HTTP_X_AUTH_TOKEN'] = token
291
            response = self.client.post(url, data, content_type, follow,
292
                                        **extra)
293
        return response
294

    
295
    def put(self, url, user='user', token='DummyToken', data={},
296
            content_type='application/octet-stream', follow=False, **extra):
297
        with astakos_user(user):
298
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
299
            if token:
300
                extra['HTTP_X_AUTH_TOKEN'] = token
301
            response = self.client.put(url, data, content_type, follow,
302
                                       **extra)
303
        return response
304

    
305
    def copy(self, url, user='user', token='DummyToken', data={},
306
             content_type='application/octet-stream', follow=False, **extra):
307
        with astakos_user(user):
308
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
309
            if token:
310
                extra['HTTP_X_AUTH_TOKEN'] = token
311
            response = self.client.copy(url, data, content_type, follow,
312
                                        **extra)
313
        return response
314

    
315
    def move(self, url, user='user', token='DummyToken', data={},
316
             content_type='application/octet-stream', follow=False, **extra):
317
        with astakos_user(user):
318
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
319
            if token:
320
                extra['HTTP_X_AUTH_TOKEN'] = token
321
            response = self.client.move(url, data, content_type, follow,
322
                                        **extra)
323
        return response
324

    
325
    def update_account_meta(self, meta, user=None, verify_status=True):
326
        user = user or self.user
327
        kwargs = dict(
328
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
329
        url = join_urls(self.pithos_path, user)
330
        r = self.post('%s?update=' % url, user=user, **kwargs)
331
        if verify_status:
332
            self.assertEqual(r.status_code, 202)
333
        account_meta = self.get_account_meta(user=user)
334
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
335
            k in meta.keys())
336
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
337
            k, v in meta.items())
338

    
339
    def reset_account_meta(self, meta, user=None, verify_status=True):
340
        user = user or self.user
341
        kwargs = dict(
342
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
343
        url = join_urls(self.pithos_path, user)
344
        r = self.post(url, user=user, **kwargs)
345
        if verify_status:
346
            self.assertEqual(r.status_code, 202)
347
        account_meta = self.get_account_meta(user=user)
348
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
349
            k in meta.keys())
350
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
351
            k, v in meta.items())
352

    
353
    def delete_account_meta(self, meta, user=None, verify_status=True):
354
        user = user or self.user
355
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
356
        kwargs = dict((transform(k), '') for k, v in meta.items())
357
        url = join_urls(self.pithos_path, user)
358
        r = self.post('%s?update=' % url, user=user, **kwargs)
359
        if verify_status:
360
            self.assertEqual(r.status_code, 202)
361
        account_meta = self.get_account_meta(user=user)
362
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
363
            k in meta.keys())
364
        return r
365

    
366
    def delete_account_groups(self, groups, user=None, verify_status=True):
367
        user = user or self.user
368
        url = join_urls(self.pithos_path, user)
369
        r = self.post('%s?update=' % url, user=user, **groups)
370
        if verify_status:
371
            self.assertEqual(r.status_code, 202)
372
        account_groups = self.get_account_groups()
373
        (self.assertTrue(k not in account_groups) for k in groups.keys())
374
        return r
375

    
376
    def get_account_info(self, until=None, user=None, verify_status=True):
377
        user = user or self.user
378
        url = join_urls(self.pithos_path, user)
379
        if until is not None:
380
            parts = list(urlsplit(url))
381
            parts[3] = urlencode({
382
                'until': until
383
            })
384
            url = urlunsplit(parts)
385
        r = self.head(url, user=user)
386
        if verify_status:
387
            self.assertEqual(r.status_code, 204)
388
        return r
389

    
390
    def get_account_meta(self, until=None, user=None):
391
        prefix = 'X-Account-Meta-'
392
        r = self.get_account_info(until=until, user=user)
393
        headers = dict(r._headers.values())
394
        return filter_headers(headers, prefix)
395

    
396
    def get_account_groups(self, until=None, user=None):
397
        prefix = 'X-Account-Group-'
398
        r = self.get_account_info(until=until, user=user)
399
        headers = dict(r._headers.values())
400
        return filter_headers(headers, prefix)
401

    
402
    def get_container_info(self, container, until=None, user=None,
403
                           verify_status=True):
404
        user = user or self.user
405
        url = join_urls(self.pithos_path, user, container)
406
        if until is not None:
407
            parts = list(urlsplit(url))
408
            parts[3] = urlencode({
409
                'until': until
410
            })
411
            url = urlunsplit(parts)
412
        r = self.head(url, user=user)
413
        if verify_status:
414
            self.assertEqual(r.status_code, 204)
415
        return r
416

    
417
    def get_container_meta(self, container, until=None, user=None):
418
        prefix = 'X-Container-Meta-'
419
        r = self.get_container_info(container, until=until, user=user)
420
        headers = dict(r._headers.values())
421
        return filter_headers(headers, prefix)
422

    
423
    def update_container_meta(self, container, meta=None, user=None,
424
                              verify_status=True):
425
        user = user or self.user
426
        meta = meta or {get_random_name(): get_random_name()}
427
        kwargs = dict(
428
            ('HTTP_X_CONTAINER_META_%s' % k, str(v)) for k, v in meta.items())
429
        url = join_urls(self.pithos_path, user, container)
430
        r = self.post('%s?update=' % url, user=user, **kwargs)
431
        if verify_status:
432
            self.assertEqual(r.status_code, 202)
433
        container_meta = self.get_container_meta(container, user=user)
434
        (self.assertTrue('X-Container-Meta-%s' % k in container_meta) for
435
            k in meta.keys())
436
        (self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for
437
            k, v in meta.items())
438
        return r
439

    
440
    def list_containers(self, format='json', headers={}, user=None, **params):
441
        user = user or self.user
442
        _url = join_urls(self.pithos_path, user)
443
        parts = list(urlsplit(_url))
444
        params['format'] = format
445
        parts[3] = urlencode(params)
446
        url = urlunsplit(parts)
447
        _headers = dict(('HTTP_%s' % k.upper(), str(v))
448
                        for k, v in headers.items())
449
        r = self.get(url, user=user, **_headers)
450

    
451
        if format is None:
452
            containers = r.content.split('\n')
453
            if '' in containers:
454
                containers.remove('')
455
            return containers
456
        elif format == 'json':
457
            try:
458
                containers = json.loads(r.content)
459
            except:
460
                self.fail('json format expected')
461
            return containers
462
        elif format == 'xml':
463
            return minidom.parseString(r.content)
464

    
465
    def delete_container_content(self, cname, user=None, verify_status=True):
466
        user = user or self.user
467
        url = join_urls(self.pithos_path, user, cname)
468
        r = self.delete('%s?delimiter=/' % url, user=user)
469
        if verify_status:
470
            self.assertEqual(r.status_code, 204)
471
        return r
472

    
473
    def delete_container(self, cname, user=None, verify_status=True):
474
        user = user or self.user
475
        url = join_urls(self.pithos_path, user, cname)
476
        r = self.delete(url, user=user)
477
        if verify_status:
478
            self.assertEqual(r.status_code, 204)
479
        return r
480

    
481
    def delete_object(self, cname, oname, user=None, verify_status=True):
482
        user = user or self.user
483
        url = join_urls(self.pithos_path, user, cname, oname)
484
        r = self.delete(url, user=user)
485
        if verify_status:
486
            self.assertEqual(r.status_code, 204)
487
        return r
488

    
489
    def create_container(self, cname=None, user=None, verify_status=True):
490
        cname = cname or get_random_name()
491
        user = user or self.user
492
        url = join_urls(self.pithos_path, user, cname)
493
        r = self.put(url, user=user, data='')
494
        if verify_status:
495
            self.assertTrue(r.status_code in (202, 201))
496
        return cname, r
497

    
498
    def upload_object(self, cname, oname=None, length=None, verify_status=True,
499
                      user=None, **meta):
500
        oname = oname or get_random_name()
501
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
502
        user = user or self.user
503
        data = get_random_data(length=length)
504
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
505
                       for k, v in meta.iteritems())
506
        url = join_urls(self.pithos_path, user, cname, oname)
507
        r = self.put(url, user=user, data=data, **headers)
508
        if verify_status:
509
            self.assertEqual(r.status_code, 201)
510
        return oname, data, r
511

    
512
    def update_object_data(self, cname, oname=None, length=None,
513
                           content_type=None, content_range=None,
514
                           verify_status=True, user=None, **meta):
515
        oname = oname or get_random_name()
516
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
517
        content_type = content_type or 'application/octet-stream'
518
        user = user or self.user
519
        data = get_random_data(length=length)
520
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
521
                       for k, v in meta.iteritems())
522
        if content_range:
523
            headers['HTTP_CONTENT_RANGE'] = content_range
524
        url = join_urls(self.pithos_path, user, cname, oname)
525
        r = self.post(url, user=user, data=data, content_type=content_type,
526
                      **headers)
527
        if verify_status:
528
            self.assertEqual(r.status_code, 204)
529
        return oname, data, r
530

    
531
    def append_object_data(self, cname, oname=None, length=None,
532
                           content_type=None, user=None):
533
        return self.update_object_data(cname, oname=oname,
534
                                       length=length,
535
                                       content_type=content_type,
536
                                       content_range='bytes */*',
537
                                       user=user)
538

    
539
    def create_folder(self, cname, oname=None, user=None, verify_status=True,
540
                      **headers):
541
        user = user or self.user
542
        oname = oname or get_random_name()
543
        url = join_urls(self.pithos_path, user, cname, oname)
544
        r = self.put(url, user=user, data='',
545
                     content_type='application/directory', **headers)
546
        if verify_status:
547
            self.assertEqual(r.status_code, 201)
548
        return oname, r
549

    
550
    def list_objects(self, cname, prefix=None, user=None, verify_status=True):
551
        user = user or self.user
552
        url = join_urls(self.pithos_path, user, cname)
553
        path = '%s?format=json' % url
554
        if prefix is not None:
555
            path = '%s&prefix=%s' % (path, prefix)
556
        r = self.get(path, user=user)
557
        if verify_status:
558
            self.assertTrue(r.status_code in (200, 204))
559
        try:
560
            objects = json.loads(r.content)
561
        except:
562
            self.fail('json format expected')
563
        return objects
564

    
565
    def get_object_info(self, container, object, version=None, until=None,
566
                        user=None, verify_status=True):
567
        user = user or self.user
568
        url = join_urls(self.pithos_path, user, container, object)
569
        if until is not None:
570
            parts = list(urlsplit(url))
571
            parts[3] = urlencode({
572
                'until': until
573
            })
574
            url = urlunsplit(parts)
575
        if version:
576
            url = '%s?version=%s' % (url, version)
577
        r = self.head(url, user=user)
578
        if verify_status:
579
            self.assertEqual(r.status_code, 200)
580
        return r
581

    
582
    def get_object_meta(self, container, object, version=None, until=None,
583
                        user=None):
584
        prefix = 'X-Object-Meta-'
585
        user = user or self.user
586
        r = self.get_object_info(container, object, version, until=until,
587
                                 user=user)
588
        headers = dict(r._headers.values())
589
        return filter_headers(headers, prefix)
590

    
591
    def update_object_meta(self, container, object, meta=None, user=None,
592
                           verify_status=True):
593
        user = user or self.user
594
        meta = meta or {get_random_name(): get_random_name()}
595
        kwargs = dict(
596
            ('HTTP_X_OBJECT_META_%s' % k, str(v)) for k, v in meta.items())
597
        url = join_urls(self.pithos_path, user, container, object)
598
        r = self.post('%s?update=' % url, user=user, content_type='', **kwargs)
599
        if verify_status:
600
            self.assertEqual(r.status_code, 202)
601
        object_meta = self.get_object_meta(container, object, user=user)
602
        (self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for
603
            k in meta.keys())
604
        (self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
605
            k, v in meta.items())
606
        return r
607

    
608
    def assert_extended(self, data, format, type, size=10000):
609
        if format == 'xml':
610
            self._assert_xml(data, type, size)
611
        elif format == 'json':
612
            self._assert_json(data, type, size)
613

    
614
    def _assert_json(self, data, type, size):
615
        convert = lambda s: s.lower()
616
        info = [convert(elem) for elem in details[type]]
617
        self.assertTrue(len(data) <= size)
618
        for item in info:
619
            for i in data:
620
                if 'subdir' in i.keys():
621
                    continue
622
                self.assertTrue(item in i.keys())
623

    
624
    def _assert_xml(self, data, type, size):
625
        convert = lambda s: s.lower()
626
        info = [convert(elem) for elem in details[type]]
627
        try:
628
            info.remove('content_encoding')
629
        except ValueError:
630
            pass
631
        xml = data
632
        entities = xml.getElementsByTagName(type)
633
        self.assertTrue(len(entities) <= size)
634
        for e in entities:
635
            for item in info:
636
                self.assertTrue(e.getElementsByTagName(item))
637

    
638

    
639
class AssertMappingInvariant(object):
640
    def __init__(self, callable, *args, **kwargs):
641
        self.callable = callable
642
        self.args = args
643
        self.kwargs = kwargs
644

    
645
    def __enter__(self):
646
        self.map = self.callable(*self.args, **self.kwargs)
647
        return self.map
648

    
649
    def __exit__(self, type, value, tb):
650
        map = self.callable(*self.args, **self.kwargs)
651
        for k, v in self.map.items():
652
            if is_date(v):
653
                continue
654

    
655
            assert(k in map), '%s not in map' % k
656
            assert v == map[k]
657

    
658

    
659
class AssertUUidInvariant(object):
660
    def __init__(self, callable, *args, **kwargs):
661
        self.callable = callable
662
        self.args = args
663
        self.kwargs = kwargs
664

    
665
    def __enter__(self):
666
        self.map = self.callable(*self.args, **self.kwargs)
667
        assert('x-object-uuid' in self.map)
668
        self.uuid = self.map['x-object-uuid']
669
        return self.map
670

    
671
    def __exit__(self, type, value, tb):
672
        map = self.callable(*self.args, **self.kwargs)
673
        assert('x-object-uuid' in self.map)
674
        uuid = map['x-object-uuid']
675
        assert(uuid == self.uuid)