Revision 3a19e99b

b/contrib/snf-pithos-tools/pithos/tools/lib/hashmap.py
88 88
            self.size += len(block)
89 89

  
90 90

  
91
def merkle(path, blocksize=4194304, blockhash='sha256'):
91
def merkle(fp, blocksize=4194304, blockhash='sha256'):
92 92
    hashes = HashMap(blocksize, blockhash)
93
    hashes.load(open(path))
93
    hashes.load(fp)
94 94
    return hexlify(hashes.hash())
b/snf-django-lib/snf_django/utils/testing.py
138 138
                    "system": {
139 139
                        "pithos.diskspace": {
140 140
                            "usage": 0,
141
                            "limit": 1073741824,
141
                            "limit": 1073741824,  # 1GB
142 142
                            "pending": 0
143 143
                        }
144 144
                    }
b/snf-pithos-app/pithos/api/test/__init__.py
39 39

  
40 40
from snf_django.utils.testing import with_settings, astakos_user
41 41

  
42
from pithos.backends.random_word import get_random_word
43 42
from pithos.api import settings as pithos_settings
43
from pithos.api.test.util import is_date, get_random_data
44 44

  
45 45
from synnefo.lib.services import get_service_path
46 46
from synnefo.lib import join_urls
......
51 51

  
52 52
import django.utils.simplejson as json
53 53

  
54
import re
55 54
import random
56 55
import threading
57 56
import functools
58 57

  
58

  
59 59
pithos_test_settings = functools.partial(with_settings, pithos_settings)
60 60

  
61 61
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
......
87 87

  
88 88

  
89 89
class PithosAPITest(TestCase):
90
    #TODO unauthorized request
91 90
    def setUp(self):
92 91
        pithos_settings.BACKEND_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
93 92
        pithos_settings.BACKEND_DB_CONNECTION = django_to_sqlalchemy()
......
212 211
                if not k.startswith('X-Account-Group-')])
213 212
        return headers
214 213

  
214
    def get_container_info(self, container, until=None):
215
        url = join_urls(self.pithos_path, self.user, container)
216
        if until is not None:
217
            parts = list(urlsplit(url))
218
            parts[3] = urlencode({
219
                'until': until
220
            })
221
            url = urlunsplit(parts)
222
        r = self.head(url)
223
        self.assertEqual(r.status_code, 204)
224
        return r
225

  
226
    def get_container_meta(self, container, until=None):
227
        r = self.get_container_info(container, until=until)
228
        headers = dict(r._headers.values())
229
        map(headers.pop,
230
            [k for k in headers.keys()
231
                if not k.startswith('X-Container-Meta-')])
232
        return headers
233

  
234
    def update_container_meta(self, container, meta):
235
        kwargs = dict(
236
            ('HTTP_X_CONTAINER_META_%s' % k, str(v)) for k, v in meta.items())
237
        url = join_urls(self.pithos_path, self.user, container)
238
        r = self.post('%s?update=' % url, **kwargs)
239
        self.assertEqual(r.status_code, 202)
240
        container_meta = self.get_container_meta(container)
241
        (self.assertTrue('X-Container-Meta-%s' % k in container_meta) for
242
            k in meta.keys())
243
        (self.assertEqual(container_meta['X-Container-Meta-%s' % k], v) for
244
            k, v in meta.items())
245

  
215 246
    def list_containers(self, format='json', headers={}, **params):
216 247
        _url = join_urls(self.pithos_path, self.user)
217 248
        parts = list(urlsplit(_url))
......
254 285
        self.assertTrue(r.status_code in (202, 201))
255 286
        return r
256 287

  
257
    def upload_object(self, cname, oname=None, **meta):
258
        oname = oname or get_random_word(8)
259
        data = get_random_word(length=random.randint(1, 1024))
288
    def upload_object(self, cname, oname=None, length=1024, verify=True,
289
                      **meta):
290
        oname = oname or get_random_data(8)
291
        length = length or random.randint(1, 1024)
292
        data = get_random_data(length=length)
260 293
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
261 294
                       for k, v in meta.iteritems())
262 295
        url = join_urls(self.pithos_path, self.user, cname, oname)
263 296
        r = self.put(url, data=data, **headers)
264
        self.assertEqual(r.status_code, 201)
297
        if verify:
298
            self.assertEqual(r.status_code, 201)
299
        return oname, data, r
300

  
301
    def update_object_data(self, cname, oname=None, length=None,
302
                           content_type=None, content_range=None,
303
                           verify=True, **meta):
304
        oname = oname or get_random_data(8)
305
        length = length or random.randint(1, 1024)
306
        content_type = content_type or 'application/octet-stream'
307
        data = get_random_data(length=length)
308
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
309
                       for k, v in meta.iteritems())
310
        if content_range:
311
            headers['HTTP_CONTENT_RANGE'] = content_range
312
        url = join_urls(self.pithos_path, self.user, cname, oname)
313
        r = self.post(url, data=data, content_type=content_type, **headers)
314
        if verify:
315
            self.assertEqual(r.status_code, 204)
265 316
        return oname, data, r
266 317

  
267
    def create_folder(self, cname, oname=get_random_word(8), **headers):
318
    def append_object_data(self, cname, oname=None, length=None,
319
                           content_type=None):
320
        return self.update_object_data(cname, oname=oname,
321
                                       length=length,
322
                                       content_type=content_type,
323
                                       content_range='bytes */*')
324

  
325
    def create_folder(self, cname, oname=None, **headers):
326
        oname = oname or get_random_data(8)
268 327
        url = join_urls(self.pithos_path, self.user, cname, oname)
269 328
        r = self.put(url, data='', content_type='application/directory',
270 329
                     **headers)
271 330
        self.assertEqual(r.status_code, 201)
272 331
        return oname, r
273 332

  
274
    def list_objects(self, cname):
333
    def list_objects(self, cname, prefix=None):
275 334
        url = join_urls(self.pithos_path, self.user, cname)
276
        r = self.get('%s?format=json' % url)
335
        path = '%s?format=json' % url
336
        if prefix is not None:
337
            path = '%s&prefix=%s' % (path, prefix)
338
        r = self.get(path)
277 339
        self.assertTrue(r.status_code in (200, 204))
278 340
        try:
279 341
            objects = json.loads(r.content)
......
281 343
            self.fail('json format expected')
282 344
        return objects
283 345

  
346
    def get_object_info(self, container, object, version=None, until=None):
347
        url = join_urls(self.pithos_path, self.user, container, object)
348
        if until is not None:
349
            parts = list(urlsplit(url))
350
            parts[3] = urlencode({
351
                'until': until
352
            })
353
            url = urlunsplit(parts)
354
        if version:
355
            url = '%s?version=%s' % (url, version)
356
        r = self.head(url)
357
        self.assertEqual(r.status_code, 200)
358
        return r
359

  
360
    def get_object_meta(self, container, object, version=None, until=None):
361
        r = self.get_object_info(container, object, version, until=until)
362
        headers = dict(r._headers.values())
363
        map(headers.pop,
364
            [k for k in headers.keys()
365
                if not k.startswith('X-Object-Meta-')])
