Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / __init__.py @ 381a548c

History | View | Annotate | Download (24.8 kB)

1 f3787696 Sofia Papagiannaki
#!/usr/bin/env python
2 f3787696 Sofia Papagiannaki
#coding=utf8
3 f3787696 Sofia Papagiannaki
4 f3787696 Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 f3787696 Sofia Papagiannaki
#
6 f3787696 Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 f3787696 Sofia Papagiannaki
# without modification, are permitted provided that the following
8 f3787696 Sofia Papagiannaki
# conditions are met:
9 f3787696 Sofia Papagiannaki
#
10 f3787696 Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 f3787696 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 f3787696 Sofia Papagiannaki
#      disclaimer.
13 f3787696 Sofia Papagiannaki
#
14 f3787696 Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 f3787696 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 f3787696 Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 f3787696 Sofia Papagiannaki
#      provided with the distribution.
18 f3787696 Sofia Papagiannaki
#
19 f3787696 Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 f3787696 Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 f3787696 Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 f3787696 Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 f3787696 Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 f3787696 Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 f3787696 Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 f3787696 Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 f3787696 Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 f3787696 Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 f3787696 Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 f3787696 Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 f3787696 Sofia Papagiannaki
#
32 f3787696 Sofia Papagiannaki
# The views and conclusions contained in the software and
33 f3787696 Sofia Papagiannaki
# documentation are those of the authors and should not be
34 f3787696 Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 f3787696 Sofia Papagiannaki
# or implied, of GRNET S.A.
36 f3787696 Sofia Papagiannaki
37 f53483d8 Sofia Papagiannaki
from urlparse import urlunsplit, urlsplit, urlparse
38 f3787696 Sofia Papagiannaki
from xml.dom import minidom
39 6ee6677e Sofia Papagiannaki
from urllib import quote, unquote
40 f3787696 Sofia Papagiannaki
41 f3787696 Sofia Papagiannaki
from snf_django.utils.testing import with_settings, astakos_user
42 f3787696 Sofia Papagiannaki
43 f3787696 Sofia Papagiannaki
from pithos.api import settings as pithos_settings
44 bc4f25c0 Sofia Papagiannaki
from pithos.api.test.util import is_date, get_random_data, get_random_name
45 bc4f25c0 Sofia Papagiannaki
from pithos.backends.migrate import initialize_db
46 f3787696 Sofia Papagiannaki
47 d6a92fa0 Sofia Papagiannaki
from synnefo.lib.services import get_service_path
48 d6a92fa0 Sofia Papagiannaki
from synnefo.lib import join_urls
49 d6a92fa0 Sofia Papagiannaki
50 f3787696 Sofia Papagiannaki
from django.test import TestCase
51 f53483d8 Sofia Papagiannaki
from django.test.client import Client, MULTIPART_CONTENT, FakePayload
52 bc4f25c0 Sofia Papagiannaki
from django.test.simple import DjangoTestSuiteRunner
53 7c36f3fb Sofia Papagiannaki
from django.conf import settings
54 f3787696 Sofia Papagiannaki
from django.utils.http import urlencode
55 bc4f25c0 Sofia Papagiannaki
from django.db.backends.creation import TEST_DATABASE_PREFIX
56 f3787696 Sofia Papagiannaki
57 f3787696 Sofia Papagiannaki
import django.utils.simplejson as json
58 f3787696 Sofia Papagiannaki
59 f3787696 Sofia Papagiannaki
import random
60 f3787696 Sofia Papagiannaki
import functools
61 f3787696 Sofia Papagiannaki
62 3a19e99b Sofia Papagiannaki
63 f3787696 Sofia Papagiannaki
pithos_test_settings = functools.partial(with_settings, pithos_settings)
64 f3787696 Sofia Papagiannaki
65 f3787696 Sofia Papagiannaki
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
66 f3787696 Sofia Papagiannaki
                "%A, %d-%b-%y %H:%M:%S GMT",
67 f3787696 Sofia Papagiannaki
                "%a, %d %b %Y %H:%M:%S GMT"]
68 f3787696 Sofia Papagiannaki
69 f3787696 Sofia Papagiannaki
o_names = ['kate.jpg',
70 f3787696 Sofia Papagiannaki
           'kate_beckinsale.jpg',
71 f3787696 Sofia Papagiannaki
           'How To Win Friends And Influence People.pdf',
72 f3787696 Sofia Papagiannaki
           'moms_birthday.jpg',
73 f3787696 Sofia Papagiannaki
           'poodle_strut.mov',
74 f3787696 Sofia Papagiannaki
           'Disturbed - Down With The Sickness.mp3',
75 f3787696 Sofia Papagiannaki
           'army_of_darkness.avi',
76 f3787696 Sofia Papagiannaki
           'the_mad.avi',
77 f3787696 Sofia Papagiannaki
           'photos/animals/dogs/poodle.jpg',
78 f3787696 Sofia Papagiannaki
           'photos/animals/dogs/terrier.jpg',
79 f3787696 Sofia Papagiannaki
           'photos/animals/cats/persian.jpg',
80 f3787696 Sofia Papagiannaki
           'photos/animals/cats/siamese.jpg',
81 f3787696 Sofia Papagiannaki
           'photos/plants/fern.jpg',
82 f3787696 Sofia Papagiannaki
           'photos/plants/rose.jpg',
83 f3787696 Sofia Papagiannaki
           'photos/me.jpg']
84 f3787696 Sofia Papagiannaki
85 f3787696 Sofia Papagiannaki
details = {'container': ('name', 'count', 'bytes', 'last_modified',
86 f3787696 Sofia Papagiannaki
                         'x_container_policy'),
87 f3787696 Sofia Papagiannaki
           'object': ('name', 'hash', 'bytes', 'content_type',
88 f3787696 Sofia Papagiannaki
                      'content_encoding', 'last_modified',)}
