Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / __init__.py @ 7e402b46

History | View | Annotate | Download (26.4 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from urlparse import urlunsplit, urlsplit, urlparse
38
from xml.dom import minidom
39
from urllib import quote, unquote
40
from mock import patch, PropertyMock
41

    
42
from snf_django.utils.testing import with_settings, astakos_user
43

    
44
from pithos.api import settings as pithos_settings
45
from pithos.api.test.util import is_date, get_random_data, get_random_name
46
from pithos.backends.migrate import initialize_db
47

    
48
from synnefo.lib.services import get_service_path
49
from synnefo.lib import join_urls
50
from synnefo.util import text
51

    
52
from django.test import TestCase
53
from django.test.client import Client, MULTIPART_CONTENT, FakePayload
54
from django.test.simple import DjangoTestSuiteRunner
55
from django.conf import settings
56
from django.utils.http import urlencode
57
from django.db.backends.creation import TEST_DATABASE_PREFIX
58

    
59
import django.utils.simplejson as json
60

    
61

    
62
import sys
63
import random
64
import functools
65

    
66

    
67
pithos_test_settings = functools.partial(with_settings, pithos_settings)
68

    
69
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
70
                "%A, %d-%b-%y %H:%M:%S GMT",
71
                "%a, %d %b %Y %H:%M:%S GMT"]
72

    
73
o_names = ['kate.jpg',
74
           'kate_beckinsale.jpg',
75
           'How To Win Friends And Influence People.pdf',
76
           'moms_birthday.jpg',
77
           'poodle_strut.mov',
78
           'Disturbed - Down With The Sickness.mp3',
79
           'army_of_darkness.avi',
80
           'the_mad.avi',
81
           'photos/animals/dogs/poodle.jpg',
82
           'photos/animals/dogs/terrier.jpg',
83
           'photos/animals/cats/persian.jpg',
84
           'photos/animals/cats/siamese.jpg',
85
           'photos/plants/fern.jpg',
86
           'photos/plants/rose.jpg',
87
           'photos/me.jpg']
88

    
89
details = {'container': ('name', 'count', 'bytes', 'last_modified',
90
                         'x_container_policy'),
91
           'object': ('name', 'hash', 'bytes', 'content_type',
92
                      'content_encoding', 'last_modified',)}
93

    
94
TEST_BLOCK_SIZE = 1024
95
TEST_HASH_ALGORITHM = 'sha256'
96

    
97
print 'backend module:', pithos_settings.BACKEND_DB_MODULE
98
print 'backend database engine:', settings.DATABASES['default']['ENGINE']
99
print 'update md5:', pithos_settings.UPDATE_MD5
100

    
101

    
102
django_sqlalchemy_engines = {
103
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
104
    'django.db.backends.postgresql': 'postgresql',
105
    'django.db.backends.mysql': '',
106
    'django.db.backends.sqlite3': 'mssql',
107
    'django.db.backends.oracle': 'oracle'}
108

    
109

    
110
def prepare_db_connection():
111
    """Build pithos backend connection string from django default database"""
112

    
113
    db = settings.DATABASES['default']
114
    name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME'])
115

    
116
    if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'):
117
        if db['ENGINE'] == 'django.db.backends.sqlite3':
118
            db_connection = 'sqlite:///%s' % name
119
        else:
120
            d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
121
                     user=db['USER'],
122
                     pwd=db['PASSWORD'],
123
                     host=db['HOST'].lower(),
124
                     port=int(db['PORT']) if db['PORT'] != '' else '',
125
                     name=name)
