Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / __init__.py @ 0c6ab9df

History | View | Annotate | Download (26.3 kB)

1 f3787696 Sofia Papagiannaki
#!/usr/bin/env python
2 f3787696 Sofia Papagiannaki
#coding=utf8
3 f3787696 Sofia Papagiannaki
4 f3787696 Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 f3787696 Sofia Papagiannaki
#
6 f3787696 Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 f3787696 Sofia Papagiannaki
# without modification, are permitted provided that the following
8 f3787696 Sofia Papagiannaki
# conditions are met:
9 f3787696 Sofia Papagiannaki
#
10 f3787696 Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 f3787696 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 f3787696 Sofia Papagiannaki
#      disclaimer.
13 f3787696 Sofia Papagiannaki
#
14 f3787696 Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 f3787696 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 f3787696 Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 f3787696 Sofia Papagiannaki
#      provided with the distribution.
18 f3787696 Sofia Papagiannaki
#
19 f3787696 Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 f3787696 Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 f3787696 Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 f3787696 Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 f3787696 Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 f3787696 Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 f3787696 Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 f3787696 Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 f3787696 Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 f3787696 Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 f3787696 Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 f3787696 Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 f3787696 Sofia Papagiannaki
#
32 f3787696 Sofia Papagiannaki
# The views and conclusions contained in the software and
33 f3787696 Sofia Papagiannaki
# documentation are those of the authors and should not be
34 f3787696 Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 f3787696 Sofia Papagiannaki
# or implied, of GRNET S.A.
36 f3787696 Sofia Papagiannaki
37 f53483d8 Sofia Papagiannaki
from urlparse import urlunsplit, urlsplit, urlparse
38 f3787696 Sofia Papagiannaki
from xml.dom import minidom
39 6ee6677e Sofia Papagiannaki
from urllib import quote, unquote
40 13bf6cd8 Sofia Papagiannaki
from mock import patch, PropertyMock
41 f3787696 Sofia Papagiannaki
42 f3787696 Sofia Papagiannaki
from snf_django.utils.testing import with_settings, astakos_user
43 f3787696 Sofia Papagiannaki
44 f3787696 Sofia Papagiannaki
from pithos.api import settings as pithos_settings
45 bc4f25c0 Sofia Papagiannaki
from pithos.api.test.util import is_date, get_random_data, get_random_name
46 bc4f25c0 Sofia Papagiannaki
from pithos.backends.migrate import initialize_db
47 f3787696 Sofia Papagiannaki
48 d6a92fa0 Sofia Papagiannaki
from synnefo.lib.services import get_service_path
49 d6a92fa0 Sofia Papagiannaki
from synnefo.lib import join_urls
50 13bf6cd8 Sofia Papagiannaki
from synnefo.util import text
51 d6a92fa0 Sofia Papagiannaki
52 f3787696 Sofia Papagiannaki
from django.test import TestCase
53 f53483d8 Sofia Papagiannaki
from django.test.client import Client, MULTIPART_CONTENT, FakePayload
54 bc4f25c0 Sofia Papagiannaki
from django.test.simple import DjangoTestSuiteRunner
55 7c36f3fb Sofia Papagiannaki
from django.conf import settings
56 f3787696 Sofia Papagiannaki
from django.utils.http import urlencode
57 bc4f25c0 Sofia Papagiannaki
from django.db.backends.creation import TEST_DATABASE_PREFIX
58 f3787696 Sofia Papagiannaki
59 f3787696 Sofia Papagiannaki
import django.utils.simplejson as json
60 f3787696 Sofia Papagiannaki
61 13bf6cd8 Sofia Papagiannaki
62 1a720e84 Christos Stavrakakis
import sys
63 f3787696 Sofia Papagiannaki
import random
64 f3787696 Sofia Papagiannaki
import functools
65 f3787696 Sofia Papagiannaki
66 3a19e99b Sofia Papagiannaki
67 f3787696 Sofia Papagiannaki
pithos_test_settings = functools.partial(with_settings, pithos_settings)
68 f3787696 Sofia Papagiannaki
69 f3787696 Sofia Papagiannaki
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
70 f3787696 Sofia Papagiannaki
                "%A, %d-%b-%y %H:%M:%S GMT",
71 f3787696 Sofia Papagiannaki
                "%a, %d %b %Y %H:%M:%S GMT"]
72 f3787696 Sofia Papagiannaki
73 f3787696 Sofia Papagiannaki
o_names = ['kate.jpg',
74 f3787696 Sofia Papagiannaki
           'kate_beckinsale.jpg',
75 f3787696 Sofia Papagiannaki
           'How To Win Friends And Influence People.pdf',
76 f3787696 Sofia Papagiannaki
           'moms_birthday.jpg',
77 f3787696 Sofia Papagiannaki
           'poodle_strut.mov',
78 f3787696 Sofia Papagiannaki
           'Disturbed - Down With The Sickness.mp3',
79 f3787696 Sofia Papagiannaki
           'army_of_darkness.avi',
80 f3787696 Sofia Papagiannaki
           'the_mad.avi',
81 f3787696 Sofia Papagiannaki
           'photos/animals/dogs/poodle.jpg',
82 f3787696 Sofia Papagiannaki
           'photos/animals/dogs/terrier.jpg',
83 f3787696 Sofia Papagiannaki
           'photos/animals/cats/persian.jpg',
84 f3787696 Sofia Papagiannaki
           'photos/animals/cats/siamese.jpg',
85 f3787696 Sofia Papagiannaki
           'photos/plants/fern.jpg',
86 f3787696 Sofia Papagiannaki
           'photos/plants/rose.jpg',
87 f3787696 Sofia Papagiannaki
           'photos/me.jpg']
88 f3787696 Sofia Papagiannaki
89 f3787696 Sofia Papagiannaki
details = {'container': ('name', 'count', 'bytes', 'last_modified',
90 f3787696 Sofia Papagiannaki
                         'x_container_policy'),
91 f3787696 Sofia Papagiannaki
           'object': ('name', 'hash', 'bytes', 'content_type',
92 f3787696 Sofia Papagiannaki
                      'content_encoding', 'last_modified',)}
