Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / livetest / pithos.py @ 49cc29b2

History | View | Annotate | Download (46.2 kB)

1
# Copyright 2012-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import time
35
import datetime
36
from os import urandom
37
from tempfile import NamedTemporaryFile
38
from string import ascii_letters
39

    
40
from kamaki.clients import livetest, ClientError
41
from kamaki.clients.pithos import PithosClient
42
from kamaki.clients.astakos import AstakosClient
43

    
44

    
45
def chargen():
46
    """10 + 2 * 26 + 26 = 88"""
47
    while True:
48
        for CH in xrange(10):
49
            yield '%s' % CH
50
        for CH in ascii_letters:
51
            yield CH
52
        for CH in '~!@#$%^&*()_+`-=:";|<>?,./':
53
            yield CH
54

    
55

    
56
def sample_block(f, block):
57
    block_size = 4 * 1024 * 1024
58
    f.seek(block * block_size)
59
    ch = [f.read(1)]
60
    f.seek(block_size / 2, 1)
61
    ch.append(f.read(1))
62
    f.seek((block + 1) * block_size - 1)
63
    ch.append(f.read(1))
64
    return ch
65

    
66

    
67
class Pithos(livetest.Generic):
68

    
69
    files = []
70

    
71
    def setUp(self):
72
        self.client = PithosClient(
73
            self['file', 'url'],
74
            self['file', 'token'],
75
            AstakosClient(
76
                self['user', 'url'],
77
                self['file', 'token']
78
            ).term('uuid'))
79

    
80
        self.now = time.mktime(time.gmtime())
81
        self.now_unformated = datetime.datetime.utcnow()
82
        self._init_data()
83

    
84
        """Prepare an object to be shared - also its container"""
85
        self.client.container = self.c1
86
        self.client.object_post(
87
            'test',
88
            update=True,
89
            permissions={'read': [self.client.account]})
90

    
91
        self.create_remote_object(self.c1, 'another.test')
92

    
93
    def _init_data(self):
94
        self.c1 = 'c1_' + unicode(self.now)
95
        self.c2 = 'c2_' + unicode(self.now)
96
        self.c3 = 'c3_' + unicode(self.now)
97
        try:
98
            self.client.create_container(self.c2)
99
        except ClientError:
100
            pass
101
        try:
102
            self.client.create_container(self.c1)
103
        except ClientError:
104
            pass
105
        try:
106
            self.client.create_container(self.c3)
107
        except ClientError:
108
            pass
109

    
110
        self.create_remote_object(self.c1, 'test')
111
        self.create_remote_object(self.c2, 'test')
112
        self.create_remote_object(self.c1, 'test1')
113
        self.create_remote_object(self.c2, 'test1')
114

    
115
    def create_remote_object(self, container, obj):
116
        self.client.container = container
117
        self.client.object_put(
118
            obj,
119
            content_type='application/octet-stream',
120
            data='file %s that lives in %s' % (obj, container),
121
            metadata={'incontainer': container})
122

    
123
    def forceDeleteContainer(self, container):
124
        self.client.container = container
125
        try:
126
            r = self.client.list_objects()
127
        except ClientError:
128
            return
129
        for obj in r:
130
            name = obj['name']
131
            self.client.del_object(name)
132
        r = self.client.container_delete()
133
        self.container = ''
134

    
135
    def tearDown(self):
136
        """Destroy test cases"""
137
        for f in self.files:
138
            f.close()
139
        self.forceDeleteContainer(self.c1)
140
        self.forceDeleteContainer(self.c2)
141
        try:
142
            self.forceDeleteContainer(self.c3)
143
        except ClientError:
144
            pass
145
        self.client.container = ''
146

    
147
    def test_000(self):
148
        """Prepare a full Pithos+ test"""
149
        print('')
150
        super(self.__class__, self).test_000()
151

    
152
    def test_account_head(self):
153
        """Test account_HEAD"""
154
        self._test_0010_account_head()
155

    
156
    def _test_0010_account_head(self):
157
        r = self.client.account_head()
158
        self.assertEqual(r.status_code, 204)
159

    
160
        r = self.client.account_head(until='1000000000')
161
        self.assertEqual(r.status_code, 204)
162

    
163
        r = self.client.get_account_info(until='1000000000')
164
        datestring = unicode(r['x-account-until-timestamp'])
165
        self.assertEqual(u'Sun, 09 Sep 2001 01:46:40 GMT', datestring)
166

    
167
        r = self.client.get_account_quota()
168
        self.assertTrue('x-account-policy-quota' in r)
169

    
170
        r = self.client.get_account_versioning()
171
        self.assertTrue('x-account-policy-versioning' in r)
172

    
173
        """Check if(un)modified_since"""
174
        for format in self.client.DATE_FORMATS:
175
            now_formated = self.now_unformated.strftime(format)
176
            r1 = self.client.account_head(
177
                if_modified_since=now_formated,
178
                success=(204, 304, 412))
179
            sc1 = r1.status_code
180
            r2 = self.client.account_head(
181
                if_unmodified_since=now_formated,
182
                success=(204, 304, 412))
183
            sc2 = r2.status_code
184
            self.assertNotEqual(sc1, sc2)
185

    
186
    def test_account_get(self):
187
        """Test account_GET"""
188
        self._test_0020_account_get()
189

    
190
    def _test_0020_account_get(self):
191
        #r = self.client.account_get()
192
        #self.assertEqual(r.status_code, 200)
193
        r = self.client.list_containers()
194
        fullLen = len(r)
195
        self.assertTrue(fullLen > 2)
196

    
197
        r = self.client.account_get(limit=1)
198
        self.assertEqual(len(r.json), 1)
199

    
200
        r = self.client.account_get(marker='c2_')
201
        temp_c0 = r.json[0]['name']
202
        temp_c2 = r.json[2]['name']
203

    
204
        r = self.client.account_get(limit=2, marker='c2_')
205
        conames = [container['name'] for container in r.json if (
206
            container['name'].lower().startswith('c2_'))]
207
        self.assertTrue(temp_c0 in conames)
208
        self.assertFalse(temp_c2 in conames)
209

    
210
        r = self.client.account_get(show_only_shared=True)
211
        self.assertTrue(self.c1 in [c['name'] for c in r.json])
212

    
213
        r = self.client.account_get(until=1342609206)