89 f3787696 Sofia Papagiannaki
90 5fe43b8c Sofia Papagiannaki
TEST_BLOCK_SIZE = 1024
91 5fe43b8c Sofia Papagiannaki
TEST_HASH_ALGORITHM = 'sha256'
92 5fe43b8c Sofia Papagiannaki
93 bc4f25c0 Sofia Papagiannaki
print 'backend module:', pithos_settings.BACKEND_DB_MODULE
94 bc4f25c0 Sofia Papagiannaki
print 'backend database engine:', settings.DATABASES['default']['ENGINE']
95 bc4f25c0 Sofia Papagiannaki
print 'update md5:', pithos_settings.UPDATE_MD5
96 65f480f5 Sofia Papagiannaki
97 65f480f5 Sofia Papagiannaki
98 bc4f25c0 Sofia Papagiannaki
django_sqlalchemy_engines = {
99 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
100 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.postgresql': 'postgresql',
101 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.mysql': '',
102 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.sqlite3': 'mssql',
103 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.oracle': 'oracle'}
104 65f480f5 Sofia Papagiannaki
105 65f480f5 Sofia Papagiannaki
106 f53483d8 Sofia Papagiannaki
def prepare_db_connection():
107 bc4f25c0 Sofia Papagiannaki
    """Build pithos backend connection string from django default database"""
108 bc4f25c0 Sofia Papagiannaki
109 65f480f5 Sofia Papagiannaki
    db = settings.DATABASES['default']
110 bc4f25c0 Sofia Papagiannaki
    name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME'])
111 bc4f25c0 Sofia Papagiannaki
112 bc4f25c0 Sofia Papagiannaki
    if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'):
113 bc4f25c0 Sofia Papagiannaki
        if db['ENGINE'] == 'django.db.backends.sqlite3':
114 bc4f25c0 Sofia Papagiannaki
            db_connection = 'sqlite:///%s' % name
115 bc4f25c0 Sofia Papagiannaki
        else:
116 bc4f25c0 Sofia Papagiannaki
            d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
117 bc4f25c0 Sofia Papagiannaki
                     user=db['USER'],
118 bc4f25c0 Sofia Papagiannaki
                     pwd=db['PASSWORD'],
119 bc4f25c0 Sofia Papagiannaki
                     host=db['HOST'].lower(),
120 bc4f25c0 Sofia Papagiannaki
                     port=int(db['PORT']) if db['PORT'] != '' else '',
121 bc4f25c0 Sofia Papagiannaki
                     name=name)