93 f3787696 Sofia Papagiannaki
94 5fe43b8c Sofia Papagiannaki
TEST_BLOCK_SIZE = 1024
95 5fe43b8c Sofia Papagiannaki
TEST_HASH_ALGORITHM = 'sha256'
96 5fe43b8c Sofia Papagiannaki
97 bc4f25c0 Sofia Papagiannaki
print 'backend module:', pithos_settings.BACKEND_DB_MODULE
98 bc4f25c0 Sofia Papagiannaki
print 'backend database engine:', settings.DATABASES['default']['ENGINE']
99 bc4f25c0 Sofia Papagiannaki
print 'update md5:', pithos_settings.UPDATE_MD5
100 65f480f5 Sofia Papagiannaki
101 65f480f5 Sofia Papagiannaki
102 bc4f25c0 Sofia Papagiannaki
django_sqlalchemy_engines = {
103 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
104 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.postgresql': 'postgresql',
105 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.mysql': '',
106 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.sqlite3': 'mssql',
107 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.oracle': 'oracle'}
108 65f480f5 Sofia Papagiannaki
109 65f480f5 Sofia Papagiannaki
110 f53483d8 Sofia Papagiannaki
def prepare_db_connection():
111 bc4f25c0 Sofia Papagiannaki
    """Build pithos backend connection string from django default database"""
112 bc4f25c0 Sofia Papagiannaki
113 65f480f5 Sofia Papagiannaki
    db = settings.DATABASES['default']
114 bc4f25c0 Sofia Papagiannaki
    name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME'])
115 bc4f25c0 Sofia Papagiannaki
116 bc4f25c0 Sofia Papagiannaki
    if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'):
117 bc4f25c0 Sofia Papagiannaki
        if db['ENGINE'] == 'django.db.backends.sqlite3':
118 bc4f25c0 Sofia Papagiannaki
            db_connection = 'sqlite:///%s' % name
119 bc4f25c0 Sofia Papagiannaki
        else:
120 bc4f25c0 Sofia Papagiannaki
            d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
121 bc4f25c0 Sofia Papagiannaki
                     user=db['USER'],
122 bc4f25c0 Sofia Papagiannaki
                     pwd=db['PASSWORD'],
123 bc4f25c0 Sofia Papagiannaki
                     host=db['HOST'].lower(),
124 bc4f25c0 Sofia Papagiannaki
                     port=int(db['PORT']) if db['PORT'] != '' else '',
125 bc4f25c0 Sofia Papagiannaki
                     name=name)