214
        self.assertTrue(len(r.json) <= fullLen)
215

    
216
        """Check if(un)modified_since"""
217
        for format in self.client.DATE_FORMATS:
218
            now_formated = self.now_unformated.strftime(format)
219
            r1 = self.client.account_get(
220
                if_modified_since=now_formated,
221
                success=(200, 304, 412))
222
            sc1 = r1.status_code
223
            r2 = self.client.account_get(
224
                if_unmodified_since=now_formated,
225
                success=(200, 304, 412))
226
            sc2 = r2.status_code
227
            self.assertNotEqual(sc1, sc2)
228

    
229
        """Check sharing_accounts"""
230
        r = self.client.get_sharing_accounts()
231
        self.assertTrue(len(r) > 0)
232

    
233
    def test_account_post(self):
234
        """Test account_POST"""
235
        self._test_0030_account_post()
236

    
237
    def _test_0030_account_post(self):
238
        r = self.client.account_post()
239
        self.assertEqual(r.status_code, 202)
240
        grpName = 'grp' + unicode(self.now)
241

    
242
        """Method set/del_account_meta and set_account_groupcall use
243
            account_post internally
244
        """
245
        u1 = self.client.account
246
        #  Invalid display name
247
        u2 = '1nc0r3c7-d15p14y-n4m3'
248
        #  valid display name
249
        u3 = '6488c1b2-cb06-40a8-a02a-d474b8d29c59'
250
        self.assertRaises(
251
            ClientError,
252
            self.client.set_account_group,
253
            grpName, [u1, u2])
254
        self.client.set_account_group(grpName, [u1])
255
        r = self.client.get_account_group()
256
        self.assertEqual(r['x-account-group-' + grpName], '%s' % u1)
257
        try:
258
            self.client.set_account_group(grpName, [u1, u3])
259
            r = self.client.get_account_group()
260
            self.assertEqual(
261
                r['x-account-group-' + grpName],
262
                '%s,%s' % (u1, u3))
263
        except:
264
            print('\tInvalid user id %s (it is ok, though)' % u3)
265
        self.client.del_account_group(grpName)
266
        r = self.client.get_account_group()
267
        self.assertTrue('x-account-group-' + grpName not in r)
268

    
269
        mprefix = 'meta' + unicode(self.now)
270
        self.client.set_account_meta({
271
            mprefix + '1': 'v1',
272
            mprefix + '2': 'v2'})
273
        r = self.client.get_account_meta()
274
        self.assertEqual(r['x-account-meta-' + mprefix + '1'], 'v1')
275
        self.assertEqual(r['x-account-meta-' + mprefix + '2'], 'v2')
276

    
277
        self.client.del_account_meta(mprefix + '1')
278
        r = self.client.get_account_meta()
279
        self.assertTrue('x-account-meta-' + mprefix + '1' not in r)
280

    
281
        self.client.del_account_meta(mprefix + '2')
282
        r = self.client.get_account_meta()
283
        self.assertTrue('x-account-meta-' + mprefix + '2' not in r)
284

    
285
        """Missing testing for quota, versioning, because normally
286
        you don't have permissions to modify those at account level
287
        """
288

    
289
        #newquota = 1000000
290
        #self.client.set_account_quota(newquota)
291
        #r = self.client.get_account_info()
292
        #print(unicode(r))
293
        #r = self.client.get_account_quota()
294
        #self.assertEqual(r['x-account-policy-quota'], newquota)
295
        self.client.set_account_versioning('auto')
296

    
297
    def test_container_head(self):
298
        """Test container_HEAD"""
299
        self._test_0040_container_head()
300

    
301
    def _test_0040_container_head(self):
302
        self.client.container = self.c1
303

    
304
        r = self.client.container_head()
305
        self.assertEqual(r.status_code, 204)
306

    
307
        """Check until"""
308
        r = self.client.container_head(until=1000000, success=(204, 404))
309
        self.assertEqual(r.status_code, 404)
310

    
311
        """Check and if(un)modified_since"""
312
        for format in self.client.DATE_FORMATS:
313
            now_formated = self.now_unformated.strftime(format)
314
            r1 = self.client.container_head(
315
                if_modified_since=now_formated,
316
                success=(204, 304, 412))
317
            sc1 = r1.status_code
318
            r2 = self.client.container_head(
319
                if_unmodified_since=now_formated,
320
                success=(204, 304, 412))
321
            sc2 = r2.status_code
322
            self.assertNotEqual(sc1, sc2)
323

    
324
        """Check container object meta"""
325
        r = self.client.get_container_object_meta()
326
        self.assertEqual(r['x-container-object-meta'], 'Incontainer')
327

    
328
    def test_container_get(self):
329
        """Test container_GET"""
330
        self._test_0050_container_get()
331

    
332
    def _test_0050_container_get(self):
333
        self.client.container = self.c1
334

    
335
        r = self.client.container_get()
336
        self.assertEqual(r.status_code, 200)
337
        fullLen = len(r.json)
338

    
339
        r = self.client.container_get(prefix='test')
340
        lalobjects = [obj for obj in r.json if obj['name'].startswith('test')]
341
        self.assertTrue(len(r.json) > 1)
342
        self.assertEqual(len(r.json), len(lalobjects))
343

    
344
        r = self.client.container_get(limit=1)
345
        self.assertEqual(len(r.json), 1)
346

    
347
        r = self.client.container_get(marker='another')
348
        self.assertTrue(len(r.json) > 1)
349
        neobjects = [obj for obj in r.json if obj['name'] > 'another']
350
        self.assertEqual(len(r.json), len(neobjects))
351

    
352
        r = self.client.container_get(prefix='another.test', delimiter='.')
353
        self.assertTrue(fullLen > len(r.json))
354

    
355
        r = self.client.container_get(path='/')
356
        self.assertEqual(fullLen, len(r.json))
357

    
358
        r = self.client.container_get(format='xml')
359
        self.assertEqual(r.text.split()[4], 'name="' + self.c1 + '">')
360

    
361
        r = self.client.container_get(meta=['incontainer'])
362
        self.assertTrue(len(r.json) > 0)
363

    
364
        r = self.client.container_get(show_only_shared=True)
365
        self.assertTrue(len(r.json) < fullLen)
366

    
367
        try:
368
            r = self.client.container_get(until=1000000000)
