Statistics
| Branch: | Tag: | Revision:

root / pithos / api / tests.py @ b09c9aaa

History | View | Annotate | Download (47.7 kB)

1
from django.test.client import Client
2
from django.test import TestCase
3
from django.utils import simplejson as json
4
from xml.dom import minidom
5
import types
6
import hashlib
7
import os
8
import mimetypes
9
import random
10
import datetime
11

    
12
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
13
                "%A, %d-%b-%y %H:%M:%S GMT",
14
                "%a, %d %b %Y %H:%M:%S GMT"]
15

    
16
class AaiClient(Client):
17
    def request(self, **request):
18
        request['HTTP_X_AUTH_TOKEN'] = '46e427d657b20defe352804f0eb6f8a2'
19
        return super(AaiClient, self).request(**request)
20

    
21
class BaseTestCase(TestCase):
22
    #TODO unauthorized request
23
    def setUp(self):
24
        self.client = AaiClient()
25
        self.headers = {
26
            'account':(
27
                'X-Account-Container-Count',
28
                'X-Account-Bytes-Used',
29
                'Last-Modified',
30
                'Content-Length',
31
                'Date',
32
                'Content-Type',),
33
            'container':(
34
                'X-Container-Object-Count',
35
                'X-Container-Bytes-Used',
36
                'Content-Type',
37
                'Last-Modified',
38
                'Content-Length',
39
                'Date',),
40
            'object':(
41
                'ETag',
42
                'Content-Length',
43
                'Content-Type',
44
                'Content-Encoding',
45
                'Last-Modified',
46
                'Date',
47
                'X-Object-Manifest',
48
                'Content-Range',)}
49
        self.contentTypes = {'xml':'application/xml',
50
                             'json':'application/json',
51
                             '':'text/plain'}
52
        self.extended = {
53
            'container':(
54
                'name',
55
                'count',
56
                'bytes',
57
                'last_modified'),
58
            'object':(
59
                'name',
60
                'hash',
61
                'bytes',
62
                'content_type',
63
                'content_encoding',
64
                'last_modified',)}
65
        self.return_codes = (400, 401, 404, 503,)
66
    
67
    def assertFault(self, response, status_code, name):
68
        self.assertEqual(response.status_code, status_code)
69
    
70
    def assertBadRequest(self, response):
71
        self.assertFault(response, 400, 'badRequest')
72
    
73
    def assertItemNotFound(self, response):
74
        self.assertFault(response, 404, 'itemNotFound')
75

    
76
    def assertUnauthorized(self, response):
77
        self.assertFault(response, 401, 'unauthorized')
78

    
79
    def assertServiceUnavailable(self, response):
80
        self.assertFault(response, 503, 'serviceUnavailable')
81

    
82
    def assertNonEmpty(self, response):
83
        self.assertFault(response, 409, 'nonEmpty')
84

    
85
    def assert_status(self, response, codes):
86
        l = [elem for elem in self.return_codes]
87
        if type(codes) == types.ListType:
88
            l.extend(codes)
89
        else:
90
            l.append(codes)
91
        self.assertTrue(response.status_code in l)
92

    
93
    def get_account_meta(self, account, exp_meta={}):
94
        path = '/v1/%s' % account
95
        response = self.client.head(path)
96
        self.assert_status(response, 204)
97
        self.assert_headers(response, 'account', exp_meta)
98
        return response
99

    
100
    def list_containers(self, account, limit=10000, marker='', format=''):
101
        params = locals()
102
        params.pop('self')
103
        params.pop('account')
104
        path = '/v1/%s' % account
105
        response = self.client.get(path, params)
106
        self.assert_status(response, [200, 204])
107
        response.content = response.content.strip()
108
        if format:
109
            self.assert_extended(response, format, 'container', limit)
110
        else:
111
            names = get_content_splitted(response)
112
            self.assertTrue(len(names) <= limit)
113
        return response
114

    
115
    def update_account_meta(self, account, **metadata):
116
        path = '/v1/%s' % account
117
        response = self.client.post(path, **metadata)
118
        response.content = response.content.strip()
119
        self.assert_status(response, 202)
120
        return response
121

    
122
    def get_container_meta(self, account, container, exp_meta={}):
123
        params = locals()
124
        params.pop('self')
125
        params.pop('account')
126
        params.pop('container')
127
        path = '/v1/%s/%s' %(account, container)
128
        response = self.client.head(path, params)
129
        response.content = response.content.strip()
130
        self.assert_status(response, 204)
131
        if response.status_code == 204:
132
            self.assert_headers(response, 'container', exp_meta)
133
        return response
134

    
135
    def list_objects(self, account, container, limit=10000, marker='', prefix='', format='', path='', delimiter='', meta=''):
136
        params = locals()
137
        params.pop('self')
138
        params.pop('account')
139
        params.pop('container')
140
        path = '/v1/%s/%s' % (account, container)
141
        response = self.client.get(path, params)
142
        response.content = response.content.strip()
143
        if format:
144
            self.assert_extended(response, format, 'object', limit)
145
        self.assert_status(response, [200, 204])
146
        return response
147

    
148
    def create_container(self, account, name, **meta):
149
        path = '/v1/%s/%s' %(account, name)
150
        response = self.client.put(path, **meta)
151
        response.content = response.content.strip()
152
        self.assert_status(response, [201, 202])
153
        return response
154

    
155
    def update_container_meta(self, account, name, **meta):
156
        path = '/v1/%s/%s' %(account, name)
157
        response = self.client.post(path, data={}, content_type='text/xml', follow=False, **meta)
158
        response.content = response.content.strip()
159
        self.assert_status(response, 202)
160
        return response
161

    
162
    def delete_container(self, account, container):
163
        path = '/v1/%s/%s' %(account, container)
164
        response = self.client.delete(path)
165
        response.content = response.content.strip()
166
        self.assert_status(response, [204, 409])
167
        return response
168

    
169
    def get_object_meta(self, account, container, name):
170
        path = '/v1/%s/%s/%s' %(account, container, name)
171
        response = self.client.head(path)