126 bc4f25c0 Sofia Papagiannaki
            db_connection = (
127 bc4f25c0 Sofia Papagiannaki
                '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
128 bc4f25c0 Sofia Papagiannaki
129 bc4f25c0 Sofia Papagiannaki
            # initialize pithos database
130 bc4f25c0 Sofia Papagiannaki
            initialize_db(db_connection)
131 65f480f5 Sofia Papagiannaki
    else:
132 bc4f25c0 Sofia Papagiannaki
        db_connection = name
133 bc4f25c0 Sofia Papagiannaki
    pithos_settings.BACKEND_DB_CONNECTION = db_connection
134 bc4f25c0 Sofia Papagiannaki
135 bc4f25c0 Sofia Papagiannaki
136 6ee6677e Sofia Papagiannaki
def filter_headers(headers, prefix):
137 6ee6677e Sofia Papagiannaki
    meta = {}
138 6ee6677e Sofia Papagiannaki
    for k, v in headers.iteritems():
139 6ee6677e Sofia Papagiannaki
        if not k.startswith(prefix):
140 6ee6677e Sofia Papagiannaki
            continue
141 6ee6677e Sofia Papagiannaki
        meta[unquote(k[len(prefix):])] = unquote(v)
142 6ee6677e Sofia Papagiannaki
    return meta
143 6ee6677e Sofia Papagiannaki
144 6ee6677e Sofia Papagiannaki
145 bc4f25c0 Sofia Papagiannaki
class PithosTestSuiteRunner(DjangoTestSuiteRunner):
146 bc4f25c0 Sofia Papagiannaki
    def setup_databases(self, **kwargs):
147 bc4f25c0 Sofia Papagiannaki
        old_names, mirrors = super(PithosTestSuiteRunner,
148 bc4f25c0 Sofia Papagiannaki
                                   self).setup_databases(**kwargs)
149 f53483d8 Sofia Papagiannaki
        prepare_db_connection()
150 bc4f25c0 Sofia Papagiannaki
        return old_names, mirrors
151 65f480f5 Sofia Papagiannaki
152 de8fb87d Sofia Papagiannaki
    def teardown_databases(self, old_config, **kwargs):
153 de8fb87d Sofia Papagiannaki
        from pithos.api.util import _pithos_backend_pool
154 de8fb87d Sofia Papagiannaki
        _pithos_backend_pool.shutdown()
155 1a720e84 Christos Stavrakakis
        try:
156 1a720e84 Christos Stavrakakis
            super(PithosTestSuiteRunner, self).teardown_databases(old_config,
157 1a720e84 Christos Stavrakakis
                                                                  **kwargs)
158 1a720e84 Christos Stavrakakis
        except Exception as e:
159 1a720e84 Christos Stavrakakis
            sys.stderr.write("FAILED to teardown databases: %s\n" % str(e))
160 de8fb87d Sofia Papagiannaki
161 f3787696 Sofia Papagiannaki
162 f53483d8 Sofia Papagiannaki
class PithosTestClient(Client):
163 e7e3df52 Sofia Papagiannaki
    def _get_path(self, parsed):
164 e7e3df52 Sofia Papagiannaki
        # If there are parameters, add them
165 e7e3df52 Sofia Papagiannaki
        if parsed[3]:
166 c977b0b6 Sofia Papagiannaki
            return unquote(parsed[2] + ";" + parsed[3])
167 e7e3df52 Sofia Papagiannaki
        else:
168 c977b0b6 Sofia Papagiannaki
            return unquote(parsed[2])
169 e7e3df52 Sofia Papagiannaki
170 f53483d8 Sofia Papagiannaki
    def copy(self, path, data={}, content_type=MULTIPART_CONTENT,
171 f53483d8 Sofia Papagiannaki
             follow=False, **extra):
172 f53483d8 Sofia Papagiannaki
        """
173 f53483d8 Sofia Papagiannaki
        Send a resource to the server using COPY.
174 f53483d8 Sofia Papagiannaki
        """
175 f53483d8 Sofia Papagiannaki
        parsed = urlparse(path)
176 f53483d8 Sofia Papagiannaki
        r = {
177 f53483d8 Sofia Papagiannaki
            'CONTENT_TYPE':    'text/html; charset=utf-8',
178 f53483d8 Sofia Papagiannaki
            'PATH_INFO':       self._get_path(parsed),
179 f53483d8 Sofia Papagiannaki
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
180 f53483d8 Sofia Papagiannaki
            'REQUEST_METHOD': 'COPY',
181 f53483d8 Sofia Papagiannaki
            'wsgi.input':      FakePayload('')
182 f53483d8 Sofia Papagiannaki
        }
183 f53483d8 Sofia Papagiannaki
        r.update(extra)
184 f53483d8 Sofia Papagiannaki
185 f53483d8 Sofia Papagiannaki
        response = self.request(**r)
186 f53483d8 Sofia Papagiannaki
        if follow:
187 f53483d8 Sofia Papagiannaki
            response = self._handle_redirects(response, **extra)
188 f53483d8 Sofia Papagiannaki
        return response
189 f53483d8 Sofia Papagiannaki
190 f53483d8 Sofia Papagiannaki
    def move(self, path, data={}, content_type=MULTIPART_CONTENT,
191 f53483d8 Sofia Papagiannaki
             follow=False, **extra):
192 f53483d8 Sofia Papagiannaki
        """
193 f53483d8 Sofia Papagiannaki
        Send a resource to the server using MOVE.
194 f53483d8 Sofia Papagiannaki
        """
195 f53483d8 Sofia Papagiannaki
        parsed = urlparse(path)
196 f53483d8 Sofia Papagiannaki
        r = {
197 f53483d8 Sofia Papagiannaki
            'CONTENT_TYPE':    'text/html; charset=utf-8',
198 f53483d8 Sofia Papagiannaki
            'PATH_INFO':       self._get_path(parsed),
199 f53483d8 Sofia Papagiannaki
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
200 f53483d8 Sofia Papagiannaki
            'REQUEST_METHOD': 'MOVE',
201 f53483d8 Sofia Papagiannaki
            'wsgi.input':      FakePayload('')
202 f53483d8 Sofia Papagiannaki
        }
203 f53483d8 Sofia Papagiannaki
        r.update(extra)
204 f53483d8 Sofia Papagiannaki
205 f53483d8 Sofia Papagiannaki
        response = self.request(**r)
206 f53483d8 Sofia Papagiannaki
        if follow:
207 f53483d8 Sofia Papagiannaki
            response = self._handle_redirects(response, **extra)
208 f53483d8 Sofia Papagiannaki
        return response
209 f53483d8 Sofia Papagiannaki
210 f53483d8 Sofia Papagiannaki
211 f3787696 Sofia Papagiannaki
class PithosAPITest(TestCase):
212 13bf6cd8 Sofia Papagiannaki
    def create_patch(self, name, new_callable=None):
213 13bf6cd8 Sofia Papagiannaki
        patcher = patch(name, new_callable=new_callable)
214 13bf6cd8 Sofia Papagiannaki
        thing = patcher.start()
215 13bf6cd8 Sofia Papagiannaki
        self.addCleanup(patcher.stop)
216 13bf6cd8 Sofia Papagiannaki
        return thing
217 13bf6cd8 Sofia Papagiannaki
218 f3787696 Sofia Papagiannaki
    def setUp(self):
219 f53483d8 Sofia Papagiannaki
        self.client = PithosTestClient()
220 f53483d8 Sofia Papagiannaki
221 369a7b41 Sofia Papagiannaki
        # Override default block size to spead up tests
222 5fe43b8c Sofia Papagiannaki
        pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE
223 5fe43b8c Sofia Papagiannaki
        pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM
224 369a7b41 Sofia Papagiannaki
225 f3787696 Sofia Papagiannaki
        self.user = 'user'
226 d6a92fa0 Sofia Papagiannaki
        self.pithos_path = join_urls(get_service_path(
227 d6a92fa0 Sofia Papagiannaki
            pithos_settings.pithos_services, 'object-store'))
228 f3787696 Sofia Papagiannaki
229 13bf6cd8 Sofia Papagiannaki
        # patch astakosclient.AstakosClient.validate_token
230 13bf6cd8 Sofia Papagiannaki
        mock_validate_token = self.create_patch(
231 13bf6cd8 Sofia Papagiannaki
            'astakosclient.AstakosClient.validate_token')
232 13bf6cd8 Sofia Papagiannaki
        mock_validate_token.return_value = {
233 13bf6cd8 Sofia Papagiannaki
            'access': {'user': {'id': text.udec(self.user, 'utf8')}}}
234 13bf6cd8 Sofia Papagiannaki
235 13bf6cd8 Sofia Papagiannaki
        # patch astakosclient.AstakosClient.get_token
236 13bf6cd8 Sofia Papagiannaki
        mock_get_token = self.create_patch(
237 13bf6cd8 Sofia Papagiannaki
            'astakosclient.AstakosClient.get_token')
238 13bf6cd8 Sofia Papagiannaki
        mock_get_token.return_value = {'access_token': 'valid_token'}
239 13bf6cd8 Sofia Papagiannaki
240 13bf6cd8 Sofia Papagiannaki
        # patch astakosclient.AstakosClient.api_oa2_auth
241 f4f948c0 Sofia Papagiannaki
        mock_api_oauth2_auth = self.create_patch(
242 f4f948c0 Sofia Papagiannaki
            'astakosclient.AstakosClient.oauth2_url',
243 13bf6cd8 Sofia Papagiannaki
            new_callable=PropertyMock)
244 f4f948c0 Sofia Papagiannaki
        mock_api_oauth2_auth.return_value = '/astakos/oauth2/'
245 13bf6cd8 Sofia Papagiannaki
246 ec6f741b Sofia Papagiannaki
        mock_service_get_quotas = self.create_patch(
247 ec6f741b Sofia Papagiannaki
            'astakosclient.AstakosClient.service_get_quotas')
248 ec6f741b Sofia Papagiannaki
        mock_service_get_quotas.return_value = {
249 ec6f741b Sofia Papagiannaki
            self.user: {
250 ec6f741b Sofia Papagiannaki
                "system": {
251 ec6f741b Sofia Papagiannaki
                    "pithos.diskspace": {
252 ec6f741b Sofia Papagiannaki
                        "usage": 0,
253 ec6f741b Sofia Papagiannaki
                        "limit": 1073741824,  # 1GB
254 ec6f741b Sofia Papagiannaki
                        "pending": 0}}}}
255 ec6f741b Sofia Papagiannaki
256 f3787696 Sofia Papagiannaki
    def tearDown(self):
257 f3787696 Sofia Papagiannaki
        #delete additionally created metadata
258 f3787696 Sofia Papagiannaki
        meta = self.get_account_meta()
259 f3787696 Sofia Papagiannaki
        self.delete_account_meta(meta)
260 f3787696 Sofia Papagiannaki
261 f3787696 Sofia Papagiannaki
        #delete additionally created groups
262 f3787696 Sofia Papagiannaki
        groups = self.get_account_groups()
263 f3787696 Sofia Papagiannaki
        self.delete_account_groups(groups)
264 f3787696 Sofia Papagiannaki
265 f3787696 Sofia Papagiannaki
        self._clean_account()
266 f3787696 Sofia Papagiannaki
267 65f480f5 Sofia Papagiannaki
    def _clean_account(self):
268 65f480f5 Sofia Papagiannaki
        for c in self.list_containers():
269 65f480f5 Sofia Papagiannaki
            self.delete_container_content(c['name'])
270 65f480f5 Sofia Papagiannaki
            self.delete_container(c['name'])
271 65f480f5 Sofia Papagiannaki
272 f53483d8 Sofia Papagiannaki
    def head(self, url, user='user', token='DummyToken', data={}, follow=False,
273 f53483d8 Sofia Papagiannaki
             **extra):
274 f3787696 Sofia Papagiannaki
        with astakos_user(user):
275 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
276 f53483d8 Sofia Papagiannaki
            if token:
277 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
278 6ee6677e Sofia Papagiannaki
            response = self.client.head(url, data, follow, **extra)
279 f3787696 Sofia Papagiannaki
        return response
280 f3787696 Sofia Papagiannaki
281 f53483d8 Sofia Papagiannaki
    def get(self, url, user='user', token='DummyToken', data={}, follow=False,
282 f53483d8 Sofia Papagiannaki
            **extra):
283 f3787696 Sofia Papagiannaki
        with astakos_user(user):
284 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
285 f53483d8 Sofia Papagiannaki
            if token:
286 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
287 6ee6677e Sofia Papagiannaki
            response = self.client.get(url, data, follow, **extra)
288 f3787696 Sofia Papagiannaki
        return response
289 f3787696 Sofia Papagiannaki
290 f53483d8 Sofia Papagiannaki
    def delete(self, url, user='user', token='DummyToken', data={},
291 f53483d8 Sofia Papagiannaki
               follow=False, **extra):
292 f3787696 Sofia Papagiannaki
        with astakos_user(user):
293 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
294 f53483d8 Sofia Papagiannaki
            if token:
295 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
296 6ee6677e Sofia Papagiannaki
            response = self.client.delete(url, data, follow, **extra)
297 f3787696 Sofia Papagiannaki
        return response
298 f3787696 Sofia Papagiannaki
299 f53483d8 Sofia Papagiannaki
    def post(self, url, user='user', token='DummyToken', data={},
300 6ee6677e Sofia Papagiannaki
             content_type='application/octet-stream', follow=False, **extra):
301 f3787696 Sofia Papagiannaki
        with astakos_user(user):
302 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
303 f53483d8 Sofia Papagiannaki
            if token:
304 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
305 6ee6677e Sofia Papagiannaki
            response = self.client.post(url, data, content_type, follow,
306 6ee6677e Sofia Papagiannaki
                                        **extra)
307 f3787696 Sofia Papagiannaki
        return response
308 f3787696 Sofia Papagiannaki
309 f53483d8 Sofia Papagiannaki
    def put(self, url, user='user', token='DummyToken', data={},
310 6ee6677e Sofia Papagiannaki
            content_type='application/octet-stream', follow=False, **extra):
311 f3787696 Sofia Papagiannaki
        with astakos_user(user):
312 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
313 f53483d8 Sofia Papagiannaki
            if token:
314 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
315 6ee6677e Sofia Papagiannaki
            response = self.client.put(url, data, content_type, follow,
316 6ee6677e Sofia Papagiannaki
                                       **extra)
317 f3787696 Sofia Papagiannaki
        return response
318 f3787696 Sofia Papagiannaki
319 f53483d8 Sofia Papagiannaki
    def copy(self, url, user='user', token='DummyToken', data={},
320 f53483d8 Sofia Papagiannaki
             content_type='application/octet-stream', follow=False, **extra):
321 f53483d8 Sofia Papagiannaki
        with astakos_user(user):
322 f53483d8 Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
323 f53483d8 Sofia Papagiannaki
            if token:
324 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
325 f53483d8 Sofia Papagiannaki
            response = self.client.copy(url, data, content_type, follow,
326 f53483d8 Sofia Papagiannaki
                                        **extra)
327 f53483d8 Sofia Papagiannaki
        return response
328 f53483d8 Sofia Papagiannaki
329 f53483d8 Sofia Papagiannaki
    def move(self, url, user='user', token='DummyToken', data={},
330 f53483d8 Sofia Papagiannaki
             content_type='application/octet-stream', follow=False, **extra):
331 f53483d8 Sofia Papagiannaki
        with astakos_user(user):
332 f53483d8 Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
333 f53483d8 Sofia Papagiannaki
            if token:
334 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
335 f53483d8 Sofia Papagiannaki
            response = self.client.move(url, data, content_type, follow,
336 f53483d8 Sofia Papagiannaki
                                        **extra)
337 f53483d8 Sofia Papagiannaki
        return response
338 f53483d8 Sofia Papagiannaki
339 c977b0b6 Sofia Papagiannaki
    def update_account_meta(self, meta, user=None, verify_status=True):
340 6ee6677e Sofia Papagiannaki
        user = user or self.user
341 f3787696 Sofia Papagiannaki
        kwargs = dict(
342 f3787696 Sofia Papagiannaki
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
343 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
344 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **kwargs)
345 c977b0b6 Sofia Papagiannaki
        if verify_status:
346 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
347 f53483d8 Sofia Papagiannaki
        account_meta = self.get_account_meta(user=user)
348 f53483d8 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
349 f53483d8 Sofia Papagiannaki
            k in meta.keys())