369
            datestring = unicode(r.headers['x-account-until-timestamp'])
370
            self.assertEqual(u'Sun, 09 Sep 2001 01:46:40 GMT', datestring)
371

    
372
        except ClientError:
373

    
374
            pass
375

    
376
        """Check and if un/modified_since"""
377
        for format in self.client.DATE_FORMATS:
378
            now_formated = self.now_unformated.strftime(format)
379
            r1 = self.client.container_get(
380
                if_modified_since=now_formated,
381
                success=(200, 304, 412))
382
            sc1 = r1.status_code
383
            r2 = self.client.container_get(
384
                if_unmodified_since=now_formated,
385
                success=(200, 304, 412))
386
            sc2 = r2.status_code
387
            self.assertNotEqual(sc1, sc2)
388

    
389
    def test_container_put(self):
390
        """Test container_PUT"""
391
        self._test_0050_container_put()
392

    
393
    def _test_0050_container_put(self):
394
        self.client.container = self.c2
395

    
396
        r = self.client.container_put()
397
        self.assertEqual(r.status_code, 202)
398

    
399
        r = self.client.get_container_limit(self.client.container)
400
        cquota = r.values()[0]
401
        newquota = 2 * int(cquota)
402

    
403
        r = self.client.container_put(quota=newquota)
404
        self.assertEqual(r.status_code, 202)
405

    
406
        r = self.client.get_container_limit(self.client.container)
407
        xquota = int(r.values()[0])
408
        self.assertEqual(newquota, xquota)
409

    
410
        r = self.client.container_put(versioning='auto')
411
        self.assertEqual(r.status_code, 202)
412

    
413
        r = self.client.get_container_versioning(self.client.container)
414
        nvers = r.values()[0]
415
        self.assertEqual('auto', nvers)
416

    
417
        r = self.client.container_put(versioning='none')
418
        self.assertEqual(r.status_code, 202)
419

    
420
        r = self.client.get_container_versioning(self.client.container)
421
        nvers = r.values()[0]
422
        self.assertEqual('none', nvers)
423

    
424
        r = self.client.container_put(metadata={'m1': 'v1', 'm2': 'v2'})
425
        self.assertEqual(r.status_code, 202)
426

    
427
        r = self.client.get_container_meta(self.client.container)
428
        self.assertTrue('x-container-meta-m1' in r)
429
        self.assertEqual(r['x-container-meta-m1'], 'v1')
430
        self.assertTrue('x-container-meta-m2' in r)
431
        self.assertEqual(r['x-container-meta-m2'], 'v2')
432

    
433
        r = self.client.container_put(metadata={'m1': '', 'm2': 'v2a'})
434
        self.assertEqual(r.status_code, 202)
435

    
436
        r = self.client.get_container_meta(self.client.container)
437
        self.assertTrue('x-container-meta-m1' not in r)
438
        self.assertTrue('x-container-meta-m2' in r)
439
        self.assertEqual(r['x-container-meta-m2'], 'v2a')
440

    
441
        self.client.del_container_meta(self.client.container)
442

    
443
    def test_container_post(self):
444
        """Test container_POST"""
445
        self._test_0060_container_post()
446

    
447
    def _test_0060_container_post(self):
448
        self.client.container = self.c2
449

    
450
        """Simple post"""
451
        r = self.client.container_post()
452
        self.assertEqual(r.status_code, 202)
453

    
454
        """post meta"""
455
        self.client.set_container_meta({'m1': 'v1', 'm2': 'v2'})
456
        r = self.client.get_container_meta(self.client.container)
457
        self.assertTrue('x-container-meta-m1' in r)
458
        self.assertEqual(r['x-container-meta-m1'], 'v1')
459
        self.assertTrue('x-container-meta-m2' in r)
460
        self.assertEqual(r['x-container-meta-m2'], 'v2')
461

    
462
        """post/2del meta"""
463
        r = self.client.del_container_meta('m1')
464
        r = self.client.set_container_meta({'m2': 'v2a'})
465
        r = self.client.get_container_meta(self.client.container)
466
        self.assertTrue('x-container-meta-m1' not in r)
467
        self.assertTrue('x-container-meta-m2' in r)
468
        self.assertEqual(r['x-container-meta-m2'], 'v2a')
469

    
470
        """check quota"""
471
        r = self.client.get_container_limit(self.client.container)
472
        cquota = r.values()[0]
473
        newquota = 2 * int(cquota)
474
        r = self.client.set_container_limit(newquota)
475
        r = self.client.get_container_limit(self.client.container)
476
        xquota = int(r.values()[0])
477
        self.assertEqual(newquota, xquota)
478
        r = self.client.set_container_limit(cquota)
479
        r = self.client.get_container_limit(self.client.container)
480
        xquota = r.values()[0]
481
        self.assertEqual(cquota, xquota)
482

    
483
        """Check versioning"""
484
        self.client.set_container_versioning('auto')
485
        r = self.client.get_container_versioning(self.client.container)
486
        nvers = r.values()[0]
487
        self.assertEqual('auto', nvers)
488
        self.client.set_container_versioning('none')
489
        r = self.client.get_container_versioning(self.client.container)
490
        nvers = r.values()[0]
491
        self.assertEqual('none', nvers)
492

    
493
        """put_block uses content_type and content_length to
494
        post blocks of data 2 container. All that in upload_object"""
495
        """Change a file at fs"""
496
        f = self.create_large_file(1024 * 1024 * 100)
497
        """Upload it at a directory in container"""
498
        self.client.create_directory('dir')
499
        r = self.client.upload_object('/dir/sample.file', f)
500
        for term in ('content-length', 'content-type', 'x-object-version'):
501
            self.assertTrue(term in r)
502
        """Check if file has been uploaded"""
503
        r = self.client.get_object_info('/dir/sample.file')
504
        self.assertTrue(int(r['content-length']) > 100000000)
505

    
506
        """What is tranfer_encoding? What should I check about it? """
507
        #TODO
508

    
509
        """Check update=False"""
510
        r = self.client.object_post(
511
            'test',
512
            update=False,
513
            metadata={'newmeta': 'newval'})
514

    
515
        r = self.client.get_object_info('test')
516
        self.assertTrue('x-object-meta-newmeta' in r)
517
        self.assertFalse('x-object-meta-incontainer' in r)