172
        response.content = response.content.strip()
173
        self.assert_status(response, 204)
174
        return response
175

    
176
    def get_object(self, account, container, name, exp_meta={}, **headers):
177
        path = '/v1/%s/%s/%s' %(account, container, name)
178
        response = self.client.get(path, **headers)
179
        response.content = response.content.strip()
180
        self.assert_status(response, [200, 206, 304, 412, 416])
181
        if response.status_code in [200, 206]:
182
            self.assert_headers(response, 'object')
183
        return response
184

    
185
    def upload_object(self, account, container, name, data, content_type='application/json', **headers):
186
        path = '/v1/%s/%s/%s' %(account, container, name)
187
        response = self.client.put(path, data, content_type, **headers)
188
        response.content = response.content.strip()
189
        self.assert_status(response, [201, 411, 422])
190
        if response.status_code == 201:
191
            self.assertTrue(response['Etag'])
192
        return response
193

    
194
    def copy_object(self, account, container, name, src, **headers):
195
        path = '/v1/%s/%s/%s' %(account, container, name)
196
        headers['HTTP_X_COPY_FROM'] = src
197
        response = self.client.put(path, **headers)
198
        response.content = response.content.strip()
199
        self.assert_status(response, 201)
200
        return response
201

    
202
    def move_object(self, account, container, name, src, **headers):
203
        path = '/v1/%s/%s/%s' % (account, container, name)
204
        headers['HTTP_X_MOVE_FROM'] = src
205
        response = self.client.put(path, **headers)
206
        response.content = response.content.strip()
207
        self.assert_status(response, 201)
208
        return response
209

    
210
    def update_object_meta(self, account, container, name, **headers):
211
        path = '/v1/%s/%s/%s' %(account, container, name)
212
        response = self.client.post(path, **headers)
213
        response.content = response.content.strip()
214
        self.assert_status(response, 202)
215
        return response
216

    
217
    def delete_object(self, account, container, name):
218
        path = '/v1/%s/%s/%s' %(account, container, name)
219
        response = self.client.delete(path)
220
        response.content = response.content.strip()
221
        self.assert_status(response, 204)
222
        return response
223

    
224
    def assert_headers(self, response, type, exp_meta={}):
225
        entities = ['Account', 'Container', 'Container-Object', 'Object']
226
        user_defined_meta = ['X-%s-Meta' %elem for elem in entities]
227
        headers = [item for item in response._headers.values()]
228
        system_headers = [h for h in headers if not h[0].startswith(tuple(user_defined_meta))]
229
        for h in system_headers:
230
            self.assertTrue(h[0] in self.headers[type])
231
            if exp_meta:
232
                self.assertEqual(h[1], exp_meta[h[0]])
233

    
234
    def assert_extended(self, response, format, type, size):
235
        self.assertEqual(response['Content-Type'].find(self.contentTypes[format]), 0)
236
        if format == 'xml':
237
            self.assert_xml(response, type, size)
238
        elif format == 'json':
239
            self.assert_json(response, type, size)
240

    
241
    def assert_json(self, response, type, size):
242
        convert = lambda s: s.lower()
243
        info = [convert(elem) for elem in self.extended[type]]
244
        data = json.loads(response.content)
245
        self.assertTrue(len(data) <= size)
246
        for item in info:
247
            for i in data:
248
                if 'subdir' in i.keys():
249
                    continue
250
                self.assertTrue(item in i.keys())
251

    
252
    def assert_xml(self, response, type, size):
253
        convert = lambda s: s.lower()
254
        info = [convert(elem) for elem in self.extended[type]]
255
        try:
256
            info.remove('content_encoding')
257
        except ValueError:
258
            pass
259
        xml = minidom.parseString(response.content)
260
        for item in info:
261
            nodes = xml.getElementsByTagName(item)
262
            self.assertTrue(nodes)
263
            self.assertTrue(len(nodes) <= size)
264
            
265

    
266
    def upload_os_file(self, account, container, fullpath, meta={}):
267
        try:
268
            f = open(fullpath, 'r')
269
            data = f.read()
270
            name = os.path.split(fullpath)[-1]
271
            return self.upload_data(account, container, name, data)    
272
        except IOError:
273
            return
274

    
275
    def upload_random_data(self, account, container, name, length=1024, meta={}):
276
        data = str(random.getrandbits(length))
277
        return self.upload_data(account, container, name, data, meta)
278

    
279
    def upload_data(self, account, container, name, data, meta={}):
280
        obj = {}
281
        obj['name'] = name
282
        try:
283
            obj['data'] = data
284
            obj['hash'] = compute_hash(obj['data'])
285
            meta.update({'HTTP_X_OBJECT_META_TEST':'test1',
286
                         'HTTP_ETAG':obj['hash']})
287
            meta['HTTP_CONTENT_TYPE'], enc = mimetypes.guess_type(name)
288
            if enc:
289
                meta['HTTP_CONTENT_TYPE'] = enc
290
            obj['meta'] = meta
291
            r = self.upload_object(account,
292
                               container,
293
                               obj['name'],
294
                               obj['data'],
295
                               meta['HTTP_CONTENT_TYPE'],
296
                               **meta)
297
            if r.status_code == 201:
298
                return obj
299
        except IOError:
300
            return
301

    
302
class ListContainers(BaseTestCase):
303
    def setUp(self):
304
        BaseTestCase.setUp(self)
305
        self.account = 'test'
306
        #create some containers
307
        self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
308
        for item in self.containers:
309
            self.create_container(self.account, item)
310

    
311
    def tearDown(self):
312
        for c in get_content_splitted(self.list_containers(self.account)):
313
            response = self.delete_container(self.account, c)
314

    
315
    def test_list(self):
316
        #list containers
317
        response = self.list_containers(self.account)
318
        containers = get_content_splitted(response)
319
        self.assertEquals(self.containers, containers)
320

    
321
    #def test_list_204(self):
322
    #    response = self.list_containers('non-existing-account')
323
    #    self.assertEqual(response.status_code, 204)
324

    
325
    def test_list_with_limit(self):
326
        limit = 2