126
            db_connection = (
127
                '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
128

    
129
            # initialize pithos database
130
            initialize_db(db_connection)
131
    else:
132
        db_connection = name
133
    pithos_settings.BACKEND_DB_CONNECTION = db_connection
134

    
135

    
136
def filter_headers(headers, prefix):
137
    meta = {}
138
    for k, v in headers.iteritems():
139
        if not k.startswith(prefix):
140
            continue
141
        meta[unquote(k[len(prefix):])] = unquote(v)
142
    return meta
143

    
144

    
145
class PithosTestSuiteRunner(DjangoTestSuiteRunner):
146
    def setup_databases(self, **kwargs):
147
        old_names, mirrors = super(PithosTestSuiteRunner,
148
                                   self).setup_databases(**kwargs)
149
        prepare_db_connection()
150
        return old_names, mirrors
151

    
152
    def teardown_databases(self, old_config, **kwargs):
153
        from pithos.api.util import _pithos_backend_pool
154
        _pithos_backend_pool.shutdown()
155
        try:
156
            super(PithosTestSuiteRunner, self).teardown_databases(old_config,
157
                                                                  **kwargs)
158
        except Exception as e:
159
            sys.stderr.write("FAILED to teardown databases: %s\n" % str(e))
160

    
161

    
162
class PithosTestClient(Client):
163
    def _get_path(self, parsed):
164
        # If there are parameters, add them
165
        if parsed[3]:
166
            return unquote(parsed[2] + ";" + parsed[3])
167
        else:
168
            return unquote(parsed[2])
169

    
170
    def copy(self, path, data={}, content_type=MULTIPART_CONTENT,
171
             follow=False, **extra):
172
        """
173
        Send a resource to the server using COPY.
174
        """
175
        parsed = urlparse(path)
176
        r = {
177
            'CONTENT_TYPE':    'text/html; charset=utf-8',
178
            'PATH_INFO':       self._get_path(parsed),
179
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
180
            'REQUEST_METHOD': 'COPY',
181
            'wsgi.input':      FakePayload('')
182
        }
183
        r.update(extra)
184

    
185
        response = self.request(**r)
186
        if follow:
187
            response = self._handle_redirects(response, **extra)
188
        return response
189

    
190
    def move(self, path, data={}, content_type=MULTIPART_CONTENT,
191
             follow=False, **extra):
192
        """
193
        Send a resource to the server using MOVE.
194
        """
195
        parsed = urlparse(path)
196
        r = {
197
            'CONTENT_TYPE':    'text/html; charset=utf-8',
198
            'PATH_INFO':       self._get_path(parsed),
199
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
200
            'REQUEST_METHOD': 'MOVE',
201
            'wsgi.input':      FakePayload('')
202
        }
203
        r.update(extra)
204

    
205
        response = self.request(**r)
206
        if follow:
207
            response = self._handle_redirects(response, **extra)
208
        return response
209

    
210

    
211
class PithosAPITest(TestCase):
212
    def create_patch(self, name, new_callable=None):
213
        patcher = patch(name, new_callable=new_callable)
214
        thing = patcher.start()
215
        self.addCleanup(patcher.stop)
216
        return thing
217

    
218
    def setUp(self):
219
        self.client = PithosTestClient()
220

    
221
        # Override default block size to spead up tests
222
        pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE
223
        pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM
224

    
225
        self.user = 'user'
226
        self.pithos_path = join_urls(get_service_path(
227
            pithos_settings.pithos_services, 'object-store'))
228

    
229
        # patch astakosclient.AstakosClient.validate_token
230
        mock_validate_token = self.create_patch(
231
            'astakosclient.AstakosClient.validate_token')
232
        mock_validate_token.return_value = {
233
            'access': {'user': {'id': text.udec(self.user, 'utf8')}}}
234

    
235
        # patch astakosclient.AstakosClient.get_token
236
        mock_get_token = self.create_patch(
237
            'astakosclient.AstakosClient.get_token')
238
        mock_get_token.return_value = {'access_token': 'valid_token'}
239

    
240
        # patch astakosclient.AstakosClient.api_oa2_auth
241
        mock_api_oauth2_auth = self.create_patch(
242
            'astakosclient.AstakosClient.oauth2_url',
243
            new_callable=PropertyMock)
244
        mock_api_oauth2_auth.return_value = '/astakos/oauth2/'
245

    
246
        mock_service_get_quotas = self.create_patch(
247
            'astakosclient.AstakosClient.service_get_quotas')
248
        mock_service_get_quotas.return_value = {
249
            self.user: {
250
                "system": {
251
                    "pithos.diskspace": {
252
                        "usage": 0,
253
                        "limit": 1073741824,  # 1GB
254
                        "pending": 0}}}}
255

    
256
    def tearDown(self):
257
        #delete additionally created metadata
258
        meta = self.get_account_meta()
259
        self.delete_account_meta(meta)
260

    
261
        #delete additionally created groups
262
        groups = self.get_account_groups()
263
        self.delete_account_groups(groups)
264

    
265
        self._clean_account()
266

    
267
    def _clean_account(self):
268
        for c in self.list_containers():
269
            self.delete_container_content(c['name'])
270
            self.delete_container(c['name'])
271

    
272
    def head(self, url, user='user', token='DummyToken', data={}, follow=False,
273
             **extra):
274
        with astakos_user(user):
275
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
276
            if token:
277
                extra['HTTP_X_AUTH_TOKEN'] = token
278
            response = self.client.head(url, data, follow, **extra)
279
        return response
280

    
281
    def get(self, url, user='user', token='DummyToken', data={}, follow=False,
282
            **extra):
283
        with astakos_user(user):
284
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
285
            if token:
286
                extra['HTTP_X_AUTH_TOKEN'] = token
287
            response = self.client.get(url, data, follow, **extra)
288
        return response
289

    
290
    def delete(self, url, user='user', token='DummyToken', data={},
291
               follow=False, **extra):
292
        with astakos_user(user):
293
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
294
            if token:
295
                extra['HTTP_X_AUTH_TOKEN'] = token
296
            response = self.client.delete(url, data, follow, **extra)
297
        return response
298

    
299
    def post(self, url, user='user', token='DummyToken', data={},
300
             content_type='application/octet-stream', follow=False, **extra):
301
        with astakos_user(user):
302
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
303
            if token:
304
                extra['HTTP_X_AUTH_TOKEN'] = token
305
            response = self.client.post(url, data, content_type, follow,
306
                                        **extra)
307
        return response
308

    
309
    def put(self, url, user='user', token='DummyToken', data={},
310
            content_type='application/octet-stream', follow=False,
311
            quote_extra=True, **extra):
312
        with astakos_user(user):
313
            if quote_extra:
314
                extra = dict((quote(k), quote(v)) for k, v in extra.items())
315
            if token:
316
                extra['HTTP_X_AUTH_TOKEN'] = token
317
            response = self.client.put(url, data, content_type, follow,
318
                                       **extra)
319
        return response
320

    
321
    def copy(self, url, user='user', token='DummyToken', data={},
322
             content_type='application/octet-stream', follow=False, **extra):
323
        with astakos_user(user):
324
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
325
            if token:
326
                extra['HTTP_X_AUTH_TOKEN'] = token
327
            response = self.client.copy(url, data, content_type, follow,
328
                                        **extra)
329
        return response
330

    
331
    def move(self, url, user='user', token='DummyToken', data={},
332
             content_type='application/octet-stream', follow=False, **extra):
333
        with astakos_user(user):
334
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
335
            if token:
336
                extra['HTTP_X_AUTH_TOKEN'] = token
337
            response = self.client.move(url, data, content_type, follow,
338
                                        **extra)
339
        return response
340

    
341
    def update_account_meta(self, meta, user=None, verify_status=True):
342
        user = user or self.user
343
        kwargs = dict(
344
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
345
        url = join_urls(self.pithos_path, user)
346
        r = self.post('%s?update=' % url, user=user, **kwargs)
347
        if verify_status:
348
            self.assertEqual(r.status_code, 202)
349
        account_meta = self.get_account_meta(user=user)
350
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
351
            k in meta.keys())
352
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
353
            k, v in meta.items())
