Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / __init__.py @ 7e402b46

History | View | Annotate | Download (26.4 kB)

1 f3787696 Sofia Papagiannaki
#!/usr/bin/env python
2 f3787696 Sofia Papagiannaki
#coding=utf8
3 f3787696 Sofia Papagiannaki
4 f3787696 Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 f3787696 Sofia Papagiannaki
#
6 f3787696 Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 f3787696 Sofia Papagiannaki
# without modification, are permitted provided that the following
8 f3787696 Sofia Papagiannaki
# conditions are met:
9 f3787696 Sofia Papagiannaki
#
10 f3787696 Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 f3787696 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 f3787696 Sofia Papagiannaki
#      disclaimer.
13 f3787696 Sofia Papagiannaki
#
14 f3787696 Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 f3787696 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 f3787696 Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 f3787696 Sofia Papagiannaki
#      provided with the distribution.
18 f3787696 Sofia Papagiannaki
#
19 f3787696 Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 f3787696 Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 f3787696 Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 f3787696 Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 f3787696 Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 f3787696 Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 f3787696 Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 f3787696 Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 f3787696 Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 f3787696 Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 f3787696 Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 f3787696 Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 f3787696 Sofia Papagiannaki
#
32 f3787696 Sofia Papagiannaki
# The views and conclusions contained in the software and
33 f3787696 Sofia Papagiannaki
# documentation are those of the authors and should not be
34 f3787696 Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 f3787696 Sofia Papagiannaki
# or implied, of GRNET S.A.
36 f3787696 Sofia Papagiannaki
37 f53483d8 Sofia Papagiannaki
from urlparse import urlunsplit, urlsplit, urlparse
38 f3787696 Sofia Papagiannaki
from xml.dom import minidom
39 6ee6677e Sofia Papagiannaki
from urllib import quote, unquote
40 13bf6cd8 Sofia Papagiannaki
from mock import patch, PropertyMock
41 f3787696 Sofia Papagiannaki
42 f3787696 Sofia Papagiannaki
from snf_django.utils.testing import with_settings, astakos_user
43 f3787696 Sofia Papagiannaki
44 f3787696 Sofia Papagiannaki
from pithos.api import settings as pithos_settings
45 bc4f25c0 Sofia Papagiannaki
from pithos.api.test.util import is_date, get_random_data, get_random_name
46 bc4f25c0 Sofia Papagiannaki
from pithos.backends.migrate import initialize_db
47 f3787696 Sofia Papagiannaki
48 d6a92fa0 Sofia Papagiannaki
from synnefo.lib.services import get_service_path
49 d6a92fa0 Sofia Papagiannaki
from synnefo.lib import join_urls
50 13bf6cd8 Sofia Papagiannaki
from synnefo.util import text
51 d6a92fa0 Sofia Papagiannaki
52 f3787696 Sofia Papagiannaki
from django.test import TestCase
53 f53483d8 Sofia Papagiannaki
from django.test.client import Client, MULTIPART_CONTENT, FakePayload
54 bc4f25c0 Sofia Papagiannaki
from django.test.simple import DjangoTestSuiteRunner
55 7c36f3fb Sofia Papagiannaki
from django.conf import settings
56 f3787696 Sofia Papagiannaki
from django.utils.http import urlencode
57 bc4f25c0 Sofia Papagiannaki
from django.db.backends.creation import TEST_DATABASE_PREFIX
58 f3787696 Sofia Papagiannaki
59 f3787696 Sofia Papagiannaki
import django.utils.simplejson as json
60 f3787696 Sofia Papagiannaki
61 13bf6cd8 Sofia Papagiannaki
62 1a720e84 Christos Stavrakakis
import sys
63 f3787696 Sofia Papagiannaki
import random
64 f3787696 Sofia Papagiannaki
import functools
65 f3787696 Sofia Papagiannaki
66 3a19e99b Sofia Papagiannaki
67 f3787696 Sofia Papagiannaki
pithos_test_settings = functools.partial(with_settings, pithos_settings)
68 f3787696 Sofia Papagiannaki
69 f3787696 Sofia Papagiannaki
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
70 f3787696 Sofia Papagiannaki
                "%A, %d-%b-%y %H:%M:%S GMT",
71 f3787696 Sofia Papagiannaki
                "%a, %d %b %Y %H:%M:%S GMT"]
72 f3787696 Sofia Papagiannaki
73 f3787696 Sofia Papagiannaki
o_names = ['kate.jpg',
74 f3787696 Sofia Papagiannaki
           'kate_beckinsale.jpg',
75 f3787696 Sofia Papagiannaki
           'How To Win Friends And Influence People.pdf',
76 f3787696 Sofia Papagiannaki
           'moms_birthday.jpg',
77 f3787696 Sofia Papagiannaki
           'poodle_strut.mov',
78 f3787696 Sofia Papagiannaki
           'Disturbed - Down With The Sickness.mp3',
79 f3787696 Sofia Papagiannaki
           'army_of_darkness.avi',
80 f3787696 Sofia Papagiannaki
           'the_mad.avi',
81 f3787696 Sofia Papagiannaki
           'photos/animals/dogs/poodle.jpg',
82 f3787696 Sofia Papagiannaki
           'photos/animals/dogs/terrier.jpg',
83 f3787696 Sofia Papagiannaki
           'photos/animals/cats/persian.jpg',
84 f3787696 Sofia Papagiannaki
           'photos/animals/cats/siamese.jpg',
85 f3787696 Sofia Papagiannaki
           'photos/plants/fern.jpg',
86 f3787696 Sofia Papagiannaki
           'photos/plants/rose.jpg',
87 f3787696 Sofia Papagiannaki
           'photos/me.jpg']
88 f3787696 Sofia Papagiannaki
89 f3787696 Sofia Papagiannaki
details = {'container': ('name', 'count', 'bytes', 'last_modified',
90 f3787696 Sofia Papagiannaki
                         'x_container_policy'),
91 f3787696 Sofia Papagiannaki
           'object': ('name', 'hash', 'bytes', 'content_type',
92 f3787696 Sofia Papagiannaki
                      'content_encoding', 'last_modified',)}
