33 |
33 |
|
34 |
34 |
from unittest import TestCase
|
35 |
35 |
from mock import patch, call, Mock
|
|
36 |
from tempfile import NamedTemporaryFile
|
|
37 |
from os import urandom
|
36 |
38 |
|
37 |
39 |
from kamaki.clients import ClientError
|
38 |
40 |
from kamaki.clients.pithos import PithosClient as PC
|
... | ... | |
134 |
136 |
|
135 |
137 |
files = []
|
136 |
138 |
|
|
139 |
def _create_temp_file(self):
|
|
140 |
self.files.append(NamedTemporaryFile())
|
|
141 |
tmpFile = self.files[-1]
|
|
142 |
num_of_blocks = 8
|
|
143 |
file_size = num_of_blocks * 4 * 1024 * 1024
|
|
144 |
print('\n\tCreate tmp file')
|
|
145 |
tmpFile.write(urandom(file_size))
|
|
146 |
tmpFile.flush()
|
|
147 |
tmpFile.seek(0)
|
|
148 |
print('\t\tDone')
|
|
149 |
return tmpFile
|
|
150 |
|
137 |
151 |
def assert_dicts_are_equal(self, d1, d2):
|
138 |
152 |
for k, v in d1.items():
|
139 |
153 |
self.assertTrue(k in d2)
|
... | ... | |
156 |
170 |
for f in self.files:
|
157 |
171 |
f.close()
|
158 |
172 |
|
|
173 |
# Pithos+ methods that extend storage API
|
|
174 |
|
159 |
175 |
def test_get_account_info(self):
|
160 |
176 |
self.FR.headers = account_info
|
161 |
177 |
self.FR.status_code = 204
|
... | ... | |
239 |
255 |
PC.get_container_info = Mock(return_value=container_info)
|
240 |
256 |
PC.container_post = Mock(return_value=self.FR())
|
241 |
257 |
PC.object_put = Mock(return_value=self.FR())
|
242 |
|
from tempfile import NamedTemporaryFile
|
243 |
|
from os import urandom
|
244 |
|
self.files.append(NamedTemporaryFile())
|
245 |
|
tmpFile = self.files[-1]
|
246 |
|
num_of_blocks = 8
|
247 |
|
file_size = num_of_blocks * 4 * 1024 * 1024
|
248 |
|
print('\n\tCreate tmp file')
|
249 |
|
tmpFile.write(urandom(file_size))
|
250 |
|
tmpFile.flush()
|
251 |
|
tmpFile.seek(0)
|
252 |
|
print('\t\tDone')
|
|
258 |
tmpFile = self._create_temp_file()
|
253 |
259 |
obj = 'objectName'
|
254 |
260 |
|
255 |
|
# No special args
|
|
261 |
# Without kwargs
|
256 |
262 |
self.client.upload_object(obj, tmpFile)
|
257 |
263 |
self.assertEqual(PC.get_container_info.mock_calls, [call()])
|
258 |
264 |
[call1, call2] = PC.object_put.mock_calls
|
... | ... | |
539 |
545 |
call('format', 'json'), call('path', path)])
|
540 |
546 |
self.FR.status_code = 404
|
541 |
547 |
self.assertRaises(ClientError, self.client.list_objects)
|
|
548 |
|
|
549 |
# Pithos+ only methods
|
|
550 |
|
|
551 |
def test_purge_container(self):
|
|
552 |
with patch.object(
|
|
553 |
PC,
|
|
554 |
'container_delete',
|
|
555 |
return_value=self.FR()) as cd:
|
|
556 |
self.client.purge_container()
|
|
557 |
self.assertTrue('until' in cd.mock_calls[-1][2])
|
|
558 |
cont = self.client.container
|
|
559 |
self.client.purge_container('another-container')
|
|
560 |
self.assertEqual(self.client.container, cont)
|
|
561 |
|
|
562 |
def test_upload_object_unchunked(self):
|
|
563 |
tmpFile = self._create_temp_file()
|
|
564 |
obj = 'obj3c7N4m3'
|
|
565 |
expected = dict(
|
|
566 |
success=201,
|
|
567 |
data=8 * 4 * 1024 * 1024,
|
|
568 |
etag='some-etag',
|
|
569 |
content_encoding='some content_encoding',
|
|
570 |
content_type='some content-type',
|
|
571 |
content_disposition='some content_disposition',
|
|
572 |
public=True,
|
|
573 |
permissions=dict(read=['u1', 'g1', 'u2'], write=['u1']))
|
|
574 |
with patch.object(PC, 'object_put', return_value=self.FR()) as put:
|
|
575 |
self.client.upload_object_unchunked(obj, tmpFile)
|
|
576 |
self.assertEqual(put.mock_calls[-1][1], (obj,))
|
|
577 |
self.assertEqual(
|
|
578 |
sorted(put.mock_calls[-1][2].keys()),
|
|
579 |
sorted(expected.keys()))
|
|
580 |
kwargs = dict(expected)
|
|
581 |
kwargs.pop('success')
|
|
582 |
kwargs['size'] = kwargs.pop('data')
|
|
583 |
kwargs['sharing'] = kwargs.pop('permissions')
|
|
584 |
tmpFile.seek(0)
|
|
585 |
self.client.upload_object_unchunked(obj, tmpFile, **kwargs)
|
|
586 |
pmc = put.mock_calls[-1][2]
|
|
587 |
for k, v in expected.items():
|
|
588 |
if k == 'data':
|
|
589 |
self.assertEqual(len(pmc[k]), v)
|
|
590 |
else:
|
|
591 |
self.assertEqual(pmc[k], v)
|