350 f53483d8 Sofia Papagiannaki
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
351 f53483d8 Sofia Papagiannaki
            k, v in meta.items())
352 f53483d8 Sofia Papagiannaki
353 c977b0b6 Sofia Papagiannaki
    def reset_account_meta(self, meta, user=None, verify_status=True):
354 f53483d8 Sofia Papagiannaki
        user = user or self.user
355 f53483d8 Sofia Papagiannaki
        kwargs = dict(
356 f53483d8 Sofia Papagiannaki
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
357 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
358 f53483d8 Sofia Papagiannaki
        r = self.post(url, user=user, **kwargs)
359 c977b0b6 Sofia Papagiannaki
        if verify_status:
360 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
361 f53483d8 Sofia Papagiannaki
        account_meta = self.get_account_meta(user=user)
362 f53483d8 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
363 f53483d8 Sofia Papagiannaki
            k in meta.keys())
364 f53483d8 Sofia Papagiannaki
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
365 f53483d8 Sofia Papagiannaki
            k, v in meta.items())
366 f3787696 Sofia Papagiannaki
367 c977b0b6 Sofia Papagiannaki
    def delete_account_meta(self, meta, user=None, verify_status=True):
368 6ee6677e Sofia Papagiannaki
        user = user or self.user
369 f53483d8 Sofia Papagiannaki
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
370 f3787696 Sofia Papagiannaki
        kwargs = dict((transform(k), '') for k, v in meta.items())