518

    
519
        r = self.client.del_container_meta('m2')
520

    
521
    def test_container_delete(self):
522
        """Test container_DELETE"""
523
        self._test_0070_container_delete()
524

    
525
    def _test_0070_container_delete(self):
526

    
527
        """Fail to delete a non-empty container"""
528
        self.client.container = self.c2
529
        r = self.client.container_delete(success=409)
530
        self.assertEqual(r.status_code, 409)
531

    
532
        """Fail to delete c3 (empty) container"""
533
        self.client.container = self.c3
534
        r = self.client.container_delete(until='1000000000')
535
        self.assertEqual(r.status_code, 204)
536

    
537
        """Delete c3 (empty) container"""
538
        r = self.client.container_delete()
539
        self.assertEqual(r.status_code, 204)
540

    
541
        """Purge container(empty a container), check versionlist"""
542
        self.client.container = self.c1
543
        r = self.client.object_head('test', success=(200, 404))
544
        self.assertEqual(r.status_code, 200)
545
        self.client.del_container(delimiter='/')
546
        r = self.client.object_head('test', success=(200, 404))
547
        self.assertEqual(r.status_code, 404)
548
        r = self.client.get_object_versionlist('test')
549
        self.assertTrue(len(r) > 0)
550
        self.assertTrue(len(r[0]) > 1)
551
        self.client.purge_container()
552
        self.assertRaises(
553
            ClientError,
554
            self.client.get_object_versionlist,
555
            'test')
556

    
557
    def _test_0080_recreate_deleted_data(self):
558
        self._init_data()
559

    
560
    def test_object_head(self):
561
        """Test object_HEAD"""
562
        self._test_0090_object_head()
563

    
564
    def _test_0090_object_head(self):
565
        self.client.container = self.c2
566
        obj = 'test'
567

    
568
        r = self.client.object_head(obj)
569
        self.assertEqual(r.status_code, 200)
570
        etag = r.headers['etag']
571

    
572
        r = self.client.object_head(obj, version=40)
573
        self.assertEqual(r.headers['x-object-version'], '40')
574

    
575
        r = self.client.object_head(obj, if_etag_match=etag)
576
        self.assertEqual(r.status_code, 200)
577

    
578
        r = self.client.object_head(
579
            obj,
580
            if_etag_not_match=etag,
581
            success=(200, 412, 304))
582
        self.assertNotEqual(r.status_code, 200)
583

    
584
        r = self.client.object_head(
585
            obj,
586
            version=40,
587
            if_etag_match=etag,
588
            success=412)
589
        self.assertEqual(r.status_code, 412)
590

    
591
        """Check and if(un)modified_since"""
592
        for format in self.client.DATE_FORMATS:
593
            now_formated = self.now_unformated.strftime(format)
594
            r1 = self.client.object_head(
595
                obj,
596
                if_modified_since=now_formated,
597
                success=(200, 304, 412))
598
            sc1 = r1.status_code
599
            r2 = self.client.object_head(
600
                obj,
601
                if_unmodified_since=now_formated,
602
                success=(200, 304, 412))
603
            sc2 = r2.status_code
604
            self.assertNotEqual(sc1, sc2)
605

    
606
    def test_object_get(self):
607
        """Test object_GET"""
608
        self._test_0100_object_get()
609

    
610
    def _test_0100_object_get(self):
611
        self.client.container = self.c1
612
        obj = 'test'
613

    
614
        r = self.client.object_get(obj)
615
        self.assertEqual(r.status_code, 200)
616

    
617
        osize = int(r.headers['content-length'])
618
        etag = r.headers['etag']
619

    
620
        r = self.client.object_get(obj, hashmap=True)
621
        for term in ('hashes', 'block_hash', 'block_hash', 'bytes'):
622
            self.assertTrue(term in r.json)
623

    
624
        r = self.client.object_get(obj, format='xml', hashmap=True)
625
        self.assertEqual(len(r.text.split('hash>')), 3)
626

    
627
        rangestr = 'bytes=%s-%s' % (osize / 3, osize / 2)
628
        r = self.client.object_get(
629
            obj,
630
            data_range=rangestr,
631
            success=(200, 206))
632
        partsize = int(r.headers['content-length'])
633
        self.assertTrue(0 < partsize and partsize <= 1 + osize / 3)
634

    
635
        rangestr = 'bytes=%s-%s' % (osize / 3, osize / 2)
636
        r = self.client.object_get(
637
            obj,
638
            data_range=rangestr,
639
            if_range=True,
640
            success=(200, 206))
641
        partsize = int(r.headers['content-length'])
642
        self.assertTrue(0 < partsize and partsize <= 1 + osize / 3)
643

    
644
        r = self.client.object_get(obj, if_etag_match=etag)
645
        self.assertEqual(r.status_code, 200)
646

    
647
        r = self.client.object_get(obj, if_etag_not_match=etag + 'LALALA')
648
        self.assertEqual(r.status_code, 200)
649

    
650
        """Check and if(un)modified_since"""
651
        for format in self.client.DATE_FORMATS:
652
            now_formated = self.now_unformated.strftime(format)
653
            r1 = self.client.object_get(
654
                obj,
655
                if_modified_since=now_formated,
656
                success=(200, 304, 412))
657
            sc1 = r1.status_code
658
            r2 = self.client.object_get(
659
                obj,
660
                if_unmodified_since=now_formated,
661
                success=(200, 304, 412))
662
            sc2 = r2.status_code
663
            self.assertNotEqual(sc1, sc2)
664

    
665
        """Upload an object to download"""
666
        container_info_cache = dict()
667
        trg_fname = 'remotefile_%s' % self.now
668
        f_size = 59247824
669
        src_f = self.create_large_file(f_size)
670
        print('\tUploading...')
671
        r = self.client.upload_object(
672
            trg_fname, src_f,
673
            container_info_cache=container_info_cache)
674
        print('\tDownloading...')
675
        self.files.append(NamedTemporaryFile())
676
        dnl_f = self.files[-1]
677
        self.client.download_object(trg_fname, dnl_f)
678

    
679
        print('\tCheck if files match...')
680
        for pos in (0, f_size / 2, f_size - 128):
681
            src_f.seek(pos)
682
            dnl_f.seek(pos)
683
            self.assertEqual(src_f.read(64), dnl_f.read(64))
