Revision 773d61ca

b/kamaki/clients/tests/pithos.py
33 33

  
34 34
import time
35 35
import datetime
36
import os
37
import tempfile
36
from os import urandom
37
from tempfile import NamedTemporaryFile
38 38

  
39 39
from kamaki.clients import tests, ClientError
40 40
from kamaki.clients.pithos import PithosClient
......
42 42

  
43 43
class Pithos(tests.Generic):
44 44

  
45
    fnames = []
45
    files = []
46 46

  
47 47
    def setUp(self):
48 48
        self.client = PithosClient(
......
105 105

  
106 106
    def tearDown(self):
107 107
        """Destroy test cases"""
108
        for fname in self.fnames:
109
            try:
110
                os.remove(fname)
111
            except OSError:
112
                pass
108
        for f in self.files:
109
            f.close()
113 110
        self.forceDeleteContainer(self.c1)
114 111
        self.forceDeleteContainer(self.c2)
115 112
        try:
......
449 446
        """put_block uses content_type and content_length to
450 447
        post blocks of data 2 container. All that in upload_object"""
451 448
        """Change a file at fs"""
452
        fname = 'l100M.' + unicode(self.now)
453
        self.create_large_file(1024 * 1024 * 100, fname)
454
        self.fnames.append(fname)
449
        f = self.create_large_file(1024 * 1024 * 100)
455 450
        """Upload it at a directory in container"""
456 451
        self.client.create_directory('dir')
457
        newf = open(fname, 'rb')
458
        self.client.upload_object('/dir/sample.file', newf)
459
        newf.close()
452
        self.client.upload_object('/dir/sample.file', f)
460 453
        """Check if file has been uploaded"""
461 454
        r = self.client.get_object_info('/dir/sample.file')
462 455
        self.assertTrue(int(r['content-length']) > 100000000)
......
614 607
            self.assertNotEqual(sc1, sc2)
615 608

  
616 609
        """Upload an object to download"""
617
        src_file = tempfile.NamedTemporaryFile(delete=False)
618
        dnl_file = tempfile.NamedTemporaryFile(delete=False)
619

  
620
        src_fname = src_file.name
621
        dnl_fname = dnl_file.name
622

  
623
        src_file.close()
624
        dnl_file.close()
625

  
626 610
        trg_fname = 'remotefile_%s' % self.now
627 611
        f_size = 59247824
628
        self.create_large_file(f_size, src_fname)
629
        src_f = open(src_fname, 'rb+')
612
        src_f = self.create_large_file(f_size)
630 613
        print('\tUploading...')
631 614
        self.client.upload_object(trg_fname, src_f)
632
        src_f.close()
633 615
        print('\tDownloading...')
634
        dnl_f = open(dnl_fname, 'wb+')
616
        self.files.append(NamedTemporaryFile())
617
        dnl_f = self.files[-1]
635 618
        self.client.download_object(trg_fname, dnl_f)
636
        dnl_f.close()
637 619

  
638 620
        print('\tCheck if files match...')
639
        src_f = open(src_fname)
640
        dnl_f = open(dnl_fname)
641 621
        for pos in (0, f_size / 2, f_size - 20):
642 622
            src_f.seek(pos)
643 623
            dnl_f.seek(pos)
644 624
            self.assertEqual(src_f.read(10), dnl_f.read(10))
645
        src_f.close()
646
        dnl_f.close()
647

  
648
        os.remove(src_fname)
649
        os.remove(dnl_fname)
650 625

  
651 626
    def test_object_put(self):
652 627
        """Test object_PUT"""
......
799 774
        self.assertEqual(r.text, txt)
800 775

  
801 776
        """Upload a local file with one request"""
802
        fname = 'l10K.' + unicode(self.now)
803
        self.create_large_file(1024 * 10, fname)
804
        self.fnames.append(fname)
805
        newf = open(fname, 'rb')
777
        newf = self.create_large_file(1024 * 10)
806 778
        self.client.upload_object('sample.file', newf)
807
        newf.close()
808 779
        """Check if file has been uploaded"""
809 780
        r = self.client.get_object_info('sample.file')
810 781
        self.assertEqual(int(r['content-length']), 10240)
......
1008 979
        self.client.container = self.c2
1009 980
        obj = 'test2'
1010 981
        """create a filesystem file"""
1011
        self.fnames.append(obj)
1012
        newf = open(self.fnames[-1], 'w')
982
        self.files.append(NamedTemporaryFile())
983
        newf = self.files[-1]
1013 984
        newf.writelines(['ello!\n',
1014 985
            'This is a test line\n',
1015 986
            'inside a test file\n'])
1016
        newf.close()
1017 987
        """create a file on container"""
1018 988
        r = self.client.object_put(obj,
1019 989
            content_type='application/octet-stream',
......
1023 993
                'write': ['u2', 'u3']})
1024 994

  
1025 995
        """Append tests update, content_range, content_type, content_length"""
1026
        newf = open(obj, 'r')
996
        newf.seek(0)
1027 997
        self.client.append_object(obj, newf)
1028 998
        r = self.client.object_get(obj)
1029 999
        self.assertTrue(r.text.startswith('Hello!'))
......
1035 1005
        r = self.client.overwrite_object(obj, 0, 10, newf)
1036 1006
        r = self.client.object_get(obj)
1037 1007
        self.assertTrue(r.text.startswith('ello!'))
1038
        newf.close()
1039 1008

  
1040 1009
        """Truncate tests update,
1041 1010
            content_range, content_type, object_bytes and source_object"""
......
1172 1141
        r = self.client.object_get(obj, success=(200, 404))
1173 1142
        self.assertEqual(r.status_code, 404)
1174 1143

  
1175
    def create_large_file(self, size, name):
1144
    def create_large_file(self, size):
1176 1145
        """Create a large file at fs"""
1177
        Ki = size / 10
1146
        self.files.append(NamedTemporaryFile())
1147
        f = self.files[-1]
1148
        Ki = size / 8
1178 1149
        bytelist = [b * Ki for b in range(size / Ki)]
1179
        with open(name, 'w') as f:
1180
            def append2file(step):
1181
                f.seek(step)
1182
                f.write(os.urandom(Ki))
1183
            self.do_with_progress_bar(
1184
                append2file,
1185
                ' create rand file %s (%sB): ' % (name, size),
1186
                bytelist)
1150

  
1151
        def append2file(step):
1152
            f.seek(step)
1153
            f.write(urandom(Ki))
1154
            f.flush()
1155
        self.do_with_progress_bar(
1156
            append2file,
1157
            ' create rand file %s (%sB): ' % (f.name, size),
1158
            bytelist)
1159
        f.seek(0)
1160
        return f

Also available in: Unified diff