Fix tests (object HEAD returns 200).
[pithos] / pithos / api / tests.py
1 # Copyright 2011 GRNET S.A. All rights reserved.
2
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from django.test.client import Client
35 from django.test import TestCase
36 from django.utils import simplejson as json
37 from xml.dom import minidom
38 import types
39 import hashlib
40 import os
41 import mimetypes
42 import random
43 import datetime
44 import string
45 from pithos.backends import backend
46
47 DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
48                 "%A, %d-%b-%y %H:%M:%S GMT",
49                 "%a, %d %b %Y %H:%M:%S GMT"]
50
51 class AaiClient(Client):
52     def request(self, **request):
53         request['HTTP_X_AUTH_TOKEN'] = '46e427d657b20defe352804f0eb6f8a2'
54         return super(AaiClient, self).request(**request)
55
56 class BaseTestCase(TestCase):
57     #TODO unauthorized request
58     def setUp(self):
59         self.client = AaiClient()
60         self.headers = {
61             'account':(
62                 'X-Account-Container-Count',
63                 'X-Account-Bytes-Used',
64                 'Last-Modified',
65                 'Content-Length',
66                 'Date',
67                 'Content-Type',),
68             'container':(
69                 'X-Container-Object-Count',
70                 'X-Container-Bytes-Used',
71                 'Content-Type',
72                 'Last-Modified',
73                 'Content-Length',
74                 'Date',
75                 'X-Container-Block-Size',
76                 'X-Container-Block-Hash',),
77             'object':(
78                 'ETag',
79                 'Content-Length',
80                 'Content-Type',
81                 'Content-Encoding',
82                 'Last-Modified',
83                 'Date',
84                 'X-Object-Manifest',
85                 'Content-Range',
86                 'X-Object-Version',
87                 'X-Object-Version-Timestamp',)}
88         self.contentTypes = {'xml':'application/xml',
89                              'json':'application/json',
90                              '':'text/plain'}
91         self.extended = {
92             'container':(
93                 'name',
94                 'count',
95                 'bytes',
96                 'last_modified'),
97             'object':(
98                 'name',
99                 'hash',
100                 'bytes',
101                 'content_type',
102                 'content_encoding',
103                 'last_modified',)}
104         self.return_codes = (400, 401, 404, 503,)
105
106     def assertFault(self, response, status_code, name):
107         self.assertEqual(response.status_code, status_code)
108
109     def assertBadRequest(self, response):
110         self.assertFault(response, 400, 'badRequest')
111
112     def assertItemNotFound(self, response):
113         self.assertFault(response, 404, 'itemNotFound')
114
115     def assertUnauthorized(self, response):
116         self.assertFault(response, 401, 'unauthorized')
117
118     def assertServiceUnavailable(self, response):
119         self.assertFault(response, 503, 'serviceUnavailable')
120
121     def assertNonEmpty(self, response):
122         self.assertFault(response, 409, 'nonEmpty')
123
124     def assert_status(self, response, codes):
125         l = [elem for elem in self.return_codes]
126         if type(codes) == types.ListType:
127             l.extend(codes)
128         else:
129             l.append(codes)
130         self.assertTrue(response.status_code in l)
131
132     def get_account_meta(self, account, exp_meta={}):
133         path = '/v1/%s' % account
134         response = self.client.head(path)
135         self.assert_status(response, 204)
136         self.assert_headers(response, 'account', exp_meta)
137         return response
138
139     def list_containers(self, account, limit=10000, marker='', format='',
140                         **headers):
141         params = locals()
142         params.pop('self')
143         params.pop('account')
144         path = '/v1/%s' % account
145         response = self.client.get(path, params, **headers)
146         self.assert_status(response, [200, 204, 304, 412])
147         response.content = response.content.strip()
148         if format:
149             self.assert_extended(response, format, 'container', limit)
150         else:
151             names = get_content_splitted(response)
152             self.assertTrue(len(names) <= limit)
153         return response
154
155     def update_account_meta(self, account, **metadata):
156         path = '/v1/%s' % account
157         response = self.client.post(path, **metadata)
158         response.content = response.content
159         self.assert_status(response, 202)
160         return response
161
162     def get_container_meta(self, account, container, exp_meta={}):
163         params = locals()
164         params.pop('self')
165         params.pop('account')
166         params.pop('container')
167         path = '/v1/%s/%s' %(account, container)
168         response = self.client.head(path, params)
169         response.content = response.content
170         self.assert_status(response, 204)
171         if response.status_code == 204:
172             self.assert_headers(response, 'container', exp_meta)
173         return response
174
175     def list_objects(self, account, container, limit=10000, marker='',
176                      prefix='', format='', path='', delimiter='', meta='',
177                      **headers):
178         params = locals()
179         params.pop('self')
180         params.pop('account')
181         params.pop('container')
182         path = '/v1/%s/%s' % (account, container)
183         response = self.client.get(path, params, **headers)
184         response.content = response.content.strip()
185         if format:
186             self.assert_extended(response, format, 'object', limit)
187         self.assert_status(response, [200, 204, 304, 412])
188         return response
189
190     def create_container(self, account, name, **meta):
191         path = '/v1/%s/%s' %(account, name)
192         response = self.client.put(path, **meta)
193         response.content = response.content
194         self.assert_status(response, [201, 202])
195         return response
196
197     def update_container_meta(self, account, name, **meta):
198         path = '/v1/%s/%s' %(account, name)
199         response = self.client.post(path,
200                                     data=None,
201                                     content_type='text/xml',
202                                     follow=False, **meta)
203         response.content = response.content
204         self.assert_status(response, 202)
205         return response
206
207     def delete_container(self, account, container):
208         path = '/v1/%s/%s' %(account, container)
209         response = self.client.delete(path)
210         response.content = response.content
211         self.assert_status(response, [204, 409])
212         return response
213
214     def get_object_meta(self, account, container, name):
215         path = '/v1/%s/%s/%s' %(account, container, name)
216         response = self.client.head(path)
217         response.content = response.content
218         self.assert_status(response, 200)
219         return response
220
221     def get_object(self, account, container, name, format='', **headers):
222         path = '/v1/%s/%s/%s' %(account, container, name)
223         response = self.client.get(path, {'format':format}, **headers)
224         response.content = response.content
225         self.assert_status(response, [200, 206, 304, 412, 416])
226         if response.status_code in [200, 206]:
227             self.assert_headers(response, 'object')
228         return response
229
230     def upload_object(self, account, container, name, data, content_type='',
231                       **headers):
232         path = '/v1/%s/%s/%s' %(account, container, name)
233         response = self.client.put(path, data, content_type, **headers)
234         response.content = response.content
235         self.assert_status(response, [201, 411, 422])
236         if response.status_code == 201:
237             self.assertTrue(response['Etag'])
238         return response
239
240     def copy_object(self, account, container, name, src, **headers):
241         path = '/v1/%s/%s/%s' %(account, container, name)
242         headers['HTTP_X_COPY_FROM'] = src
243         response = self.client.put(path, **headers)
244         response.content = response.content
245         self.assert_status(response, 201)
246         return response
247
248     def move_object(self, account, container, name, src, **headers):
249         path = '/v1/%s/%s/%s' % (account, container, name)
250         headers['HTTP_X_MOVE_FROM'] = src
251         response = self.client.put(path, **headers)
252         response.content = response.content
253         self.assert_status(response, 201)
254         return response
255
256     def update_object(self, account, container, name, data={},
257                       content_type='MULTIPART_CONTENT', **headers):
258         path = '/v1/%s/%s/%s' %(account, container, name)
259         response = self.client.post(path, data, content_type, **headers)
260         response.content = response.content
261         self.assert_status(response, [202, 204, 416])
262         return response
263
264     def delete_object(self, account, container, name):
265         path = '/v1/%s/%s/%s' %(account, container, name)
266         response = self.client.delete(path)
267         response.content = response.content
268         self.assert_status(response, 204)
269         return response
270
271     def assert_headers(self, response, type, exp_meta={}):
272         entities = ['Account', 'Container', 'Container-Object', 'Object']
273         user_defined_meta = ['X-%s-Meta' %elem for elem in entities]
274         headers = [item for item in response._headers.values()]
275         t = tuple(user_defined_meta)
276         system_headers = [h for h in headers if not h[0].startswith(t)]
277         for h in system_headers:
278             self.assertTrue(h[0] in self.headers[type])
279             if exp_meta:
280                 self.assertEqual(h[1], exp_meta[h[0]])
281
282     def assert_extended(self, response, format, type, size):
283         exp_content_type = self.contentTypes[format]
284         self.assertEqual(response['Content-Type'].find(exp_content_type), 0)
285         if format == 'xml':
286             self.assert_xml(response, type, size)
287         elif format == 'json':
288             self.assert_json(response, type, size)
289
290     def assert_json(self, response, type, size):
291         convert = lambda s: s.lower()
292         info = [convert(elem) for elem in self.extended[type]]
293         data = json.loads(response.content)
294         self.assertTrue(len(data) <= size)
295         for item in info:
296             for i in data:
297                 if 'subdir' in i.keys():
298                     continue
299                 self.assertTrue(item in i.keys())
300
301     def assert_xml(self, response, type, size):
302         convert = lambda s: s.lower()
303         info = [convert(elem) for elem in self.extended[type]]
304         try:
305             info.remove('content_encoding')
306         except ValueError:
307             pass
308         xml = minidom.parseString(response.content)
309         for item in info:
310             nodes = xml.getElementsByTagName(item)
311             self.assertTrue(nodes)
312             self.assertTrue(len(nodes) <= size)
313             
314
315     def upload_os_file(self, account, container, fullpath, meta={}):
316         try:
317             f = open(fullpath, 'r')
318             data = f.read()
319             name = os.path.split(fullpath)[-1]
320             return self.upload_data(account, container, name, data)    
321         except IOError:
322             return
323
324     def upload_random_data(self, account, container, name, length=1024,
325                            meta={}):
326         data = get_random_data(length)
327         return self.upload_data(account, container, name, data, meta)
328
329     def upload_data(self, account, container, name, data, meta={}):
330         obj = {}
331         obj['name'] = name
332         try:
333             obj['data'] = data
334             obj['hash'] = compute_md5_hash(obj['data'])
335             meta.update({'HTTP_X_OBJECT_META_TEST':'test1',
336                          'HTTP_ETAG':obj['hash']})
337             type, enc = mimetypes.guess_type(name)
338             meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
339             if enc:
340                 meta['HTTP_CONTENT_ENCODING'] = enc
341             
342             obj['meta'] = meta
343             r = self.upload_object(account,
344                                container,
345                                obj['name'],
346                                obj['data'],
347                                meta['HTTP_CONTENT_TYPE'],
348                                **meta)
349             if r.status_code == 201:
350                 return obj
351         except IOError:
352             return
353
354 class AccountHead(BaseTestCase):
355     def setUp(self):
356         BaseTestCase.setUp(self)
357         self.account = 'test'
358         self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
359         for item in self.containers:
360             self.create_container(self.account, item)
361
362     def tearDown(self):
363         for c in  get_content_splitted(self.list_containers(self.account)):
364             self.delete_container(self.account, c)
365
366     def test_get_account_meta(self):
367         response = self.get_account_meta(self.account)
368         r2 = self.list_containers(self.account)
369         containers =  get_content_splitted(r2)
370         l = str(len(containers))
371         self.assertEqual(response['X-Account-Container-Count'], l)
372         size = 0
373         for c in containers:
374             r = self.get_container_meta(self.account, c)
375             size = size + int(r['X-Container-Bytes-Used'])
376         self.assertEqual(response['X-Account-Bytes-Used'], str(size))
377
378     #def test_get_account_401(self):
379     #    response = self.get_account_meta('non-existing-account')
380     #    print response
381     #    self.assertEqual(response.status_code, 401)
382
383 class AccountGet(BaseTestCase):
384     def setUp(self):
385         BaseTestCase.setUp(self)
386         self.account = 'test'
387         #create some containers
388         self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
389         for item in self.containers:
390             self.create_container(self.account, item)
391
392     def tearDown(self):
393         for c in get_content_splitted(self.list_containers(self.account)):
394             response = self.delete_container(self.account, c)
395
396     def test_list(self):
397         #list containers
398         response = self.list_containers(self.account)
399         containers = get_content_splitted(response)
400         self.assertEquals(self.containers, containers)
401
402     #def test_list_204(self):
403     #    response = self.list_containers('non-existing-account')
404     #    self.assertEqual(response.status_code, 204)
405
406     def test_list_with_limit(self):
407         limit = 2
408         response = self.list_containers(self.account, limit=limit)
409         containers = get_content_splitted(response)
410         self.assertEquals(len(containers), limit)
411         self.assertEquals(self.containers[:2], containers)
412
413     def test_list_with_marker(self):
414         l = 2
415         m = 'bananas'
416         response = self.list_containers(self.account, limit=l, marker=m)
417         containers =  get_content_splitted(response)
418         i = self.containers.index(m) + 1
419         self.assertEquals(self.containers[i:(i+l)], containers)
420         
421         m = 'oranges'
422         response = self.list_containers(self.account, limit=l, marker=m)
423         containers = get_content_splitted(response)
424         i = self.containers.index(m) + 1
425         self.assertEquals(self.containers[i:(i+l)], containers)
426
427     #def test_extended_list(self):
428     #    self.list_containers(self.account, limit=3, format='xml')
429     #    self.list_containers(self.account, limit=3, format='json')
430
431     def test_list_json_with_marker(self):
432         l = 2
433         m = 'bananas'
434         response = self.list_containers(self.account, limit=l, marker=m,
435                                         format='json')
436         containers = json.loads(response.content)
437         self.assertEqual(containers[0]['name'], 'kiwis')
438         self.assertEqual(containers[1]['name'], 'oranges')
439
440     def test_list_xml_with_marker(self):
441         l = 2
442         m = 'oranges'
443         response = self.list_containers(self.account, limit=l, marker=m,
444                                         format='xml')
445         xml = minidom.parseString(response.content)
446         nodes = xml.getElementsByTagName('name')
447         self.assertEqual(len(nodes), 1)
448         self.assertEqual(nodes[0].childNodes[0].data, 'pears')
449
450     def test_if_modified_since(self):
451         t = datetime.datetime.utcnow()
452         t2 = t - datetime.timedelta(minutes=10)
453         
454         #add a new container
455         self.create_container(self.account,
456                               'dummy')
457
458         for f in DATE_FORMATS:
459             past = t2.strftime(f)
460             
461             headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past}
462             r = self.list_containers(self.account, **headers)
463             
464             #assert get success
465             self.assertEqual(r.status_code, 200)
466
467     def test_if_modified_since_invalid_date(self):
468         headers = {'HTTP_IF_MODIFIED_SINCE':''}
469         r = self.list_containers(self.account, **headers)
470             
471         #assert get success
472         self.assertEqual(r.status_code, 200)
473
474     def test_if_not_modified_since(self):
475         now = datetime.datetime.utcnow()
476         since = now + datetime.timedelta(1)
477         
478         for f in DATE_FORMATS:
479             headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)}
480             r = self.list_containers(self.account, **headers)
481             
482             #assert not modified
483             self.assertEqual(r.status_code, 304)
484
485     def test_if_unmodified_since(self):
486         now = datetime.datetime.utcnow()
487         since = now + datetime.timedelta(1)
488         
489         for f in DATE_FORMATS:
490             headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)}
491             r = self.list_containers(self.account, **headers)
492             
493             #assert success
494             self.assertEqual(r.status_code, 200)
495             self.assertEqual(self.containers, get_content_splitted(r))
496
497     def test_if_unmodified_since_precondition_failed(self):
498         t = datetime.datetime.utcnow()
499         t2 = t - datetime.timedelta(minutes=10)
500         
501         #add a new container
502         self.create_container(self.account,
503                               'dummy')
504         
505         for f in DATE_FORMATS:
506             past = t2.strftime(f)
507             
508             headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past}
509             r = self.list_containers(self.account, **headers)
510             
511             #assert get success
512             self.assertEqual(r.status_code, 412)
513
514 class AccountPost(BaseTestCase):
515     def setUp(self):
516         BaseTestCase.setUp(self)
517         self.account = 'test'
518         self.containers = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
519         for item in self.containers:
520             self.create_container(self.account, item)
521
522     def tearDown(self):
523         for c in  get_content_splitted(self.list_containers(self.account)):
524             self.delete_container(self.account, c)
525
526     def test_update_meta(self):
527         meta = {'HTTP_X_ACCOUNT_META_TEST':'test',
528                 'HTTP_X_ACCOUNT_META_TOST':'tost'}
529         response = self.update_account_meta(self.account, **meta)
530         response = self.get_account_meta(self.account)
531         for k,v in meta.items():
532             key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
533             self.assertTrue(response[key])
534             self.assertEqual(response[key], v)
535
536     #def test_invalid_account_update_meta(self):
537     #    with AssertInvariant(self.get_account_meta, self.account):
538     #        meta = {'HTTP_X_ACCOUNT_META_TEST':'test',
539     #               'HTTP_X_ACCOUNT_META_TOST':'tost'}
540     #        response = self.update_account_meta('non-existing-account', **meta)
541
542 class ContainerHead(BaseTestCase):
543     def setUp(self):
544         BaseTestCase.setUp(self)
545         self.account = 'test'
546         self.container = 'apples'
547         self.create_container(self.account, self.container)
548
549     def tearDown(self):
550         for o in self.list_objects(self.account, self.container):
551             self.delete_object(self.account, self.container, o)
552         self.delete_container(self.account, self.container)
553
554     def test_get_meta(self):
555         headers = {'HTTP_X_OBJECT_META_TRASH':'true'}
556         t1 = datetime.datetime.utcnow()
557         o = self.upload_random_data(self.account,
558                                 self.container,
559                                 'McIntosh.jpg',
560                                 meta=headers)
561         if o:
562             r = self.get_container_meta(self.account,
563                                         self.container)
564             self.assertEqual(r['X-Container-Object-Count'], '1')
565             self.assertEqual(r['X-Container-Bytes-Used'], str(len(o['data'])))
566             t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
567             delta = (t2 - t1)
568             threashold = datetime.timedelta(seconds=1) 
569             self.assertTrue(delta < threashold)
570             self.assertTrue(r['X-Container-Object-Meta'])
571             self.assertTrue('Trash' in r['X-Container-Object-Meta'])
572
573 class ContainerGet(BaseTestCase):
574     def setUp(self):
575         BaseTestCase.setUp(self)
576         self.account = 'test'
577         self.container = ['pears', 'apples']
578         for c in self.container:
579             self.create_container(self.account, c)
580         self.obj = []
581         for o in o_names[:8]:
582             self.obj.append(self.upload_random_data(self.account,
583                                                     self.container[0],
584                                                     o))
585         for o in o_names[8:]:
586             self.obj.append(self.upload_random_data(self.account,
587                                                     self.container[1],
588                                                     o))
589
590     def tearDown(self):
591         for c in self.container:
592             for obj in get_content_splitted(self.list_objects(self.account, c)):
593                 self.delete_object(self.account, c, obj)
594             self.delete_container(self.account, c)
595
596     def test_list_objects(self):
597         response = self.list_objects(self.account, self.container[0])
598         objects = get_content_splitted(response)
599         l = [elem['name'] for elem in self.obj[:8]]
600         l.sort()
601         self.assertEqual(objects, l)
602
603     def test_list_objects_with_limit_marker(self):
604         response = self.list_objects(self.account, self.container[0], limit=2)
605         objects = get_content_splitted(response)
606         l = [elem['name'] for elem in self.obj[:8]]
607         l.sort()
608         self.assertEqual(objects, l[:2])
609         
610         markers = ['How To Win Friends And Influence People.pdf',
611                    'moms_birthday.jpg']
612         limit = 4
613         for m in markers:
614             response = self.list_objects(self.account, self.container[0],
615                                          limit=limit, marker=m)
616             objects = get_content_splitted(response)
617             l = [elem['name'] for elem in self.obj[:8]]
618             l.sort()
619             start = l.index(m) + 1
620             end = start + limit
621             end = len(l) >= end and end or len(l)
622             self.assertEqual(objects, l[start:end])
623
624     def test_list_pseudo_hierarchical_folders(self):
625         response = self.list_objects(self.account, self.container[1],
626                                      prefix='photos', delimiter='/')
627         objects = get_content_splitted(response)
628         self.assertEquals(['photos/animals/', 'photos/me.jpg',
629                            'photos/plants/'], objects)
630         
631         response = self.list_objects(self.account, self.container[1],
632                                      prefix='photos/animals', delimiter='/')
633         objs = get_content_splitted(response)
634         l = ['photos/animals/cats/', 'photos/animals/dogs/']
635         self.assertEquals(l, objs)
636         
637         response = self.list_objects(self.account, self.container[1],
638                                      path='photos')
639         objects = get_content_splitted(response)
640         self.assertEquals(['photos/me.jpg'], objects)
641
642     def test_extended_list_json(self):
643         response = self.list_objects(self.account,
644                                      self.container[1],
645                                      format='json', limit=2,
646                                      prefix='photos/animals',
647                                      delimiter='/')
648         objects = json.loads(response.content)
649         self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/')
650         self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/')
651
652     def test_extended_list_xml(self):
653         response = self.list_objects(self.account, self.container[1],
654                                      format='xml', limit=4, prefix='photos',
655                                      delimiter='/')
656         xml = minidom.parseString(response.content)
657         dirs = xml.getElementsByTagName('subdir')
658         self.assertEqual(len(dirs), 2)
659         self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/')
660         self.assertEqual(dirs[1].attributes['name'].value, 'photos/plants/')
661         
662         objects = xml.getElementsByTagName('name')
663         self.assertEqual(len(objects), 1)
664         self.assertEqual(objects[0].childNodes[0].data, 'photos/me.jpg')
665
666     def test_list_meta_double_matching(self):
667         meta = {'HTTP_X_OBJECT_META_QUALITY':'aaa',
668                 'HTTP_X_OBJECT_META_STOCK':'true'}
669         r = self.update_object(self.account,
670                                     self.container[0],
671                                     self.obj[0]['name'],
672                                     **meta)
673         r = self.list_objects(self.account,
674                           self.container[0],
675                           meta='Quality,Stock')
676         self.assertEqual(r.status_code, 200)
677         obj = get_content_splitted(r)
678         self.assertEqual(len(obj), 1)
679         self.assertTrue(obj, self.obj[0]['name'])
680
681     def test_list_using_meta(self):
682         meta = {'HTTP_X_OBJECT_META_QUALITY':'aaa'}
683         for o in self.obj[:2]:
684             r = self.update_object(self.account,
685                                     self.container[0],
686                                     o['name'],
687                                     **meta)
688         meta = {'HTTP_X_OBJECT_META_STOCK':'true'}
689         for o in self.obj[3:5]:
690             r = self.update_object(self.account,
691                                     self.container[0],
692                                     o['name'],
693                                     **meta)
694         
695         r = self.list_objects(self.account,
696                           self.container[0],
697                           meta='Quality')
698         self.assertEqual(r.status_code, 200)
699         obj = get_content_splitted(r)
700         self.assertEqual(len(obj), 2)
701         self.assertTrue(obj, [o['name'] for o in self.obj[:2]])
702         
703         # test case insensitive
704         r = self.list_objects(self.account,
705                           self.container[0],
706                           meta='quality')
707         self.assertEqual(r.status_code, 200)
708         obj = get_content_splitted(r)
709         self.assertEqual(len(obj), 2)
710         self.assertTrue(obj, [o['name'] for o in self.obj[:2]])
711         
712         # test multiple matches
713         r = self.list_objects(self.account,
714                           self.container[0],
715                           meta='Quality,Stock')
716         self.assertEqual(r.status_code, 200)
717         obj = get_content_splitted(r)
718         self.assertEqual(len(obj), 4)
719         self.assertTrue(obj, [o['name'] for o in self.obj[:4]])
720         
721         # test non 1-1 multiple match
722         r = self.list_objects(self.account,
723                           self.container[0],
724                           meta='Quality,aaaa')
725         self.assertEqual(r.status_code, 200)
726         obj = get_content_splitted(r)
727         self.assertEqual(len(obj), 2)
728         self.assertTrue(obj, [o['name'] for o in self.obj[:2]])
729
730     def test_if_modified_since(self):
731         t = datetime.datetime.utcnow()
732         t2 = t - datetime.timedelta(minutes=10)
733         
734         #add a new container
735         self.upload_random_data(self.account,
736                                 self.container[0],
737                                 'dummy.txt')
738
739         for f in DATE_FORMATS:
740             past = t2.strftime(f)
741             
742             headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past}
743             r = self.list_objects(self.account,
744                                   self.container[0], **headers)
745             
746             #assert get success
747             self.assertEqual(r.status_code, 200)
748
749     def test_if_modified_since_invalid_date(self):
750         headers = {'HTTP_IF_MODIFIED_SINCE':''}
751         r = self.list_objects(self.account,
752                               self.container[0], **headers)
753         
754         #assert get success
755         self.assertEqual(r.status_code, 200)
756
757     def test_if_not_modified_since(self):
758         now = datetime.datetime.utcnow()
759         since = now + datetime.timedelta(1)
760         
761         for f in DATE_FORMATS:
762             headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)}
763             r = self.list_objects(self.account,
764                               self.container[0], **headers)
765         
766             #assert not modified
767             self.assertEqual(r.status_code, 304)
768
769     def test_if_unmodified_since(self):
770         now = datetime.datetime.utcnow()
771         since = now + datetime.timedelta(1)
772         
773         for f in DATE_FORMATS:
774             headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)}
775             r = self.list_objects(self.account,
776                               self.container[0], **headers)
777         
778             #assert success
779             self.assertEqual(r.status_code, 200)
780             objlist = self.list_objects(self.account, self.container[0])
781             self.assertEqual(get_content_splitted(r),
782                              get_content_splitted(objlist))
783
784     def test_if_unmodified_since_precondition_failed(self):
785         t = datetime.datetime.utcnow()
786         t2 = t - datetime.timedelta(minutes=10)
787         
788         #add a new container
789         self.create_container(self.account,
790                               'dummy')
791
792         for f in DATE_FORMATS:
793             past = t2.strftime(f)
794             
795             headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past}
796             r = self.list_objects(self.account,
797                               self.container[0], **headers)
798         
799             #assert get success
800             self.assertEqual(r.status_code, 412)
801
802 class ContainerPut(BaseTestCase):
803     def setUp(self):
804         BaseTestCase.setUp(self)
805         self.account = 'test'
806         self.containers = ['c1', 'c2']
807
808     def tearDown(self):
809         for c in self.containers:
810             r = self.delete_container(self.account, c)
811
812     def test_create(self):
813         response = self.create_container(self.account, self.containers[0])
814         if response.status_code == 201:
815             response = self.list_containers(self.account)
816             content = get_content_splitted(response)
817             self.assertTrue(self.containers[0] in content)
818             r = self.get_container_meta(self.account, self.containers[0])
819             self.assertEqual(r.status_code, 204)
820
821     def test_create_twice(self):
822         response = self.create_container(self.account, self.containers[0])
823         if response.status_code == 201:
824             r = self.create_container(self.account, self.containers[0])
825             self.assertTrue(r.status_code, 202)
826
827 class ContainerPost(BaseTestCase):
828     def setUp(self):
829         BaseTestCase.setUp(self)
830         self.account = 'test'
831         self.container = 'apples'
832         self.create_container(self.account, self.container)
833
834     def tearDown(self):
835         for o in self.list_objects(self.account, self.container):
836             self.delete_object(self.account, self.container, o)
837         self.delete_container(self.account, self.container)
838
839     def test_update_meta(self):
840         meta = {'HTTP_X_CONTAINER_META_TEST':'test33',
841                 'HTTP_X_CONTAINER_META_TOST':'tost22'}
842         response = self.update_container_meta(self.account, self.container,
843                                               **meta)
844         response = self.get_container_meta(self.account, self.container)
845         for k,v in meta.items():
846             key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
847             self.assertTrue(response[key])
848             self.assertEqual(response[key], v)
849
850 class ContainerDelete(BaseTestCase):
851     def setUp(self):
852         BaseTestCase.setUp(self)
853         self.account = 'test'
854         self.containers = ['c1', 'c2']
855         for c in self.containers:
856             self.create_container(self.account, c)
857         self.upload_random_data(self.account,
858                                 self.containers[1],
859                                 'nice.jpg')
860
861     def tearDown(self):
862         for c in self.containers:
863             for o in get_content_splitted(self.list_objects(self.account, c)):
864                 self.delete_object(self.account, c, o)
865             self.delete_container(self.account, c)
866
867     def test_delete(self):
868         r = self.delete_container(self.account, self.containers[0])
869         self.assertEqual(r.status_code, 204)
870
871     def test_delete_non_empty(self):
872         r = self.delete_container(self.account, self.containers[1])
873         self.assertNonEmpty(r)
874
875     def test_delete_invalid(self):
876         self.assertItemNotFound(self.delete_container(self.account, 'c3'))
877
878 class ObjectHead(BaseTestCase):
879     pass
880
881 class ObjectGet(BaseTestCase):
882     def setUp(self):
883         BaseTestCase.setUp(self)
884         self.account = 'test'
885         self.containers = ['c1', 'c2']
886         #create some containers
887         for c in self.containers:
888             self.create_container(self.account, c)
889         
890         #upload a file
891         self.objects = []
892         self.objects.append(self.upload_os_file(self.account,
893                             self.containers[1],
894                             './api/tests.py'))
895         self.objects.append(self.upload_os_file(self.account,
896                             self.containers[1],
897                             'settings.py'))
898
899     def tearDown(self):
900         for c in self.containers:
901             for o in get_content_splitted(self.list_objects(self.account, c)):
902                 self.delete_object(self.account, c, o)
903             self.delete_container(self.account, c)
904
905     def test_get(self):
906         #perform get
907         r = self.get_object(self.account,
908                             self.containers[1],
909                             self.objects[0]['name'],
910                             self.objects[0]['meta'])
911         #assert success
912         self.assertEqual(r.status_code, 200)
913         
914         #assert content-type
915         self.assertEqual(r['Content-Type'],
916                          self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
917
918     def test_get_invalid(self):
919         r = self.get_object(self.account,
920                             self.containers[0],
921                             self.objects[0]['name'])
922         self.assertItemNotFound(r)
923
924     def test_get_partial(self):
925         #perform get with range
926         headers = {'HTTP_RANGE':'bytes=0-499'}
927         r = self.get_object(self.account,
928                         self.containers[1],
929                         self.objects[0]['name'],
930                         **headers)
931         
932         #assert successful partial content
933         self.assertEqual(r.status_code, 206)
934         
935         #assert content-type
936         self.assertEqual(r['Content-Type'],
937                          self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
938         
939         #assert content length
940         self.assertEqual(int(r['Content-Length']), 500)
941         
942         #assert content
943         self.assertEqual(self.objects[0]['data'][:500], r.content)
944
945     def test_get_final_500(self):
946         #perform get with range
947         headers = {'HTTP_RANGE':'bytes=-500'}
948         r = self.get_object(self.account,
949                         self.containers[1],
950                         self.objects[0]['name'],
951                         **headers)
952         
953         #assert successful partial content
954         self.assertEqual(r.status_code, 206)
955         
956         #assert content-type
957         self.assertEqual(r['Content-Type'],
958                          self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
959         
960         #assert content length
961         self.assertEqual(int(r['Content-Length']), 500)
962         
963         #assert content
964         self.assertTrue(self.objects[0]['data'][-500:], r.content)
965
966     def test_get_rest(self):
967         #perform get with range
968         offset = len(self.objects[0]['data']) - 500
969         headers = {'HTTP_RANGE':'bytes=%s-' %offset}
970         r = self.get_object(self.account,
971                         self.containers[1],
972                         self.objects[0]['name'],
973                         **headers)
974         
975         #assert successful partial content
976         self.assertEqual(r.status_code, 206)
977         
978         #assert content-type
979         self.assertEqual(r['Content-Type'],
980                          self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
981         
982         #assert content length
983         self.assertEqual(int(r['Content-Length']), 500)
984         
985         #assert content
986         self.assertTrue(self.objects[0]['data'][-500:], r.content)
987
988     def test_get_range_not_satisfiable(self):
989         #perform get with range
990         offset = len(self.objects[0]['data']) + 1
991         headers = {'HTTP_RANGE':'bytes=0-%s' %offset}
992         r = self.get_object(self.account,
993                         self.containers[1],
994                         self.objects[0]['name'],
995                         **headers)
996         
997         #assert Range Not Satisfiable
998         self.assertEqual(r.status_code, 416)
999
1000     def test_multiple_range(self):
1001         #perform get with multiple range
1002         ranges = ['0-499', '-500', '1000-']
1003         headers = {'HTTP_RANGE' : 'bytes=%s' % ','.join(ranges)}
1004         r = self.get_object(self.account,
1005                         self.containers[1],
1006                         self.objects[0]['name'],
1007                         **headers)
1008         
1009         # assert partial content
1010         self.assertEqual(r.status_code, 206)
1011         
1012         # assert Content-Type of the reply will be multipart/byteranges
1013         self.assertTrue(r['Content-Type'])
1014         content_type_parts = r['Content-Type'].split()
1015         self.assertEqual(content_type_parts[0], ('multipart/byteranges;'))
1016         
1017         boundary = '--%s' %content_type_parts[1].split('=')[-1:][0]
1018         cparts = r.content.split(boundary)[1:-1]
1019         
1020         # assert content parts are exactly 2
1021         self.assertEqual(len(cparts), len(ranges))
1022         
1023         # for each content part assert headers
1024         i = 0
1025         for cpart in cparts:
1026             content = cpart.split('\r\n')
1027             headers = content[1:3]
1028             content_range = headers[0].split(': ')
1029             self.assertEqual(content_range[0], 'Content-Range')
1030             
1031             r = ranges[i].split('-')
1032             if not r[0] and not r[1]:
1033                 pass
1034             elif not r[0]:
1035                 start = len(self.objects[0]['data']) - int(r[1])
1036                 end = len(self.objects[0]['data'])
1037             elif not r[1]:
1038                 start = int(r[0])
1039                 end = len(self.objects[0]['data'])
1040             else:
1041                 start = int(r[0])
1042                 end = int(r[1]) + 1
1043             fdata = self.objects[0]['data'][start:end]
1044             sdata = '\r\n'.join(content[4:-1])
1045             self.assertEqual(len(fdata), len(sdata))
1046             self.assertEquals(fdata, sdata)
1047             i+=1
1048
1049     def test_multiple_range_not_satisfiable(self):
1050         #perform get with multiple range
1051         out_of_range = len(self.objects[0]['data']) + 1
1052         ranges = ['0-499', '-500', '%d-' %out_of_range]
1053         headers = {'HTTP_RANGE' : 'bytes=%s' % ','.join(ranges)}
1054         r = self.get_object(self.account,
1055                         self.containers[1],
1056                         self.objects[0]['name'],
1057                         **headers)
1058         
1059         # assert partial content
1060         self.assertEqual(r.status_code, 416)
1061
1062     def test_get_with_if_match(self):
1063         #perform get with If-Match
1064         headers = {'HTTP_IF_MATCH':self.objects[0]['hash']}
1065         r = self.get_object(self.account,
1066                         self.containers[1],
1067                         self.objects[0]['name'],
1068                         **headers)
1069         #assert get success
1070         self.assertEqual(r.status_code, 200)
1071         
1072         #assert content-type
1073         self.assertEqual(r['Content-Type'],
1074                          self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1075         
1076         #assert response content
1077         self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
1078
1079     def test_get_with_if_match_star(self):
1080         #perform get with If-Match *
1081         headers = {'HTTP_IF_MATCH':'*'}
1082         r = self.get_object(self.account,
1083                         self.containers[1],
1084                         self.objects[0]['name'],
1085                         **headers)
1086         #assert get success
1087         self.assertEqual(r.status_code, 200)
1088         
1089         #assert content-type
1090         self.assertEqual(r['Content-Type'],
1091                          self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1092         
1093         #assert response content
1094         self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
1095
1096     def test_get_with_multiple_if_match(self):
1097         #perform get with If-Match
1098         etags = [i['hash'] for i in self.objects if i]
1099         etags = ','.join('"%s"' % etag for etag in etags)
1100         headers = {'HTTP_IF_MATCH':etags}
1101         r = self.get_object(self.account,
1102                         self.containers[1],
1103                         self.objects[0]['name'],
1104                         **headers)
1105         #assert get success
1106         self.assertEqual(r.status_code, 200)
1107         
1108         #assert content-type
1109         self.assertEqual(r['Content-Type'],
1110                          self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1111         
1112         #assert content-type
1113         self.assertEqual(r['Content-Type'],
1114                          self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1115         
1116         #assert response content
1117         self.assertEqual(self.objects[0]['data'].strip(), r.content.strip())
1118
1119     def test_if_match_precondition_failed(self):
1120         #perform get with If-Match
1121         headers = {'HTTP_IF_MATCH':'123'}
1122         r = self.get_object(self.account,
1123                         self.containers[1],
1124                         self.objects[0]['name'],
1125                         **headers)
1126         #assert precondition failed 
1127         self.assertEqual(r.status_code, 412)
1128
1129     def test_if_none_match(self):
1130         #perform get with If-None-Match
1131         headers = {'HTTP_IF_NONE_MATCH':'123'}
1132         r = self.get_object(self.account,
1133                         self.containers[1],
1134                         self.objects[0]['name'],
1135                         **headers)
1136         
1137         #assert get success
1138         self.assertEqual(r.status_code, 200)
1139         
1140         #assert content-type
1141         self.assertEqual(r['Content-Type'],
1142                          self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1143
1144     def test_if_none_match(self):
1145         #perform get with If-None-Match *
1146         headers = {'HTTP_IF_NONE_MATCH':'*'}
1147         r = self.get_object(self.account,
1148                         self.containers[1],
1149                         self.objects[0]['name'],
1150                         **headers)
1151         
1152         #assert get success
1153         self.assertEqual(r.status_code, 304)
1154
1155     def test_if_none_match_not_modified(self):
1156         #perform get with If-None-Match
1157         headers = {'HTTP_IF_NONE_MATCH':'%s' %self.objects[0]['hash']}
1158         r = self.get_object(self.account,
1159                         self.containers[1],
1160                         self.objects[0]['name'],
1161                         **headers)
1162         
1163         #assert not modified
1164         self.assertEqual(r.status_code, 304)
1165         self.assertEqual(r['ETag'], self.objects[0]['hash'])
1166
1167     def test_if_modified_since(self):
1168         t = datetime.datetime.utcnow()
1169         t2 = t - datetime.timedelta(minutes=10)
1170         
1171         #modify the object
1172         self.upload_object(self.account,
1173                            self.containers[1],
1174                            self.objects[0]['name'],
1175                            self.objects[0]['data'][:200])
1176         
1177         for f in DATE_FORMATS:
1178             past = t2.strftime(f)
1179             
1180             headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %past}
1181             r = self.get_object(self.account,
1182                         self.containers[1],
1183                         self.objects[0]['name'],
1184                         **headers)
1185             
1186             #assert get success
1187             self.assertEqual(r.status_code, 200)
1188             
1189             #assert content-type
1190             self.assertEqual(r['Content-Type'],
1191                              self.objects[0]['meta']['HTTP_CONTENT_TYPE'])   
1192
1193     def test_if_modified_since_invalid_date(self):
1194         headers = {'HTTP_IF_MODIFIED_SINCE':''}
1195         r = self.get_object(self.account,
1196                     self.containers[1],
1197                     self.objects[0]['name'],
1198                     **headers)
1199         
1200         #assert get success
1201         self.assertEqual(r.status_code, 200)
1202         
1203         #assert content-type
1204         self.assertEqual(r['Content-Type'],
1205                          self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1206
1207     def test_if_not_modified_since(self):
1208         now = datetime.datetime.utcnow()
1209         since = now + datetime.timedelta(1)
1210         
1211         for f in DATE_FORMATS:
1212             headers = {'HTTP_IF_MODIFIED_SINCE':'%s' %since.strftime(f)}
1213             r = self.get_object(self.account,
1214                                 self.containers[1],
1215                                 self.objects[0]['name'],
1216                                 **headers)
1217             
1218             #assert not modified
1219             self.assertEqual(r.status_code, 304)
1220
1221     def test_if_unmodified_since(self):
1222         now = datetime.datetime.utcnow()
1223         since = now + datetime.timedelta(1)
1224         
1225         for f in DATE_FORMATS:
1226             headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %since.strftime(f)}
1227             r = self.get_object(self.account,
1228                                 self.containers[1],
1229                                 self.objects[0]['name'],
1230                                 **headers)
1231             #assert success
1232             self.assertEqual(r.status_code, 200)
1233             self.assertEqual(self.objects[0]['data'], r.content)
1234             
1235             #assert content-type
1236             self.assertEqual(r['Content-Type'],
1237                              self.objects[0]['meta']['HTTP_CONTENT_TYPE'])
1238
1239     def test_if_unmodified_since_precondition_failed(self):
1240         t = datetime.datetime.utcnow()
1241         t2 = t - datetime.timedelta(minutes=10)
1242         
1243         #modify the object
1244         self.upload_object(self.account,
1245                            self.containers[1],
1246                            self.objects[0]['name'],
1247                            self.objects[0]['data'][:200])
1248         
1249         for f in DATE_FORMATS:
1250             past = t2.strftime(f)
1251             
1252             headers = {'HTTP_IF_UNMODIFIED_SINCE':'%s' %past}
1253             r = self.get_object(self.account,
1254                         self.containers[1],
1255                         self.objects[0]['name'],
1256                         **headers)
1257             #assert get success
1258             self.assertEqual(r.status_code, 412)
1259
1260     def test_hashes(self):
1261         l = 8388609
1262         fname = 'largefile'
1263         o = self.upload_random_data(self.account,
1264                                 self.containers[1],
1265                                 fname,
1266                                 l)
1267         if o:
1268             r = self.get_object(self.account,
1269                                 self.containers[1],
1270                                 fname,
1271                                 'json')
1272             body = json.loads(r.content)
1273             hashes = body['hashes']
1274             block_size = body['block_size']
1275             block_hash = body['block_hash']
1276             block_num = l/block_size == 0 and l/block_size or l/block_size + 1
1277             self.assertTrue(len(hashes), block_num)
1278             i = 0
1279             for h in hashes:
1280                 start = i * block_size
1281                 end = (i + 1) * block_size
1282                 hash = compute_block_hash(o['data'][start:end], block_hash)
1283                 self.assertEqual(h, hash)
1284                 i += 1
1285
1286 class ObjectPut(BaseTestCase):
1287     def setUp(self):
1288         BaseTestCase.setUp(self)
1289         self.account = 'test'
1290         self.container = 'c1'
1291         self.create_container(self.account, self.container)
1292         
1293         self.src = os.path.join('.', 'api', 'tests.py')
1294         self.dest = os.path.join('.', 'api', 'chunked_update_test_file')
1295         create_chunked_update_test_file(self.src, self.dest)
1296
1297     def tearDown(self):
1298         r = self.list_objects(self.account, self.container)
1299         for o in get_content_splitted(r):
1300             self.delete_object(self.account, self.container, o)
1301         self.delete_container(self.account, self.container)
1302         
1303         # delete test file
1304         os.remove(self.dest)
1305
1306     def test_upload(self):
1307         filename = 'tests.py'
1308         fullpath = os.path.join('.', 'api', filename) 
1309         f = open(fullpath, 'r')
1310         data = f.read()
1311         hash = compute_md5_hash(data)
1312         meta={'HTTP_ETAG':hash,
1313               'HTTP_X_OBJECT_META_TEST':'test1'
1314               }
1315         type, enc = mimetypes.guess_type(fullpath)
1316         meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
1317         if enc:
1318             meta['HTTP_CONTENT_ENCODING'] = enc
1319         r = self.upload_object(self.account,
1320                                self.container,
1321                                filename,
1322                                data,
1323                                content_type=meta['HTTP_CONTENT_TYPE'],
1324                                **meta)
1325         self.assertEqual(r.status_code, 201)
1326         r = self.get_object_meta(self.account, self.container, filename)
1327         self.assertTrue(r['X-Object-Meta-Test'])
1328         self.assertEqual(r['X-Object-Meta-Test'],
1329                          meta['HTTP_X_OBJECT_META_TEST'])
1330         
1331         #assert uploaded content
1332         r = self.get_object(self.account, self.container, filename)
1333         self.assertEqual(os.path.getsize(fullpath), int(r['Content-Length']))
1334         self.assertEqual(data.strip(), r.content.strip())
1335
1336     def test_upload_unprocessable_entity(self):
1337         filename = 'tests.py'
1338         fullpath = os.path.join('.', 'api', filename) 
1339         f = open(fullpath, 'r')
1340         data = f.read()
1341         meta={'HTTP_ETAG':'123',
1342               'HTTP_X_OBJECT_META_TEST':'test1'
1343               }
1344         type, enc = mimetypes.guess_type(fullpath)
1345         meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
1346         if enc:
1347             meta['HTTP_CONTENT_ENCODING'] = enc
1348         r = self.upload_object(self.account,
1349                                self.container,
1350                                filename,
1351                                data,
1352                                content_type = meta['HTTP_CONTENT_TYPE'],
1353                                **meta)
1354         self.assertEqual(r.status_code, 422)
1355
1356     def test_chucked_update(self):
1357         objname = os.path.split(self.src)[-1:][0]
1358         f = open(self.dest, 'r')
1359         data = f.read()
1360         meta = {}
1361         type, enc = mimetypes.guess_type(self.dest)
1362         meta['HTTP_CONTENT_TYPE'] = type and type or 'plain/text'
1363         if enc:
1364             meta['HTTP_CONTENT_ENCODING'] = enc
1365         meta.update({'HTTP_TRANSFER_ENCODING':'chunked'})
1366         r = self.upload_object(self.account,
1367                                self.container,
1368                                objname,
1369                                data,
1370                                content_type = 'plain/text',
1371                                **meta)
1372         self.assertEqual(r.status_code, 201)
1373         
1374         r = self.get_object(self.account,
1375                             self.container,
1376                             objname)
1377         uploaded_data = r.content
1378         f = open(self.src, 'r')
1379         actual_data = f.read()
1380         self.assertEqual(actual_data, uploaded_data)
1381
1382 class ObjectCopy(BaseTestCase):
1383     def setUp(self):
1384         BaseTestCase.setUp(self)
1385         self.account = 'test'
1386         self.containers = ['c1', 'c2']
1387         for c in self.containers:
1388             self.create_container(self.account, c)
1389         self.obj = self.upload_os_file(self.account,
1390                             self.containers[0],
1391                             './api/tests.py')
1392
1393     def tearDown(self):
1394         for c in self.containers:
1395             for o in get_content_splitted(self.list_objects(self.account, c)):
1396                 self.delete_object(self.account, c, o)
1397             self.delete_container(self.account, c)
1398
1399     def test_copy(self):
1400         with AssertInvariant(self.get_object_meta, self.account,
1401                              self.containers[0], self.obj['name']):
1402             #perform copy
1403             meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1404             src_path = os.path.join('/', self.containers[0], self.obj['name'])
1405             r = self.copy_object(self.account,
1406                              self.containers[0],
1407                              'testcopy',
1408                              src_path,
1409                              **meta)
1410             #assert copy success
1411             self.assertEqual(r.status_code, 201)
1412             
1413             #assert access the new object
1414             r = self.get_object_meta(self.account,
1415                                      self.containers[0],
1416                                      'testcopy')
1417             self.assertTrue(r['X-Object-Meta-Test'])
1418             self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
1419             
1420             #assert etag is the same
1421             self.assertEqual(r['ETag'], self.obj['hash'])
1422             
1423             #assert src object still exists
1424             r = self.get_object_meta(self.account, self.containers[0],
1425                                      self.obj['name'])
1426             self.assertEqual(r.status_code, 200)
1427
1428     def test_copy_from_different_container(self):
1429         with AssertInvariant(self.get_object_meta,
1430                              self.account,
1431                              self.containers[0],
1432                              self.obj['name']):
1433             meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1434             src_path = os.path.join('/', self.containers[0], self.obj['name'])
1435             r = self.copy_object(self.account,
1436                              self.containers[1],
1437                              'testcopy',
1438                              src_path,
1439                              **meta)
1440             self.assertEqual(r.status_code, 201)
1441             
1442             # assert updated metadata
1443             r = self.get_object_meta(self.account,
1444                                      self.containers[1],
1445                                      'testcopy')
1446             self.assertTrue(r['X-Object-Meta-Test'])
1447             self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
1448             
1449             #assert src object still exists
1450             r = self.get_object_meta(self.account, self.containers[0],
1451                                      self.obj['name'])
1452             self.assertEqual(r.status_code, 200)
1453
1454     def test_copy_invalid(self):
1455         #copy from invalid object
1456         meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1457         r = self.copy_object(self.account,
1458                          self.containers[1],
1459                          'testcopy',
1460                          os.path.join('/', self.containers[0], 'test.py'),
1461                          **meta)
1462         self.assertItemNotFound(r)
1463         
1464         #copy from invalid container
1465         meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1466         src_path = os.path.join('/', self.containers[1], self.obj['name'])
1467         r = self.copy_object(self.account,
1468                          self.containers[1],
1469                          'testcopy',
1470                          src_path,
1471                          **meta)
1472         self.assertItemNotFound(r)
1473
1474 class ObjectMove(ObjectCopy):
1475     def test_move(self):
1476         #perform move
1477         meta = {'HTTP_X_OBJECT_META_TEST':'testcopy'}
1478         src_path = os.path.join('/', self.containers[0], self.obj['name'])
1479         r = self.move_object(self.account,
1480                          self.containers[0],
1481                          'testcopy',
1482                          src_path,
1483                          **meta)
1484         #assert successful move
1485         self.assertEqual(r.status_code, 201)
1486         
1487         #assert updated metadata
1488         r = self.get_object_meta(self.account,
1489                                  self.containers[0],
1490                                  'testcopy')
1491         self.assertTrue(r['X-Object-Meta-Test'])
1492         self.assertTrue(r['X-Object-Meta-Test'], 'testcopy')
1493         
1494         #assert src object no more exists
1495         r = self.get_object_meta(self.account, self.containers[0],
1496                                  self.obj['name'])
1497         self.assertItemNotFound(r)
1498
1499 class ObjectPost(BaseTestCase):
1500     def setUp(self):
1501         BaseTestCase.setUp(self)
1502         self.account = 'test'
1503         self.containers = ['c1', 'c2']
1504         for c in self.containers:
1505             self.create_container(self.account, c)
1506         self.obj = self.upload_os_file(self.account,
1507                                        self.containers[0],
1508                                        './api/tests.py')
1509
1510     def tearDown(self):
1511         for c in self.containers:
1512             for o in get_content_splitted(self.list_objects(self.account, c)):
1513                 self.delete_object(self.account, c, o)
1514             self.delete_container(self.account, c)
1515
1516     def test_update_meta(self):
1517         #perform update metadata
1518         more = {'HTTP_X_OBJECT_META_FOO':'foo',
1519                 'HTTP_X_OBJECT_META_BAR':'bar'}
1520         r = self.update_object(self.account,
1521                                 self.containers[0],
1522                                 self.obj['name'],
1523                                 **more)
1524         #assert request accepted
1525         self.assertEqual(r.status_code, 202)
1526         
1527         #assert old metadata are still there
1528         r = self.get_object_meta(self.account, self.containers[0],
1529                                  self.obj['name'])
1530         self.assertTrue('X-Object-Meta-Test' not in r.items())
1531         
1532         #assert new metadata have been updated
1533         for k,v in more.items():
1534             key = '-'.join(elem.capitalize() for elem in k.split('_')[1:])
1535             self.assertTrue(r[key])
1536             self.assertTrue(r[key], v)
1537
1538     def test_update_object(self,
1539                            first_byte_pos=0,
1540                            last_byte_pos=499,
1541                            instance_length = True,
1542                            content_length = 500):
1543         l = len(self.obj['data'])
1544         length =  instance_length and l or '*'
1545         range = 'bytes %d-%d/%s' %(first_byte_pos,
1546                                        last_byte_pos,
1547                                        length)
1548         partial = last_byte_pos - first_byte_pos + 1
1549         data = get_random_data(partial)
1550         more = {'HTTP_CONTENT_RANGE':range}
1551         if content_length:
1552             more.update({'CONTENT_LENGTH':'%s' % content_length})
1553         
1554         r = self.update_object(self.account,
1555                                 self.containers[0],
1556                                 self.obj['name'],
1557                                 data,
1558                                 'application/octet-stream',
1559                                 **more)
1560         
1561         if partial < 0 or (instance_length and l <= last_byte_pos):
1562             self.assertEqual(r.status_code, 202)    
1563         elif content_length and content_length != partial:
1564             self.assertEqual(r.status_code, 400)
1565         else:
1566             self.assertEqual(r.status_code, 204)
1567             
1568             #check modified object
1569             r = self.get_object(self.account,
1570                             self.containers[0],
1571                             self.obj['name'])
1572             self.assertEqual(r.content[0:partial], data)
1573             self.assertEqual(r.content[partial:l], self.obj['data'][partial:l])
1574
1575     def test_update_object_no_content_length(self):
1576         self.test_update_object(content_length = None)
1577
1578     def test_update_object_invalid_content_length(self):
1579         with AssertContentInvariant(self.get_object, self.account, self.containers[0],
1580                             self.obj['name']):
1581             self.test_update_object(content_length = 1000)
1582
1583     def test_update_object_with_unknown_instance_length(self):
1584         self.test_update_object(instance_length = False)
1585
1586     def test_update_object_invalid_range(self):
1587         with AssertContentInvariant(self.get_object, self.account, self.containers[0],
1588                             self.obj['name']):
1589             self.test_update_object(499, 0, True)
1590     
1591     def test_update_object_invalid_range_and_length(self):
1592         with AssertContentInvariant(self.get_object, self.account, self.containers[0],
1593                             self.obj['name']):
1594             self.test_update_object(499, 0, True, -1)
1595     
1596     def test_update_object_invalid_range_with_no_content_length(self):
1597         with AssertContentInvariant(self.get_object, self.account, self.containers[0],
1598                             self.obj['name']):
1599             self.test_update_object(499, 0, True, content_length = None)
1600     
1601     def test_update_object_out_of_limits(self):    
1602         with AssertContentInvariant(self.get_object, self.account, self.containers[0],
1603                             self.obj['name']):
1604             l = len(self.obj['data'])
1605             self.test_update_object(0, l+1, True)
1606
1607     def test_append(self):
1608         data = get_random_data(500)
1609         more = {'CONTENT_LENGTH':'500',
1610                 'HTTP_CONTENT_RANGE':'bytes */*'}
1611         
1612         r = self.update_object(self.account,
1613                                 self.containers[0],
1614                                 self.obj['name'],
1615                                 data,
1616                                 'application/octet-stream',
1617                                 **more)
1618         
1619         self.assertEqual(r.status_code, 204)
1620         
1621         r = self.get_object(self.account,
1622                                 self.containers[0],
1623                                 self.obj['name'])
1624         self.assertEqual(len(r.content), len(self.obj['data']) + 500)
1625         self.assertEqual(r.content[:-500], self.obj['data'])
1626
1627     def test_update_with_chunked_transfer(self):
1628         data, pure = create_random_chunked_data()
1629         dl = len(pure)
1630         fl = len(self.obj['data'])
1631         meta = {'HTTP_TRANSFER_ENCODING':'chunked',
1632                 'HTTP_CONTENT_RANGE':'bytes 0-/%d' %fl}
1633         r = self.update_object(self.account,
1634                                 self.containers[0],
1635                                 self.obj['name'],
1636                                 data,
1637                                 'application/octet-stream',
1638                                 **meta)
1639         
1640         #check modified object
1641         r = self.get_object(self.account,
1642                         self.containers[0],
1643                         self.obj['name'])
1644         self.assertEqual(r.content[0:dl], pure)
1645         self.assertEqual(r.content[dl:fl], self.obj['data'][dl:fl])
1646
1647     def test_update_with_chunked_transfer_strict_range(self):
1648         data, pure = create_random_chunked_data()
1649         dl = len(pure) - 1
1650         fl = len(self.obj['data'])
1651         meta = {'HTTP_TRANSFER_ENCODING':'chunked',
1652                 'HTTP_CONTENT_RANGE':'bytes 0-%d/%d' %(dl, fl)}
1653         r = self.update_object(self.account,
1654                                 self.containers[0],
1655                                 self.obj['name'],
1656                                 data,
1657                                 'application/octet-stream',
1658                                 **meta)
1659         
1660         #check modified object
1661         r = self.get_object(self.account,
1662                         self.containers[0],
1663                         self.obj['name'])
1664         self.assertEqual(r.content[0:dl+1], pure)
1665         self.assertEqual(r.content[dl+1:fl], self.obj['data'][dl+1:fl])
1666
1667 class ObjectDelete(BaseTestCase):
1668     def setUp(self):
1669         BaseTestCase.setUp(self)
1670         self.account = 'test'
1671         self.containers = ['c1', 'c2']
1672         for c in self.containers:
1673             self.create_container(self.account, c)
1674         self.obj = self.upload_os_file(self.account,
1675                             self.containers[0],
1676                             './api/tests.py')
1677
1678     def tearDown(self):
1679         for c in self.containers:
1680             for o in get_content_splitted(self.list_objects(self.account, c)):
1681                 self.delete_object(self.account, c, o)
1682             self.delete_container(self.account, c)
1683
1684     def test_delete(self):
1685         #perform delete object
1686         r = self.delete_object(self.account, self.containers[0],
1687                                self.obj['name'])
1688         
1689         #assert success
1690         self.assertEqual(r.status_code, 204)
1691
1692     def test_delete_invalid(self):
1693         #perform delete object
1694         r = self.delete_object(self.account, self.containers[1],
1695                                self.obj['name'])
1696         
1697         #assert failure
1698         self.assertItemNotFound(r)
1699
1700 class AssertInvariant(object):
1701     def __init__(self, callable, *args, **kwargs):
1702         self.callable = callable
1703         self.args = args
1704         self.kwargs = kwargs
1705
1706     def __enter__(self):
1707         self.items = self.callable(*self.args, **self.kwargs).items()
1708         return self.items
1709
1710     def __exit__(self, type, value, tb):
1711         items = self.callable(*self.args, **self.kwargs).items()
1712         assert self.items == items
1713
1714 class AssertContentInvariant(object):
1715     def __init__(self, callable, *args, **kwargs):
1716         self.callable = callable
1717         self.args = args
1718         self.kwargs = kwargs
1719
1720     def __enter__(self):
1721         self.content = self.callable(*self.args, **self.kwargs).content
1722         return self.content
1723
1724     def __exit__(self, type, value, tb):
1725         content = self.callable(*self.args, **self.kwargs).content
1726         assert self.content == content
1727
1728 def get_content_splitted(response):
1729     if response:
1730         return response.content.split('\n')
1731
1732 def compute_md5_hash(data):
1733     md5 = hashlib.md5()
1734     offset = 0
1735     md5.update(data)
1736     return md5.hexdigest().lower()
1737
1738 def compute_block_hash(data, algorithm):
1739     h = hashlib.new(algorithm)
1740     h.update(data.rstrip('\x00'))
1741     return h.hexdigest()
1742
1743 def create_chunked_update_test_file(src, dest):
1744     fr = open(src, 'r')
1745     fw = open(dest, 'w')
1746     data = fr.readline()
1747     while data:
1748         fw.write(hex(len(data)))
1749         fw.write('\r\n')
1750         fw.write(data)
1751         data = fr.readline()
1752     fw.write(hex(0))
1753     fw.write('\r\n')
1754
1755 def create_random_chunked_data(rows=5):
1756     i = 0
1757     out = []
1758     pure= []
1759     while i < rows:
1760         data = get_random_data(random.randint(1, 100))
1761         out.append(hex(len(data)))
1762         out.append(data)
1763         pure.append(data)
1764         i+=1
1765     out.append(hex(0))
1766     out.append('\r\n')
1767     return '\r\n'.join(out), ''.join(pure)
1768
1769 def get_random_data(length=500):
1770     char_set = string.ascii_uppercase + string.digits
1771     return ''.join(random.choice(char_set) for x in range(length))
1772
1773 o_names = ['kate.jpg',
1774            'kate_beckinsale.jpg',
1775            'How To Win Friends And Influence People.pdf',
1776            'moms_birthday.jpg',
1777            'poodle_strut.mov',
1778            'Disturbed - Down With The Sickness.mp3',
1779            'army_of_darkness.avi',
1780            'the_mad.avi',
1781            'photos/animals/dogs/poodle.jpg',
1782            'photos/animals/dogs/terrier.jpg',
1783            'photos/animals/cats/persian.jpg',
1784            'photos/animals/cats/siamese.jpg',
1785            'photos/plants/fern.jpg',
1786            'photos/plants/rose.jpg',
1787            'photos/me.jpg']