327
        response = self.list_containers(self.account, limit=limit)
328
        containers = get_content_splitted(response)
329
        self.assertEquals(len(containers), limit)
330
        self.assertEquals(self.containers[:2], containers)
331

    
332
    def test_list_with_marker(self):
333
        limit = 2
334
        marker = 'bananas'
335
        response = self.list_containers(self.account, limit=limit, marker=marker)
336
        containers =  get_content_splitted(response)
337
        i = self.containers.index(marker) + 1
338
        self.assertEquals(self.containers[i:(i+limit)], containers)
339
        
340
        marker = 'oranges'
341
        response = self.list_containers(self.account, limit=limit, marker=marker)
342
        containers = get_content_splitted(response)
343
        i = self.containers.index(marker) + 1
344
        self.assertEquals(self.containers[i:(i+limit)], containers)
345

    
346
    #def test_extended_list(self):
347
    #    self.list_containers(self.account, limit=3, format='xml')
348
    #    self.list_containers(self.account, limit=3, format='json')
349

    
350
    def test_list_json_with_marker(self):
351
        limit = 2
352
        marker = 'bananas'
353
        response = self.list_containers(self.account, limit=limit, marker=marker, format='json')
354
        containers = json.loads(response.content)
355
        self.assertEqual(containers[0]['name'], 'kiwis')
356
        self.assertEqual(containers[1]['name'], 'oranges')
357

    
358
    def test_list_xml_with_marker(self):
359
        limit = 2
360
        marker = 'oranges'
361
        response = self.list_containers(self.account, limit=limit, marker=marker, format='xml')
362
        xml = minidom.parseString(response.content)
363
        nodes = xml.getElementsByTagName('name')
364
        self.assertEqual(len(nodes), 1)
365
        self.assertEqual(nodes[0].childNodes[0].data, 'pears')
366

    
367
class AccountMetadata(BaseTestCase):
368
    def setUp(self):
369
        BaseTestCase.setUp(self)
370
        self.account = 'test'
371
        self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
372
        for item in self.containers:
373
            self.create_container(self.account, item)
374

    
375
    def tearDown(self):
376
        for c in  get_content_splitted(self.list_containers(self.account)):
377
            self.delete_container(self.account, c)
378

    
379
    def test_get_account_meta(self):
380
        response = self.get_account_meta(self.account)
381
        r2 = self.list_containers(self.account)
382
        containers =  get_content_splitted(r2)
383
        self.assertEqual(response['X-Account-Container-Count'], str(len(containers)))
384
        size = 0
385
        for c in containers:
386
            r = self.get_container_meta(self.account, c)
387
            size = size + int(r['X-Container-Bytes-Used'])
388
        self.assertEqual(response['X-Account-Bytes-Used'], str(size))
389

    
390
    #def test_get_account_401(self):
391
    #    response = self.get_account_meta('non-existing-account')
392
    #    print response
393
    #    self.assertEqual(response.status_code, 401)
394

    
395
    def test_update_meta(self):
396
        meta = {'HTTP_X_ACCOUNT_META_TEST':'test', 'HTTP_X_ACCOUNT_META_TOST':'tost'}
397
        response = self.update_account_meta(self.account, **meta)
398
        response = self.get_account_meta(self.account)
399
        for k,v in meta.items():
400
            key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
401
            self.assertTrue(response[key])
402
            self.assertEqual(response[key], v)
403

    
404
    #def test_invalid_account_update_meta(self):
405
    #    with AssertInvariant(self.get_account_meta, self.account):
406
    #        meta = {'HTTP_X_ACCOUNT_META_TEST':'test', 'HTTP_X_ACCOUNT_META_TOST':'tost'}
407
    #        response = self.update_account_meta('non-existing-account', **meta)
408

    
409
class ListObjects(BaseTestCase):
410
    def setUp(self):
411
        BaseTestCase.setUp(self)
412
        self.account = 'test'
413
        self.container = ['pears', 'apples']
414
        for c in self.container:
415
            self.create_container(self.account, c)
416
        self.obj = []
417
        for o in o_names[:8]:
418
            self.obj.append(self.upload_random_data(self.account,
419
                                                    self.container[0],
420
                                                    o))
421
        for o in o_names[8:]:
422
            self.obj.append(self.upload_random_data(self.account,
423
                                                    self.container[1],
424
                                                    o))
425

    
426
    def tearDown(self):
427
        for c in self.container:
428
            self.delete_container_recursively(c)
429

    
430
    def delete_container_recursively(self, c):
431
        for obj in get_content_splitted(self.list_objects(self.account, c)):
432
            self.delete_object(self.account, c, obj)
433
        self.delete_container(self.account, c)
434

    
435
    def test_list_objects(self):
436
        response = self.list_objects(self.account, self.container[0])
437
        objects = get_content_splitted(response)
438
        l = [elem['name'] for elem in self.obj[:8]]
439
        l.sort()
440
        self.assertEqual(objects, l)
441

    
442
    def test_list_objects_with_limit_marker(self):
443
        response = self.list_objects(self.account, self.container[0], limit=2)
444
        objects = get_content_splitted(response)
445
        l = [elem['name'] for elem in self.obj[:8]]
446
        l.sort()
447
        self.assertEqual(objects, l[:2])
448
        
449
        markers = ['How To Win Friends And Influence People.pdf',
450
                   'moms_birthday.jpg']
451
        limit = 4
452
        for m in markers:
453
            response = self.list_objects(self.account, self.container[0], limit=limit, marker=m)
454
            objects = get_content_splitted(response)
455
            l = [elem['name'] for elem in self.obj[:8]]
456
            l.sort()
457
            start = l.index(m) + 1
458
            end = start + limit
459
            end = len(l) >= end and end or len(l)
460
            self.assertEqual(objects, l[start:end])
461

    
462
    def test_list_pseudo_hierarchical_folders(self):
463
        response = self.list_objects(self.account, self.container[1], prefix='photos', delimiter='/')
464
        objects = get_content_splitted(response)
465
        self.assertEquals(['photos/animals/', 'photos/me.jpg', 'photos/plants/'], objects)