684

    
685
        print('\tDownload KiBs to string and check again...')
686
        for pos in (0, f_size / 2, f_size - 256):
687
            src_f.seek(pos)
688
            tmp_s = self.client.download_to_string(
689
                trg_fname,
690
                range_str='%s-%s' % (pos, (pos + 128)))
691
            self.assertEqual(tmp_s, src_f.read(len(tmp_s)))
692

    
693
        """Upload a boring file"""
694
        trg_fname = 'boringfile_%s' % self.now
695
        src_f = self.create_boring_file(42)
696
        print('\tUploading boring file...')
697
        self.client.upload_object(
698
            trg_fname, src_f,
699
            container_info_cache=container_info_cache)
700
        print('\tDownloading boring file...')
701
        self.files.append(NamedTemporaryFile())
702
        dnl_f = self.files[-1]
703
        self.client.download_object(trg_fname, dnl_f)
704

    
705
        print('\tCheck if files match...')
706
        for i in range(42):
707
            self.assertEqual(sample_block(src_f, i), sample_block(dnl_f, i))
708

    
709
    def test_object_put(self):
710
        """Test object_PUT"""
711
        self._test_object_put()
712

    
713
    def _test_object_put(self):
714
        self.client.container = self.c2
715
        obj = 'another.test'
716

    
717
        self.client.create_object(obj + '.FAKE')
718
        r = self.client.get_object_info(obj + '.FAKE')
719
        self.assertEqual(r['content-type'], 'application/octet-stream')
720

    
721
        """create the object"""
722
        r = self.client.object_put(
723
            obj,
724
            data='a',
725
            content_type='application/octer-stream',
726
            permissions=dict(
727
                read=['accX:groupA', 'u1', 'u2'],
728
                write=['u2', 'u3']),
729
            metadata=dict(key1='val1', key2='val2'),
730
            content_encoding='UTF-8',
731
            content_disposition='attachment; filename="fname.ext"')
732
        self.assertEqual(r.status_code, 201)
733
        etag = r.headers['etag']
734

    
735
        """Check content-disposition"""
736
        r = self.client.get_object_info(obj)
737
        self.assertTrue('content-disposition' in r)
738

    
739
        """Check permissions"""
740
        r = self.client.get_object_sharing(obj)
741
        self.assertTrue('accx:groupa' in r['read'])
742
        self.assertTrue('u1' in r['read'])
743
        self.assertTrue('u2' in r['write'])
744
        self.assertTrue('u3' in r['write'])
745

    
746
        """Check metadata"""
747
        r = self.client.get_object_meta(obj)
748
        self.assertEqual(r['x-object-meta-key1'], 'val1')
749
        self.assertEqual(r['x-object-meta-key2'], 'val2')
750

    
751
        """Check public and if_etag_match"""
752
        r = self.client.object_put(
753
            obj,
754
            if_etag_match=etag,
755
            data='b',
756
            content_type='application/octet-stream',
757
            public=True)
758

    
759
        r = self.client.object_get(obj)
760
        self.assertTrue('x-object-public' in r.headers)
761
        vers2 = int(r.headers['x-object-version'])
762
        etag = r.headers['etag']
763
        self.assertEqual(r.text, 'b')
764

    
765
        """Check if_etag_not_match"""
766
        r = self.client.object_put(
767
            obj,
768
            if_etag_not_match=etag,
769
            data='c',
770
            content_type='application/octet-stream',
771
            success=(201, 412))
772
        self.assertEqual(r.status_code, 412)
773

    
774
        """Check content_type and content_length"""
775
        tmpdir = 'dir' + unicode(self.now)
776
        r = self.client.object_put(
777
            tmpdir,
778
            content_type='application/directory',
779
            content_length=0)
780

    
781
        r = self.client.get_object_info(tmpdir)
782
        self.assertEqual(r['content-type'], 'application/directory')
783

    
784
        """Check copy_from, content_encoding"""
785
        r = self.client.object_put(
786
            '%s/%s' % (tmpdir, obj),
787
            format=None,
788
            copy_from='/%s/%s' % (self.client.container, obj),
789
            content_encoding='application/octet-stream',
790
            source_account=self.client.account,
791
            content_length=0,
792
            success=201)
793
        self.assertEqual(r.status_code, 201)
794

    
795
        """Test copy_object for cross-conctainer copy"""
796
        self.client.copy_object(
797
            src_container=self.c2,
798
            src_object='%s/%s' % (tmpdir, obj),
799
            dst_container=self.c1,
800
            dst_object=obj)
801
        self.client.container = self.c1
802
        r1 = self.client.get_object_info(obj)
803
        self.client.container = self.c2
804
        r2 = self.client.get_object_info('%s/%s' % (tmpdir, obj))
805
        self.assertEqual(r1['x-object-hash'], r2['x-object-hash'])
806

    
807
        """Check cross-container copy_from, content_encoding"""
808
        self.client.container = self.c1
809
        fromstr = '/%s/%s/%s' % (self.c2, tmpdir, obj)
810
        r = self.client.object_put(
811
            obj,
812
            format=None,
813
            copy_from=fromstr,
814
            content_encoding='application/octet-stream',
815
            source_account=self.client.account,
816
            content_length=0,
817
            success=201)
818

    
819
        self.assertEqual(r.status_code, 201)
820
        r = self.client.get_object_info(obj)
821
        self.assertEqual(r['etag'], etag)
822

    
823
        """Check source_account"""
824
        self.client.container = self.c2
825
        fromstr = '/%s/%s' % (self.c1, obj)
826
        r = self.client.object_put(
827
            '%sv2' % obj,
828
            format=None,
829
            move_from=fromstr,
830
            content_encoding='application/octet-stream',
831
            source_account='nonExistendAddress@NeverLand.com',
832
            content_length=0,
833
            success=(201, 403))
834
        self.assertEqual(r.status_code, 403)
835

    
836
        """Check cross-container move_from"""
837
        self.client.container = self.c1
838
        r1 = self.client.get_object_info(obj)
839
        self.client.container = self.c2
840
        self.client.move_object(
841
            src_container=self.c1,
842
            src_object=obj,
843
            dst_container=self.c2,
844
            dst_object=obj + 'v0')
845
        r0 = self.client.get_object_info(obj + 'v0')