122 bc4f25c0 Sofia Papagiannaki
            db_connection = (
123 bc4f25c0 Sofia Papagiannaki
                '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
124 bc4f25c0 Sofia Papagiannaki
125 bc4f25c0 Sofia Papagiannaki
            # initialize pithos database
126 bc4f25c0 Sofia Papagiannaki
            initialize_db(db_connection)
127 65f480f5 Sofia Papagiannaki
    else:
128 bc4f25c0 Sofia Papagiannaki
        db_connection = name
129 bc4f25c0 Sofia Papagiannaki
    pithos_settings.BACKEND_DB_CONNECTION = db_connection
130 bc4f25c0 Sofia Papagiannaki
131 bc4f25c0 Sofia Papagiannaki
132 6ee6677e Sofia Papagiannaki
def filter_headers(headers, prefix):
133 6ee6677e Sofia Papagiannaki
    meta = {}
134 6ee6677e Sofia Papagiannaki
    for k, v in headers.iteritems():
135 6ee6677e Sofia Papagiannaki
        if not k.startswith(prefix):
136 6ee6677e Sofia Papagiannaki
            continue
137 6ee6677e Sofia Papagiannaki
        meta[unquote(k[len(prefix):])] = unquote(v)
138 6ee6677e Sofia Papagiannaki
    return meta
139 6ee6677e Sofia Papagiannaki
140 6ee6677e Sofia Papagiannaki
141 bc4f25c0 Sofia Papagiannaki
class PithosTestSuiteRunner(DjangoTestSuiteRunner):
142 bc4f25c0 Sofia Papagiannaki
    def setup_databases(self, **kwargs):
143 bc4f25c0 Sofia Papagiannaki
        old_names, mirrors = super(PithosTestSuiteRunner,
144 bc4f25c0 Sofia Papagiannaki
                                   self).setup_databases(**kwargs)
145 f53483d8 Sofia Papagiannaki
        prepare_db_connection()
146 bc4f25c0 Sofia Papagiannaki
        return old_names, mirrors
147 65f480f5 Sofia Papagiannaki
148 de8fb87d Sofia Papagiannaki
    def teardown_databases(self, old_config, **kwargs):
149 de8fb87d Sofia Papagiannaki
        from pithos.api.util import _pithos_backend_pool
150 de8fb87d Sofia Papagiannaki
        _pithos_backend_pool.shutdown()
151 de8fb87d Sofia Papagiannaki
        super(PithosTestSuiteRunner, self).teardown_databases(old_config,
152 de8fb87d Sofia Papagiannaki
                                                              **kwargs)
153 de8fb87d Sofia Papagiannaki
154 f3787696 Sofia Papagiannaki
155 f53483d8 Sofia Papagiannaki
class PithosTestClient(Client):
156 e7e3df52 Sofia Papagiannaki
    def _get_path(self, parsed):
157 e7e3df52 Sofia Papagiannaki
        # If there are parameters, add them
158 e7e3df52 Sofia Papagiannaki
        if parsed[3]:
159 c977b0b6 Sofia Papagiannaki
            return unquote(parsed[2] + ";" + parsed[3])
160 e7e3df52 Sofia Papagiannaki
        else:
161 c977b0b6 Sofia Papagiannaki
            return unquote(parsed[2])
162 e7e3df52 Sofia Papagiannaki
163 f53483d8 Sofia Papagiannaki
    def copy(self, path, data={}, content_type=MULTIPART_CONTENT,
164 f53483d8 Sofia Papagiannaki
             follow=False, **extra):
165 f53483d8 Sofia Papagiannaki
        """
166 f53483d8 Sofia Papagiannaki
        Send a resource to the server using COPY.
167 f53483d8 Sofia Papagiannaki
        """
168 f53483d8 Sofia Papagiannaki
        parsed = urlparse(path)
169 f53483d8 Sofia Papagiannaki
        r = {
170 f53483d8 Sofia Papagiannaki
            'CONTENT_TYPE':    'text/html; charset=utf-8',
171 f53483d8 Sofia Papagiannaki
            'PATH_INFO':       self._get_path(parsed),
172 f53483d8 Sofia Papagiannaki
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
173 f53483d8 Sofia Papagiannaki
            'REQUEST_METHOD': 'COPY',
174 f53483d8 Sofia Papagiannaki
            'wsgi.input':      FakePayload('')
175 f53483d8 Sofia Papagiannaki
        }
176 f53483d8 Sofia Papagiannaki
        r.update(extra)
177 f53483d8 Sofia Papagiannaki
178 f53483d8 Sofia Papagiannaki
        response = self.request(**r)
179 f53483d8 Sofia Papagiannaki
        if follow:
180 f53483d8 Sofia Papagiannaki
            response = self._handle_redirects(response, **extra)
181 f53483d8 Sofia Papagiannaki
        return response
182 f53483d8 Sofia Papagiannaki
183 f53483d8 Sofia Papagiannaki
    def move(self, path, data={}, content_type=MULTIPART_CONTENT,
184 f53483d8 Sofia Papagiannaki
             follow=False, **extra):
185 f53483d8 Sofia Papagiannaki
        """
186 f53483d8 Sofia Papagiannaki
        Send a resource to the server using MOVE.
187 f53483d8 Sofia Papagiannaki
        """
188 f53483d8 Sofia Papagiannaki
        parsed = urlparse(path)
189 f53483d8 Sofia Papagiannaki
        r = {
190 f53483d8 Sofia Papagiannaki
            'CONTENT_TYPE':    'text/html; charset=utf-8',
191 f53483d8 Sofia Papagiannaki
            'PATH_INFO':       self._get_path(parsed),
192 f53483d8 Sofia Papagiannaki
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
193 f53483d8 Sofia Papagiannaki
            'REQUEST_METHOD': 'MOVE',
194 f53483d8 Sofia Papagiannaki
            'wsgi.input':      FakePayload('')
195 f53483d8 Sofia Papagiannaki
        }
196 f53483d8 Sofia Papagiannaki
        r.update(extra)
197 f53483d8 Sofia Papagiannaki
198 f53483d8 Sofia Papagiannaki
        response = self.request(**r)
199 f53483d8 Sofia Papagiannaki
        if follow:
200 f53483d8 Sofia Papagiannaki
            response = self._handle_redirects(response, **extra)
201 f53483d8 Sofia Papagiannaki
        return response
202 f53483d8 Sofia Papagiannaki
203 f53483d8 Sofia Papagiannaki
204 f3787696 Sofia Papagiannaki
class PithosAPITest(TestCase):
205 f3787696 Sofia Papagiannaki
    def setUp(self):
206 f53483d8 Sofia Papagiannaki
        self.client = PithosTestClient()
207 f53483d8 Sofia Papagiannaki
208 369a7b41 Sofia Papagiannaki
        # Override default block size to spead up tests
209 5fe43b8c Sofia Papagiannaki
        pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE
210 5fe43b8c Sofia Papagiannaki
        pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM
211 369a7b41 Sofia Papagiannaki
212 f3787696 Sofia Papagiannaki
        self.user = 'user'
213 d6a92fa0 Sofia Papagiannaki
        self.pithos_path = join_urls(get_service_path(
214 d6a92fa0 Sofia Papagiannaki
            pithos_settings.pithos_services, 'object-store'))
215 f3787696 Sofia Papagiannaki
216 f3787696 Sofia Papagiannaki
    def tearDown(self):
217 f3787696 Sofia Papagiannaki
        #delete additionally created metadata
218 f3787696 Sofia Papagiannaki
        meta = self.get_account_meta()
219 f3787696 Sofia Papagiannaki
        self.delete_account_meta(meta)
220 f3787696 Sofia Papagiannaki
221 f3787696 Sofia Papagiannaki
        #delete additionally created groups
222 f3787696 Sofia Papagiannaki
        groups = self.get_account_groups()
223 f3787696 Sofia Papagiannaki
        self.delete_account_groups(groups)
224 f3787696 Sofia Papagiannaki
225 f3787696 Sofia Papagiannaki
        self._clean_account()
226 f3787696 Sofia Papagiannaki
227 65f480f5 Sofia Papagiannaki
    def _clean_account(self):
228 65f480f5 Sofia Papagiannaki
        for c in self.list_containers():
229 65f480f5 Sofia Papagiannaki
            self.delete_container_content(c['name'])
230 65f480f5 Sofia Papagiannaki
            self.delete_container(c['name'])
231 65f480f5 Sofia Papagiannaki
232 f53483d8 Sofia Papagiannaki
    def head(self, url, user='user', token='DummyToken', data={}, follow=False,
233 f53483d8 Sofia Papagiannaki
             **extra):
234 f3787696 Sofia Papagiannaki
        with astakos_user(user):
235 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
236 f53483d8 Sofia Papagiannaki
            if token:
237 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
238 6ee6677e Sofia Papagiannaki
            response = self.client.head(url, data, follow, **extra)
239 f3787696 Sofia Papagiannaki
        return response
240 f3787696 Sofia Papagiannaki
241 f53483d8 Sofia Papagiannaki
    def get(self, url, user='user', token='DummyToken', data={}, follow=False,
242 f53483d8 Sofia Papagiannaki
            **extra):
243 f3787696 Sofia Papagiannaki
        with astakos_user(user):
244 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
245 f53483d8 Sofia Papagiannaki
            if token:
246 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
247 6ee6677e Sofia Papagiannaki
            response = self.client.get(url, data, follow, **extra)
248 f3787696 Sofia Papagiannaki
        return response
249 f3787696 Sofia Papagiannaki
250 f53483d8 Sofia Papagiannaki
    def delete(self, url, user='user', token='DummyToken', data={},
251 f53483d8 Sofia Papagiannaki
               follow=False, **extra):
252 f3787696 Sofia Papagiannaki
        with astakos_user(user):
253 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
254 f53483d8 Sofia Papagiannaki
            if token:
255 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
256 6ee6677e Sofia Papagiannaki
            response = self.client.delete(url, data, follow, **extra)
257 f3787696 Sofia Papagiannaki
        return response
258 f3787696 Sofia Papagiannaki
259 f53483d8 Sofia Papagiannaki
    def post(self, url, user='user', token='DummyToken', data={},
260 6ee6677e Sofia Papagiannaki
             content_type='application/octet-stream', follow=False, **extra):
261 f3787696 Sofia Papagiannaki
        with astakos_user(user):
262 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
263 f53483d8 Sofia Papagiannaki
            if token:
264 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
265 6ee6677e Sofia Papagiannaki
            response = self.client.post(url, data, content_type, follow,
266 6ee6677e Sofia Papagiannaki
                                        **extra)
267 f3787696 Sofia Papagiannaki
        return response
268 f3787696 Sofia Papagiannaki
269 f53483d8 Sofia Papagiannaki
    def put(self, url, user='user', token='DummyToken', data={},
270 6ee6677e Sofia Papagiannaki
            content_type='application/octet-stream', follow=False, **extra):
271 f3787696 Sofia Papagiannaki
        with astakos_user(user):
272 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
273 f53483d8 Sofia Papagiannaki
            if token:
274 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
275 6ee6677e Sofia Papagiannaki
            response = self.client.put(url, data, content_type, follow,
276 6ee6677e Sofia Papagiannaki
                                       **extra)
277 f3787696 Sofia Papagiannaki
        return response
278 f3787696 Sofia Papagiannaki
279 f53483d8 Sofia Papagiannaki
    def copy(self, url, user='user', token='DummyToken', data={},
280 f53483d8 Sofia Papagiannaki
             content_type='application/octet-stream', follow=False, **extra):
281 f53483d8 Sofia Papagiannaki
        with astakos_user(user):
282 f53483d8 Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
283 f53483d8 Sofia Papagiannaki
            if token:
284 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
285 f53483d8 Sofia Papagiannaki
            response = self.client.copy(url, data, content_type, follow,
286 f53483d8 Sofia Papagiannaki
                                        **extra)
287 f53483d8 Sofia Papagiannaki
        return response
288 f53483d8 Sofia Papagiannaki
289 f53483d8 Sofia Papagiannaki
    def move(self, url, user='user', token='DummyToken', data={},
290 f53483d8 Sofia Papagiannaki
             content_type='application/octet-stream', follow=False, **extra):
291 f53483d8 Sofia Papagiannaki
        with astakos_user(user):
292 f53483d8 Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
293 f53483d8 Sofia Papagiannaki
            if token:
294 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
295 f53483d8 Sofia Papagiannaki
            response = self.client.move(url, data, content_type, follow,
296 f53483d8 Sofia Papagiannaki
                                        **extra)
297 f53483d8 Sofia Papagiannaki
        return response
298 f53483d8 Sofia Papagiannaki
299 c977b0b6 Sofia Papagiannaki
    def update_account_meta(self, meta, user=None, verify_status=True):
300 6ee6677e Sofia Papagiannaki
        user = user or self.user
301 f3787696 Sofia Papagiannaki
        kwargs = dict(
302 f3787696 Sofia Papagiannaki
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
303 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
304 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **kwargs)
305 c977b0b6 Sofia Papagiannaki
        if verify_status:
306 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
307 f53483d8 Sofia Papagiannaki
        account_meta = self.get_account_meta(user=user)
308 f53483d8 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
309 f53483d8 Sofia Papagiannaki
            k in meta.keys())