354

    
355
    def reset_account_meta(self, meta, user=None, verify_status=True):
356
        user = user or self.user
357
        kwargs = dict(
358
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
359
        url = join_urls(self.pithos_path, user)
360
        r = self.post(url, user=user, **kwargs)
361
        if verify_status:
362
            self.assertEqual(r.status_code, 202)
363
        account_meta = self.get_account_meta(user=user)
364
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
365
            k in meta.keys())
366
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
367
            k, v in meta.items())
368

    
369
    def delete_account_meta(self, meta, user=None, verify_status=True):
370
        user = user or self.user
371
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
372
        kwargs = dict((transform(k), '') for k, v in meta.items())
373
        url = join_urls(self.pithos_path, user)
374
        r = self.post('%s?update=' % url, user=user, **kwargs)
375
        if verify_status:
376
            self.assertEqual(r.status_code, 202)
377
        account_meta = self.get_account_meta(user=user)
378
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
379
            k in meta.keys())
380
        return r
381

    
382
    def delete_account_groups(self, groups, user=None, verify_status=True):
383
        user = user or self.user
384
        url = join_urls(self.pithos_path, user)
385
        r = self.post('%s?update=' % url, user=user, **groups)
386
        if verify_status:
387
            self.assertEqual(r.status_code, 202)