466
        
467
        response = self.list_objects(self.account, self.container[1], prefix='photos/animals', delimiter='/')
468
        objects = get_content_splitted(response)
469
        self.assertEquals(['photos/animals/cats/', 'photos/animals/dogs/'], objects)
470
        
471
        response = self.list_objects(self.account, self.container[1], path='photos')
472
        objects = get_content_splitted(response)
473
        self.assertEquals(['photos/me.jpg'], objects)
474

    
475
    def test_extended_list_json(self):
476
        response = self.list_objects(self.account, self.container[1], format='json', limit=2, prefix='photos/animals', delimiter='/')
477
        objects = json.loads(response.content)
478
        self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/')
479
        self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/')
480

    
481
    def test_extended_list_xml(self):
482
        response = self.list_objects(self.account, self.container[1], format='xml', limit=4, prefix='photos', delimiter='/')
483
        xml = minidom.parseString(response.content)
484
        dirs = xml.getElementsByTagName('subdir')
485
        self.assertEqual(len(dirs), 2)
486
        self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/')
487
        self.assertEqual(dirs[1].attributes['name'].value, 'photos/plants/')
488
        
489
        objects = xml.getElementsByTagName('name')
490
        self.assertEqual(len(objects), 1)
491
        self.assertEqual(objects[0].childNodes[0].data, 'photos/me.jpg')
492

    
493
    def test_list_using_meta(self):
494
        meta = {'HTTP_X_OBJECT_META_QUALITY':'aaa'}
495
        for o in self.obj[:2]:
496
            r = self.update_object_meta(self.account,
497
                                    self.container[0],
498
                                    o['name'],
499
                                    **meta)
500
        meta = {'HTTP_X_OBJECT_META_STOCK':'true'}
501
        for o in self.obj[3:5]:
502
            r = self.update_object_meta(self.account,
503
                                    self.container[0],
504
                                    o['name'],
505
                                    **meta)
506
            
507
        r = self.list_objects(self.account,
508
                          self.container[0],
509
                          meta='Quality')
510
        self.assertEqual(r.status_code, 200)
511
        obj = get_content_splitted(r)
512
        self.assertEqual(len(obj), 2)
513
        
514
        # test case insensitive
515
        r = self.list_objects(self.account,
516
                          self.container[0],
517
                          meta='quality')
518
        self.assertEqual(r.status_code, 200)
519
        obj = get_content_splitted(r)
520
        self.assertEqual(len(obj), 2)
521
        
522
        # test multiple matches
523
        r = self.list_objects(self.account,
524
                          self.container[0],
525
                          meta='Quality, Stock')
526
        self.assertEqual(r.status_code, 200)
527
        obj = get_content_splitted(r)
528
        self.assertEqual(len(obj), 4)
529
        
530
        # test non 1-1 multiple match
531
        r = self.list_objects(self.account,
532
                          self.container[0],
533
                          meta='Quality, aaaa')
534
        self.assertEqual(r.status_code, 200)
535
        obj = get_content_splitted(r)
536
        self.assertEqual(len(obj), 2)
537
        
538

    
539
class ContainerMeta(BaseTestCase):
540
    def setUp(self):
541
        BaseTestCase.setUp(self)
542
        self.account = 'test'
543
        self.container = 'apples'
544
        self.create_container(self.account, self.container)
545

    
546
    def tearDown(self):
547
        for o in self.list_objects(self.account, self.container):
548
            self.delete_object(self.account, self.container, o)
549
        self.delete_container(self.account, self.container)
550
    
551
    def test_get_meta(self):
552
        headers = {'HTTP_X_OBJECT_META_TRASH':'true'}
553
        t1 = datetime.datetime.utcnow()
554
        o = self.upload_random_data(self.account,
555
                                self.container,
556
                                'McIntosh.jpg',
557
                                meta=headers)
558
        if o:
559
            r = self.get_container_meta(self.account,
560
                                        self.container)
561
            self.assertEqual(r['X-Container-Object-Count'], '1')
562
            self.assertEqual(r['X-Container-Bytes-Used'], str(len(o['data'])))
563
            t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
564
            delta = (t2 - t1)
565
            threashold = datetime.timedelta(seconds=1) 
566
            self.assertTrue(delta < threashold)
567
            self.assertTrue(r['X-Container-Object-Meta'])
568
            self.assertTrue('Trash' in r['X-Container-Object-Meta'])
569

    
570
    def test_update_meta(self):
571
        meta = {'HTTP_X_CONTAINER_META_TEST':'test33', 'HTTP_X_CONTAINER_META_TOST':'tost22'}
572
        response = self.update_container_meta(self.account, self.container, **meta)
573
        response = self.get_container_meta(self.account, self.container)
574
        for k,v in meta.items():
575
            key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
576
            self.assertTrue(response[key])
577
            self.assertEqual(response[key], v)
578

    
579
class CreateContainer(BaseTestCase):
580
    def setUp(self):
581
        BaseTestCase.setUp(self)
582
        self.account = 'test'
583
        self.containers = ['c1', 'c2']
584

    
585
    def tearDown(self):
586
        for c in self.containers:
587
            r = self.delete_container(self.account, c)
588

    
589
    def test_create(self):
590
        response = self.create_container(self.account, self.containers[0])
591
        if response.status_code == 201:
592
            response = self.list_containers(self.account)
593
            self.assertTrue(self.containers[0] in get_content_splitted(response))
594
            r = self.get_container_meta(self.account, self.containers[0])
595
            self.assertEqual(r.status_code, 204)
596

    
597
    def test_create_twice(self):
598
        response = self.create_container(self.account, self.containers[0])
599
        if response.status_code == 201:
600
            self.assertTrue(self.create_container(self.account, self.containers[0]).status_code, 202)
601

    
602
class DeleteContainer(BaseTestCase):
603
    def setUp(self):
604
        BaseTestCase.setUp(self)
605
        self.account = 'test'
606
        self.containers = ['c1', 'c2']
607
        for c in self.containers:
608
            self.create_container(self.account, c)