310 f53483d8 Sofia Papagiannaki
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
311 f53483d8 Sofia Papagiannaki
            k, v in meta.items())
312 f53483d8 Sofia Papagiannaki
313 c977b0b6 Sofia Papagiannaki
    def reset_account_meta(self, meta, user=None, verify_status=True):
314 f53483d8 Sofia Papagiannaki
        user = user or self.user
315 f53483d8 Sofia Papagiannaki
        kwargs = dict(
316 f53483d8 Sofia Papagiannaki
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
317 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
318 f53483d8 Sofia Papagiannaki
        r = self.post(url, user=user, **kwargs)
319 c977b0b6 Sofia Papagiannaki
        if verify_status:
320 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
321 f53483d8 Sofia Papagiannaki
        account_meta = self.get_account_meta(user=user)
322 f53483d8 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
323 f53483d8 Sofia Papagiannaki
            k in meta.keys())
324 f53483d8 Sofia Papagiannaki
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
325 f53483d8 Sofia Papagiannaki
            k, v in meta.items())
326 f3787696 Sofia Papagiannaki
327 c977b0b6 Sofia Papagiannaki
    def delete_account_meta(self, meta, user=None, verify_status=True):
328 6ee6677e Sofia Papagiannaki
        user = user or self.user
329 f53483d8 Sofia Papagiannaki
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
330 f3787696 Sofia Papagiannaki
        kwargs = dict((transform(k), '') for k, v in meta.items())
331 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
332 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **kwargs)
333 c977b0b6 Sofia Papagiannaki
        if verify_status:
334 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
335 f53483d8 Sofia Papagiannaki
        account_meta = self.get_account_meta(user=user)
336 f53483d8 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
337 f53483d8 Sofia Papagiannaki
            k in meta.keys())
338 f3787696 Sofia Papagiannaki
        return r
339 f3787696 Sofia Papagiannaki
340 c977b0b6 Sofia Papagiannaki
    def delete_account_groups(self, groups, user=None, verify_status=True):
341 6ee6677e Sofia Papagiannaki
        user = user or self.user
342 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
343 f53483d8 Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **groups)
344 c977b0b6 Sofia Papagiannaki
        if verify_status:
345 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
346 6ee6677e Sofia Papagiannaki
        account_groups = self.get_account_groups()
347 6ee6677e Sofia Papagiannaki
        (self.assertTrue(k not in account_groups) for k in groups.keys())
348 f3787696 Sofia Papagiannaki
        return r
349 f3787696 Sofia Papagiannaki
350 c977b0b6 Sofia Papagiannaki
    def get_account_info(self, until=None, user=None, verify_status=True):
351 6ee6677e Sofia Papagiannaki
        user = user or self.user
352 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
353 f3787696 Sofia Papagiannaki
        if until is not None:
354 f3787696 Sofia Papagiannaki
            parts = list(urlsplit(url))
355 f3787696 Sofia Papagiannaki
            parts[3] = urlencode({
356 f3787696 Sofia Papagiannaki
                'until': until
357 f3787696 Sofia Papagiannaki
            })
358 f3787696 Sofia Papagiannaki
            url = urlunsplit(parts)
359 6ee6677e Sofia Papagiannaki
        r = self.head(url, user=user)
360 c977b0b6 Sofia Papagiannaki
        if verify_status:
361 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
362 f3787696 Sofia Papagiannaki
        return r
363 f3787696 Sofia Papagiannaki
364 6ee6677e Sofia Papagiannaki
    def get_account_meta(self, until=None, user=None):
365 6ee6677e Sofia Papagiannaki
        prefix = 'X-Account-Meta-'
366 6ee6677e Sofia Papagiannaki
        r = self.get_account_info(until=until, user=user)
367 f3787696 Sofia Papagiannaki
        headers = dict(r._headers.values())
368 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
369 f3787696 Sofia Papagiannaki
370 6ee6677e Sofia Papagiannaki
    def get_account_groups(self, until=None, user=None):
371 6ee6677e Sofia Papagiannaki
        prefix = 'X-Account-Group-'
372 6ee6677e Sofia Papagiannaki
        r = self.get_account_info(until=until, user=user)
373 f3787696 Sofia Papagiannaki
        headers = dict(r._headers.values())
374 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
375 f3787696 Sofia Papagiannaki
376 c977b0b6 Sofia Papagiannaki
    def get_container_info(self, container, until=None, user=None,
377 c977b0b6 Sofia Papagiannaki
                           verify_status=True):
378 6ee6677e Sofia Papagiannaki
        user = user or self.user
379 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container)
380 3a19e99b Sofia Papagiannaki
        if until is not None:
381 3a19e99b Sofia Papagiannaki
            parts = list(urlsplit(url))
382 3a19e99b Sofia Papagiannaki
            parts[3] = urlencode({
383 3a19e99b Sofia Papagiannaki
                'until': until
384 3a19e99b Sofia Papagiannaki
            })
385 3a19e99b Sofia Papagiannaki
            url = urlunsplit(parts)
386 6ee6677e Sofia Papagiannaki
        r = self.head(url, user=user)
387 c977b0b6 Sofia Papagiannaki
        if verify_status:
388 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
389 3a19e99b Sofia Papagiannaki
        return r
390 3a19e99b Sofia Papagiannaki
391 6ee6677e Sofia Papagiannaki
    def get_container_meta(self, container, until=None, user=None):
392 6ee6677e Sofia Papagiannaki
        prefix = 'X-Container-Meta-'
393 6ee6677e Sofia Papagiannaki
        r = self.get_container_info(container, until=until, user=user)
394 3a19e99b Sofia Papagiannaki
        headers = dict(r._headers.values())
395 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
396 3a19e99b Sofia Papagiannaki
397 c977b0b6 Sofia Papagiannaki
    def update_container_meta(self, container, meta=None, user=None,
398 c977b0b6 Sofia Papagiannaki
                              verify_status=True):
399 6ee6677e Sofia Papagiannaki
        user = user or self.user
400 c977b0b6 Sofia Papagiannaki
        meta = meta or {get_random_name(): get_random_name()}
401 3a19e99b Sofia Papagiannaki
        kwargs = dict(
402 3a19e99b Sofia Papagiannaki
            ('HTTP_X_CONTAINER_META_%s' % k, str(v)) for k, v in meta.items())
403 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container)
404 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **kwargs)
405 c977b0b6 Sofia Papagiannaki
        if verify_status:
406 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
407 6e0f3e65 Sofia Papagiannaki
        container_meta = self.get_container_meta(container, user=user)
408 3a19e99b Sofia Papagiannaki
        (self.assertTrue('X-Container-Meta-%s' % k in container_meta) for
409 3a19e99b Sofia Papagiannaki
            k in meta.keys())
410 3a19e99b Sofia Papagiannaki
        (self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for
411 3a19e99b Sofia Papagiannaki
            k, v in meta.items())
412 c977b0b6 Sofia Papagiannaki
        return r