388
        account_groups = self.get_account_groups()
389
        (self.assertTrue(k not in account_groups) for k in groups.keys())
390
        return r
391

    
392
    def get_account_info(self, until=None, user=None, verify_status=True):
393
        user = user or self.user
394
        url = join_urls(self.pithos_path, user)
395
        if until is not None:
396
            parts = list(urlsplit(url))
397
            parts[3] = urlencode({
398
                'until': until
399
            })
400
            url = urlunsplit(parts)
401
        r = self.head(url, user=user)
402
        if verify_status:
403
            self.assertEqual(r.status_code, 204)
404
        return r
405

    
406
    def get_account_meta(self, until=None, user=None):
407
        prefix = 'X-Account-Meta-'
408
        r = self.get_account_info(until=until, user=user)
409
        headers = dict(r._headers.values())
410
        return filter_headers(headers, prefix)
411

    
412
    def get_account_groups(self, until=None, user=None):
413
        prefix = 'X-Account-Group-'
414
        r = self.get_account_info(until=until, user=user)
415
        headers = dict(r._headers.values())
416
        return filter_headers(headers, prefix)
417

    
418
    def get_container_info(self, container, until=None, user=None,
419
                           verify_status=True):
420
        user = user or self.user
421
        url = join_urls(self.pithos_path, user, container)
422
        if until is not None:
423
            parts = list(urlsplit(url))
424
            parts[3] = urlencode({
425
                'until': until
426
            })
427
            url = urlunsplit(parts)
428
        r = self.head(url, user=user)
429
        if verify_status:
430
            self.assertEqual(r.status_code, 204)
431
        return r
432

    
433
    def get_container_meta(self, container, until=None, user=None):
434
        prefix = 'X-Container-Meta-'
435
        r = self.get_container_info(container, until=until, user=user)
436
        headers = dict(r._headers.values())
437
        return filter_headers(headers, prefix)
438

    
439
    def update_container_meta(self, container, meta=None, user=None,
440
                              verify_status=True):
441
        user = user or self.user
442
        meta = meta or {get_random_name(): get_random_name()}
443
        kwargs = dict(
444
            ('HTTP_X_CONTAINER_META_%s' % k, str(v)) for k, v in meta.items())
445
        url = join_urls(self.pithos_path, user, container)
446
        r = self.post('%s?update=' % url, user=user, **kwargs)
447
        if verify_status:
448
            self.assertEqual(r.status_code, 202)
449
        container_meta = self.get_container_meta(container, user=user)
450
        (self.assertTrue('X-Container-Meta-%s' % k in container_meta) for
451
            k in meta.keys())
452
        (self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for
453
            k, v in meta.items())
454
        return r
455

    
456
    def list_containers(self, format='json', headers={}, user=None, **params):
457
        user = user or self.user
458
        _url = join_urls(self.pithos_path, user)
459
        parts = list(urlsplit(_url))
460
        params['format'] = format
461
        parts[3] = urlencode(params)
462
        url = urlunsplit(parts)
463
        _headers = dict(('HTTP_%s' % k.upper(), str(v))
464
                        for k, v in headers.items())
465
        r = self.get(url, user=user, **_headers)
466

    
467
        if format is None:
468
            containers = r.content.split('\n')
469
            if '' in containers:
470
                containers.remove('')