846
        self.assertEqual(r1['x-object-hash'], r0['x-object-hash'])
847

    
848
        """Check move_from"""
849
        r = self.client.object_put(
850
            '%sv1' % obj,
851
            format=None,
852
            move_from='/%s/%s' % (self.c2, obj),
853
            source_version=vers2,
854
            content_encoding='application/octet-stream',
855
            content_length=0, success=201)
856

    
857
        """Check manifest"""
858
        mobj = 'manifest.test'
859
        txt = ''
860
        for i in range(10):
861
            txt += '%s' % i
862
            r = self.client.object_put(
863
                '%s/%s' % (mobj, i),
864
                data='%s' % i,
865
                content_length=1,
866
                success=201,
867
                content_type='application/octet-stream',
868
                content_encoding='application/octet-stream')
869

    
870
        r = self.client.object_put(
871
            mobj,
872
            content_length=0,
873
            content_type='application/octet-stream',
874
            manifest='%s/%s' % (self.client.container, mobj))
875

    
876
        r = self.client.object_get(mobj)
877
        self.assertEqual(r.text, txt)
878

    
879
        """Upload a local file with one request"""
880
        newf = self.create_large_file(1024 * 10)
881
        self.client.upload_object('sample.file', newf)
882
        """Check if file has been uploaded"""
883
        r = self.client.get_object_info('sample.file')
884
        self.assertEqual(int(r['content-length']), 10240)
885

    
886
        """Some problems with transfer-encoding?"""
887

    
888
    def test_object_copy(self):
889
        """Test object_COPY"""
890
        self._test_0110_object_copy()
891

    
892
    def _test_0110_object_copy(self):
893
        #  TODO: check with source_account option
894
        self.client.container = self.c2
895
        obj = 'test2'
896

    
897
        data = '{"key1":"val1", "key2":"val2"}'
898
        r = self.client.object_put(
899
            '%sorig' % obj,
900
            content_type='application/octet-stream',
901
            data=data,
902
            metadata=dict(mkey1='mval1', mkey2='mval2'),
903
            permissions=dict(
904
                read=['accX:groupA', 'u1', 'u2'],
905
                write=['u2', 'u3']),
906
            content_disposition='attachment; filename="fname.ext"')
907

    
908
        r = self.client.object_copy(
909
            '%sorig' % obj,
910
            destination='/%s/%s' % (self.client.container, obj),
911
            ignore_content_type=False, content_type='application/json',
912
            metadata={'mkey2': 'mval2a', 'mkey3': 'mval3'},
913
            permissions={'write': ['u5', 'accX:groupB']})
914
        self.assertEqual(r.status_code, 201)
915

    
916
        """Check content-disposition"""
917
        r = self.client.get_object_info(obj)
918
        self.assertTrue('content-disposition' in r)
919

    
920
        """Check Metadata"""
921
        r = self.client.get_object_meta(obj)
922
        self.assertEqual(r['x-object-meta-mkey1'], 'mval1')
923
        self.assertEqual(r['x-object-meta-mkey2'], 'mval2a')
924
        self.assertEqual(r['x-object-meta-mkey3'], 'mval3')
925

    
926
        """Check permissions"""
927
        r = self.client.get_object_sharing(obj)
928
        self.assertFalse('read' in r or 'u2' in r['write'])
929
        self.assertTrue('accx:groupb' in r['write'])
930

    
931
        """Check destination account"""
932
        r = self.client.object_copy(
933
            obj,
934
            destination='/%s/%s' % (self.c1, obj),
935
            content_encoding='utf8',
936
            content_type='application/json',
937
            destination_account='nonExistendAddress@NeverLand.com',
938
            success=(201, 403))
939
        self.assertEqual(r.status_code, 403)
940

    
941
        """Check destination being another container
942
        and also content_type and content encoding"""
943
        r = self.client.object_copy(
944
            obj,
945
            destination='/%s/%s' % (self.c1, obj),
946
            content_encoding='utf8',
947
            content_type='application/json')
948
        self.assertEqual(r.status_code, 201)
949
        self.assertEqual(
950
            r.headers['content-type'],
951
            'application/json; charset=UTF-8')
952

    
953
        """Check ignore_content_type and content_type"""
954
        r = self.client.object_get(obj)
955
        etag = r.headers['etag']
956
        ctype = r.headers['content-type']
957
        self.assertEqual(ctype, 'application/json')
958

    
959
        r = self.client.object_copy(
960
            '%sorig' % obj,
961
            destination='/%s/%s0' % (self.client.container, obj),
962
            ignore_content_type=True,
963
            content_type='application/json')
964
        self.assertEqual(r.status_code, 201)
965
        self.assertNotEqual(r.headers['content-type'], 'application/json')
966

    
967
        """Check if_etag_(not_)match"""
968
        r = self.client.object_copy(
969
            obj,
970
            destination='/%s/%s1' % (self.client.container, obj),
971
            if_etag_match=etag)
972
        self.assertEqual(r.status_code, 201)
973

    
974
        r = self.client.object_copy(
975
            obj,
976
            destination='/%s/%s2' % (self.client.container, obj),
977
            if_etag_not_match='lalala')
978
        self.assertEqual(r.status_code, 201)
979
        vers2 = r.headers['x-object-version']
980

    
981
        """Check source_version, public and format """
982
        r = self.client.object_copy(
983
            '%s2' % obj,
984
            destination='/%s/%s3' % (self.client.container, obj),
985
            source_version=vers2,
986
            format='xml',
987
            public=True)
988
        self.assertEqual(r.status_code, 201)
989
        self.assertTrue(r.headers['content-type'].index('xml') > 0)
990

    
991
        r = self.client.get_object_info(obj + '3')
992
        self.assertTrue('x-object-public' in r)
993

    
994
    def test_object_move(self):
995
        """Test object_MOVE"""
996
        self._test_0120_object_move()
997

    
998
    def _test_0120_object_move(self):
999
        self.client.container = self.c2
1000
        obj = 'test2'
1001

    
1002
        data = '{"key1": "val1", "key2": "val2"}'
1003
        r = self.client.object_put(
1004
            '%sorig' % obj,
1005
            content_type='application/octet-stream',
1006
            data=data,
1007
            metadata=dict(mkey1='mval1', mkey2='mval2'),
1008
            permissions=dict(
1009
                read=['accX:groupA', 'u1', 'u2'],
1010
                write=['u2', 'u3']))
1011

    
1012
        r = self.client.object_move(
1013
            '%sorig' % obj,
1014
            destination='/%s/%s' % (self.client.container, obj),
1015
            ignore_content_type=False,
1016
            content_type='application/json',
1017
            metadata=dict(mkey2='mval2a', mkey3='mval3'),
1018
            permissions=dict(write=['u5', 'accX:groupB']))
1019
        self.assertEqual(r.status_code, 201)
1020

    
1021
        """Check Metadata"""
1022
        r = self.client.get_object_meta(obj)
1023
        self.assertEqual(r['x-object-meta-mkey1'], 'mval1')
1024
        self.assertEqual(r['x-object-meta-mkey2'], 'mval2a')
1025
        self.assertEqual(r['x-object-meta-mkey3'], 'mval3')
1026

    
1027
        """Check permissions"""
1028
        r = self.client.get_object_sharing(obj)
1029
        self.assertFalse('read' in r)
1030
        self.assertTrue('u5' in r['write'])
1031
        self.assertTrue('accx:groupb' in r['write'])
1032

    
1033
        """Check destination account"""
1034
        r = self.client.object_move(
1035
            obj,
1036
            destination='/%s/%s' % (self.c1, obj),
1037
            content_encoding='utf8',
1038
            content_type='application/json',
1039
            destination_account='nonExistendAddress@NeverLand.com',
1040
            success=(201, 403))
1041
        self.assertEqual(r.status_code, 403)
1042

    
1043
        """Check destination being another container and also
1044
        content_type, content_disposition and content encoding"""
1045
        r = self.client.object_move(
1046
            obj,
1047
            destination='/%s/%s' % (self.c1, obj),
1048
            content_encoding='utf8',
1049
            content_type='application/json',
1050
            content_disposition='attachment; filename="fname.ext"')
1051
        self.assertEqual(r.status_code, 201)
1052
        self.assertEqual(
1053
            r.headers['content-type'],
1054
            'application/json; charset=UTF-8')
1055
        self.client.container = self.c1
1056
        r = self.client.get_object_info(obj)
1057
        self.assertTrue('content-disposition' in r)
1058
        self.assertTrue('fname.ext' in r['content-disposition'])
1059
        etag = r['etag']
1060
        ctype = r['content-type']
1061
        self.assertEqual(ctype, 'application/json')
1062

    
1063
        """Check ignore_content_type and content_type"""
1064
        r = self.client.object_move(
1065
            obj,
1066
            destination='/%s/%s' % (self.c2, obj),
1067
            ignore_content_type=True,
1068
            content_type='application/json')
1069
        self.assertEqual(r.status_code, 201)
1070
        self.assertNotEqual(r.headers['content-type'], 'application/json')
1071

    
1072
        """Check if_etag_(not_)match"""
1073
        self.client.container = self.c2
1074
        r = self.client.object_move(
1075
            obj,
1076
            destination='/%s/%s0' % (self.client.container, obj),
1077
            if_etag_match=etag)
1078
        self.assertEqual(r.status_code, 201)
1079

    
1080
        r = self.client.object_move(
1081
            '%s0' % obj,
1082
            destination='/%s/%s1' % (self.client.container, obj),
1083
            if_etag_not_match='lalala')
1084
        self.assertEqual(r.status_code, 201)
1085

    
1086
        """Check public and format """
1087
        r = self.client.object_move(
1088
            '%s1' % obj,
1089
            destination='/%s/%s2' % (self.client.container, obj),
1090
            format='xml',
1091
            public=True)
1092
        self.assertEqual(r.status_code, 201)
1093
        self.assertTrue(r.headers['content-type'].index('xml') > 0)
1094

    
1095
        r = self.client.get_object_info(obj + '2')
1096
        self.assertTrue('x-object-public' in r)
1097

    
1098
    def test_object_post(self):
1099
        """Test object_POST"""
1100
        self._test_0130_object_post()
1101

    
1102
    def _test_0130_object_post(self):
1103
        self.client.container = self.c2
1104
        obj = 'test2'
1105
        """create a filesystem file"""
1106
        self.files.append(NamedTemporaryFile())
1107
        newf = self.files[-1]
1108
        newf.writelines([
1109
            'ello!\n',
1110
            'This is a test line\n',
1111
            'inside a test file\n'])
1112
        """create a file on container"""
1113
        r = self.client.object_put(
1114
            obj,
1115
            content_type='application/octet-stream',
1116
            data='H',
1117
            metadata=dict(mkey1='mval1', mkey2='mval2'),
1118
            permissions=dict(
1119
                read=['accX:groupA', 'u1', 'u2'],
1120
                write=['u2', 'u3']))
1121

    
1122
        """Append livetest update, content_[range|type|length]"""
1123
        newf.seek(0)
1124
        self.client.append_object(obj, newf)
1125
        r = self.client.object_get(obj)
1126
        self.assertTrue(r.text.startswith('Hello!'))
1127

    
1128
        """Overwrite livetest update,
1129
            content_type, content_length, content_range
1130
        """
1131
        newf.seek(0)
1132
        r = self.client.overwrite_object(obj, 0, 10, newf)
1133
        r = self.client.object_get(obj)
1134
        self.assertTrue(r.text.startswith('ello!'))
1135

    
1136
        """Truncate livetest update,
1137
            content_range, content_type, object_bytes and source_object"""
1138
        r = self.client.truncate_object(obj, 5)
1139
        r = self.client.object_get(obj)
1140
        self.assertEqual(r.text, 'ello!')
1141

    
1142
        """Check metadata"""
1143
        self.client.set_object_meta(obj, {'mkey2': 'mval2a', 'mkey3': 'mval3'})
1144
        r = self.client.get_object_meta(obj)
1145
        self.assertEqual(r['x-object-meta-mkey1'], 'mval1')
1146
        self.assertEqual(r['x-object-meta-mkey2'], 'mval2a')
1147
        self.assertEqual(r['x-object-meta-mkey3'], 'mval3')
1148
        self.client.del_object_meta(obj, 'mkey1')
1149
        r = self.client.get_object_meta(obj)
1150
        self.assertFalse('x-object-meta-mkey1' in r)
1151

    
1152
        """Check permissions"""