413 3a19e99b Sofia Papagiannaki
414 6ee6677e Sofia Papagiannaki
    def list_containers(self, format='json', headers={}, user=None, **params):
415 6ee6677e Sofia Papagiannaki
        user = user or self.user
416 6ee6677e Sofia Papagiannaki
        _url = join_urls(self.pithos_path, user)
417 d6a92fa0 Sofia Papagiannaki
        parts = list(urlsplit(_url))
418 f3787696 Sofia Papagiannaki
        params['format'] = format
419 f3787696 Sofia Papagiannaki
        parts[3] = urlencode(params)
420 f3787696 Sofia Papagiannaki
        url = urlunsplit(parts)
421 f3787696 Sofia Papagiannaki
        _headers = dict(('HTTP_%s' % k.upper(), str(v))
422 f3787696 Sofia Papagiannaki
                        for k, v in headers.items())
423 6ee6677e Sofia Papagiannaki
        r = self.get(url, user=user, **_headers)
424 f3787696 Sofia Papagiannaki
425 f3787696 Sofia Papagiannaki
        if format is None:
426 f3787696 Sofia Papagiannaki
            containers = r.content.split('\n')
427 f3787696 Sofia Papagiannaki
            if '' in containers:
428 f3787696 Sofia Papagiannaki
                containers.remove('')
429 f3787696 Sofia Papagiannaki
            return containers
430 f3787696 Sofia Papagiannaki
        elif format == 'json':
431 f3787696 Sofia Papagiannaki
            try:
432 f3787696 Sofia Papagiannaki
                containers = json.loads(r.content)
433 f3787696 Sofia Papagiannaki
            except:
434 f3787696 Sofia Papagiannaki
                self.fail('json format expected')
435 f3787696 Sofia Papagiannaki
            return containers
436 f3787696 Sofia Papagiannaki
        elif format == 'xml':
437 f3787696 Sofia Papagiannaki
            return minidom.parseString(r.content)
438 f3787696 Sofia Papagiannaki
439 c977b0b6 Sofia Papagiannaki
    def delete_container_content(self, cname, user=None, verify_status=True):
440 6ee6677e Sofia Papagiannaki
        user = user or self.user
441 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
442 6ee6677e Sofia Papagiannaki
        r = self.delete('%s?delimiter=/' % url, user=user)
443 c977b0b6 Sofia Papagiannaki
        if verify_status:
444 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
445 f3787696 Sofia Papagiannaki
        return r
446 f3787696 Sofia Papagiannaki
447 c977b0b6 Sofia Papagiannaki
    def delete_container(self, cname, user=None, verify_status=True):
448 6ee6677e Sofia Papagiannaki
        user = user or self.user
449 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
450 6ee6677e Sofia Papagiannaki
        r = self.delete(url, user=user)
451 c977b0b6 Sofia Papagiannaki
        if verify_status:
452 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
453 c977b0b6 Sofia Papagiannaki
        return r
454 c977b0b6 Sofia Papagiannaki
455 c977b0b6 Sofia Papagiannaki
    def delete_object(self, cname, oname, user=None, verify_status=True):
456 c977b0b6 Sofia Papagiannaki
        user = user or self.user
457 c977b0b6 Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
458 c977b0b6 Sofia Papagiannaki
        r = self.delete(url, user=user)
459 c977b0b6 Sofia Papagiannaki
        if verify_status:
460 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
461 f3787696 Sofia Papagiannaki
        return r
462 f3787696 Sofia Papagiannaki
463 c977b0b6 Sofia Papagiannaki
    def create_container(self, cname=None, user=None, verify_status=True):
464 95b36144 Sofia Papagiannaki
        cname = cname or get_random_name()
465 6ee6677e Sofia Papagiannaki
        user = user or self.user
466 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
467 6ee6677e Sofia Papagiannaki
        r = self.put(url, user=user, data='')
468 c977b0b6 Sofia Papagiannaki
        if verify_status:
469 c977b0b6 Sofia Papagiannaki
            self.assertTrue(r.status_code in (202, 201))
470 95b36144 Sofia Papagiannaki
        return cname, r
471 f3787696 Sofia Papagiannaki
472 c977b0b6 Sofia Papagiannaki
    def upload_object(self, cname, oname=None, length=None, verify_status=True,
473 6ee6677e Sofia Papagiannaki
                      user=None, **meta):
474 bc4f25c0 Sofia Papagiannaki
        oname = oname or get_random_name()
475 5fe43b8c Sofia Papagiannaki
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
476 6ee6677e Sofia Papagiannaki
        user = user or self.user
477 3a19e99b Sofia Papagiannaki
        data = get_random_data(length=length)
478 f3787696 Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
479 f3787696 Sofia Papagiannaki
                       for k, v in meta.iteritems())
480 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
481 6ee6677e Sofia Papagiannaki
        r = self.put(url, user=user, data=data, **headers)
482 c977b0b6 Sofia Papagiannaki
        if verify_status:
483 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
484 3a19e99b Sofia Papagiannaki
        return oname, data, r
485 3a19e99b Sofia Papagiannaki
486 3a19e99b Sofia Papagiannaki
    def update_object_data(self, cname, oname=None, length=None,
487 3a19e99b Sofia Papagiannaki
                           content_type=None, content_range=None,
488 c977b0b6 Sofia Papagiannaki
                           verify_status=True, user=None, **meta):
489 bc4f25c0 Sofia Papagiannaki
        oname = oname or get_random_name()
490 5fe43b8c Sofia Papagiannaki
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
491 3a19e99b Sofia Papagiannaki
        content_type = content_type or 'application/octet-stream'
492 6ee6677e Sofia Papagiannaki
        user = user or self.user
493 3a19e99b Sofia Papagiannaki
        data = get_random_data(length=length)