471
            return containers
472
        elif format == 'json':
473
            try:
474
                containers = json.loads(r.content)
475
            except:
476
                self.fail('json format expected')
477
            return containers
478
        elif format == 'xml':
479
            return minidom.parseString(r.content)
480

    
481
    def delete_container_content(self, cname, user=None, verify_status=True):
482
        user = user or self.user
483
        url = join_urls(self.pithos_path, user, cname)
484
        r = self.delete('%s?delimiter=/' % url, user=user)
485
        if verify_status:
486
            self.assertEqual(r.status_code, 204)
487
        return r
488

    
489
    def delete_container(self, cname, user=None, verify_status=True):
490
        user = user or self.user
491
        url = join_urls(self.pithos_path, user, cname)
492
        r = self.delete(url, user=user)
493
        if verify_status:
494
            self.assertEqual(r.status_code, 204)
495
        return r
496

    
497
    def delete_object(self, cname, oname, user=None, verify_status=True):
498
        user = user or self.user
499
        url = join_urls(self.pithos_path, user, cname, oname)
500
        r = self.delete(url, user=user)
501
        if verify_status:
502
            self.assertEqual(r.status_code, 204)
503
        return r
504

    
505
    def create_container(self, cname=None, user=None, verify_status=True):
506
        cname = cname or get_random_name()
507
        user = user or self.user
508
        url = join_urls(self.pithos_path, user, cname)
509
        r = self.put(url, user=user, data='')
510
        if verify_status:
511
            self.assertTrue(r.status_code in (202, 201))
512
        return cname, r
513

    
514
    def upload_object(self, cname, oname=None, length=None, verify_status=True,
515
                      user=None, **meta):
516
        oname = oname or get_random_name()
517
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
518
        user = user or self.user
519
        data = get_random_data(length=length)
520
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
521
                       for k, v in meta.iteritems())
522
        url = join_urls(self.pithos_path, user, cname, oname)
523
        r = self.put(url, user=user, data=data, **headers)
524
        if verify_status:
525
            self.assertEqual(r.status_code, 201)
526
        return oname, data, r
527

    
528
    def update_object_data(self, cname, oname=None, length=None,
529
                           content_type=None, content_range=None,
530
                           verify_status=True, user=None, **meta):
531
        oname = oname or get_random_name()
532
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
533
        content_type = content_type or 'application/octet-stream'
534
        user = user or self.user
535
        data = get_random_data(length=length)
536
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
537
                       for k, v in meta.iteritems())
538
        if content_range:
539
            headers['HTTP_CONTENT_RANGE'] = content_range
540
        url = join_urls(self.pithos_path, user, cname, oname)
541
        r = self.post(url, user=user, data=data, content_type=content_type,
542
                      **headers)
543
        if verify_status:
544
            self.assertEqual(r.status_code, 204)
545
        return oname, data, r
546

    
547
    def append_object_data(self, cname, oname=None, length=None,
548
                           content_type=None, user=None):
549
        return self.update_object_data(cname, oname=oname,
550
                                       length=length,
551
                                       content_type=content_type,
552
                                       content_range='bytes */*',
553
                                       user=user)
554

    
555
    def create_folder(self, cname, oname=None, user=None, verify_status=True,
556
                      **headers):
557
        user = user or self.user
558
        oname = oname or get_random_name()
559
        url = join_urls(self.pithos_path, user, cname, oname)
560
        r = self.put(url, user=user, data='',
561
                     content_type='application/directory', **headers)
562
        if verify_status:
563
            self.assertEqual(r.status_code, 201)
564
        return oname, r
565

    
566
    def list_objects(self, cname, prefix=None, user=None, verify_status=True):
567
        user = user or self.user
568
        url = join_urls(self.pithos_path, user, cname)
569
        path = '%s?format=json' % url
570
        if prefix is not None:
571
            path = '%s&prefix=%s' % (path, prefix)
572
        r = self.get(path, user=user)
573
        if verify_status:
574
            self.assertTrue(r.status_code in (200, 204))
575
        try:
576
            objects = json.loads(r.content)
577
        except:
578
            self.fail('json format expected')
579
        return objects
580

    
581
    def get_object_info(self, container, object, version=None, until=None,
582
                        user=None, verify_status=True):
583
        user = user or self.user
584
        url = join_urls(self.pithos_path, user, container, object)
585
        if until is not None:
586
            parts = list(urlsplit(url))
587
            parts[3] = urlencode({
588
                'until': until
589
            })
590
            url = urlunsplit(parts)
591
        if version:
592
            url = '%s?version=%s' % (url, version)
593
        r = self.head(url, user=user)
594
        if verify_status:
595
            self.assertEqual(r.status_code, 200)
596
        return r
597

    
598
    def get_object_meta(self, container, object, version=None, until=None,
599
                        user=None):
600
        prefix = 'X-Object-Meta-'
601
        user = user or self.user
602
        r = self.get_object_info(container, object, version, until=until,
603
                                 user=user)
604
        headers = dict(r._headers.values())
605
        return filter_headers(headers, prefix)
606

    
607
    def update_object_meta(self, container, object, meta=None, user=None,
608
                           verify_status=True):
609
        user = user or self.user
610
        meta = meta or {get_random_name(): get_random_name()}
611
        kwargs = dict(
612
            ('HTTP_X_OBJECT_META_%s' % k, str(v)) for k, v in meta.items())
613
        url = join_urls(self.pithos_path, user, container, object)
614
        r = self.post('%s?update=' % url, user=user, content_type='', **kwargs)
615
        if verify_status:
616
            self.assertEqual(r.status_code, 202)
617
        object_meta = self.get_object_meta(container, object, user=user)
618
        (self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for
619
            k in meta.keys())
620
        (self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
621
            k, v in meta.items())
622
        return r
623

    
624
    def assert_extended(self, data, format, type, size=10000):
625
        if format == 'xml':
626
            self._assert_xml(data, type, size)
627
        elif format == 'json':
628
            self._assert_json(data, type, size)
629

    
630
    def _assert_json(self, data, type, size):
631
        convert = lambda s: s.lower()
632
        info = [convert(elem) for elem in details[type]]
633
        self.assertTrue(len(data) <= size)
634
        for item in info:
635
            for i in data:
636
                if 'subdir' in i.keys():
637
                    continue
638
                self.assertTrue(item in i.keys())
639

    
640
    def _assert_xml(self, data, type, size):
641
        convert = lambda s: s.lower()
642
        info = [convert(elem) for elem in details[type]]
643
        try:
644
            info.remove('content_encoding')
645
        except ValueError:
646
            pass
647
        xml = data
648
        entities = xml.getElementsByTagName(type)
649
        self.assertTrue(len(entities) <= size)
650
        for e in entities:
651
            for item in info:
652
                self.assertTrue(e.getElementsByTagName(item))
653

    
654

    
655
class AssertMappingInvariant(object):
656
    def __init__(self, callable, *args, **kwargs):
657
        self.callable = callable
658
        self.args = args
659
        self.kwargs = kwargs
660

    
661
    def __enter__(self):
662
        self.map = self.callable(*self.args, **self.kwargs)
663
        return self.map
664

    
665
    def __exit__(self, type, value, tb):
666
        map = self.callable(*self.args, **self.kwargs)
667
        for k, v in self.map.items():
668
            if is_date(v):
669
                continue
670

    
671
            assert(k in map), '%s not in map' % k
672
            assert v == map[k]
673

    
674

    
675
class AssertUUidInvariant(object):
676
    def __init__(self, callable, *args, **kwargs):
677
        self.callable = callable
678
        self.args = args
679
        self.kwargs = kwargs
680

    
681
    def __enter__(self):
682
        self.map = self.callable(*self.args, **self.kwargs)
683
        assert('x-object-uuid' in self.map)
684
        self.uuid = self.map['x-object-uuid']
685
        return self.map
686

    
687
    def __exit__(self, type, value, tb):
688
        map = self.callable(*self.args, **self.kwargs)
689
        assert('x-object-uuid' in self.map)
690
        uuid = map['x-object-uuid']
691
        assert(uuid == self.uuid)