1153
        self.client.set_object_sharing(
1154
            obj,
1155
            read_permition=['u4', 'u5'],
1156
            write_permition=['u4'])
1157
        r = self.client.get_object_sharing(obj)
1158
        self.assertTrue('read' in r)
1159
        self.assertTrue('u5' in r['read'])
1160
        self.assertTrue('write' in r)
1161
        self.assertTrue('u4' in r['write'])
1162
        self.client.del_object_sharing(obj)
1163
        r = self.client.get_object_sharing(obj)
1164
        self.assertTrue(len(r) == 0)
1165

    
1166
        """Check publish"""
1167
        self.client.publish_object(obj)
1168
        r = self.client.get_object_info(obj)
1169
        self.assertTrue('x-object-public' in r)
1170
        self.client.unpublish_object(obj)
1171
        r = self.client.get_object_info(obj)
1172
        self.assertFalse('x-object-public' in r)
1173

    
1174
        """Check if_etag_(not)match"""
1175
        etag = r['etag']
1176
        """
1177
        r = self.client.object_post(
1178
            obj,
1179
            update=True,
1180
            public=True,
1181
            if_etag_not_match=etag,
1182
            success=(412, 202, 204))
1183
        self.assertEqual(r.status_code, 412)
1184
        """
1185

    
1186
        r = self.client.object_post(
1187
            obj,
1188
            update=True,
1189
            public=True,
1190
            if_etag_match=etag,
1191
            content_encoding='application/json')
1192

    
1193
        r = self.client.get_object_info(obj)
1194
        helloVersion = r['x-object-version']
1195
        self.assertTrue('x-object-public' in r)
1196
        self.assertEqual(r['content-encoding'], 'application/json')
1197

    
1198
        """Check source_version and source_account and content_disposition"""
1199
        r = self.client.object_post(
1200
            obj,
1201
            update=True,
1202
            content_type='application/octet-srteam',
1203
            content_length=5,
1204
            content_range='bytes 1-5/*',
1205
            source_object='/%s/%s' % (self.c2, obj),
1206
            source_account='thisAccountWillNeverExist@adminland.com',
1207
            source_version=helloVersion,
1208
            data='12345',
1209
            success=(403, 202, 204))
1210
        self.assertEqual(r.status_code, 403)
1211

    
1212
        r = self.client.object_post(
1213
            obj,
1214
            update=True,
1215
            content_type='application/octet-srteam',
1216
            content_length=5,
1217
            content_range='bytes 1-5/*',
1218
            source_object='/%s/%s' % (self.c2, obj),
1219
            source_account=self.client.account,
1220
            source_version=helloVersion,
1221
            data='12345',
1222
            content_disposition='attachment; filename="fname.ext"')
1223

    
1224
        r = self.client.object_get(obj)
1225
        self.assertEqual(r.text, 'eello!')
1226
        self.assertTrue('content-disposition' in r.headers)
1227
        self.assertTrue('fname.ext' in r.headers['content-disposition'])
1228

    
1229
        """Check manifest"""
1230
        mobj = 'manifest.test'
1231
        txt = ''
1232
        for i in range(10):
1233
            txt += '%s' % i
1234
            r = self.client.object_put(
1235
                '%s/%s' % (mobj, i),
1236
                data='%s' % i,
1237
                content_length=1,
1238
                success=201,
1239
                content_encoding='application/octet-stream',
1240
                content_type='application/octet-stream')
1241

    
1242
        self.client.create_object_by_manifestation(
1243
            mobj,
1244
            content_type='application/octet-stream')
1245

    
1246
        r = self.client.object_post(
1247
            mobj,
1248
            manifest='%s/%s' % (self.client.container, mobj))
1249

    
1250
        r = self.client.object_get(mobj)
1251
        self.assertEqual(r.text, txt)
1252

    
1253
        """We need to check transfer_encoding """
1254

    
1255
    def test_object_delete(self):
1256
        """Test object_DELETE"""
1257
        self._test_0140_object_delete()
1258

    
1259
    def _test_0140_object_delete(self):
1260
        self.client.container = self.c2
1261
        obj = 'test2'
1262
        """create a file on container"""
1263
        r = self.client.object_put(
1264
            obj,
1265
            content_type='application/octet-stream',
1266
            data='H',
1267
            metadata=dict(mkey1='mval1', mkey2='mval2'),
1268
            permissions=dict(
1269
                read=['accX:groupA', 'u1', 'u2'],
1270
                write=['u2', 'u3']))
1271

    
1272
        """Check with false until"""
1273
        r = self.client.object_delete(obj, until=1000000)
1274

    
1275
        r = self.client.object_get(obj, success=(200, 404))
1276
        self.assertEqual(r.status_code, 200)
1277

    
1278
        """Check normal case"""
1279
        r = self.client.object_delete(obj)
1280
        self.assertEqual(r.status_code, 204)
1281

    
1282
        r = self.client.object_get(obj, success=(200, 404))
1283
        self.assertEqual(r.status_code, 404)
1284

    
1285
    def create_large_file(self, size):
1286
        """Create a large file at fs"""
1287
        self.files.append(NamedTemporaryFile())
1288
        f = self.files[-1]
1289
        Ki = size / 8
1290
        bytelist = [b * Ki for b in range(size / Ki)]
1291

    
1292
        def append2file(step):
1293
            f.seek(step)
1294
            f.write(urandom(Ki))
1295
            f.flush()
1296
        self.do_with_progress_bar(
1297
            append2file,
1298
            ' create rand file %s (%sB): ' % (f.name, size),
1299
            bytelist)
1300
        f.seek(0)
1301
        return f
1302

    
1303
    def create_boring_file(self, num_of_blocks):
1304
        """Create a file with some blocks being the same"""
1305
        self.files.append(NamedTemporaryFile())
1306
        tmpFile = self.files[-1]
1307
        block_size = 4 * 1024 * 1024
1308
        print('\n\tCreate boring file of %s blocks' % num_of_blocks)
1309
        chars = chargen()
1310
        while num_of_blocks:
1311
            fslice = 3 if num_of_blocks > 3 else num_of_blocks
1312
            tmpFile.write(fslice * block_size * chars.next())
1313
            num_of_blocks -= fslice
1314
        print('\t\tDone')
1315
        tmpFile.seek(0)
1316
        return tmpFile