371 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
372 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **kwargs)
373 c977b0b6 Sofia Papagiannaki
        if verify_status:
374 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
375 f53483d8 Sofia Papagiannaki
        account_meta = self.get_account_meta(user=user)
376 f53483d8 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
377 f53483d8 Sofia Papagiannaki
            k in meta.keys())
378 f3787696 Sofia Papagiannaki
        return r
379 f3787696 Sofia Papagiannaki
380 c977b0b6 Sofia Papagiannaki
    def delete_account_groups(self, groups, user=None, verify_status=True):
381 6ee6677e Sofia Papagiannaki
        user = user or self.user
382 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
383 f53483d8 Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **groups)
384 c977b0b6 Sofia Papagiannaki
        if verify_status:
385 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
386 6ee6677e Sofia Papagiannaki
        account_groups = self.get_account_groups()
387 6ee6677e Sofia Papagiannaki
        (self.assertTrue(k not in account_groups) for k in groups.keys())
388 f3787696 Sofia Papagiannaki
        return r
389 f3787696 Sofia Papagiannaki
390 c977b0b6 Sofia Papagiannaki
    def get_account_info(self, until=None, user=None, verify_status=True):
391 6ee6677e Sofia Papagiannaki
        user = user or self.user
392 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
393 f3787696 Sofia Papagiannaki
        if until is not None:
394 f3787696 Sofia Papagiannaki
            parts = list(urlsplit(url))
395 f3787696 Sofia Papagiannaki
            parts[3] = urlencode({
396 f3787696 Sofia Papagiannaki
                'until': until
397 f3787696 Sofia Papagiannaki
            })
398 f3787696 Sofia Papagiannaki
            url = urlunsplit(parts)
399 6ee6677e Sofia Papagiannaki
        r = self.head(url, user=user)
400 c977b0b6 Sofia Papagiannaki
        if verify_status:
401 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
402 f3787696 Sofia Papagiannaki
        return r
403 f3787696 Sofia Papagiannaki
404 6ee6677e Sofia Papagiannaki
    def get_account_meta(self, until=None, user=None):
405 6ee6677e Sofia Papagiannaki
        prefix = 'X-Account-Meta-'
406 6ee6677e Sofia Papagiannaki
        r = self.get_account_info(until=until, user=user)
407 f3787696 Sofia Papagiannaki
        headers = dict(r._headers.values())
408 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
409 f3787696 Sofia Papagiannaki
410 6ee6677e Sofia Papagiannaki
    def get_account_groups(self, until=None, user=None):
411 6ee6677e Sofia Papagiannaki
        prefix = 'X-Account-Group-'
412 6ee6677e Sofia Papagiannaki
        r = self.get_account_info(until=until, user=user)
413 f3787696 Sofia Papagiannaki
        headers = dict(r._headers.values())
414 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
415 f3787696 Sofia Papagiannaki
416 c977b0b6 Sofia Papagiannaki
    def get_container_info(self, container, until=None, user=None,
417 c977b0b6 Sofia Papagiannaki
                           verify_status=True):
418 6ee6677e Sofia Papagiannaki
        user = user or self.user
419 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container)
420 3a19e99b Sofia Papagiannaki
        if until is not None:
421 3a19e99b Sofia Papagiannaki
            parts = list(urlsplit(url))
422 3a19e99b Sofia Papagiannaki
            parts[3] = urlencode({
423 3a19e99b Sofia Papagiannaki
                'until': until
424 3a19e99b Sofia Papagiannaki
            })
425 3a19e99b Sofia Papagiannaki
            url = urlunsplit(parts)
426 6ee6677e Sofia Papagiannaki
        r = self.head(url, user=user)
427 c977b0b6 Sofia Papagiannaki
        if verify_status:
428 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
429 3a19e99b Sofia Papagiannaki
        return r
430 3a19e99b Sofia Papagiannaki
431 6ee6677e Sofia Papagiannaki
    def get_container_meta(self, container, until=None, user=None):
432 6ee6677e Sofia Papagiannaki
        prefix = 'X-Container-Meta-'
433 6ee6677e Sofia Papagiannaki
        r = self.get_container_info(container, until=until, user=user)
434 3a19e99b Sofia Papagiannaki
        headers = dict(r._headers.values())