494 3a19e99b Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
495 3a19e99b Sofia Papagiannaki
                       for k, v in meta.iteritems())
496 3a19e99b Sofia Papagiannaki
        if content_range:
497 3a19e99b Sofia Papagiannaki
            headers['HTTP_CONTENT_RANGE'] = content_range
498 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
499 6ee6677e Sofia Papagiannaki
        r = self.post(url, user=user, data=data, content_type=content_type,
500 6ee6677e Sofia Papagiannaki
                      **headers)
501 c977b0b6 Sofia Papagiannaki
        if verify_status:
502 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
503 f3787696 Sofia Papagiannaki
        return oname, data, r
504 f3787696 Sofia Papagiannaki
505 3a19e99b Sofia Papagiannaki
    def append_object_data(self, cname, oname=None, length=None,
506 6ee6677e Sofia Papagiannaki
                           content_type=None, user=None):
507 3a19e99b Sofia Papagiannaki
        return self.update_object_data(cname, oname=oname,
508 3a19e99b Sofia Papagiannaki
                                       length=length,
509 3a19e99b Sofia Papagiannaki
                                       content_type=content_type,
510 6ee6677e Sofia Papagiannaki
                                       content_range='bytes */*',
511 6ee6677e Sofia Papagiannaki
                                       user=user)
512 3a19e99b Sofia Papagiannaki
513 c977b0b6 Sofia Papagiannaki
    def create_folder(self, cname, oname=None, user=None, verify_status=True,
514 c977b0b6 Sofia Papagiannaki
                      **headers):
515 6ee6677e Sofia Papagiannaki
        user = user or self.user
516 bc4f25c0 Sofia Papagiannaki
        oname = oname or get_random_name()
517 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
518 6ee6677e Sofia Papagiannaki
        r = self.put(url, user=user, data='',
519 6ee6677e Sofia Papagiannaki
                     content_type='application/directory', **headers)
520 c977b0b6 Sofia Papagiannaki
        if verify_status:
521 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
522 f3787696 Sofia Papagiannaki
        return oname, r
523 f3787696 Sofia Papagiannaki
524 c977b0b6 Sofia Papagiannaki
    def list_objects(self, cname, prefix=None, user=None, verify_status=True):
525 6ee6677e Sofia Papagiannaki
        user = user or self.user
526 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
527 3a19e99b Sofia Papagiannaki
        path = '%s?format=json' % url
528 3a19e99b Sofia Papagiannaki
        if prefix is not None:
529 3a19e99b Sofia Papagiannaki
            path = '%s&prefix=%s' % (path, prefix)
530 6ee6677e Sofia Papagiannaki
        r = self.get(path, user=user)
531 c977b0b6 Sofia Papagiannaki
        if verify_status:
532 c977b0b6 Sofia Papagiannaki
            self.assertTrue(r.status_code in (200, 204))
533 f3787696 Sofia Papagiannaki
        try:
534 f3787696 Sofia Papagiannaki
            objects = json.loads(r.content)
535 f3787696 Sofia Papagiannaki
        except:
536 f3787696 Sofia Papagiannaki
            self.fail('json format expected')
537 f3787696 Sofia Papagiannaki
        return objects
538 f3787696 Sofia Papagiannaki
539 6ee6677e Sofia Papagiannaki
    def get_object_info(self, container, object, version=None, until=None,
540 c977b0b6 Sofia Papagiannaki
                        user=None, verify_status=True):
541 6ee6677e Sofia Papagiannaki
        user = user or self.user
542 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container, object)
543 3a19e99b Sofia Papagiannaki
        if until is not None:
544 3a19e99b Sofia Papagiannaki
            parts = list(urlsplit(url))
545 3a19e99b Sofia Papagiannaki
            parts[3] = urlencode({
546 3a19e99b Sofia Papagiannaki
                'until': until
547 3a19e99b Sofia Papagiannaki
            })
548 3a19e99b Sofia Papagiannaki
            url = urlunsplit(parts)
549 3a19e99b Sofia Papagiannaki
        if version:
550 3a19e99b Sofia Papagiannaki
            url = '%s?version=%s' % (url, version)
551 6ee6677e Sofia Papagiannaki
        r = self.head(url, user=user)
552 c977b0b6 Sofia Papagiannaki
        if verify_status:
553 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
554 3a19e99b Sofia Papagiannaki
        return r
555 3a19e99b Sofia Papagiannaki
556 6ee6677e Sofia Papagiannaki
    def get_object_meta(self, container, object, version=None, until=None,
557 6ee6677e Sofia Papagiannaki
                        user=None):
558 6ee6677e Sofia Papagiannaki
        prefix = 'X-Object-Meta-'
559 f53483d8 Sofia Papagiannaki
        user = user or self.user
560 6ee6677e Sofia Papagiannaki
        r = self.get_object_info(container, object, version, until=until,
561 6ee6677e Sofia Papagiannaki
                                 user=user)
562 3a19e99b Sofia Papagiannaki
        headers = dict(r._headers.values())
563 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
564 3a19e99b Sofia Papagiannaki
565 c977b0b6 Sofia Papagiannaki
    def update_object_meta(self, container, object, meta=None, user=None,
566 c977b0b6 Sofia Papagiannaki
                           verify_status=True):
567 6ee6677e Sofia Papagiannaki
        user = user or self.user
568 c977b0b6 Sofia Papagiannaki
        meta = meta or {get_random_name(): get_random_name()}
569 3a19e99b Sofia Papagiannaki
        kwargs = dict(
570 3a19e99b Sofia Papagiannaki
            ('HTTP_X_OBJECT_META_%s' % k, str(v)) for k, v in meta.items())
571 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container, object)
572 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, content_type='', **kwargs)
573 c977b0b6 Sofia Papagiannaki
        if verify_status:
574 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
575 6e0f3e65 Sofia Papagiannaki
        object_meta = self.get_object_meta(container, object, user=user)
576 3a19e99b Sofia Papagiannaki
        (self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for
577 3a19e99b Sofia Papagiannaki
            k in meta.keys())
578 3a19e99b Sofia Papagiannaki
        (self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
579 3a19e99b Sofia Papagiannaki
            k, v in meta.items())
580 c977b0b6 Sofia Papagiannaki
        return r
581 3a19e99b Sofia Papagiannaki
582 f3787696 Sofia Papagiannaki
    def assert_extended(self, data, format, type, size=10000):
583 f3787696 Sofia Papagiannaki
        if format == 'xml':
584 f3787696 Sofia Papagiannaki
            self._assert_xml(data, type, size)
585 f3787696 Sofia Papagiannaki
        elif format == 'json':
586 f3787696 Sofia Papagiannaki
            self._assert_json(data, type, size)
587 f3787696 Sofia Papagiannaki
588 f3787696 Sofia Papagiannaki
    def _assert_json(self, data, type, size):
589 f3787696 Sofia Papagiannaki
        convert = lambda s: s.lower()
590 f3787696 Sofia Papagiannaki
        info = [convert(elem) for elem in details[type]]
591 f3787696 Sofia Papagiannaki
        self.assertTrue(len(data) <= size)
592 f3787696 Sofia Papagiannaki
        for item in info:
593 f3787696 Sofia Papagiannaki
            for i in data:
594 f3787696 Sofia Papagiannaki
                if 'subdir' in i.keys():
595 f3787696 Sofia Papagiannaki
                    continue
596 f3787696 Sofia Papagiannaki
                self.assertTrue(item in i.keys())
597 f3787696 Sofia Papagiannaki
598 f3787696 Sofia Papagiannaki
    def _assert_xml(self, data, type, size):
599 f3787696 Sofia Papagiannaki
        convert = lambda s: s.lower()
600 f3787696 Sofia Papagiannaki
        info = [convert(elem) for elem in details[type]]
601 f3787696 Sofia Papagiannaki
        try:
602 f3787696 Sofia Papagiannaki
            info.remove('content_encoding')
603 f3787696 Sofia Papagiannaki
        except ValueError:
604 f3787696 Sofia Papagiannaki
            pass
605 f3787696 Sofia Papagiannaki
        xml = data
606 f3787696 Sofia Papagiannaki
        entities = xml.getElementsByTagName(type)
607 f3787696 Sofia Papagiannaki
        self.assertTrue(len(entities) <= size)
608 f3787696 Sofia Papagiannaki
        for e in entities:
609 f3787696 Sofia Papagiannaki
            for item in info:
610 f3787696 Sofia Papagiannaki
                self.assertTrue(e.getElementsByTagName(item))
611 f3787696 Sofia Papagiannaki
612 f3787696 Sofia Papagiannaki
613 f3787696 Sofia Papagiannaki
class AssertMappingInvariant(object):
614 f3787696 Sofia Papagiannaki
    def __init__(self, callable, *args, **kwargs):
615 f3787696 Sofia Papagiannaki
        self.callable = callable
616 f3787696 Sofia Papagiannaki
        self.args = args
617 f3787696 Sofia Papagiannaki
        self.kwargs = kwargs
618 f3787696 Sofia Papagiannaki
619 f3787696 Sofia Papagiannaki
    def __enter__(self):
620 f3787696 Sofia Papagiannaki
        self.map = self.callable(*self.args, **self.kwargs)
621 f3787696 Sofia Papagiannaki
        return self.map
622 f3787696 Sofia Papagiannaki
623 f3787696 Sofia Papagiannaki
    def __exit__(self, type, value, tb):
624 f3787696 Sofia Papagiannaki
        map = self.callable(*self.args, **self.kwargs)
625 f3787696 Sofia Papagiannaki
        for k, v in self.map.items():
626 f3787696 Sofia Papagiannaki
            if is_date(v):
627 f3787696 Sofia Papagiannaki
                continue
628 f3787696 Sofia Papagiannaki
629 f3787696 Sofia Papagiannaki
            assert(k in map), '%s not in map' % k
630 f3787696 Sofia Papagiannaki
            assert v == map[k]
631 f3787696 Sofia Papagiannaki
632 3a19e99b Sofia Papagiannaki
633 3a19e99b Sofia Papagiannaki
class AssertUUidInvariant(object):
634 3a19e99b Sofia Papagiannaki
    def __init__(self, callable, *args, **kwargs):
635 3a19e99b Sofia Papagiannaki
        self.callable = callable
636 3a19e99b Sofia Papagiannaki
        self.args = args
637 3a19e99b Sofia Papagiannaki
        self.kwargs = kwargs
638 3a19e99b Sofia Papagiannaki
639 3a19e99b Sofia Papagiannaki
    def __enter__(self):
640 3a19e99b Sofia Papagiannaki
        self.map = self.callable(*self.args, **self.kwargs)
641 3a19e99b Sofia Papagiannaki
        assert('x-object-uuid' in self.map)
642 3a19e99b Sofia Papagiannaki
        self.uuid = self.map['x-object-uuid']
643 3a19e99b Sofia Papagiannaki
        return self.map
644 3a19e99b Sofia Papagiannaki
645 3a19e99b Sofia Papagiannaki
    def __exit__(self, type, value, tb):
646 3a19e99b Sofia Papagiannaki
        map = self.callable(*self.args, **self.kwargs)
647 3a19e99b Sofia Papagiannaki
        assert('x-object-uuid' in self.map)
648 3a19e99b Sofia Papagiannaki
        uuid = map['x-object-uuid']
649 3a19e99b Sofia Papagiannaki
        assert(uuid == self.uuid)