366
        return headers
367

  
368
    def update_object_meta(self, container, object, meta):
369
        kwargs = dict(
370
            ('HTTP_X_OBJECT_META_%s' % k, str(v)) for k, v in meta.items())
371
        url = join_urls(self.pithos_path, self.user, container, object)
372
        r = self.post('%s?update=' % url, content_type='', **kwargs)
373
        self.assertEqual(r.status_code, 202)
374
        object_meta = self.get_object_meta(container, object)
375
        (self.assertTrue('X-Objecr-Meta-%s' % k in object_meta) for
376
            k in meta.keys())
377
        (self.assertEqual(object_meta['X-Object-Meta-%s' % k], v) for
378
            k, v in meta.items())
379

  
284 380
    def assert_status(self, status, codes):
285 381
        l = [elem for elem in return_codes]
286 382
        if isinstance(codes, list):
......
339 435
            assert(k in map), '%s not in map' % k
340 436
            assert v == map[k]
341 437

  
438

  
439
class AssertUUidInvariant(object):
440
    def __init__(self, callable, *args, **kwargs):
441
        self.callable = callable
442
        self.args = args
443
        self.kwargs = kwargs
444

  
445
    def __enter__(self):
446
        self.map = self.callable(*self.args, **self.kwargs)
447
        assert('x-object-uuid' in self.map)
448
        self.uuid = self.map['x-object-uuid']
449
        return self.map
450

  
451
    def __exit__(self, type, value, tb):
452
        map = self.callable(*self.args, **self.kwargs)
453
        assert('x-object-uuid' in self.map)
454
        uuid = map['x-object-uuid']
455
        assert(uuid == self.uuid)