435 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
436 3a19e99b Sofia Papagiannaki
437 c977b0b6 Sofia Papagiannaki
    def update_container_meta(self, container, meta=None, user=None,
438 c977b0b6 Sofia Papagiannaki
                              verify_status=True):
439 6ee6677e Sofia Papagiannaki
        user = user or self.user
440 c977b0b6 Sofia Papagiannaki
        meta = meta or {get_random_name(): get_random_name()}
441 3a19e99b Sofia Papagiannaki
        kwargs = dict(
442 3a19e99b Sofia Papagiannaki
            ('HTTP_X_CONTAINER_META_%s' % k, str(v)) for k, v in meta.items())
443 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container)
444 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **kwargs)
445 c977b0b6 Sofia Papagiannaki
        if verify_status:
446 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
447 6e0f3e65 Sofia Papagiannaki
        container_meta = self.get_container_meta(container, user=user)
448 3a19e99b Sofia Papagiannaki
        (self.assertTrue('X-Container-Meta-%s' % k in container_meta) for
449 3a19e99b Sofia Papagiannaki
            k in meta.keys())
450 3a19e99b Sofia Papagiannaki
        (self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for
451 3a19e99b Sofia Papagiannaki
            k, v in meta.items())
452 c977b0b6 Sofia Papagiannaki
        return r
453 3a19e99b Sofia Papagiannaki
454 6ee6677e Sofia Papagiannaki
    def list_containers(self, format='json', headers={}, user=None, **params):
455 6ee6677e Sofia Papagiannaki
        user = user or self.user
456 6ee6677e Sofia Papagiannaki
        _url = join_urls(self.pithos_path, user)
457 d6a92fa0 Sofia Papagiannaki
        parts = list(urlsplit(_url))
458 f3787696 Sofia Papagiannaki
        params['format'] = format
459 f3787696 Sofia Papagiannaki
        parts[3] = urlencode(params)
460 f3787696 Sofia Papagiannaki
        url = urlunsplit(parts)
461 f3787696 Sofia Papagiannaki
        _headers = dict(('HTTP_%s' % k.upper(), str(v))
462 f3787696 Sofia Papagiannaki
                        for k, v in headers.items())
463 6ee6677e Sofia Papagiannaki
        r = self.get(url, user=user, **_headers)
464 f3787696 Sofia Papagiannaki
465 f3787696 Sofia Papagiannaki
        if format is None:
466 f3787696 Sofia Papagiannaki
            containers = r.content.split('\n')
467 f3787696 Sofia Papagiannaki
            if '' in containers:
468 f3787696 Sofia Papagiannaki
                containers.remove('')
469 f3787696 Sofia Papagiannaki
            return containers
470 f3787696 Sofia Papagiannaki
        elif format == 'json':
471 f3787696 Sofia Papagiannaki
            try:
472 f3787696 Sofia Papagiannaki
                containers = json.loads(r.content)
473 f3787696 Sofia Papagiannaki
            except:
474 f3787696 Sofia Papagiannaki
                self.fail('json format expected')
475 f3787696 Sofia Papagiannaki
            return containers
476 f3787696 Sofia Papagiannaki
        elif format == 'xml':
477 f3787696 Sofia Papagiannaki
            return minidom.parseString(r.content)
478 f3787696 Sofia Papagiannaki
479 c977b0b6 Sofia Papagiannaki
    def delete_container_content(self, cname, user=None, verify_status=True):
480 6ee6677e Sofia Papagiannaki
        user = user or self.user
481 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
482 6ee6677e Sofia Papagiannaki
        r = self.delete('%s?delimiter=/' % url, user=user)
483 c977b0b6 Sofia Papagiannaki
        if verify_status:
484 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
485 f3787696 Sofia Papagiannaki
        return r
486 f3787696 Sofia Papagiannaki
487 c977b0b6 Sofia Papagiannaki
    def delete_container(self, cname, user=None, verify_status=True):
488 6ee6677e Sofia Papagiannaki
        user = user or self.user
489 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
490 6ee6677e Sofia Papagiannaki
        r = self.delete(url, user=user)
491 c977b0b6 Sofia Papagiannaki
        if verify_status:
492 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
493 c977b0b6 Sofia Papagiannaki
        return r
494 c977b0b6 Sofia Papagiannaki
495 c977b0b6 Sofia Papagiannaki
    def delete_object(self, cname, oname, user=None, verify_status=True):
496 c977b0b6 Sofia Papagiannaki
        user = user or self.user
497 c977b0b6 Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
498 c977b0b6 Sofia Papagiannaki
        r = self.delete(url, user=user)
499 c977b0b6 Sofia Papagiannaki
        if verify_status:
500 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
501 f3787696 Sofia Papagiannaki
        return r
502 f3787696 Sofia Papagiannaki
503 c977b0b6 Sofia Papagiannaki
    def create_container(self, cname=None, user=None, verify_status=True):
504 95b36144 Sofia Papagiannaki
        cname = cname or get_random_name()
505 6ee6677e Sofia Papagiannaki
        user = user or self.user
506 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
507 6ee6677e Sofia Papagiannaki
        r = self.put(url, user=user, data='')
508 c977b0b6 Sofia Papagiannaki
        if verify_status:
509 c977b0b6 Sofia Papagiannaki
            self.assertTrue(r.status_code in (202, 201))
510 95b36144 Sofia Papagiannaki
        return cname, r
511 f3787696 Sofia Papagiannaki
512 c977b0b6 Sofia Papagiannaki
    def upload_object(self, cname, oname=None, length=None, verify_status=True,
513 6ee6677e Sofia Papagiannaki
                      user=None, **meta):
514 bc4f25c0 Sofia Papagiannaki
        oname = oname or get_random_name()
515 5fe43b8c Sofia Papagiannaki
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
516 6ee6677e Sofia Papagiannaki
        user = user or self.user
517 3a19e99b Sofia Papagiannaki
        data = get_random_data(length=length)
518 f3787696 Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
519 f3787696 Sofia Papagiannaki
                       for k, v in meta.iteritems())
520 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
521 6ee6677e Sofia Papagiannaki
        r = self.put(url, user=user, data=data, **headers)