93 f3787696 Sofia Papagiannaki
94 5fe43b8c Sofia Papagiannaki
TEST_BLOCK_SIZE = 1024
95 5fe43b8c Sofia Papagiannaki
TEST_HASH_ALGORITHM = 'sha256'
96 5fe43b8c Sofia Papagiannaki
97 bc4f25c0 Sofia Papagiannaki
print 'backend module:', pithos_settings.BACKEND_DB_MODULE
98 bc4f25c0 Sofia Papagiannaki
print 'backend database engine:', settings.DATABASES['default']['ENGINE']
99 bc4f25c0 Sofia Papagiannaki
print 'update md5:', pithos_settings.UPDATE_MD5
100 65f480f5 Sofia Papagiannaki
101 65f480f5 Sofia Papagiannaki
102 bc4f25c0 Sofia Papagiannaki
django_sqlalchemy_engines = {
103 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
104 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.postgresql': 'postgresql',
105 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.mysql': '',
106 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.sqlite3': 'mssql',
107 bc4f25c0 Sofia Papagiannaki
    'django.db.backends.oracle': 'oracle'}
108 65f480f5 Sofia Papagiannaki
109 65f480f5 Sofia Papagiannaki
110 f53483d8 Sofia Papagiannaki
def prepare_db_connection():
111 bc4f25c0 Sofia Papagiannaki
    """Build pithos backend connection string from django default database"""
112 bc4f25c0 Sofia Papagiannaki
113 65f480f5 Sofia Papagiannaki
    db = settings.DATABASES['default']
114 bc4f25c0 Sofia Papagiannaki
    name = db.get('TEST_NAME', TEST_DATABASE_PREFIX + db['NAME'])
115 bc4f25c0 Sofia Papagiannaki
116 bc4f25c0 Sofia Papagiannaki
    if (pithos_settings.BACKEND_DB_MODULE == 'pithos.backends.lib.sqlalchemy'):
117 bc4f25c0 Sofia Papagiannaki
        if db['ENGINE'] == 'django.db.backends.sqlite3':
118 bc4f25c0 Sofia Papagiannaki
            db_connection = 'sqlite:///%s' % name
119 bc4f25c0 Sofia Papagiannaki
        else:
120 bc4f25c0 Sofia Papagiannaki
            d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
121 bc4f25c0 Sofia Papagiannaki
                     user=db['USER'],
122 bc4f25c0 Sofia Papagiannaki
                     pwd=db['PASSWORD'],
123 bc4f25c0 Sofia Papagiannaki
                     host=db['HOST'].lower(),
124 bc4f25c0 Sofia Papagiannaki
                     port=int(db['PORT']) if db['PORT'] != '' else '',
125 bc4f25c0 Sofia Papagiannaki
                     name=name)
126 bc4f25c0 Sofia Papagiannaki
            db_connection = (
127 bc4f25c0 Sofia Papagiannaki
                '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d)
128 bc4f25c0 Sofia Papagiannaki
129 bc4f25c0 Sofia Papagiannaki
            # initialize pithos database
130 bc4f25c0 Sofia Papagiannaki
            initialize_db(db_connection)
131 65f480f5 Sofia Papagiannaki
    else:
132 bc4f25c0 Sofia Papagiannaki
        db_connection = name
133 bc4f25c0 Sofia Papagiannaki
    pithos_settings.BACKEND_DB_CONNECTION = db_connection
134 bc4f25c0 Sofia Papagiannaki
135 bc4f25c0 Sofia Papagiannaki
136 6ee6677e Sofia Papagiannaki
def filter_headers(headers, prefix):
137 6ee6677e Sofia Papagiannaki
    meta = {}
138 6ee6677e Sofia Papagiannaki
    for k, v in headers.iteritems():
139 6ee6677e Sofia Papagiannaki
        if not k.startswith(prefix):
140 6ee6677e Sofia Papagiannaki
            continue
141 6ee6677e Sofia Papagiannaki
        meta[unquote(k[len(prefix):])] = unquote(v)
142 6ee6677e Sofia Papagiannaki
    return meta
143 6ee6677e Sofia Papagiannaki
144 6ee6677e Sofia Papagiannaki
145 bc4f25c0 Sofia Papagiannaki
class PithosTestSuiteRunner(DjangoTestSuiteRunner):
146 bc4f25c0 Sofia Papagiannaki
    def setup_databases(self, **kwargs):
147 bc4f25c0 Sofia Papagiannaki
        old_names, mirrors = super(PithosTestSuiteRunner,
148 bc4f25c0 Sofia Papagiannaki
                                   self).setup_databases(**kwargs)
149 f53483d8 Sofia Papagiannaki
        prepare_db_connection()
150 bc4f25c0 Sofia Papagiannaki
        return old_names, mirrors
151 65f480f5 Sofia Papagiannaki
152 de8fb87d Sofia Papagiannaki
    def teardown_databases(self, old_config, **kwargs):
153 de8fb87d Sofia Papagiannaki
        from pithos.api.util import _pithos_backend_pool
154 de8fb87d Sofia Papagiannaki
        _pithos_backend_pool.shutdown()
155 1a720e84 Christos Stavrakakis
        try:
156 1a720e84 Christos Stavrakakis
            super(PithosTestSuiteRunner, self).teardown_databases(old_config,
157 1a720e84 Christos Stavrakakis
                                                                  **kwargs)
158 1a720e84 Christos Stavrakakis
        except Exception as e:
159 1a720e84 Christos Stavrakakis
            sys.stderr.write("FAILED to teardown databases: %s\n" % str(e))
160 de8fb87d Sofia Papagiannaki
161 f3787696 Sofia Papagiannaki
162 f53483d8 Sofia Papagiannaki
class PithosTestClient(Client):
163 e7e3df52 Sofia Papagiannaki
    def _get_path(self, parsed):
164 e7e3df52 Sofia Papagiannaki
        # If there are parameters, add them
165 e7e3df52 Sofia Papagiannaki
        if parsed[3]:
166 c977b0b6 Sofia Papagiannaki
            return unquote(parsed[2] + ";" + parsed[3])
167 e7e3df52 Sofia Papagiannaki
        else:
168 c977b0b6 Sofia Papagiannaki
            return unquote(parsed[2])
169 e7e3df52 Sofia Papagiannaki
170 f53483d8 Sofia Papagiannaki
    def copy(self, path, data={}, content_type=MULTIPART_CONTENT,
171 f53483d8 Sofia Papagiannaki
             follow=False, **extra):
172 f53483d8 Sofia Papagiannaki
        """
173 f53483d8 Sofia Papagiannaki
        Send a resource to the server using COPY.
174 f53483d8 Sofia Papagiannaki
        """
175 f53483d8 Sofia Papagiannaki
        parsed = urlparse(path)
176 f53483d8 Sofia Papagiannaki
        r = {
177 f53483d8 Sofia Papagiannaki
            'CONTENT_TYPE':    'text/html; charset=utf-8',
178 f53483d8 Sofia Papagiannaki
            'PATH_INFO':       self._get_path(parsed),
179 f53483d8 Sofia Papagiannaki
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
180 f53483d8 Sofia Papagiannaki
            'REQUEST_METHOD': 'COPY',
181 f53483d8 Sofia Papagiannaki
            'wsgi.input':      FakePayload('')
182 f53483d8 Sofia Papagiannaki
        }
183 f53483d8 Sofia Papagiannaki
        r.update(extra)
184 f53483d8 Sofia Papagiannaki
185 f53483d8 Sofia Papagiannaki
        response = self.request(**r)
186 f53483d8 Sofia Papagiannaki
        if follow:
187 f53483d8 Sofia Papagiannaki
            response = self._handle_redirects(response, **extra)
188 f53483d8 Sofia Papagiannaki
        return response
189 f53483d8 Sofia Papagiannaki
190 f53483d8 Sofia Papagiannaki
    def move(self, path, data={}, content_type=MULTIPART_CONTENT,
191 f53483d8 Sofia Papagiannaki
             follow=False, **extra):
192 f53483d8 Sofia Papagiannaki
        """
193 f53483d8 Sofia Papagiannaki
        Send a resource to the server using MOVE.
194 f53483d8 Sofia Papagiannaki
        """
195 f53483d8 Sofia Papagiannaki
        parsed = urlparse(path)
196 f53483d8 Sofia Papagiannaki
        r = {
197 f53483d8 Sofia Papagiannaki
            'CONTENT_TYPE':    'text/html; charset=utf-8',
198 f53483d8 Sofia Papagiannaki
            'PATH_INFO':       self._get_path(parsed),
199 f53483d8 Sofia Papagiannaki
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
200 f53483d8 Sofia Papagiannaki
            'REQUEST_METHOD': 'MOVE',
201 f53483d8 Sofia Papagiannaki
            'wsgi.input':      FakePayload('')
202 f53483d8 Sofia Papagiannaki
        }
203 f53483d8 Sofia Papagiannaki
        r.update(extra)
204 f53483d8 Sofia Papagiannaki
205 f53483d8 Sofia Papagiannaki
        response = self.request(**r)
206 f53483d8 Sofia Papagiannaki
        if follow:
207 f53483d8 Sofia Papagiannaki
            response = self._handle_redirects(response, **extra)
208 f53483d8 Sofia Papagiannaki
        return response
209 f53483d8 Sofia Papagiannaki
210 f53483d8 Sofia Papagiannaki
211 f3787696 Sofia Papagiannaki
class PithosAPITest(TestCase):
212 13bf6cd8 Sofia Papagiannaki
    def create_patch(self, name, new_callable=None):
213 13bf6cd8 Sofia Papagiannaki
        patcher = patch(name, new_callable=new_callable)
214 13bf6cd8 Sofia Papagiannaki
        thing = patcher.start()
215 13bf6cd8 Sofia Papagiannaki
        self.addCleanup(patcher.stop)
216 13bf6cd8 Sofia Papagiannaki
        return thing
217 13bf6cd8 Sofia Papagiannaki
218 f3787696 Sofia Papagiannaki
    def setUp(self):
219 f53483d8 Sofia Papagiannaki
        self.client = PithosTestClient()
220 f53483d8 Sofia Papagiannaki
221 369a7b41 Sofia Papagiannaki
        # Override default block size to spead up tests
222 5fe43b8c Sofia Papagiannaki
        pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE
223 5fe43b8c Sofia Papagiannaki
        pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM
224 369a7b41 Sofia Papagiannaki
225 f3787696 Sofia Papagiannaki
        self.user = 'user'
226 d6a92fa0 Sofia Papagiannaki
        self.pithos_path = join_urls(get_service_path(
227 d6a92fa0 Sofia Papagiannaki
            pithos_settings.pithos_services, 'object-store'))
228 f3787696 Sofia Papagiannaki
229 13bf6cd8 Sofia Papagiannaki
        # patch astakosclient.AstakosClient.validate_token
230 13bf6cd8 Sofia Papagiannaki
        mock_validate_token = self.create_patch(
231 13bf6cd8 Sofia Papagiannaki
            'astakosclient.AstakosClient.validate_token')
232 13bf6cd8 Sofia Papagiannaki
        mock_validate_token.return_value = {
233 13bf6cd8 Sofia Papagiannaki
            'access': {'user': {'id': text.udec(self.user, 'utf8')}}}
234 13bf6cd8 Sofia Papagiannaki
235 13bf6cd8 Sofia Papagiannaki
        # patch astakosclient.AstakosClient.get_token
236 13bf6cd8 Sofia Papagiannaki
        mock_get_token = self.create_patch(
237 13bf6cd8 Sofia Papagiannaki
            'astakosclient.AstakosClient.get_token')
238 13bf6cd8 Sofia Papagiannaki
        mock_get_token.return_value = {'access_token': 'valid_token'}
239 13bf6cd8 Sofia Papagiannaki
240 13bf6cd8 Sofia Papagiannaki
        # patch astakosclient.AstakosClient.api_oa2_auth
241 f4f948c0 Sofia Papagiannaki
        mock_api_oauth2_auth = self.create_patch(
242 f4f948c0 Sofia Papagiannaki
            'astakosclient.AstakosClient.oauth2_url',
243 13bf6cd8 Sofia Papagiannaki
            new_callable=PropertyMock)
244 f4f948c0 Sofia Papagiannaki
        mock_api_oauth2_auth.return_value = '/astakos/oauth2/'
245 13bf6cd8 Sofia Papagiannaki
246 ec6f741b Sofia Papagiannaki
        mock_service_get_quotas = self.create_patch(
247 ec6f741b Sofia Papagiannaki
            'astakosclient.AstakosClient.service_get_quotas')
248 ec6f741b Sofia Papagiannaki
        mock_service_get_quotas.return_value = {
249 ec6f741b Sofia Papagiannaki
            self.user: {
250 ec6f741b Sofia Papagiannaki
                "system": {
251 ec6f741b Sofia Papagiannaki
                    "pithos.diskspace": {
252 ec6f741b Sofia Papagiannaki
                        "usage": 0,
253 ec6f741b Sofia Papagiannaki
                        "limit": 1073741824,  # 1GB
254 ec6f741b Sofia Papagiannaki
                        "pending": 0}}}}
255 ec6f741b Sofia Papagiannaki
256 f3787696 Sofia Papagiannaki
    def tearDown(self):
257 f3787696 Sofia Papagiannaki
        #delete additionally created metadata
258 f3787696 Sofia Papagiannaki
        meta = self.get_account_meta()
259 f3787696 Sofia Papagiannaki
        self.delete_account_meta(meta)
260 f3787696 Sofia Papagiannaki
261 f3787696 Sofia Papagiannaki
        #delete additionally created groups
262 f3787696 Sofia Papagiannaki
        groups = self.get_account_groups()
263 f3787696 Sofia Papagiannaki
        self.delete_account_groups(groups)
264 f3787696 Sofia Papagiannaki
265 f3787696 Sofia Papagiannaki
        self._clean_account()
266 f3787696 Sofia Papagiannaki
267 65f480f5 Sofia Papagiannaki
    def _clean_account(self):
268 65f480f5 Sofia Papagiannaki
        for c in self.list_containers():
269 65f480f5 Sofia Papagiannaki
            self.delete_container_content(c['name'])
270 65f480f5 Sofia Papagiannaki
            self.delete_container(c['name'])
271 65f480f5 Sofia Papagiannaki
272 f53483d8 Sofia Papagiannaki
    def head(self, url, user='user', token='DummyToken', data={}, follow=False,
273 f53483d8 Sofia Papagiannaki
             **extra):
274 f3787696 Sofia Papagiannaki
        with astakos_user(user):
275 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
276 f53483d8 Sofia Papagiannaki
            if token:
277 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
278 6ee6677e Sofia Papagiannaki
            response = self.client.head(url, data, follow, **extra)
279 f3787696 Sofia Papagiannaki
        return response
280 f3787696 Sofia Papagiannaki
281 f53483d8 Sofia Papagiannaki
    def get(self, url, user='user', token='DummyToken', data={}, follow=False,
282 f53483d8 Sofia Papagiannaki
            **extra):
283 f3787696 Sofia Papagiannaki
        with astakos_user(user):
284 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
285 f53483d8 Sofia Papagiannaki
            if token:
286 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
287 6ee6677e Sofia Papagiannaki
            response = self.client.get(url, data, follow, **extra)
288 f3787696 Sofia Papagiannaki
        return response
289 f3787696 Sofia Papagiannaki
290 f53483d8 Sofia Papagiannaki
    def delete(self, url, user='user', token='DummyToken', data={},
291 f53483d8 Sofia Papagiannaki
               follow=False, **extra):
292 f3787696 Sofia Papagiannaki
        with astakos_user(user):
293 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
294 f53483d8 Sofia Papagiannaki
            if token:
295 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
296 6ee6677e Sofia Papagiannaki
            response = self.client.delete(url, data, follow, **extra)
297 f3787696 Sofia Papagiannaki
        return response
298 f3787696 Sofia Papagiannaki
299 f53483d8 Sofia Papagiannaki
    def post(self, url, user='user', token='DummyToken', data={},
300 6ee6677e Sofia Papagiannaki
             content_type='application/octet-stream', follow=False, **extra):
301 f3787696 Sofia Papagiannaki
        with astakos_user(user):
302 6ee6677e Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
303 f53483d8 Sofia Papagiannaki
            if token:
304 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
305 6ee6677e Sofia Papagiannaki
            response = self.client.post(url, data, content_type, follow,
306 6ee6677e Sofia Papagiannaki
                                        **extra)
307 f3787696 Sofia Papagiannaki
        return response
308 f3787696 Sofia Papagiannaki
309 f53483d8 Sofia Papagiannaki
    def put(self, url, user='user', token='DummyToken', data={},
310 7e402b46 Sofia Papagiannaki
            content_type='application/octet-stream', follow=False,
311 7e402b46 Sofia Papagiannaki
            quote_extra=True, **extra):
312 f3787696 Sofia Papagiannaki
        with astakos_user(user):
313 7e402b46 Sofia Papagiannaki
            if quote_extra:
314 7e402b46 Sofia Papagiannaki
                extra = dict((quote(k), quote(v)) for k, v in extra.items())
315 f53483d8 Sofia Papagiannaki
            if token:
316 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
317 6ee6677e Sofia Papagiannaki
            response = self.client.put(url, data, content_type, follow,
318 6ee6677e Sofia Papagiannaki
                                       **extra)
319 f3787696 Sofia Papagiannaki
        return response
320 f3787696 Sofia Papagiannaki
321 f53483d8 Sofia Papagiannaki
    def copy(self, url, user='user', token='DummyToken', data={},
322 f53483d8 Sofia Papagiannaki
             content_type='application/octet-stream', follow=False, **extra):
323 f53483d8 Sofia Papagiannaki
        with astakos_user(user):
324 f53483d8 Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
325 f53483d8 Sofia Papagiannaki
            if token:
326 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
327 f53483d8 Sofia Papagiannaki
            response = self.client.copy(url, data, content_type, follow,
328 f53483d8 Sofia Papagiannaki
                                        **extra)
329 f53483d8 Sofia Papagiannaki
        return response
330 f53483d8 Sofia Papagiannaki
331 f53483d8 Sofia Papagiannaki
    def move(self, url, user='user', token='DummyToken', data={},
332 f53483d8 Sofia Papagiannaki
             content_type='application/octet-stream', follow=False, **extra):
333 f53483d8 Sofia Papagiannaki
        with astakos_user(user):
334 f53483d8 Sofia Papagiannaki
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
335 f53483d8 Sofia Papagiannaki
            if token:
336 f53483d8 Sofia Papagiannaki
                extra['HTTP_X_AUTH_TOKEN'] = token
337 f53483d8 Sofia Papagiannaki
            response = self.client.move(url, data, content_type, follow,
338 f53483d8 Sofia Papagiannaki
                                        **extra)
339 f53483d8 Sofia Papagiannaki
        return response
340 f53483d8 Sofia Papagiannaki
341 c977b0b6 Sofia Papagiannaki
    def update_account_meta(self, meta, user=None, verify_status=True):
342 6ee6677e Sofia Papagiannaki
        user = user or self.user
343 f3787696 Sofia Papagiannaki
        kwargs = dict(
344 f3787696 Sofia Papagiannaki
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
345 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
346 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **kwargs)
347 c977b0b6 Sofia Papagiannaki
        if verify_status:
348 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
349 f53483d8 Sofia Papagiannaki
        account_meta = self.get_account_meta(user=user)
350 f53483d8 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
351 f53483d8 Sofia Papagiannaki
            k in meta.keys())
352 f53483d8 Sofia Papagiannaki
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
353 f53483d8 Sofia Papagiannaki
            k, v in meta.items())
354 f53483d8 Sofia Papagiannaki
355 c977b0b6 Sofia Papagiannaki
    def reset_account_meta(self, meta, user=None, verify_status=True):
356 f53483d8 Sofia Papagiannaki
        user = user or self.user
357 f53483d8 Sofia Papagiannaki
        kwargs = dict(
358 f53483d8 Sofia Papagiannaki
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
359 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
360 f53483d8 Sofia Papagiannaki
        r = self.post(url, user=user, **kwargs)
361 c977b0b6 Sofia Papagiannaki
        if verify_status:
362 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
363 f53483d8 Sofia Papagiannaki
        account_meta = self.get_account_meta(user=user)
364 f53483d8 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
365 f53483d8 Sofia Papagiannaki
            k in meta.keys())
366 f53483d8 Sofia Papagiannaki
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
367 f53483d8 Sofia Papagiannaki
            k, v in meta.items())
368 f3787696 Sofia Papagiannaki
369 c977b0b6 Sofia Papagiannaki
    def delete_account_meta(self, meta, user=None, verify_status=True):
370 6ee6677e Sofia Papagiannaki
        user = user or self.user
371 f53483d8 Sofia Papagiannaki
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
372 f3787696 Sofia Papagiannaki
        kwargs = dict((transform(k), '') for k, v in meta.items())
373 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
374 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **kwargs)
375 c977b0b6 Sofia Papagiannaki
        if verify_status:
376 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
377 f53483d8 Sofia Papagiannaki
        account_meta = self.get_account_meta(user=user)
378 f53483d8 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
379 f53483d8 Sofia Papagiannaki
            k in meta.keys())
380 f3787696 Sofia Papagiannaki
        return r
381 f3787696 Sofia Papagiannaki
382 c977b0b6 Sofia Papagiannaki
    def delete_account_groups(self, groups, user=None, verify_status=True):
383 6ee6677e Sofia Papagiannaki
        user = user or self.user
384 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
385 f53483d8 Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **groups)
386 c977b0b6 Sofia Papagiannaki
        if verify_status:
387 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
388 6ee6677e Sofia Papagiannaki
        account_groups = self.get_account_groups()
389 6ee6677e Sofia Papagiannaki
        (self.assertTrue(k not in account_groups) for k in groups.keys())
390 f3787696 Sofia Papagiannaki
        return r
391 f3787696 Sofia Papagiannaki
392 c977b0b6 Sofia Papagiannaki
    def get_account_info(self, until=None, user=None, verify_status=True):
393 6ee6677e Sofia Papagiannaki
        user = user or self.user
394 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user)
395 f3787696 Sofia Papagiannaki
        if until is not None:
396 f3787696 Sofia Papagiannaki
            parts = list(urlsplit(url))
397 f3787696 Sofia Papagiannaki
            parts[3] = urlencode({
398 f3787696 Sofia Papagiannaki
                'until': until
399 f3787696 Sofia Papagiannaki
            })
400 f3787696 Sofia Papagiannaki
            url = urlunsplit(parts)
401 6ee6677e Sofia Papagiannaki
        r = self.head(url, user=user)
402 c977b0b6 Sofia Papagiannaki
        if verify_status:
403 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
404 f3787696 Sofia Papagiannaki
        return r
405 f3787696 Sofia Papagiannaki
406 6ee6677e Sofia Papagiannaki
    def get_account_meta(self, until=None, user=None):
407 6ee6677e Sofia Papagiannaki
        prefix = 'X-Account-Meta-'
408 6ee6677e Sofia Papagiannaki
        r = self.get_account_info(until=until, user=user)
409 f3787696 Sofia Papagiannaki
        headers = dict(r._headers.values())
410 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
411 f3787696 Sofia Papagiannaki
412 6ee6677e Sofia Papagiannaki
    def get_account_groups(self, until=None, user=None):
413 6ee6677e Sofia Papagiannaki
        prefix = 'X-Account-Group-'
414 6ee6677e Sofia Papagiannaki
        r = self.get_account_info(until=until, user=user)
415 f3787696 Sofia Papagiannaki
        headers = dict(r._headers.values())
416 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
417 f3787696 Sofia Papagiannaki
418 c977b0b6 Sofia Papagiannaki
    def get_container_info(self, container, until=None, user=None,
419 c977b0b6 Sofia Papagiannaki
                           verify_status=True):
420 6ee6677e Sofia Papagiannaki
        user = user or self.user
421 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container)
422 3a19e99b Sofia Papagiannaki
        if until is not None:
423 3a19e99b Sofia Papagiannaki
            parts = list(urlsplit(url))
424 3a19e99b Sofia Papagiannaki
            parts[3] = urlencode({
425 3a19e99b Sofia Papagiannaki
                'until': until
426 3a19e99b Sofia Papagiannaki
            })
427 3a19e99b Sofia Papagiannaki
            url = urlunsplit(parts)
428 6ee6677e Sofia Papagiannaki
        r = self.head(url, user=user)
429 c977b0b6 Sofia Papagiannaki
        if verify_status:
430 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
431 3a19e99b Sofia Papagiannaki
        return r
432 3a19e99b Sofia Papagiannaki
433 6ee6677e Sofia Papagiannaki
    def get_container_meta(self, container, until=None, user=None):
434 6ee6677e Sofia Papagiannaki
        prefix = 'X-Container-Meta-'
435 6ee6677e Sofia Papagiannaki
        r = self.get_container_info(container, until=until, user=user)
436 3a19e99b Sofia Papagiannaki
        headers = dict(r._headers.values())
437 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
438 3a19e99b Sofia Papagiannaki
439 c977b0b6 Sofia Papagiannaki
    def update_container_meta(self, container, meta=None, user=None,
440 c977b0b6 Sofia Papagiannaki
                              verify_status=True):
441 6ee6677e Sofia Papagiannaki
        user = user or self.user
442 c977b0b6 Sofia Papagiannaki
        meta = meta or {get_random_name(): get_random_name()}
443 3a19e99b Sofia Papagiannaki
        kwargs = dict(
444 3a19e99b Sofia Papagiannaki
            ('HTTP_X_CONTAINER_META_%s' % k, str(v)) for k, v in meta.items())
445 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container)
446 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, **kwargs)
447 c977b0b6 Sofia Papagiannaki
        if verify_status:
448 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
449 6e0f3e65 Sofia Papagiannaki
        container_meta = self.get_container_meta(container, user=user)
450 3a19e99b Sofia Papagiannaki
        (self.assertTrue('X-Container-Meta-%s' % k in container_meta) for
451 3a19e99b Sofia Papagiannaki
            k in meta.keys())
452 3a19e99b Sofia Papagiannaki
        (self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for
453 3a19e99b Sofia Papagiannaki
            k, v in meta.items())
454 c977b0b6 Sofia Papagiannaki
        return r
455 3a19e99b Sofia Papagiannaki
456 6ee6677e Sofia Papagiannaki
    def list_containers(self, format='json', headers={}, user=None, **params):
457 6ee6677e Sofia Papagiannaki
        user = user or self.user
458 6ee6677e Sofia Papagiannaki
        _url = join_urls(self.pithos_path, user)
459 d6a92fa0 Sofia Papagiannaki
        parts = list(urlsplit(_url))
460 f3787696 Sofia Papagiannaki
        params['format'] = format
461 f3787696 Sofia Papagiannaki
        parts[3] = urlencode(params)
462 f3787696 Sofia Papagiannaki
        url = urlunsplit(parts)
463 f3787696 Sofia Papagiannaki
        _headers = dict(('HTTP_%s' % k.upper(), str(v))
464 f3787696 Sofia Papagiannaki
                        for k, v in headers.items())
465 6ee6677e Sofia Papagiannaki
        r = self.get(url, user=user, **_headers)
466 f3787696 Sofia Papagiannaki
467 f3787696 Sofia Papagiannaki
        if format is None:
468 f3787696 Sofia Papagiannaki
            containers = r.content.split('\n')
469 f3787696 Sofia Papagiannaki
            if '' in containers:
470 f3787696 Sofia Papagiannaki
                containers.remove('')
471 f3787696 Sofia Papagiannaki
            return containers
472 f3787696 Sofia Papagiannaki
        elif format == 'json':
473 f3787696 Sofia Papagiannaki
            try:
474 f3787696 Sofia Papagiannaki
                containers = json.loads(r.content)
475 f3787696 Sofia Papagiannaki
            except:
476 f3787696 Sofia Papagiannaki
                self.fail('json format expected')
477 f3787696 Sofia Papagiannaki
            return containers
478 f3787696 Sofia Papagiannaki
        elif format == 'xml':
479 f3787696 Sofia Papagiannaki
            return minidom.parseString(r.content)
480 f3787696 Sofia Papagiannaki
481 c977b0b6 Sofia Papagiannaki
    def delete_container_content(self, cname, user=None, verify_status=True):
482 6ee6677e Sofia Papagiannaki
        user = user or self.user
483 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
484 6ee6677e Sofia Papagiannaki
        r = self.delete('%s?delimiter=/' % url, user=user)
485 c977b0b6 Sofia Papagiannaki
        if verify_status:
486 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
487 f3787696 Sofia Papagiannaki
        return r
488 f3787696 Sofia Papagiannaki
489 c977b0b6 Sofia Papagiannaki
    def delete_container(self, cname, user=None, verify_status=True):
490 6ee6677e Sofia Papagiannaki
        user = user or self.user
491 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
492 6ee6677e Sofia Papagiannaki
        r = self.delete(url, user=user)
493 c977b0b6 Sofia Papagiannaki
        if verify_status:
494 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
495 c977b0b6 Sofia Papagiannaki
        return r
496 c977b0b6 Sofia Papagiannaki
497 c977b0b6 Sofia Papagiannaki
    def delete_object(self, cname, oname, user=None, verify_status=True):
498 c977b0b6 Sofia Papagiannaki
        user = user or self.user
499 c977b0b6 Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
500 c977b0b6 Sofia Papagiannaki
        r = self.delete(url, user=user)
501 c977b0b6 Sofia Papagiannaki
        if verify_status:
502 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
503 f3787696 Sofia Papagiannaki
        return r
504 f3787696 Sofia Papagiannaki
505 c977b0b6 Sofia Papagiannaki
    def create_container(self, cname=None, user=None, verify_status=True):
506 95b36144 Sofia Papagiannaki
        cname = cname or get_random_name()
507 6ee6677e Sofia Papagiannaki
        user = user or self.user
508 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
509 6ee6677e Sofia Papagiannaki
        r = self.put(url, user=user, data='')
510 c977b0b6 Sofia Papagiannaki
        if verify_status:
511 c977b0b6 Sofia Papagiannaki
            self.assertTrue(r.status_code in (202, 201))
512 95b36144 Sofia Papagiannaki
        return cname, r
513 f3787696 Sofia Papagiannaki
514 c977b0b6 Sofia Papagiannaki
    def upload_object(self, cname, oname=None, length=None, verify_status=True,
515 6ee6677e Sofia Papagiannaki
                      user=None, **meta):
516 bc4f25c0 Sofia Papagiannaki
        oname = oname or get_random_name()
517 5fe43b8c Sofia Papagiannaki
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
518 6ee6677e Sofia Papagiannaki
        user = user or self.user
519 3a19e99b Sofia Papagiannaki
        data = get_random_data(length=length)
520 f3787696 Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
521 f3787696 Sofia Papagiannaki
                       for k, v in meta.iteritems())
522 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
523 6ee6677e Sofia Papagiannaki
        r = self.put(url, user=user, data=data, **headers)
524 c977b0b6 Sofia Papagiannaki
        if verify_status:
525 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
526 3a19e99b Sofia Papagiannaki
        return oname, data, r
527 3a19e99b Sofia Papagiannaki
528 3a19e99b Sofia Papagiannaki
    def update_object_data(self, cname, oname=None, length=None,
529 3a19e99b Sofia Papagiannaki
                           content_type=None, content_range=None,
530 c977b0b6 Sofia Papagiannaki
                           verify_status=True, user=None, **meta):
531 bc4f25c0 Sofia Papagiannaki
        oname = oname or get_random_name()
532 5fe43b8c Sofia Papagiannaki
        length = length or random.randint(TEST_BLOCK_SIZE, 2 * TEST_BLOCK_SIZE)
533 3a19e99b Sofia Papagiannaki
        content_type = content_type or 'application/octet-stream'
534 6ee6677e Sofia Papagiannaki
        user = user or self.user
535 3a19e99b Sofia Papagiannaki
        data = get_random_data(length=length)
536 3a19e99b Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
537 3a19e99b Sofia Papagiannaki
                       for k, v in meta.iteritems())
538 3a19e99b Sofia Papagiannaki
        if content_range:
539 3a19e99b Sofia Papagiannaki
            headers['HTTP_CONTENT_RANGE'] = content_range
540 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
541 6ee6677e Sofia Papagiannaki
        r = self.post(url, user=user, data=data, content_type=content_type,
542 6ee6677e Sofia Papagiannaki
                      **headers)
543 c977b0b6 Sofia Papagiannaki
        if verify_status:
544 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 204)
545 f3787696 Sofia Papagiannaki
        return oname, data, r
546 f3787696 Sofia Papagiannaki
547 3a19e99b Sofia Papagiannaki
    def append_object_data(self, cname, oname=None, length=None,
548 6ee6677e Sofia Papagiannaki
                           content_type=None, user=None):
549 3a19e99b Sofia Papagiannaki
        return self.update_object_data(cname, oname=oname,
550 3a19e99b Sofia Papagiannaki
                                       length=length,
551 3a19e99b Sofia Papagiannaki
                                       content_type=content_type,
552 6ee6677e Sofia Papagiannaki
                                       content_range='bytes */*',
553 6ee6677e Sofia Papagiannaki
                                       user=user)
554 3a19e99b Sofia Papagiannaki
555 c977b0b6 Sofia Papagiannaki
    def create_folder(self, cname, oname=None, user=None, verify_status=True,
556 c977b0b6 Sofia Papagiannaki
                      **headers):
557 6ee6677e Sofia Papagiannaki
        user = user or self.user
558 bc4f25c0 Sofia Papagiannaki
        oname = oname or get_random_name()
559 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname, oname)
560 6ee6677e Sofia Papagiannaki
        r = self.put(url, user=user, data='',
561 6ee6677e Sofia Papagiannaki
                     content_type='application/directory', **headers)
562 c977b0b6 Sofia Papagiannaki
        if verify_status:
563 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
564 f3787696 Sofia Papagiannaki
        return oname, r
565 f3787696 Sofia Papagiannaki
566 c977b0b6 Sofia Papagiannaki
    def list_objects(self, cname, prefix=None, user=None, verify_status=True):
567 6ee6677e Sofia Papagiannaki
        user = user or self.user
568 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, cname)
569 3a19e99b Sofia Papagiannaki
        path = '%s?format=json' % url
570 3a19e99b Sofia Papagiannaki
        if prefix is not None:
571 3a19e99b Sofia Papagiannaki
            path = '%s&prefix=%s' % (path, prefix)
572 6ee6677e Sofia Papagiannaki
        r = self.get(path, user=user)
573 c977b0b6 Sofia Papagiannaki
        if verify_status:
574 c977b0b6 Sofia Papagiannaki
            self.assertTrue(r.status_code in (200, 204))
575 f3787696 Sofia Papagiannaki
        try:
576 f3787696 Sofia Papagiannaki
            objects = json.loads(r.content)
577 f3787696 Sofia Papagiannaki
        except:
578 f3787696 Sofia Papagiannaki
            self.fail('json format expected')
579 f3787696 Sofia Papagiannaki
        return objects
580 f3787696 Sofia Papagiannaki
581 6ee6677e Sofia Papagiannaki
    def get_object_info(self, container, object, version=None, until=None,
582 c977b0b6 Sofia Papagiannaki
                        user=None, verify_status=True):
583 6ee6677e Sofia Papagiannaki
        user = user or self.user
584 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container, object)
585 3a19e99b Sofia Papagiannaki
        if until is not None:
586 3a19e99b Sofia Papagiannaki
            parts = list(urlsplit(url))
587 3a19e99b Sofia Papagiannaki
            parts[3] = urlencode({
588 3a19e99b Sofia Papagiannaki
                'until': until
589 3a19e99b Sofia Papagiannaki
            })
590 3a19e99b Sofia Papagiannaki
            url = urlunsplit(parts)
591 3a19e99b Sofia Papagiannaki
        if version:
592 3a19e99b Sofia Papagiannaki
            url = '%s?version=%s' % (url, version)
593 6ee6677e Sofia Papagiannaki
        r = self.head(url, user=user)
594 c977b0b6 Sofia Papagiannaki
        if verify_status:
595 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
596 3a19e99b Sofia Papagiannaki
        return r
597 3a19e99b Sofia Papagiannaki
598 6ee6677e Sofia Papagiannaki
    def get_object_meta(self, container, object, version=None, until=None,
599 6ee6677e Sofia Papagiannaki
                        user=None):
600 6ee6677e Sofia Papagiannaki
        prefix = 'X-Object-Meta-'
601 f53483d8 Sofia Papagiannaki
        user = user or self.user
602 6ee6677e Sofia Papagiannaki
        r = self.get_object_info(container, object, version, until=until,
603 6ee6677e Sofia Papagiannaki
                                 user=user)
604 3a19e99b Sofia Papagiannaki
        headers = dict(r._headers.values())
605 6ee6677e Sofia Papagiannaki
        return filter_headers(headers, prefix)
606 3a19e99b Sofia Papagiannaki
607 c977b0b6 Sofia Papagiannaki
    def update_object_meta(self, container, object, meta=None, user=None,
608 c977b0b6 Sofia Papagiannaki
                           verify_status=True):
609 6ee6677e Sofia Papagiannaki
        user = user or self.user
610 c977b0b6 Sofia Papagiannaki
        meta = meta or {get_random_name(): get_random_name()}