609
        self.upload_random_data(self.account,
610
                                self.containers[1],
611
                                'nice.jpg')
612

    
613
    def tearDown(self):
614
        for c in self.containers:
615
            for o in get_content_splitted(self.list_objects(self.account, c)):
616
                self.delete_object(self.account, c, o)
617
            self.delete_container(self.account, c)
618

    
619
    def test_delete(self):
620
        r = self.delete_container(self.account, self.containers[0])
621
        self.assertEqual(r.status_code, 204)
622

    
623
    def test_delete_non_empty(self):
624
        r = self.delete_container(self.account, self.containers[1])
625
        self.assertNonEmpty(r)
626

    
627
    def test_delete_invalid(self):
628
        self.assertItemNotFound(self.delete_container(self.account, 'c3'))
629

    
630
class GetObjects(BaseTestCase):
631
    def setUp(self):
632
        BaseTestCase.setUp(self)
633
        self.account = 'test'
634
        self.containers = ['c1', 'c2']
635
        #create some containers
636
        for c in self.containers:
637
            self.create_container(self.account, c)
638
        
639
        #upload a file
640
        self.objects = []
641
        self.objects.append(self.upload_os_file(self.account,
642
                            self.containers[1],
643
                            './api/tests.py'))
644
        self.objects.append(self.upload_os_file(self.account,
645
                            self.containers[1],
646
                            'settings.py'))
647

    
648
    def tearDown(self):
649
        for c in self.containers:
650
            for o in get_content_splitted(self.list_objects(self.account, c)):
651
                self.delete_object(self.account, c, o)
652
            self.delete_container(self.account, c)
653

    
654
    def test_get(self):
655
        #perform get
656
        r = self.get_object(self.account,
657
                            self.containers[1],
658
                            self.objects[0]['name'],
659
                            self.objects[0]['meta'])
660
        #assert success
661
        self.assertEqual(r.status_code, 200)
662

    
663
    def test_get_invalid(self):
664
        r = self.get_object(self.account,
665
                            self.containers[0],
666
                            self.objects[0]['name'])
667
        self.assertItemNotFound(r)
668

    
669
    def test_get_partial(self):
670
        #perform get with range
671
        headers = {'HTTP_RANGE':'bytes=0-499'}
672
        r = self.get_object(self.account,
673
                        self.containers[1],
674
                        self.objects[0]['name'],
675
                        **headers)
676
        
677
        #assert successful partial content
678
        self.assertEqual(r.status_code, 206)
679
        
680
        #assert content length
681
        self.assertEqual(int(r['Content-Length']), 500)
682
        
683
        #assert content
684
        self.assertEqual(self.objects[0]['data'][:500], r.content)
685

    
686
    def test_get_final_500(self):
687
        #perform get with range
688
        headers = {'HTTP_RANGE':'bytes=-500'}
689
        r = self.get_object(self.account,
690
                        self.containers[1],
691
                        self.objects[0]['name'],
692
                        **headers)
693
        
694
        #assert successful partial content
695
        self.assertEqual(r.status_code, 206)
696
        
697
        #assert content length
698
        self.assertEqual(int(r['Content-Length']), 500)
699
        
700
        #assert content
701
        self.assertTrue(self.objects[0]['data'][-500:], r.content)
702

    
703
    def test_get_rest(self):
704
        #perform get with range
705
        offset = len(self.objects[0]['data']) - 500
706
        headers = {'HTTP_RANGE':'bytes=%s-' %offset}
707
        r = self.get_object(self.account,
708
                        self.containers[1],
709
                        self.objects[0]['name'],
710
                        **headers)
711
        
712
        #assert successful partial content
713
        self.assertEqual(r.status_code, 206)
714
        
715
        #assert content length
716
        self.assertEqual(int(r['Content-Length']), 500)
717
        
718
        #assert content
719
        self.assertTrue(self.objects[0]['data'][-500:], r.content)
720
        
721
    def test_get_range_not_satisfiable(self):
722
        #perform get with range
723
        offset = len(self.objects[0]['data']) + 1
724
        headers = {'HTTP_RANGE':'bytes=0-%s' %offset}
725
        r = self.get_object(self.account,
726
                        self.containers[1],
727
                        self.objects[0]['name'],
728
                        **headers)
729
        
730
        #assert Range Not Satisfiable
731
        self.assertEqual(r.status_code, 416)
732

    
733
    def test_multiple_range(self):
734
        #perform get with multiple range
735
        ranges = ['0-499', '-500', '1000-']
736
        headers = {'HTTP_RANGE' : 'bytes=%s' % ','.join(ranges)}
737
        r = self.get_object(self.account,
738
                        self.containers[1],
739
                        self.objects[0]['name'],
740
                        **headers)
741
        
742
        # assert partial content
743
        self.assertEqual(r.status_code, 206)
744
        
745
        # assert Content-Type of the reply will be multipart/byteranges
746
        self.assertTrue(r['Content-Type'])
747
        content_type_parts = r['Content-Type'].split()
748
        self.assertEqual(content_type_parts[0], ('multipart/byteranges;'))
749
        
750
        boundary = '--%s' %content_type_parts[1].split('=')[-1:][0]
751
        cparts = r.content.split(boundary)[1:-1]
752
        
753
        # assert content parts are exactly 2
754
        self.assertEqual(len(cparts), len(ranges))
755
        
756
        # for each content part assert headers
757
        i = 0
758
        for cpart in cparts:
759
            content = cpart.split('\r\n')
760
            headers = content[1:3]
761
            content_range = headers[0].split(': ')
762
            self.assertEqual(content_range[0], 'Content-Range')
763
            
764
            r = ranges[i].split('-')
765
            if not r[0] and not r[1]:
766
                pass
767
            elif not r[0]:
768
                start = len(self.objects[0]['data']) - int(r[1])
769
                end = len(self.objects[0]['data'])
770
            elif not r[1]:
771
                start = int(r[0])
772
                end = len(self.objects[0]['data'])
773
            else:
774
                start = int(r[0])
775
                end = int(r[1]) + 1