522 c977b0b6 Sofia Papagiannaki
        if verify_status:
523 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
524 3a19e99b Sofia Papagiannaki
        return oname, data, r
525 3a19e99b Sofia Papagiannaki
526 3a19e99b Sofia Papagiannaki
    def update_object_data(self, cname, oname=None, length=None,
527 3a19e99b Sofia Papagiannaki
                           content_type=None, content_range=None,
528 c977b0b6 Sofia Papagiannaki
                           verify_status=True, user=None, **meta):
529 bc4f25c0 Sofia Papagiannaki
        oname = oname or get_random_name()
530 5fe43b8c Sofia Papagiannaki
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
531 3a19e99b Sofia Papagiannaki
        content_type = content_type or 'application/octet-stream'
532 6ee6677e Sofia Papagiannaki
        user = user or self.user
533 3a19e99b Sofia Papagiannaki
        data = get_random_data(length=length)
534 3a19e99b Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
535 3a19e99b Sofia Papagiannaki
                       for k, v in meta.iteritems())
536 3a19e99b Sofia Papagiannaki
        if content_range:
537 3a19e99b Sofia Papagiannaki
            headers['HTTP_CONTENT_RANGE'] = content_range
538 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
539 6ee6677e Sofia Papagiannaki
        r = self.post(url, user=user, data=data, content_type=content_type,
540 6ee6677e Sofia Papagiannaki
                      **headers)
541 c977b0b6 Sofia Papagiannaki
        if verify_status:
542 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
543 f3787696 Sofia Papagiannaki
        return oname, data, r
544 f3787696 Sofia Papagiannaki
545 3a19e99b Sofia Papagiannaki
    def append_object_data(self, cname, oname=None, length=None,
546 6ee6677e Sofia Papagiannaki
                           content_type=None, user=None):
547 3a19e99b Sofia Papagiannaki
        return self.update_object_data(cname, oname=oname,
548 3a19e99b Sofia Papagiannaki
                                       length=length,
549 3a19e99b Sofia Papagiannaki
                                       content_type=content_type,
550 6ee6677e Sofia Papagiannaki
                                       content_range='bytes */*',
551 6ee6677e Sofia Papagiannaki
                                       user=user)
552 3a19e99b Sofia Papagiannaki
553 c977b0b6 Sofia Papagiannaki
    def create_folder(self, cname, oname=None, user=None, verify_status=True,
554 c977b0b6 Sofia Papagiannaki
                      **headers):
555 6ee6677e Sofia Papagiannaki
        user = user or self.user
556 bc4f25c0 Sofia Papagiannaki
        oname = oname or get_random_name()
557 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
558 6ee6677e Sofia Papagiannaki
        r = self.put(url, user=user, data='',
559 6ee6677e Sofia Papagiannaki
                     content_type='application/directory', **headers)
560 c977b0b6 Sofia Papagiannaki
        if verify_status:
561 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
562 f3787696 Sofia Papagiannaki
        return oname, r
563 f3787696 Sofia Papagiannaki
564 c977b0b6 Sofia Papagiannaki
    def list_objects(self, cname, prefix=None, user=None, verify_status=True):
565 6ee6677e Sofia Papagiannaki
        user = user or self.user
566 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
567 3a19e99b Sofia Papagiannaki
        path = '%s?format=json' % url
568 3a19e99b Sofia Papagiannaki
        if prefix is not None:
569 3a19e99b Sofia Papagiannaki
            path = '%s&prefix=%s' % (path, prefix)
570 6ee6677e Sofia Papagiannaki
        r = self.get(path, user=user)
571 c977b0b6 Sofia Papagiannaki
        if verify_status:
572 c977b0b6 Sofia Papagiannaki
            self.assertTrue(r.status_code in (200, 204))
573 f3787696 Sofia Papagiannaki
        try:
574 f3787696 Sofia Papagiannaki
            objects = json.loads(r.content)
575 f3787696 Sofia Papagiannaki
        except:
576 f3787696 Sofia Papagiannaki
            self.fail('json format expected')
577 f3787696 Sofia Papagiannaki
        return objects
578 f3787696 Sofia Papagiannaki
579 6ee6677e Sofia Papagiannaki
    def get_object_info(self, container, object, version=None, until=None,
580 c977b0b6 Sofia Papagiannaki
                        user=None, verify_status=True):
581 6ee6677e Sofia Papagiannaki
        user = user or self.user
582 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container, object)
583 3a19e99b Sofia Papagiannaki
        if until is not None:
584 3a19e99b Sofia Papagiannaki
            parts = list(urlsplit(url))
585 3a19e99b Sofia Papagiannaki
            parts[3] = urlencode({
586 3a19e99b Sofia Papagiannaki
                'until': until
587 3a19e99b Sofia Papagiannaki
            })
588 3a19e99b Sofia Papagiannaki
            url = urlunsplit(parts)
589 3a19e99b Sofia Papagiannaki
        if version:
590 3a19e99b Sofia Papagiannaki
            url = '%s?version=%s' % (url, version)
591 6ee6677e Sofia Papagiannaki
        r = self.head(url, user=user)
592 c977b0b6 Sofia Papagiannaki
        if verify_status:
593 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
594 3a19e99b Sofia Papagiannaki
        return r
595 3a19e99b Sofia Papagiannaki
596 6ee6677e Sofia Papagiannaki
    def get_object_meta(self, container, object, version=None, until=None,
597 6ee6677e Sofia Papagiannaki
                        user=None):
598 6ee6677e Sofia Papagiannaki
        prefix = 'X-Object-Meta-'
599 f53483d8 Sofia Papagiannaki
        user = user or self.user
600 6ee6677e Sofia Papagiannaki
        r = self.get_object_info(container, object, version, until=until,
601 6ee6677e Sofia Papagiannaki
                                 user=user)
602 3a19e99b Sofia Papagiannaki
        headers = dict(r._headers.values())
603 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
604 3a19e99b Sofia Papagiannaki
605 c977b0b6 Sofia Papagiannaki
    def update_object_meta(self, container, object, meta=None, user=None,
606 c977b0b6 Sofia Papagiannaki
                           verify_status=True):
607 6ee6677e Sofia Papagiannaki
        user = user or self.user
608 c977b0b6 Sofia Papagiannaki
        meta = meta or {get_random_name(): get_random_name()}
609 3a19e99b Sofia Papagiannaki
        kwargs = dict(
610 3a19e99b Sofia Papagiannaki
            ('HTTP_X_OBJECT_META_%s' % k, str(v)) for k, v in meta.items())
611 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container, object)
612 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, content_type='', **kwargs)
613 c977b0b6 Sofia Papagiannaki
        if verify_status:
614 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
615 6e0f3e65 Sofia Papagiannaki
        object_meta = self.get_object_meta(container, object, user=user)
616 3a19e99b Sofia Papagiannaki
        (self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for
617 3a19e99b Sofia Papagiannaki
            k in meta.keys())
618 3a19e99b Sofia Papagiannaki
        (self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
619 3a19e99b Sofia Papagiannaki
            k, v in meta.items())
620 c977b0b6 Sofia Papagiannaki
        return r
621 3a19e99b Sofia Papagiannaki
622 f3787696 Sofia Papagiannaki
    def assert_extended(self, data, format, type, size=10000):
623 f3787696 Sofia Papagiannaki
        if format == 'xml':
624 f3787696 Sofia Papagiannaki
            self._assert_xml(data, type, size)
625 f3787696 Sofia Papagiannaki
        elif format == 'json':
626 f3787696 Sofia Papagiannaki
            self._assert_json(data, type, size)
627 f3787696 Sofia Papagiannaki
628 f3787696 Sofia Papagiannaki
    def _assert_json(self, data, type, size):
629 f3787696 Sofia Papagiannaki
        convert = lambda s: s.lower()
630 f3787696 Sofia Papagiannaki
        info = [convert(elem) for elem in details[type]]
631 f3787696 Sofia Papagiannaki
        self.assertTrue(len(data) <= size)
632 f3787696 Sofia Papagiannaki
        for item in info:
633 f3787696 Sofia Papagiannaki
            for i in data:
634 f3787696 Sofia Papagiannaki
                if 'subdir' in i.keys():
635 f3787696 Sofia Papagiannaki
                    continue
636 f3787696 Sofia Papagiannaki
                self.assertTrue(item in i.keys())
637 f3787696 Sofia Papagiannaki
638 f3787696 Sofia Papagiannaki
    def _assert_xml(self, data, type, size):
639 f3787696 Sofia Papagiannaki
        convert = lambda s: s.lower()
640 f3787696 Sofia Papagiannaki
        info = [convert(elem) for elem in details[type]]
641 f3787696 Sofia Papagiannaki
        try:
642 f3787696 Sofia Papagiannaki
            info.remove('content_encoding')
643 f3787696 Sofia Papagiannaki
        except ValueError:
644 f3787696 Sofia Papagiannaki
            pass
645 f3787696 Sofia Papagiannaki
        xml = data
646 f3787696 Sofia Papagiannaki
        entities = xml.getElementsByTagName(type)
647 f3787696 Sofia Papagiannaki
        self.assertTrue(len(entities) <= size)
648 f3787696 Sofia Papagiannaki
        for e in entities:
649 f3787696 Sofia Papagiannaki
            for item in info:
650 f3787696 Sofia Papagiannaki
                self.assertTrue(e.getElementsByTagName(item))
651 f3787696 Sofia Papagiannaki
652 f3787696 Sofia Papagiannaki
653 f3787696 Sofia Papagiannaki
class AssertMappingInvariant(object):
654 f3787696 Sofia Papagiannaki
    def __init__(self, callable, *args, **kwargs):
655 f3787696 Sofia Papagiannaki
        self.callable = callable
656 f3787696 Sofia Papagiannaki
        self.args = args
657 f3787696 Sofia Papagiannaki
        self.kwargs = kwargs
658 f3787696 Sofia Papagiannaki
659 f3787696 Sofia Papagiannaki
    def __enter__(self):
660 f3787696 Sofia Papagiannaki
        self.map = self.callable(*self.args, **self.kwargs)
661 f3787696 Sofia Papagiannaki
        return self.map
662 f3787696 Sofia Papagiannaki
663 f3787696 Sofia Papagiannaki
    def __exit__(self, type, value, tb):
664 f3787696 Sofia Papagiannaki
        map = self.callable(*self.args, **self.kwargs)
665 f3787696 Sofia Papagiannaki
        for k, v in self.map.items():
666 f3787696 Sofia Papagiannaki
            if is_date(v):
667 f3787696 Sofia Papagiannaki
                continue
668 f3787696 Sofia Papagiannaki
669 f3787696 Sofia Papagiannaki
            assert(k in map), '%s not in map' % k
670 f3787696 Sofia Papagiannaki
            assert v == map[k]
671 f3787696 Sofia Papagiannaki
672 3a19e99b Sofia Papagiannaki
673 3a19e99b Sofia Papagiannaki
class AssertUUidInvariant(object):
674 3a19e99b Sofia Papagiannaki
    def __init__(self, callable, *args, **kwargs):
675 3a19e99b Sofia Papagiannaki
        self.callable = callable
676 3a19e99b Sofia Papagiannaki
        self.args = args
677 3a19e99b Sofia Papagiannaki
        self.kwargs = kwargs
678 3a19e99b Sofia Papagiannaki
679 3a19e99b Sofia Papagiannaki
    def __enter__(self):
680 3a19e99b Sofia Papagiannaki
        self.map = self.callable(*self.args, **self.kwargs)
681 3a19e99b Sofia Papagiannaki
        assert('x-object-uuid' in self.map)
682 3a19e99b Sofia Papagiannaki
        self.uuid = self.map['x-object-uuid']
683 3a19e99b Sofia Papagiannaki
        return self.map
684 3a19e99b Sofia Papagiannaki
685 3a19e99b Sofia Papagiannaki
    def __exit__(self, type, value, tb):
686 3a19e99b Sofia Papagiannaki
        map = self.callable(*self.args, **self.kwargs)
687 3a19e99b Sofia Papagiannaki
        assert('x-object-uuid' in self.map)
688 3a19e99b Sofia Papagiannaki
        uuid = map['x-object-uuid']
689 3a19e99b Sofia Papagiannaki
        assert(uuid == self.uuid)