611 3a19e99b Sofia Papagiannaki
        kwargs = dict(
612 3a19e99b Sofia Papagiannaki
            ('HTTP_X_OBJECT_META_%s' % k, str(v)) for k, v in meta.items())
613 6ee6677e Sofia Papagiannaki
        url = join_urls(self.pithos_path, user, container, object)
614 6ee6677e Sofia Papagiannaki
        r = self.post('%s?update=' % url, user=user, content_type='', **kwargs)
615 c977b0b6 Sofia Papagiannaki
        if verify_status:
616 c977b0b6 Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
617 6e0f3e65 Sofia Papagiannaki
        object_meta = self.get_object_meta(container, object, user=user)
618 3a19e99b Sofia Papagiannaki
        (self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for
619 3a19e99b Sofia Papagiannaki
            k in meta.keys())
620 3a19e99b Sofia Papagiannaki
        (self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
621 3a19e99b Sofia Papagiannaki
            k, v in meta.items())
622 c977b0b6 Sofia Papagiannaki
        return r
623 3a19e99b Sofia Papagiannaki
624 f3787696 Sofia Papagiannaki
    def assert_extended(self, data, format, type, size=10000):
625 f3787696 Sofia Papagiannaki
        if format == 'xml':
626 f3787696 Sofia Papagiannaki
            self._assert_xml(data, type, size)
627 f3787696 Sofia Papagiannaki
        elif format == 'json':
628 f3787696 Sofia Papagiannaki
            self._assert_json(data, type, size)
629 f3787696 Sofia Papagiannaki
630 f3787696 Sofia Papagiannaki
    def _assert_json(self, data, type, size):
631 f3787696 Sofia Papagiannaki
        convert = lambda s: s.lower()
632 f3787696 Sofia Papagiannaki
        info = [convert(elem) for elem in details[type]]
633 f3787696 Sofia Papagiannaki
        self.assertTrue(len(data) <= size)
634 f3787696 Sofia Papagiannaki
        for item in info:
635 f3787696 Sofia Papagiannaki
            for i in data:
636 f3787696 Sofia Papagiannaki
                if 'subdir' in i.keys():
637 f3787696 Sofia Papagiannaki
                    continue
638 f3787696 Sofia Papagiannaki
                self.assertTrue(item in i.keys())
639 f3787696 Sofia Papagiannaki
640 f3787696 Sofia Papagiannaki
    def _assert_xml(self, data, type, size):
641 f3787696 Sofia Papagiannaki
        convert = lambda s: s.lower()
642 f3787696 Sofia Papagiannaki
        info = [convert(elem) for elem in details[type]]
643 f3787696 Sofia Papagiannaki
        try:
644 f3787696 Sofia Papagiannaki
            info.remove('content_encoding')
645 f3787696 Sofia Papagiannaki
        except ValueError:
646 f3787696 Sofia Papagiannaki
            pass
647 f3787696 Sofia Papagiannaki
        xml = data
648 f3787696 Sofia Papagiannaki
        entities = xml.getElementsByTagName(type)
649 f3787696 Sofia Papagiannaki
        self.assertTrue(len(entities) <= size)
650 f3787696 Sofia Papagiannaki
        for e in entities:
651 f3787696 Sofia Papagiannaki
            for item in info:
652 f3787696 Sofia Papagiannaki
                self.assertTrue(e.getElementsByTagName(item))
653 f3787696 Sofia Papagiannaki
654 f3787696 Sofia Papagiannaki
655 f3787696 Sofia Papagiannaki
class AssertMappingInvariant(object):
656 f3787696 Sofia Papagiannaki
    def __init__(self, callable, *args, **kwargs):
657 f3787696 Sofia Papagiannaki
        self.callable = callable
658 f3787696 Sofia Papagiannaki
        self.args = args
659 f3787696 Sofia Papagiannaki
        self.kwargs = kwargs
660 f3787696 Sofia Papagiannaki
661 f3787696 Sofia Papagiannaki
    def __enter__(self):
662 f3787696 Sofia Papagiannaki
        self.map = self.callable(*self.args, **self.kwargs)
663 f3787696 Sofia Papagiannaki
        return self.map
664 f3787696 Sofia Papagiannaki
665 f3787696 Sofia Papagiannaki
    def __exit__(self, type, value, tb):
666 f3787696 Sofia Papagiannaki
        map = self.callable(*self.args, **self.kwargs)
667 f3787696 Sofia Papagiannaki
        for k, v in self.map.items():
668 f3787696 Sofia Papagiannaki
            if is_date(v):
669 f3787696 Sofia Papagiannaki
                continue
670 f3787696 Sofia Papagiannaki
671 f3787696 Sofia Papagiannaki
            assert(k in map), '%s not in map' % k
672 f3787696 Sofia Papagiannaki
            assert v == map[k]
673 f3787696 Sofia Papagiannaki
674 3a19e99b Sofia Papagiannaki
675 3a19e99b Sofia Papagiannaki
class AssertUUidInvariant(object):
676 3a19e99b Sofia Papagiannaki
    def __init__(self, callable, *args, **kwargs):
677 3a19e99b Sofia Papagiannaki
        self.callable = callable
678 3a19e99b Sofia Papagiannaki
        self.args = args
679 3a19e99b Sofia Papagiannaki
        self.kwargs = kwargs
680 3a19e99b Sofia Papagiannaki
681 3a19e99b Sofia Papagiannaki
    def __enter__(self):
682 3a19e99b Sofia Papagiannaki
        self.map = self.callable(*self.args, **self.kwargs)
683 3a19e99b Sofia Papagiannaki
        assert('x-object-uuid' in self.map)
684 3a19e99b Sofia Papagiannaki
        self.uuid = self.map['x-object-uuid']
685 3a19e99b Sofia Papagiannaki
        return self.map
686 3a19e99b Sofia Papagiannaki
687 3a19e99b Sofia Papagiannaki
    def __exit__(self, type, value, tb):
688 3a19e99b Sofia Papagiannaki
        map = self.callable(*self.args, **self.kwargs)
689 3a19e99b Sofia Papagiannaki
        assert('x-object-uuid' in self.map)
690 3a19e99b Sofia Papagiannaki
        uuid = map['x-object-uuid']
691 3a19e99b Sofia Papagiannaki
        assert(uuid == self.uuid)