776
            fdata = self.objects[0]['data'][start:end]
777
            sdata = '\r\n'.join(content[4:-1])
778
            self.assertEqual(len(fdata), len(sdata))
779
            self.assertEquals(fdata, sdata)
780
            i+=1
781

    
782
    def test_multiple_range_not_satisfiable(self):
783
        #perform get with multiple range
784
        out_of_range = len(self.objects[0]['data']) + 1
785
        ranges = ['0-499', '-500', '%d-' %out_of_range]
786
        headers = {'HTTP_RANGE' : 'bytes=%s' % ','.join(ranges)}
787
        r = self.get_object(self.account,
788
                        self.containers[1],
789
                        self.objects[0]['name'],
790
                        **headers)
791
        
792
        # assert partial content
793
        self.assertEqual(r.status_code, 416)
794

    
795
    def test_get_with_if_match(self):
796
        #perform get with If-Match
797
        headers = {'HTTP_IF_MATCH':self.objects[0]['hash']}
798
        r = self.get_object(self.account,
799
                        self.containers[1],
800
                        self.objects[0]['name'],
801
                        **headers)
802
        #assert get success
803
        self.assertEqual(r.status_code, 200)
804
        
805
        #assert response content
806
        self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
807

    
808
    def test_get_with_if_match_star(self):
809
        #perform get with If-Match *
810
        headers = {'HTTP_IF_MATCH':'*'}
811
        r = self.get_object(self.account,
812
                        self.containers[1],
813
                        self.objects[0]['name'],
814
                        **headers)
815
        #assert get success
816
        self.assertEqual(r.status_code, 200)
817
        
818
        #assert response content
819
        self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
820

    
821
    def test_get_with_multiple_if_match(self):
822
        #perform get with If-Match
823
        etags = [i['hash'] for i in self.objects if i]
824
        etags = ','.join('"%s"' % etag for etag in etags)
825
        headers = {'HTTP_IF_MATCH':etags}
826
        r = self.get_object(self.account,
827
                        self.containers[1],
828
                        self.objects[0]['name'],
829
                        **headers)
830
        #assert get success
831
        self.assertEqual(r.status_code, 200)
832
        
833
        #assert response content
834
        self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
835

    
836
    def test_if_match_precondition_failed(self):
837
        #perform get with If-Match
838
        headers = {'HTTP_IF_MATCH':'123'}
839
        r = self.get_object(self.account,
840
                        self.containers[1],
841
                        self.objects[0]['name'],
842
                        **headers)
843
        
844
        #assert precondition failed 
845
        self.assertEqual(r.status_code, 412)
846

    
847
    def test_if_none_match(self):
848
        #perform get with If-Match
849
        headers = {'HTTP_IF_NONE_MATCH':'123'}
850
        r = self.get_object(self.account,
851
                        self.containers[1],
852
                        self.objects[0]['name'],
853
                        **headers)
854
        
855
        #assert get success
856
        self.assertEqual(r.status_code, 200)
857

    
858
    def test_if_none_match(self):
859
        #perform get with If-Match *
860
        headers = {'HTTP_IF_NONE_MATCH':'*'}
861
        r = self.get_object(self.account,
862
                        self.containers[1],
863
                        self.objects[0]['name'],
864
                        **headers)
865
        
866
        #assert get success
867
        self.assertEqual(r.status_code, 304)
868
        
869
    def test_if_none_match_not_modified(self):
870
        #perform get with If-Match
871
        headers = {'HTTP_IF_NONE_MATCH':'%s' %self.objects[0]['hash']}
872
        r = self.get_object(self.account,
873
                        self.containers[1],
874
                        self.objects[0]['name'],
875
                        **headers)
876
        
877
        #assert not modified
878
        self.assertEqual(r.status_code, 304)
879
        self.assertEqual(r['ETag'], self.objects[0]['hash'])
880
        
881
    def test_get_with_if_modified_since(self):
882
        t = datetime.datetime.utcnow()
883
        t2 = t - datetime.timedelta(minutes=10)
884
        
885
        #modify the object
886
        self.upload_object(self.account,
887
                           self.containers[1],
888
                           self.objects[0]['name'],
889
                           self.objects[0]['data'][:200])
890
        
891
        for f in DATE_FORMATS:
892
            past = t2.strftime(f)
893
            
894
            headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past}
895
            r = self.get_object(self.account,
896
                        self.containers[1],
897
                        self.objects[0]['name'],
898
                        **headers)
899
            
900
            #assert get success
901
            self.assertEqual(r.status_code, 200)
902
            
903
    def test_get_modified_since_invalid_date(self):
904
        headers = {'HTTP_IF_MODIFIED_SINCE':''}
905
        r = self.get_object(self.account,
906
                    self.containers[1],
907
                    self.objects[0]['name'],
908
                    **headers)
909
        
910
        #assert get success
911
        self.assertEqual(r.status_code, 200)
912

    
913
    def test_get_not_modified_since(self):
914
        now = datetime.datetime.utcnow()
915
        since = now + datetime.timedelta(1)
916
        
917
        for f in DATE_FORMATS:
918
            headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)}
919
            r = self.get_object(self.account,
920
                                self.containers[1],
921
                                self.objects[0]['name'],
922
                                **headers)
923
            
924
            #assert not modified
925
            self.assertEqual(r.status_code, 304)
926

    
927
    def test_get_with_if_unmodified_since(self):
928
        return
929

    
930
class UploadObject(BaseTestCase):
931
    def setUp(self):
932
        BaseTestCase.setUp(self)
933
        self.account = 'test'
934
        self.container = 'c1'
935
        self.create_container(self.account, self.container)
936
        
937
        self.src = os.path.join('.', 'api', 'tests.py')
938
        self.dest = os.path.join('.', 'api', 'chunked_update_test_file')
939
        create_chunked_update_test_file(self.src, self.dest)
940

    
941
    def tearDown(self):
942
        for o in get_content_splitted(self.list_objects(self.account, self.container)):
943
            self.delete_object(self.account, self.container, o)
944
        self.delete_container(self.account, self.container)
945
        
946
        # delete test file
947
        os.remove(self.dest)
948
        
949
    def test_upload(self):
950
        filename = 'tests.py'
951
        fullpath = os.path.join('.', 'api', filename) 
952
        f = open(fullpath, 'r')
953
        data = f.read()
954
        hash = compute_hash(data)
955
        meta={'HTTP_ETAG':hash,
956
              'HTTP_X_OBJECT_MANIFEST':123,
957
              'HTTP_X_OBJECT_META_TEST':'test1'
958
              }
959
        meta['HTTP_CONTENT_TYPE'], meta['HTTP_CONTENT_ENCODING'] = mimetypes.guess_type(fullpath)
960
        r = self.upload_object(self.account,
961
                               self.container,
962
                               filename,
963
                               data,
964
                               content_type=meta['HTTP_CONTENT_TYPE'],
965
                               **meta)
966
        self.assertEqual(r.status_code, 201)
967
        r = self.get_object_meta(self.account, self.container, filename)
968
        self.assertTrue(r['X-Object-Meta-Test'])
969
        self.assertEqual(r['X-Object-Meta-Test'], meta['HTTP_X_OBJECT_META_TEST'])
970
        
971
        #assert uploaded content
972
        r = self.get_object(self.account, self.container, filename)
973
        self.assertEqual(os.path.getsize(fullpath), int(r['Content-Length']))
974
        self.assertEqual(data.strip(), r.content.strip())
975

    
976
    def test_upload_unprocessable_entity(self):
977
        filename = 'tests.py'
978
        fullpath = os.path.join('.', 'api', filename) 
979
        f = open(fullpath, 'r')
980
        data = f.read()
981
        meta={'HTTP_ETAG':'123',
982
              'HTTP_X_OBJECT_MANIFEST':123,
983
              'HTTP_X_OBJECT_META_TEST':'test1'
984
              }
985
        meta['HTTP_CONTENT_TYPE'], meta['HTTP_CONTENT_ENCODING'] = mimetypes.guess_type(fullpath)
986
        r = self.upload_object(self.account,
987
                               self.container,
988
                               filename,
989
                               data,
990
                               content_type = meta['HTTP_CONTENT_TYPE'],
991
                               **meta)
992
        self.assertEqual(r.status_code, 422)
993
        
994
    def test_chucked_update(self):
995
        objname = os.path.split(self.src)[-1:][0]
996
        f = open(self.dest, 'r')
997
        data = f.read()
998
        meta = {}
999
        meta['HTTP_CONTENT_TYPE'], meta['HTTP_CONTENT_ENCODING'] = mimetypes.guess_type(objname)
1000
        meta.update({'HTTP_TRANSFER_ENCODING':'chunked'})
1001
        r = self.upload_object(self.account,
1002
                               self.container,
1003
                               objname,
1004
                               data,
1005
                               content_type = 'plain/text',
1006
                               **meta)
1007
        self.assertEqual(r.status_code, 201)
1008
        
1009
        r = self.get_object(self.account,
1010
                            self.container,
1011
                            objname)
1012
        uploaded_data = r.content
1013
        f = open(self.src, 'r')
1014
        actual_data = f.read()
1015
        self.assertEqual(actual_data, uploaded_data)
1016

    
1017
class CopyObject(BaseTestCase):
1018
    def setUp(self):
1019
        BaseTestCase.setUp(self)
1020
        self.account = 'test'
1021
        self.containers = ['c1', 'c2']
1022
        for c in self.containers:
1023
            self.create_container(self.account, c)
1024
        self.obj = self.upload_os_file(self.account,
1025
                            self.containers[0],
1026
                            './api/tests.py')
1027

    
1028
    def tearDown(self):
1029
        for c in self.containers:
1030
            for o in get_content_splitted(self.list_objects(self.account, c)):
1031
                self.delete_object(self.account, c, o)
1032
            self.delete_container(self.account, c)
1033

    
1034
    def test_copy(self):
1035
        with AssertInvariant(self.get_object_meta, self.account, self.containers[0], self.obj['name']):
1036
            #perform copy
1037
            meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1038
            src_path = os.path.join('/', self.containers[0], self.obj['name'])
1039
            r = self.copy_object(self.account,
1040
                             self.containers[0],
1041
                             'testcopy',
1042
                             src_path,
1043
                             **meta)
1044
            #assert copy success
1045
            self.assertEqual(r.status_code, 201)
1046
            
1047
            #assert access the new object
1048
            r = self.get_object_meta(self.account,
1049
                                     self.containers[0],
1050
                                     'testcopy')
1051
            self.assertTrue(r['X-Object-Meta-Test'])
1052
            self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
1053
            
1054
            #assert etag is the same
1055
            self.assertEqual(r['ETag'], self.obj['hash'])
1056
            
1057
            #assert src object still exists
1058
            r = self.get_object_meta(self.account, self.containers[0], self.obj['name'])
1059
            self.assertEqual(r.status_code, 204)
1060

    
1061
    def test_copy_from_different_container(self):
1062
        with AssertInvariant(self.get_object_meta,
1063
                             self.account,
1064
                             self.containers[0],
1065
                             self.obj['name']):
1066
            meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1067
            src_path = os.path.join('/', self.containers[0], self.obj['name'])
1068
            r = self.copy_object(self.account,
1069
                             self.containers[1],
1070
                             'testcopy',
1071
                             src_path,
1072
                             **meta)
1073
            self.assertEqual(r.status_code, 201)
1074
            
1075
            # assert updated metadata
1076
            r = self.get_object_meta(self.account,
1077
                                     self.containers[1],
1078
                                     'testcopy')
1079
            self.assertTrue(r['X-Object-Meta-Test'])
1080
            self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
1081
            
1082
            #assert src object still exists
1083
            r = self.get_object_meta(self.account, self.containers[0], self.obj['name'])
1084
            self.assertEqual(r.status_code, 204)
1085

    
1086
    def test_copy_invalid(self):
1087
        #copy from invalid object
1088
        meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1089
        r = self.copy_object(self.account,
1090
                         self.containers[1],
1091
                         'testcopy',
1092
                         os.path.join('/', self.containers[0], 'test.py'),
1093
                         **meta)
1094
        self.assertItemNotFound(r)
1095
        
1096
        
1097
        #copy from invalid container
1098
        meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1099
        src_path = os.path.join('/', self.containers[1], self.obj['name'])
1100
        r = self.copy_object(self.account,
1101
                         self.containers[1],
1102
                         'testcopy',
1103
                         src_path,
1104
                         **meta)
1105
        self.assertItemNotFound(r)
1106

    
1107
class MoveObject(CopyObject):
1108
    def test_move(self):
1109
        #perform move
1110
        meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1111
        src_path = os.path.join('/', self.containers[0], self.obj['name'])
1112
        r = self.move_object(self.account,
1113
                         self.containers[0],
1114
                         'testcopy',
1115
                         src_path,
1116
                         **meta)
1117
        #assert successful move
1118
        self.assertEqual(r.status_code, 201)
1119
        
1120
        #assert updated metadata
1121
        r = self.get_object_meta(self.account,
1122
                                 self.containers[0],
1123
                                 'testcopy')
1124
        self.assertTrue(r['X-Object-Meta-Test'])
1125
        self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
1126
        
1127
        #assert src object no more exists
1128
        r = self.get_object_meta(self.account, self.containers[0], self.obj['name'])
1129
        self.assertItemNotFound(r)
1130

    
1131
class UpdateObjectMeta(BaseTestCase):
1132
    def setUp(self):
1133
        BaseTestCase.setUp(self)
1134
        self.account = 'test'
1135
        self.containers = ['c1', 'c2']
1136
        for c in self.containers:
1137
            self.create_container(self.account, c)
1138
        self.obj = self.upload_os_file(self.account,
1139
                                       self.containers[0],
1140
                                       './api/tests.py')
1141

    
1142
    def tearDown(self):
1143
        for c in self.containers:
1144
            for o in get_content_splitted(self.list_objects(self.account, c)):
1145
                self.delete_object(self.account, c, o)
1146
            self.delete_container(self.account, c)
1147

    
1148
    def test_update(self):
1149
        #perform update metadata
1150
        more = {'HTTP_X_OBJECT_META_FOO':'foo',
1151
                'HTTP_X_OBJECT_META_BAR':'bar'}
1152
        r = self.update_object_meta(self.account,
1153
                                self.containers[0],
1154
                                self.obj['name'],
1155
                                **more)
1156
        #assert request accepted
1157
        self.assertEqual(r.status_code, 202)
1158
        
1159
        #assert old metadata are still there
1160
        r = self.get_object_meta(self.account, self.containers[0], self.obj['name'])
1161
        self.assertTrue('X-Object-Meta-Test' not in r.items())
1162
        
1163
        #assert new metadata have been updated
1164
        for k,v in more.items():
1165
            key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
1166
            self.assertTrue(r[key])
1167
            self.assertTrue(r[key], v)
1168

    
1169
class DeleteObject(BaseTestCase):
1170
    def setUp(self):
1171
        BaseTestCase.setUp(self)
1172
        self.account = 'test'
1173
        self.containers = ['c1', 'c2']
1174
        for c in self.containers:
1175
            self.create_container(self.account, c)
1176
        self.obj = self.upload_os_file(self.account,
1177
                            self.containers[0],
1178
                            './api/tests.py')
1179

    
1180
    def tearDown(self):
1181
        for c in self.containers:
1182
            for o in get_content_splitted(self.list_objects(self.account, c)):
1183
                self.delete_object(self.account, c, o)
1184
            self.delete_container(self.account, c)
1185

    
1186
    def test_delete(self):
1187
        #perform delete object
1188
        r = self.delete_object(self.account, self.containers[0], self.obj['name'])
1189
        
1190
        #assert success
1191
        self.assertEqual(r.status_code, 204)
1192
        
1193
    def test_delete_invalid(self):
1194
        #perform delete object
1195
        r = self.delete_object(self.account, self.containers[1], self.obj['name'])
1196
        
1197
        #assert failure
1198
        self.assertItemNotFound(r)
1199

    
1200
class AssertInvariant(object):
1201
    def __init__(self, callable, *args, **kwargs):
1202
        self.callable = callable
1203
        self.args = args
1204
        self.kwargs = kwargs
1205

    
1206
    def __enter__(self):
1207
        self.items = self.callable(*self.args, **self.kwargs).items()
1208
        return self.items
1209

    
1210
    def __exit__(self, type, value, tb):
1211
        items = self.callable(*self.args, **self.kwargs).items()
1212
        assert self.items == items
1213

    
1214
def get_content_splitted(response):
1215
    if response:
1216
        return response.content.split('\n')
1217

    
1218
def compute_hash(data):
1219
    md5 = hashlib.md5()
1220
    offset = 0
1221
    md5.update(data)
1222
    return md5.hexdigest().lower()
1223

    
1224
def create_chunked_update_test_file(src, dest):
1225
    fr = open(src, 'r')
1226
    fw = open(dest, 'w')
1227
    data = fr.readline()
1228
    while data:
1229
        fw.write(hex(len(data)))
1230
        fw.write('\r\n')
1231
        fw.write(data)
1232
        data = fr.readline()
1233
    fw.write(hex(0))
1234
    fw.write('\r\n')
1235

    
1236
o_names = ['kate.jpg',
1237
           'kate_beckinsale.jpg',
1238
           'How To Win Friends And Influence People.pdf',
1239
           'moms_birthday.jpg',
1240
           'poodle_strut.mov',
1241
           'Disturbed - Down With The Sickness.mp3',
1242
           'army_of_darkness.avi',
1243
           'the_mad.avi',
1244
           'photos/animals/dogs/poodle.jpg',
1245
           'photos/animals/dogs/terrier.jpg',
1246
           'photos/animals/cats/persian.jpg',
1247
           'photos/animals/cats/siamese.jpg',
1248
           'photos/plants/fern.jpg',
1249
           'photos/plants/rose.jpg',
1250
           'photos/me.jpg']