456

  
457

  
342 458
django_sqlalchemy_engines = {
343 459
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
344 460
    'django.db.backends.postgresql': 'postgresql',
......
365 481
        return '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d
366 482

  
367 483

  
368
def is_date(date):
369
    __D = r'(?P<day>\d{2})'
370
    __D2 = r'(?P<day>[ \d]\d)'
371
    __M = r'(?P<mon>\w{3})'
372
    __Y = r'(?P<year>\d{4})'
373
    __Y2 = r'(?P<year>\d{2})'
374
    __T = r'(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})'
375
    RFC1123_DATE = re.compile(r'^\w{3}, %s %s %s %s GMT$' % (
376
        __D, __M, __Y, __T))
377
    RFC850_DATE = re.compile(r'^\w{6,9}, %s-%s-%s %s GMT$' % (
378
        __D, __M, __Y2, __T))
379
    ASCTIME_DATE = re.compile(r'^\w{3} %s %s %s %s$' % (
380
        __M, __D2, __T, __Y))
381
    for regex in RFC1123_DATE, RFC850_DATE, ASCTIME_DATE:
382
        m = regex.match(date)
383
        if m is not None:
384
            return True
385
    return False
386

  
387

  
388
def strnextling(prefix):
389
    """Return the first unicode string
390
       greater than but not starting with given prefix.
391
       strnextling('hello') -> 'hellp'
392
    """
393
    if not prefix:
394
        ## all strings start with the null string,
395
        ## therefore we have to approximate strnextling('')
396
        ## with the last unicode character supported by python
397
        ## 0x10ffff for wide (32-bit unicode) python builds
398
        ## 0x00ffff for narrow (16-bit unicode) python builds
399
        ## We will not autodetect. 0xffff is safe enough.
400
        return unichr(0xffff)
401
    s = prefix[:-1]
402
    c = ord(prefix[-1])
403
    if c >= 0xffff:
404
        raise RuntimeError
405
    s += unichr(c + 1)
406
    return s
407

  
408

  
409 484
def test_concurrently(times=2):
410 485
    """
411 486
    Add this decorator to small pieces of code that you want to test
b/snf-pithos-app/pithos/api/test/accounts.py
135 135
        self.assertEquals(containers,
136 136
                          ['apples', 'bananas', 'kiwis', 'oranges', 'pears'])
137 137

  
138
    def test_list_shared(self):
139
        # upload and publish object
140
        oname, data, resp = self.upload_object('apples')
141
        url = join_urls(self.pithos_path, self.user, 'apples', oname)
142
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
143
        self.assertEqual(r.status_code, 202)
144

  
145
        # upload and share object
146
        other, data, resp = self.upload_object('bananas')
147
        url = join_urls(self.pithos_path, self.user, 'bananas', other)
148
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
149
        self.assertEqual(r.status_code, 202)
150

  
151
        url = join_urls(self.pithos_path, self.user)
152

  
153
        # list shared containers
154
        r = self.get('%s?public=' % url)
155
        objects = r.content.split('\n')
156
        if '' in objects:
157
            objects.remove('')
158
        self.assertEqual(objects, ['apples'])
159

  
160
        # list shared containers
161
        r = self.get('%s?shared=' % url)
162
        objects = r.content.split('\n')
163
        if '' in objects:
164
            objects.remove('')
165
        self.assertEqual(objects, ['bananas'])
166

  
167
        # list public and shared containers
168
        r = self.get('%s?public=&shared=' % url)
169
        objects = r.content.split('\n')
170
        if '' in objects:
171
            objects.remove('')
172
        self.assertEqual(objects, ['apples', 'bananas'])
173

  
174
        # assert forbidden public container listing
175
        r = self.get('%s?public=' % url, user='alice')
176
        self.assertEqual(r.status_code, 403)
177

  
178
        # assert forbidden shared & public container listing
179
        r = self.get('%s?public=&shared=' % url, user='alice')
180
        self.assertEqual(r.status_code, 403)
181

  
138 182
    def test_list_with_limit(self):
139 183
        containers = self.list_containers(format=None, limit=2)
140 184
        self.assertEquals(len(containers), 2)
......
194 238

  
195 239
        # Check modified
196 240
        for t in t1_formats:
197
            r = self.get('%s' % url, HTTP_IF_MODIFIED_SINCE=t)
241
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
198 242
            self.assertEqual(r.status_code, 200)
199 243
            self.assertEqual(
200 244
                r.content.split('\n')[:-1],
......
211 255

  
212 256
        # Check modified
213 257
        for t in t2_formats:
214
            r = self.get('%s' % url, HTTP_IF_MODIFIED_SINCE=t)
258
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
215 259
            self.assertEqual(r.status_code, 200)
216 260
            self.assertEqual(
217 261
                r.content.split('\n')[:-1],
......
219 263

  
220 264
    def test_if_modified_since_invalid_date(self):
221 265
        url = join_urls(self.pithos_path, self.user)
222
        r = self.get('%s' % url, HTTP_IF_MODIFIED_SINCE='Monday')
266
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
223 267
        self.assertEqual(r.status_code, 200)
224 268
        self.assertEqual(
225 269
            r.content.split('\n')[:-1],
b/snf-pithos-app/pithos/api/test/containers.py
34 34
# interpreted as representing official policies, either expressed
35 35
# or implied, of GRNET S.A.
36 36

  
37
from pithos.api.test import PithosAPITest, DATE_FORMATS, o_names,\
38
    strnextling, pithos_settings, pithos_test_settings
39
from pithos.backends.random_word import get_random_word
37
from pithos.api.test import (PithosAPITest, DATE_FORMATS, o_names,
38
                             pithos_settings, pithos_test_settings)
39
from pithos.api.test.util import strnextling, get_random_data
40 40

  
41 41
from synnefo.lib import join_urls
42 42

  
......
45 45

  
46 46
from xml.dom import minidom
47 47
from urllib import quote
48
import time as _time
48 49

  
49 50
import random
50 51
import datetime
......
121 122
        r = self.get('%s?shared=' % url)
122 123
        self.assertEqual(r.status_code, 200)
123 124
        objects = r.content.split('\n')
124
        objects.remove('')
125
        if '' in objects:
126
            objects.remove('')
125 127
        self.assertEqual([oname], objects)
126 128

  
127 129
        # list detailed shared and assert only the shared object is returned
......
150 152
            self.fail('json format expected')
151 153
        self.assertEqual([oname], [o['name'] for o in objects])
152 154
        self.assertTrue('x_object_sharing' in objects[0])
153
        # TODO
154
        #self.assertTrue('x_object_public' in objects[0])
155
        self.assertTrue('x_object_public' in objects[0])
155 156

  
156 157
        # create child object
157 158
        descendant = strnextling(oname)
......
161 162
        r = self.get('%s?shared=' % url)
162 163
        self.assertEqual(r.status_code, 200)
163 164
        objects = r.content.split('\n')
164
        objects.remove('')
165
        if '' in objects:
166
            objects.remove('')
165 167
        self.assertTrue(oname in objects)
166 168
        self.assertTrue(descendant not in objects)
167 169

  
168 170
        # check folder inheritance
169 171
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_SHARING='read=*')
170 172
        # create child object
171
        descendant = '%s/%s' % (oname, get_random_word(8))
173
        descendant = '%s/%s' % (oname, get_random_data(8))
172 174
        self.upload_object(cname, descendant)
173 175
        # request shared
174 176
        url = join_urls(self.pithos_path, self.user, cname)
175 177
        r = self.get('%s?shared=' % url)
176 178
        self.assertEqual(r.status_code, 200)
177 179
        objects = r.content.split('\n')
178
        objects.remove('')
180
        if '' in objects:
181
            objects.remove('')
179 182
        self.assertTrue(oname in objects)
180 183
        self.assertTrue(descendant in objects)
181 184

  
......
184 187
        cname = self.cnames[0]
185 188
        onames = self.objects[cname].keys()
186 189
        oname = onames.pop()
190
        other = onames.pop()
191

  
192
        # publish an object
187 193
        url = join_urls(self.pithos_path, self.user, cname, oname)
188 194
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
189 195
        self.assertEqual(r.status_code, 202)
190 196

  
191 197
        # share another
192
        other = onames.pop()
193 198
        url = join_urls(self.pithos_path, self.user, cname, other)
194 199
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
195 200
        self.assertEqual(r.status_code, 202)
......
200 205
        objects = r.content.split('\n')
201 206
        self.assertEqual(r.status_code, 200)
202 207
        self.assertTrue(oname in r.content.split('\n'))
203
        (self.assertTrue(o not in objects) for o in o_names[1:])
208
        (self.assertTrue(object not in objects) for object in o_names[1:])
204 209

  
205 210
        # list detailed public and assert only the public object is returned
206 211
        url = join_urls(self.pithos_path, self.user, cname)
......
210 215
            objects = json.loads(r.content)
211 216
        except:
212 217
            self.fail('json format expected')
213
        self.assertEqual([oname], [o['name'] for o in objects])
218
        self.assertEqual([oname], [obj['name'] for obj in objects])
214 219
        self.assertTrue('x_object_sharing' not in objects[0])
215 220
        self.assertTrue('x_object_public' in objects[0])
216 221

  
......
226 231
            objects = json.loads(r.content)
227 232
        except:
228 233
            self.fail('json format expected')
229
        self.assertEqual([oname], [o['name'] for o in objects])
234
        self.assertEqual([oname], [obj['name'] for obj in objects])
230 235
        self.assertTrue('x_object_sharing' in objects[0])
231 236
        self.assertTrue('x_object_public' in objects[0])
232 237

  
......
237 242
        r = self.get('%s?public=&format=json' % url, user='bob')
238 243
        self.assertEqual(r.status_code, 403)
239 244

  
240
        # Assert listing the container public contents to shared users
245
        # Assert forbidden public object listing to shared users
241 246
        r = self.get('%s?public=&format=json' % url, user='alice')
242
        self.assertEqual(r.status_code, 200)
243
        try:
244
            objects = json.loads(r.content)
245
        except:
246
            self.fail('json format expected')
247
        # TODO
248
        #self.assertEqual([oname], [o['name'] for o in objects])
249
        self.assertTrue('x_object_sharing' in objects[0])
250
        # assert public is not returned though
251
        self.assertTrue('x_object_public' not in objects[0])
247
        self.assertEqual(r.status_code, 403)
252 248

  
253 249
        # create child object
254 250
        descendant = strnextling(oname)
......
256 252
        # request public and assert child obejct is not listed
257 253
        r = self.get('%s?public=' % url)
258 254
        objects = r.content.split('\n')
259
        objects.remove('')
255
        if '' in objects:
256
            objects.remove('')
260 257
        self.assertEqual(r.status_code, 200)
261 258
        self.assertTrue(oname in objects)
262 259
        (self.assertTrue(o not in objects) for o in o_names[1:])
......
264 261
        # test folder inheritance
265 262
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
266 263
        # create child object
267
        descendant = '%s/%s' % (oname, get_random_word(8))
264
        descendant = '%s/%s' % (oname, get_random_data(8))
268 265
        self.upload_object(cname, descendant)
269 266
        # request public
270 267
        r = self.get('%s?public=' % url)
......
273 270
        self.assertTrue(oname in objects)
274 271
        self.assertTrue(descendant not in objects)
275 272

  
276
#    def test_list_shared_public(self):
277
#        # publish an object
278
#        cname = self.cnames[0]
279
#        onames = self.objects[cname].keys()
280
#        oname = onames.pop()
281
#        r = self.post('/v1/%s/%s/%s' % (self.user, cname, oname),
282
#                      content_type='',
283
#                      HTTP_X_OBJECT_PUBLIC='true')
284
#        self.assertEqual(r.status_code, 202)
285
#
286
#        # share another
287
#        other = onames.pop()
288
#        r = self.post('/v1/%s/%s/%s' % (self.user, cname, other),
289
#                      content_type='',
290
#                      HTTP_X_OBJECT_SHARING='read=alice')
291
#        self.assertEqual(r.status_code, 202)
292
#
293
#        # list shared and public objects and assert object is listed
294
#        r = self.get('/v1/%s/%s?shared=&public=&format=json' % (
295
#            self.user, cname))
296
#        self.assertEqual(r.status_code, 200)
297
#        objects = json.loads(r.content)
298
#        self.assertEqual([o['name'] for o in objects], sorted([oname, other]))
299
#        for o in objects:
300
#            if o['name'] == oname:
301
#                self.assertTrue('x_object_public' in objects[0])
302
#            elif o['name'] == other:
303
#                self.assertTrue('x_object_sharing' in objects[1])
304
#
305
#        # assert not listing shared and public to a not shared user
306
#        r = self.get('/v1/%s/%s?shared=&public=&format=json' % (
307
#            self.user, cname), user='bob')
308
#        self.assertEqual(r.status_code, 403)
309
#
310
#        # assert listing shared and public to a shared user
311
#        r = self.get('/v1/%s/%s?shared=&public=&format=json' % (
312
#            self.user, cname), user='alice')
313
#        self.assertEqual(r.status_code, 200)
314
#        try:
315
#            objects = json.loads(r.content)
316
#        except:
317
#            self.fail('json format expected')
318
#        self.assertEqual([o['name'] for o in objects], sorted([oname, other]))
319
#
320
#        # create child object
321
#        descentant1 = strnextling(oname)
322
#        self.upload_object(cname, descendant1)
323
#        descentant2 = strnextling(other)
324
#        self.upload_object(cname, descendant2)
325
#        r = self.get('/v1/%s/%s?shared=&public=&format=json' % (
326
#            self.user, cname), user='alice')
327
#        self.assertEqual(r.status_code, 200)
328
#        try:
329
#            objects = json.loads(r.content)
330
#        except:
331
#            self.fail('json format expected')
332
#        self.assertEqual([o['name'] for o in objects], [oname])
333
#
334
#        # test inheritance
335
#        oname1, _ = self.create_folder(cname,
336
#                                       HTTP_X_OBJECT_SHARING='read=alice')
337
#        # create child object
338
#        descendant1 = '%s/%s' % (oname, get_random_word(8))
339
#        self.upload_object(cname, descendant1)
340
#
341
#        oname2, _ = self.create_folder(cname,
342
#                                       HTTP_X_OBJECT_PUBLIC='true')
343
#        # create child object
344
#        descendant2 = '%s/%s' % (oname, get_random_word(8))
345
#        self.upload_object(cname, descendant2)
346
#
347
#
348
#        o = self.upload_random_data(self.container[1], 'folder2/object')
349
#        objs = self.client.list_objects(
350
#            self.container[1], shared=True, public=True)
351
#        self.assertEqual(objs, ['folder1', 'folder1/object', 'folder2'])
352
#        objs = cl.list_objects(
353
#            self.container[1], shared=True, public=True, account=get_user()
354
#        )
355
#        self.assertEqual(objs, ['folder1', 'folder1/object'])
356
#
273
    def test_list_shared_public(self):
274
        # publish an object
275
        cname = self.cnames[0]
276
        container_url = join_urls(self.pithos_path, self.user, cname)
277
        onames = self.objects[cname].keys()
278
        oname = onames.pop()
279
        url = join_urls(container_url, oname)
280
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
281
        self.assertEqual(r.status_code, 202)
282

  
283
        # share another
284
        other = onames.pop()
285
        url = join_urls(container_url, other)
286
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
287
        self.assertEqual(r.status_code, 202)
288

  
289
        # list shared and public objects and assert object is listed
290
        r = self.get('%s?shared=&public=&format=json' % container_url)
291
        self.assertEqual(r.status_code, 200)
292
        objects = json.loads(r.content)
293
        self.assertEqual([o['name'] for o in objects], sorted([oname, other]))
294
        for o in objects:
295
            if o['name'] == oname:
296
                self.assertTrue('x_object_public' in o.keys())
297
            elif o['name'] == other:
298
                self.assertTrue('x_object_sharing' in o.keys())
299

  
300
        # assert not listing shared and public to a not shared user
301
        r = self.get('%s?shared=&public=&format=json' % container_url,
302
                     user='bob')
303
        self.assertEqual(r.status_code, 403)
304

  
305
        # assert not listing public to a shared user
306
        r = self.get('%s?shared=&public=&format=json' % container_url,
307
                     user='alice')
308
        self.assertEqual(r.status_code, 403)
309

  
310
        # create child object
311
        descendant = strnextling(oname)
312
        self.upload_object(cname, descendant)
313
        # request public and assert child obejct is not listed
314
        r = self.get('%s?shared=&public=' % container_url)
315
        objects = r.content.split('\n')
316
        if '' in objects:
317
            objects.remove('')
318
        self.assertEqual(r.status_code, 200)
319
        self.assertTrue(oname in objects)
320
        (self.assertTrue(o not in objects) for o in o_names[1:])
321

  
322
        # test folder inheritance
323
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
324
        # create child object
325
        descendant = '%s/%s' % (oname, get_random_data(8))
326
        self.upload_object(cname, descendant)
327
        # request public
328
        r = self.get('%s?shared=&public=' % container_url)
329
        self.assertEqual(r.status_code, 200)
330
        objects = r.content.split('\n')
331
        if '' in objects:
332
            objects.remove('')
333
        self.assertTrue(oname in objects)
334
        self.assertTrue(descendant not in objects)
335

  
357 336
    def test_list_objects(self):
358 337
        cname = self.cnames[0]
359 338
        url = join_urls(self.pithos_path, self.user, cname)
......
366 345

  
367 346
    def test_list_objects_containing_slash(self):
368 347
        self.create_container('test')
369
        self.upload_object('test', '/objectname')
348
        self.upload_object('test', quote('/objectname', ''))
370 349

  
371 350
        url = join_urls(self.pithos_path, self.user, 'test')
372 351

  
......
517 496
        cname = 'apples'
518 497
        container_url = join_urls(self.pithos_path, self.user, cname)
519 498

  
520
        oname1 = self.objects[cname].keys().pop()
521
        url = join_urls(container_url, oname1)
522
        self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='aaa')
499
        onames = self.objects[cname].keys()
500
        url = join_urls(container_url, onames[0])
501
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='aaa')
502
        self.assertEqual(r.status_code, 202)
523 503

  
524
        oname2 = self.objects[cname].keys().pop()
525
        url = join_urls(container_url, cname, oname2)
526
        self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='ab')
504
        url = join_urls(container_url, onames[1])
505
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='ab')
506
        self.assertEqual(r.status_code, 202)
527 507

  
528
        oname3 = self.objects[cname].keys().pop()
529
        url = join_urls(container_url, oname3)
530
        self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='100')
508
        url = join_urls(container_url, onames[2])
509
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='100')
510
        self.assertEqual(r.status_code, 202)
531 511

  
532
        oname4 = self.objects[cname].keys().pop()
533
        url = join_urls(container_url, oname4)
534
        self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='200')
512
        url = join_urls(container_url, onames[3])
513
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='200')
514
        self.assertEqual(r.status_code, 202)
535 515

  
536 516
        # test multiple existence criteria matches
537 517
        r = self.get('%s?meta=Quality,Stock' % container_url)
......
539 519
        objects = r.content.split('\n')
540 520
        if '' in objects:
541 521
            objects.remove('')
542
        self.assertTrue(objects, sorted([oname1, oname2, oname3, oname4]))
522
        self.assertTrue(objects, sorted(onames))
543 523

  
544 524
        # list objects that satisfy the existence criteria
545 525
        r = self.get('%s?meta=Stock' % container_url)
......
547 527
        objects = r.content.split('\n')
548 528
        if '' in objects:
549 529
            objects.remove('')
550
        self.assertTrue(objects, sorted([oname3, oname4]))
530
        self.assertTrue(objects, sorted(onames[2:]))
551 531

  
552 532
        # test case insensitive existence criteria matching
553 533
        r = self.get('%s?meta=quality' % container_url)
......
555 535
        objects = r.content.split('\n')
556 536
        if '' in objects:
557 537
            objects.remove('')
558
        self.assertTrue(objects, sorted([oname1, oname2]))
538
        self.assertTrue(objects, sorted(onames[:2]))
559 539

  
560 540
        # test do not all existencecriteria match
561 541
        r = self.get('%s?meta=Quality,Foo' % container_url)
......
563 543
        objects = r.content.split('\n')
564 544
        if '' in objects:
565 545
            objects.remove('')
566
        self.assertTrue(objects, sorted([oname1, oname2]))
546
        self.assertTrue(objects, sorted(onames[:2]))
567 547

  
568 548
        # test equals criteria
569 549
        r = self.get('%s?meta=%s' % (container_url, quote('Quality=aaa')))
......
571 551
        objects = r.content.split('\n')
572 552
        if '' in objects:
573 553
            objects.remove('')
574
        self.assertTrue(objects, [oname1])
554
        self.assertTrue(objects, [onames[0]])
575 555

  
576 556
        # test not equals criteria
577
        r = self.get('%s?meta=%s' % (container_url, urlencode('Quality!=aaa')))
557
        r = self.get('%s?meta=%s' % (container_url, quote('Quality!=aaa')))
578 558
        self.assertEqual(r.status_code, 200)
579 559
        objects = r.content.split('\n')
580 560
        if '' in objects:
581
            objects.remove()
582
        self.assertTrue(objects, [oname2])
561
            objects.remove('')
562
        self.assertTrue(objects, [onames[1]])
583 563

  
584 564
        # test lte criteria
585
        r = self.get('%s?meta=%s' % (container_url, urlencode('Stock<=120')))
565
        r = self.get('%s?meta=%s' % (container_url, quote('Stock<=120')))
586 566
        self.assertEqual(r.status_code, 200)
587 567
        objects = r.content.split('\n')
588 568
        if '' in objects:
589 569
            objects.remove('')
590
        self.assertTrue(objects, [oname3])
570
        self.assertTrue(objects, [onames[2]])
591 571

  
592 572
        # test gte criteria
593
        r = self.get('%s?meta=%s' % (container_url, urlencode('Stock>=200')))
573
        r = self.get('%s?meta=%s' % (container_url, quote('Stock>=200')))
594 574
        self.assertEqual(r.status_code, 200)
595 575
        objects = r.content.split('\n')
596 576
        if '' in objects:
597 577
            objects.remove('')
598
        self.assertTrue(objects, [oname4])
578
        self.assertTrue(objects, [onames[3]])
599 579

  
600
#
601
#    def test_if_modified_since(self):
602
#        t = datetime.datetime.utcnow()
603
#        t2 = t - datetime.timedelta(minutes=10)
604
#
605
#        #add a new object
606
#        self.upload_random_data(self.container[0], o_names[0])
607
#
608
#        for f in DATE_FORMATS:
609
#            past = t2.strftime(f)
610
#            try:
611
#                o = self.client.list_objects(self.container[0],
612
#                                            if_modified_since=past)
613
#                self.assertEqual(o,
614
#                                 self.client.list_objects(self.container[0]))
615
#            except Fault, f:
616
#                self.failIf(f.status == 304) #fail if not modified
617
#
618
#    def test_if_modified_since_invalid_date(self):
619
#        headers = {'if-modified-since':''}
620
#        o = self.client.list_objects(self.container[0], if_modified_since='')
621
#        self.assertEqual(o, self.client.list_objects(self.container[0]))
622
#
623
#    def test_if_not_modified_since(self):
624
#        now = datetime.datetime.utcnow()
625
#        since = now + datetime.timedelta(1)
626
#
627
#        for f in DATE_FORMATS:
628
#            args = {'if_modified_since':'%s' %since.strftime(f)}
629
#
630
#            #assert not modified
631
#            self.assert_raises_fault(304, self.client.list_objects,
632
#                                     self.container[0], **args)
633
#
634
#    def test_if_unmodified_since(self):
635
#        now = datetime.datetime.utcnow()
636
#        since = now + datetime.timedelta(1)
637
#
638
#        for f in DATE_FORMATS:
639
#            obj = self.client.list_objects(
640
#                self.container[0], if_unmodified_since=since.strftime(f))
641
#
642
#            #assert unmodified
643
#            self.assertEqual(obj, self.client.list_objects(self.container[0]))
644
#
645
#    def test_if_unmodified_since_precondition_failed(self):
646
#        t = datetime.datetime.utcnow()
647
#        t2 = t - datetime.timedelta(minutes=10)
648
#
649
#        #add a new container
650
#        self.client.create_container('dummy')
651
#
652
#        for f in DATE_FORMATS:
653
#            past = t2.strftime(f)
654
#
655
#            args = {'if_unmodified_since':'%s' %past}
656
#
657
#            #assert precondition failed
658
#            self.assert_raises_fault(412, self.client.list_objects,
659
#                                     self.container[0], **args)
660
#
661
#class ContainerPut(BaseTestCase):
662
#    def setUp(self):
663
#        BaseTestCase.setUp(self)
664
#        self.containers = list(set(self.initial_containers + ['c1', 'c2']))
665
#        self.containers.sort()
666
#
667
#    def test_create(self):
668
#        self.client.create_container(self.containers[0])
669
#        containers = self.client.list_containers()
670
#        self.assertTrue(self.containers[0] in containers)
671
#        self.assert_container_exists(self.containers[0])
672
#
673
#    def test_create_twice(self):
674
#        self.client.create_container(self.containers[0])
675
#        self.assertTrue(not self.client.create_container(self.containers[0]))
676
#
677
#    def test_quota(self):
678
#        self.client.create_container(self.containers[0])
679
#
680
#        policy = {'quota':100}
681
#        self.client.set_container_policies(self.containers[0], **policy)
682
#
683
#        meta = self.client.retrieve_container_metadata(self.containers[0])
684
#        self.assertTrue('x-container-policy-quota' in meta)
685
#        self.assertEqual(meta['x-container-policy-quota'], '100')
686
#
687
#        args = [self.containers[0], 'o1']
688
#        kwargs = {'length':101}
689
#        self.assert_raises_fault(
690
#            413, self.upload_random_data, *args, **kwargs)
691
#
692
#        #reset quota
693
#        policy = {'quota':0}
694
#        self.client.set_container_policies(self.containers[0], **policy)
580
    def test_if_modified_since(self):
581
        cname = 'apples'
582
        container_info = self.get_container_info(cname)
583
        last_modified = container_info['Last-Modified']
584
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
585
        t1_formats = map(t1.strftime, DATE_FORMATS)
586

  
587
        # Check not modified
588
        url = join_urls(self.pithos_path, self.user, cname)
589
        for t in t1_formats:
590
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
591
            self.assertEqual(r.status_code, 304)
592

  
593
        # modify account: add container
594
        _time.sleep(1)
595
        oname = self.upload_object(cname)[0]
596

  
597
        # Check modified
598
        objects = self.objects[cname].keys()
599
        objects.append(oname)
600
        for t in t1_formats:
601
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
602
            self.assertEqual(r.status_code, 200)
603
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
604

  
605
        container_info = self.get_container_info(cname)
606
        last_modified = container_info['Last-Modified']
607
        t2 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
608
        t2_formats = map(t2.strftime, DATE_FORMATS)
609

  
610
        # modify account: update account meta
611
        _time.sleep(1)
612
        self.update_container_meta(cname, {'foo': 'bar'})
613

  
614
        # Check modified
615
        for t in t2_formats:
616
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
617
            self.assertEqual(r.status_code, 200)
618
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
619

  
620
    def test_if_modified_since_invalid_date(self):
621
        cname = 'apples'
622
        url = join_urls(self.pithos_path, self.user, cname)
623
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
624
        self.assertEqual(r.status_code, 200)
625
        self.assertEqual(r.content.split('\n')[:-1],
626
                         sorted(self.objects['apples'].keys()))
627

  
628
    def test_if_not_modified_since(self):
629
        cname = 'apples'
630
        url = join_urls(self.pithos_path, self.user, cname)
631
        container_info = self.get_container_info(cname)
632
        last_modified = container_info['Last-Modified']
633
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
634

  
635
        # Check unmodified
636
        t1 = t + datetime.timedelta(seconds=1)
637
        t1_formats = map(t1.strftime, DATE_FORMATS)
638
        for t in t1_formats:
639
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
640
            self.assertEqual(r.status_code, 200)
641
            self.assertEqual(
642
                r.content.split('\n')[:-1],
643
                sorted(self.objects['apples']))
644

  
645
        # modify account: add container
646
        _time.sleep(2)
647
        self.upload_object(cname)
648

  
649
        container_info = self.get_container_info(cname)
650
        last_modified = container_info['Last-Modified']
651
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
652
        t2 = t - datetime.timedelta(seconds=1)
653
        t2_formats = map(t2.strftime, DATE_FORMATS)
654

  
655
        # Check modified
656
        for t in t2_formats:
657
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
658
            self.assertEqual(r.status_code, 412)
659

  
660
        # modify account: update account meta
661
        _time.sleep(1)
662
        self.update_container_meta(cname, {'foo': 'bar'})
663

  
664
        container_info = self.get_container_info(cname)
665
        last_modified = container_info['Last-Modified']
666
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
667
        t3 = t - datetime.timedelta(seconds=1)
668
        t3_formats = map(t3.strftime, DATE_FORMATS)
669

  
670
        # Check modified
671
        for t in t3_formats:
672
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
673
            self.assertEqual(r.status_code, 412)
674

  
675
    def test_if_unmodified_since(self):
676
        cname = 'apples'
677
        url = join_urls(self.pithos_path, self.user, cname)
678
        container_info = self.get_container_info(cname)
679
        last_modified = container_info['Last-Modified']
680
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
681
        t = t + datetime.timedelta(seconds=1)
682
        t_formats = map(t.strftime, DATE_FORMATS)
683

  
684
        for tf in t_formats:
685
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
686
            self.assertEqual(r.status_code, 200)
687
            self.assertEqual(
688
                r.content.split('\n')[:-1],
689
                sorted(self.objects['apples']))
690

  
691
    def test_if_unmodified_since_precondition_failed(self):
692
        cname = 'apples'
693
        url = join_urls(self.pithos_path, self.user, cname)
694
        container_info = self.get_container_info(cname)
695
        last_modified = container_info['Last-Modified']
696
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
697
        t = t - datetime.timedelta(seconds=1)
698
        t_formats = map(t.strftime, DATE_FORMATS)
699

  
700
        for tf in t_formats:
701
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
702
            self.assertEqual(r.status_code, 412)
703

  
704

  
705
class ContainerPut(PithosAPITest):
706
    def test_create(self):
707
        self.create_container('c1')
708
        self.list_containers()
709
        self.assertTrue('c1' in self.list_containers(format=None))
710

  
711
    def test_create_twice(self):
712
        self.create_container('c1')
713
        self.assertTrue('c1' in self.list_containers(format=None))
714
        r = self.create_container('c1')
715
        self.assertEqual(r.status_code, 202)
716
        self.assertTrue('c1' in self.list_containers(format=None))
717

  
718

  
719
class ContainerPost(PithosAPITest):
720
    def test_update_meta(self):
721
        cname = 'apples'
722
        self.create_container(cname)
723
        meta = {'test': 'test33', 'tost': 'tost22'}
724
        self.update_container_meta(cname, meta)
725
        info = self.get_container_info(cname)
726
        for k, v in meta.items():
727
            k = 'x-container-meta-%s' % k
728
            self.assertTrue(k in info)
729
            self.assertEqual(info[k], v)
730

  
731
    def test_quota(self):
732
        self.create_container('c1')
733

  
734
        url = join_urls(self.pithos_path, self.user, 'c1')
735
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
736
        self.assertEqual(r.status_code, 202)
737

  
738
        info = self.get_container_info('c1')
739
        self.assertTrue('x-container-policy-quota' in info)
740
        self.assertEqual(info['x-container-policy-quota'], '100')
741

  
742
        r = self.upload_object('c1', length=101, verify=False)[2]
743
        self.assertEqual(r.status_code, 413)
744

  
745
        url = join_urls(self.pithos_path, self.user, 'c1')
746
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='0')
747
        self.assertEqual(r.status_code, 202)
748

  
749
        r = self.upload_object('c1', length=1)
750

  
751

  
752
class ContainerDelete(PithosAPITest):
753
    def setUp(self):
754
        PithosAPITest.setUp(self)
755
        cnames = ['c1', 'c2']
756

  
757
        for c in cnames:
758
            self.create_container(c)
759

  
760
    def test_delete(self):
761
        url = join_urls(self.pithos_path, self.user, 'c1')
762
        r = self.delete(url)
763
        self.assertEqual(r.status_code, 204)
764
        self.assertTrue('c1' not in self.list_containers(format=None))
765

  
766
    def test_delete_non_empty(self):
767
        self.upload_object('c1')
768
        url = join_urls(self.pithos_path, self.user, 'c1')
769
        r = self.delete(url)
770
        self.assertEqual(r.status_code, 409)
771
        self.assertTrue('c1' in self.list_containers(format=None))
772

  
773
    def test_delete_invalid(self):
774
        url = join_urls(self.pithos_path, self.user, 'c3')
775
        r = self.delete(url)
776
        self.assertEqual(r.status_code, 404)
777

  
778
    def test_delete_contents(self):
779
        folder = self.create_folder('c1')[0]
780
        descendant = strnextling(folder)
781
        self.upload_object('c1', descendant)
782
        self.create_folder('c1', '%s/%s' % (folder, get_random_data(5)))[0]
783

  
784
        self.delete('%s?delimiter=/' % join_urls(
785
            self.pithos_path, self.user, 'c1'))
786
        self.assertEqual([], self.list_objects('c1'))
787
        self.assertTrue('c1' in self.list_containers(format=None))
b/snf-pithos-app/pithos/api/test/objects.py
1
#!/usr/bin/env python
2
#coding=utf8
3

  
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

  
37
from collections import defaultdict
38
from urllib import quote
39
import time as _time
40

  
41
from pithos.api.test import (PithosAPITest, pithos_settings,
42
                             AssertMappingInvariant, AssertUUidInvariant,
43
                             DATE_FORMATS)
44
from pithos.api.test.util import compute_md5_hash, strnextling, get_random_data
45
from pithos.api.test.util.hashmap import merkle
46

  
47
from synnefo.lib import join_urls
48

  
49
import django.utils.simplejson as json
50

  
51
import random
52
import re
53
import datetime
54

  
55

  
56
class ObjectGet(PithosAPITest):
57
    def setUp(self):
58
        PithosAPITest.setUp(self)
59
        self.containers = ['c1', 'c2']
60

  
61
        # create some containers
62
        for c in self.containers:
63
            self.create_container(c)
64

  
65
        # upload files
66
        self.objects = defaultdict(list)
67
        self.objects['c1'].append(self.upload_object('c1')[0])
68

  
69
    def test_versions(self):
70
        c = 'c1'
71
        o = self.objects[c][0]
72
        url = join_urls(self.pithos_path, self.user, c, o)
73

  
74
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
75
        r = self.post(url, content_type='', **meta)
76
        self.assertEqual(r.status_code, 202)
77

  
78
        url = join_urls(self.pithos_path, self.user, c, o)
79
        r = self.get('%s?version=list&format=json' % url)
80
        self.assertEqual(r.status_code, 200)
81
        l1 = json.loads(r.content)['versions']
82
        self.assertEqual(len(l1), 2)
83

  
84
        # update meta
85
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
86
                'HTTP_X_OBJECT_META_STOCK': 'True'}
87
        r = self.post(url, content_type='', **meta)
88
        self.assertEqual(r.status_code, 202)
89

  
90
        # assert a newly created version has been created
91
        r = self.get('%s?version=list&format=json' % url)
92
        self.assertEqual(r.status_code, 200)
93
        l2 = json.loads(r.content)['versions']
94
        self.assertEqual(len(l2), len(l1) + 1)
95
        self.assertEqual(l2[:-1], l1)
96

  
97
        vserial, _ = l2[-2]
98
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
99
                         {'X-Object-Meta-Quality': 'AAA'})
100

  
101
        # update data
102
        self.append_object_data(c, o)
103

  
104
        # assert a newly created version has been created
105
        r = self.get('%s?version=list&format=json' % url)
106
        self.assertEqual(r.status_code, 200)
107
        l3 = json.loads(r.content)['versions']
108
        self.assertEqual(len(l3), len(l2) + 1)
109
        self.assertEqual(l3[:-1], l2)
110

  
111
    def test_objects_with_trailing_spaces(self):
112
        # create object
113
        oname = self.upload_object('c1')[0]
114
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
115

  
116
        r = self.get(quote('%s ' % url))
117
        self.assertEqual(r.status_code, 404)
118

  
119
        # delete object
120
        self.delete(url)
121

  
122
        r = self.get(url)
123
        self.assertEqual(r.status_code, 404)
124

  
125
        # upload object with trailing space
126
        oname = self.upload_object('c1', quote('%s ' % get_random_data(8)))[0]
127

  
128
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
129
        r = self.get(url)
130
        self.assertEqual(r.status_code, 200)
131

  
132
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
133
        r = self.get(url)
134
        self.assertEqual(r.status_code, 404)
135

  
136
    def test_get_partial(self):
137
        cname = self.containers[0]
138
        oname, odata = self.upload_object(cname, length=512)[:-1]
139
        url = join_urls(self.pithos_path, self.user, cname, oname)
140
        r = self.get(url, HTTP_RANGE='bytes=0-499')
141
        self.assertEqual(r.status_code, 206)
142
        data = r.content
143
        self.assertEqual(data, odata[:500])
144
        self.assertTrue('Content-Range' in r)
145
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
146
        self.assertTrue('Content-Type' in r)
147
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
148

  
149
    def test_get_final_500(self):
150
        cname = self.containers[0]
151
        oname, odata = self.upload_object(cname, length=512)[:-1]
152
        size = len(odata)
153
        url = join_urls(self.pithos_path, self.user, cname, oname)
154
        r = self.get(url, HTTP_RANGE='bytes=-500')
155
        self.assertEqual(r.status_code, 206)
156
        self.assertEqual(r.content, odata[-500:])
157
        self.assertTrue('Content-Range' in r)
158
        self.assertEqual(r['Content-Range'],
159
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
160
        self.assertTrue('Content-Type' in r)
161
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
162

  
163
    def test_get_rest(self):
164
        cname = self.containers[0]
165
        oname, odata = self.upload_object(cname, length=512)[:-1]
166
        size = len(odata)
167
        url = join_urls(self.pithos_path, self.user, cname, oname)
168
        offset = len(odata) - random.randint(1, 512)
169
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
170
        self.assertEqual(r.status_code, 206)
171
        self.assertEqual(r.content, odata[offset:])
172
        self.assertTrue('Content-Range' in r)
173
        self.assertEqual(r['Content-Range'],
174
                         'bytes %s-%s/%s' % (offset, size - 1, size))
175
        self.assertTrue('Content-Type' in r)
176
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
177

  
178
    def test_get_range_not_satisfiable(self):
179
        cname = self.containers[0]
180
        oname, odata = self.upload_object(cname, length=512)[:-1]
181
        url = join_urls(self.pithos_path, self.user, cname, oname)
182

  
183
        # TODO
184
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
185
        #self.assertEqual(r.status_code, 416)
186

  
187
        offset = len(odata) + 1
188
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
189
        self.assertEqual(r.status_code, 416)
190

  
191
    def test_multiple_range(self):
192
        cname = self.containers[0]
193
        oname, odata = self.upload_object(cname, length=1024)[:-1]
194
        url = join_urls(self.pithos_path, self.user, cname, oname)
195

  
196
        l = ['0-499', '-500', '1000-']
197
        ranges = 'bytes=%s' % ','.join(l)
198
        r = self.get(url, HTTP_RANGE=ranges)
199
        self.assertEqual(r.status_code, 206)
200
        self.assertTrue('content-type' in r)
201
        p = re.compile(
202
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
203
            re.I)
204
        m = p.match(r['content-type'])
205
        if m is None:
206
            self.fail('Invalid multiple range content type')
207
        boundary = m.groupdict()['boundary']
208
        cparts = r.content.split('--%s' % boundary)[1:-1]
209

  
210
        # assert content parts length
211
        self.assertEqual(len(cparts), len(l))
212

  
213
        # for each content part assert headers
214
        i = 0
215
        for cpart in cparts:
216
            content = cpart.split('\r\n')
217
            headers = content[1:3]
218
            content_range = headers[0].split(': ')
219
            self.assertEqual(content_range[0], 'Content-Range')
220

  
221
            r = l[i].split('-')
222
            if not r[0] and not r[1]:
223
                pass
224
            elif not r[0]:
225
                start = len(odata) - int(r[1])
226
                end = len(odata)
227
            elif not r[1]:
228
                start = int(r[0])
229
                end = len(odata)
230
            else:
231
                start = int(r[0])
232
                end = int(r[1]) + 1
233
            fdata = odata[start:end]
234
            sdata = '\r\n'.join(content[4:-1])
235
            self.assertEqual(len(fdata), len(sdata))
236
            self.assertEquals(fdata, sdata)
237
            i += 1
238

  
239
    def test_multiple_range_not_satisfiable(self):
240
        # perform get with multiple range
241
        cname = self.containers[0]
242
        oname, odata = self.upload_object(cname, length=1024)[:-1]
243
        out_of_range = len(odata) + 1
244
        l = ['0-499', '-500', '%d-' % out_of_range]
245
        ranges = 'bytes=%s' % ','.join(l)
246
        url = join_urls(self.pithos_path, self.user, cname, oname)
247
        r = self.get(url, HTTP_RANGE=ranges)
248
        self.assertEqual(r.status_code, 416)
249

  
250
    def test_get_with_if_match_with_md5(self):
251
        cname = self.containers[0]
252
        oname, odata = self.upload_object(cname, length=1024)[:-1]
253

  
254
        # perform get with If-Match
255
        url = join_urls(self.pithos_path, self.user, cname, oname)
256

  
257
        if pithos_settings.UPDATE_MD5:
258
            etag = compute_md5_hash(odata)
259
        else:
260
            etag = merkle(odata)
261

  
262
        r = self.get(url, HTTP_IF_MATCH=etag)
263

  
264
        # assert get success
265
        self.assertEqual(r.status_code, 200)
266

  
267
        # assert response content
268
        self.assertEqual(r.content, odata)
269

  
270
    def test_get_with_if_match_star_with_md5(self):
271
        cname = self.containers[0]
272
        oname, odata = self.upload_object(cname, length=1024)[:-1]
273

  
274
        # perform get with If-Match *
275
        url = join_urls(self.pithos_path, self.user, cname, oname)
276
        r = self.get(url, HTTP_IF_MATCH='*')
277

  
278
        # assert get success
279
        self.assertEqual(r.status_code, 200)
280

  
281
        # assert response content
282
        self.assertEqual(r.content, odata)
283

  
284
    def test_get_with_multiple_if_match_without_md5(self):
285
        cname = self.containers[0]
286
        oname, odata = self.upload_object(cname, length=1024)[:-1]
287

  
288
        # perform get with If-Match
289
        url = join_urls(self.pithos_path, self.user, cname, oname)
290

  
291
        if pithos_settings.UPDATE_MD5:
292
            etag = compute_md5_hash(odata)
293
        else:
294
            etag = merkle(odata)
295

  
296
        r = self.get(url, HTTP_IF_MATCH=','.join([etag,
297
                                                  get_random_data(8)]))
298

  
299
        # assert get success
300
        self.assertEqual(r.status_code, 200)
301

  
302
        # assert response content
303
        self.assertEqual(r.content, odata)
304

  
305
    def test_if_match_precondition_failed(self):
306
        cname = self.containers[0]
307
        oname, odata = self.upload_object(cname, length=1024)[:-1]
308

  
309
        # perform get with If-Match
310
        url = join_urls(self.pithos_path, self.user, cname, oname)
311
        r = self.get(url, HTTP_IF_MATCH=get_random_data(8))
312
        self.assertEqual(r.status_code, 412)
313

  
314
    def test_if_none_match_without_md5(self):
315
        # upload object
316
        cname = self.containers[0]
317
        oname, odata = self.upload_object(cname, length=1024)[:-1]
318

  
319
        if pithos_settings.UPDATE_MD5:
320
            etag = compute_md5_hash(odata)
321
        else:
322
            etag = merkle(odata)
323

  
324
        # perform get with If-None-Match
325
        url = join_urls(self.pithos_path, self.user, cname, oname)
326
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
327

  
328
        # assert precondition_failed
329
        self.assertEqual(r.status_code, 304)
330

  
331
        # update object data
332
        r = self.append_object_data(cname, oname)[-1]
333
        self.assertTrue(etag != r['ETag'])
334

  
335
        # perform get with If-None-Match
336
        url = join_urls(self.pithos_path, self.user, cname, oname)
337
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
338

  
339
        # assert get success
340
        self.assertEqual(r.status_code, 200)
341

  
342
    def test_if_none_match_star_without_md5(self):
343
        # upload object
344
        cname = self.containers[0]
345
        oname, odata = self.upload_object(cname, length=1024)[:-1]
346

  
347
        # perform get with If-None-Match with star
348
        url = join_urls(self.pithos_path, self.user, cname, oname)
349
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
350

  
351
        self.assertEqual(r.status_code, 304)
352

  
353
    def test_if_modified_since(self):
354
        # upload object
355
        cname = self.containers[0]
356
        oname, odata = self.upload_object(cname, length=1024)[:-1]
357
        object_info = self.get_object_info(cname, oname)
358
        last_modified = object_info['Last-Modified']
359
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
360
        t1_formats = map(t1.strftime, DATE_FORMATS)
361

  
362
        # Check not modified since
363
        url = join_urls(self.pithos_path, self.user, cname, oname)
364
        for t in t1_formats:
365
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
366
            self.assertEqual(r.status_code, 304)
367

  
368
        _time.sleep(1)
369

  
370
        # update object data
371
        appended_data = self.append_object_data(cname, oname)[1]
372

  
373
        # Check modified since
374
        url = join_urls(self.pithos_path, self.user, cname, oname)
375
        for t in t1_formats:
376
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
377
            self.assertEqual(r.status_code, 200)
378
            self.assertEqual(r.content, odata + appended_data)
379

  
380
    def test_if_modified_since_invalid_date(self):
381
        cname = self.containers[0]
382
        oname, odata = self.upload_object(cname, length=1024)[:-1]
383
        url = join_urls(self.pithos_path, self.user, cname, oname)
384
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
385
        self.assertEqual(r.status_code, 200)
386
        self.assertEqual(r.content, odata)
387

  
388
    def test_if_not_modified_since(self):
389
        cname = self.containers[0]
390
        oname, odata = self.upload_object(cname, length=1024)[:-1]
391
        url = join_urls(self.pithos_path, self.user, cname, oname)
392
        object_info = self.get_object_info(cname, oname)
393
        last_modified = object_info['Last-Modified']
394
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
395

  
396
        # Check unmodified
397
        t1 = t + datetime.timedelta(seconds=1)
398
        t1_formats = map(t1.strftime, DATE_FORMATS)
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff