Revision 00ef625c

b/test/py/ganeti.bdev_unittest.py
106 106

  
107 107
  def testParser80(self):
108 108
    """Test drbdsetup show parser for disk and network version 8.0"""
109
    data = self._ReadTestData("bdev-drbd-8.0.txt")
109
    data = testutils.ReadTestData("bdev-drbd-8.0.txt")
110 110
    result = bdev.DRBD8._GetDevInfo(data)
111 111
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
112 112
                                   "/dev/xenvg/test.meta"),
......
117 117

  
118 118
  def testParser83(self):
119 119
    """Test drbdsetup show parser for disk and network version 8.3"""
120
    data = self._ReadTestData("bdev-drbd-8.3.txt")
120
    data = testutils.ReadTestData("bdev-drbd-8.3.txt")
121 121
    result = bdev.DRBD8._GetDevInfo(data)
122 122
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
123 123
                                   "/dev/xenvg/test.meta"),
......
128 128

  
129 129
  def testParserNetIP4(self):
130 130
    """Test drbdsetup show parser for IPv4 network"""
131
    data = self._ReadTestData("bdev-drbd-net-ip4.txt")
131
    data = testutils.ReadTestData("bdev-drbd-net-ip4.txt")
132 132
    result = bdev.DRBD8._GetDevInfo(data)
133 133
    self.failUnless(("local_dev" not in result and
134 134
                     "meta_dev" not in result and
......
140 140

  
141 141
  def testParserNetIP6(self):
142 142
    """Test drbdsetup show parser for IPv6 network"""
143
    data = self._ReadTestData("bdev-drbd-net-ip6.txt")
143
    data = testutils.ReadTestData("bdev-drbd-net-ip6.txt")
144 144
    result = bdev.DRBD8._GetDevInfo(data)
145 145
    self.failUnless(("local_dev" not in result and
146 146
                     "meta_dev" not in result and
......
152 152

  
153 153
  def testParserDisk(self):
154 154
    """Test drbdsetup show parser for disk"""
155
    data = self._ReadTestData("bdev-drbd-disk.txt")
155
    data = testutils.ReadTestData("bdev-drbd-disk.txt")
156 156
    result = bdev.DRBD8._GetDevInfo(data)
157 157
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
158 158
                                   "/dev/xenvg/test.meta"),
......
237 237
  def setUp(self):
238 238
    """Read in txt data"""
239 239
    testutils.GanetiTestCase.setUp(self)
240
    proc_data = self._TestDataFilename("proc_drbd8.txt")
241
    proc80e_data = self._TestDataFilename("proc_drbd80-emptyline.txt")
242
    proc83_data = self._TestDataFilename("proc_drbd83.txt")
243
    proc83_sync_data = self._TestDataFilename("proc_drbd83_sync.txt")
240
    proc_data = testutils.TestDataFilename("proc_drbd8.txt")
241
    proc80e_data = testutils.TestDataFilename("proc_drbd80-emptyline.txt")
242
    proc83_data = testutils.TestDataFilename("proc_drbd83.txt")
243
    proc83_sync_data = testutils.TestDataFilename("proc_drbd83_sync.txt")
244 244
    proc83_sync_krnl_data = \
245
      self._TestDataFilename("proc_drbd83_sync_krnl2.6.39.txt")
245
      testutils.TestDataFilename("proc_drbd83_sync_krnl2.6.39.txt")
246 246
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
247 247
    self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
248 248
    self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
......
265 265

  
266 266
  def testHelper(self):
267 267
    """Test reading usermode_helper in /sys."""
268
    sys_drbd_helper = self._TestDataFilename("sys_drbd_usermode_helper.txt")
268
    sys_drbd_helper = testutils.TestDataFilename("sys_drbd_usermode_helper.txt")
269 269
    drbd_helper = bdev.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
270 270
    self.failUnlessEqual(drbd_helper, "/bin/true")
271 271

  
b/test/py/ganeti.cmdlib_unittest.py
61 61
    shutil.rmtree(self.tmpdir)
62 62

  
63 63
  def testVerifyCertificate(self):
64
    cmdlib._VerifyCertificate(self._TestDataFilename("cert1.pem"))
64
    cmdlib._VerifyCertificate(testutils.TestDataFilename("cert1.pem"))
65 65

  
66 66
    nonexist_filename = os.path.join(self.tmpdir, "does-not-exist")
67 67

  
......
69 69
    self.assertEqual(errcode, cmdlib.LUClusterVerifyConfig.ETYPE_ERROR)
70 70

  
71 71
    # Try to load non-certificate file
72
    invalid_cert = self._TestDataFilename("bdev-net.txt")
72
    invalid_cert = testutils.TestDataFilename("bdev-net.txt")
73 73
    (errcode, msg) = cmdlib._VerifyCertificate(invalid_cert)
74 74
    self.assertEqual(errcode, cmdlib.LUClusterVerifyConfig.ETYPE_ERROR)
75 75

  
b/test/py/ganeti.hypervisor.hv_kvm_unittest.py
248 248
class TestVersionChecking(testutils.GanetiTestCase):
249 249
  def testParseVersion(self):
250 250
    parse = hv_kvm.KVMHypervisor._ParseKVMVersion
251
    help_112 = utils.ReadFile(self._TestDataFilename("kvm_1.1.2_help.txt"))
252
    help_10 = utils.ReadFile(self._TestDataFilename("kvm_1.0_help.txt"))
253
    help_01590 = utils.ReadFile(self._TestDataFilename("kvm_0.15.90_help.txt"))
254
    help_0125 = utils.ReadFile(self._TestDataFilename("kvm_0.12.5_help.txt"))
255
    help_091 = utils.ReadFile(self._TestDataFilename("kvm_0.9.1_help.txt"))
251
    help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
252
    help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
253
    help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
254
    help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
255
    help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
256 256
    self.assertEqual(parse(help_112), ("1.1.2", 1, 1, 2))
257 257
    self.assertEqual(parse(help_10), ("1.0", 1, 0, 0))
258 258
    self.assertEqual(parse(help_01590), ("0.15.90", 0, 15, 90))
......
300 300

  
301 301
    """
302 302
    boot_re = hv_kvm.KVMHypervisor._BOOT_RE
303
    help_112 = utils.ReadFile(self._TestDataFilename("kvm_1.1.2_help.txt"))
304
    help_10 = utils.ReadFile(self._TestDataFilename("kvm_1.0_help.txt"))
305
    help_01590 = utils.ReadFile(self._TestDataFilename("kvm_0.15.90_help.txt"))
306
    help_0125 = utils.ReadFile(self._TestDataFilename("kvm_0.12.5_help.txt"))
307
    help_091 = utils.ReadFile(self._TestDataFilename("kvm_0.9.1_help.txt"))
308
    help_091_fake = utils.ReadFile(
309
      self._TestDataFilename("kvm_0.9.1_help_boot_test.txt"))
303
    help_112 = testutils.ReadTestData("kvm_1.1.2_help.txt")
304
    help_10 = testutils.ReadTestData("kvm_1.0_help.txt")
305
    help_01590 = testutils.ReadTestData("kvm_0.15.90_help.txt")
306
    help_0125 = testutils.ReadTestData("kvm_0.12.5_help.txt")
307
    help_091 = testutils.ReadTestData("kvm_0.9.1_help.txt")
308
    help_091_fake = testutils.ReadTestData("kvm_0.9.1_help_boot_test.txt")
310 309

  
311 310
    self.assertTrue(boot_re.search(help_091))
312 311
    self.assertTrue(boot_re.search(help_0125))
b/test/py/ganeti.netutils_unittest.py
502 502
    # IPv4-only, fake loopback interface
503 503
    tests = ["ip-addr-show-lo-ipv4.txt", "ip-addr-show-lo-oneline-ipv4.txt"]
504 504
    for test_file in tests:
505
      data = self._ReadTestData(test_file)
505
      data = testutils.ReadTestData(test_file)
506 506
      addr = netutils._GetIpAddressesFromIpOutput(data)
507 507
      self.failUnless(len(addr[4]) == 1 and addr[4][0] == "127.0.0.1" and not
508 508
                      addr[6])
......
510 510
    # IPv6-only, fake loopback interface
511 511
    tests = ["ip-addr-show-lo-ipv6.txt", "ip-addr-show-lo-ipv6.txt"]
512 512
    for test_file in tests:
513
      data = self._ReadTestData(test_file)
513
      data = testutils.ReadTestData(test_file)
514 514
      addr = netutils._GetIpAddressesFromIpOutput(data)
515 515
      self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and not addr[4])
516 516

  
517 517
    # IPv4 and IPv6, fake loopback interface
518 518
    tests = ["ip-addr-show-lo.txt", "ip-addr-show-lo-oneline.txt"]
519 519
    for test_file in tests:
520
      data = self._ReadTestData(test_file)
520
      data = testutils.ReadTestData(test_file)
521 521
      addr = netutils._GetIpAddressesFromIpOutput(data)
522 522
      self.failUnless(len(addr[6]) == 1 and addr[6][0] == "::1" and
523 523
                      len(addr[4]) == 1 and addr[4][0] == "127.0.0.1")
524 524

  
525 525
    # IPv4 and IPv6, dummy interface
526
    data = self._ReadTestData("ip-addr-show-dummy0.txt")
526
    data = testutils.ReadTestData("ip-addr-show-dummy0.txt")
527 527
    addr = netutils._GetIpAddressesFromIpOutput(data)
528 528
    self.failUnless(len(addr[6]) == 1 and
529 529
                    addr[6][0] == "2001:db8:85a3::8a2e:370:7334" and
b/test/py/ganeti.storage_unittest.py
48 48

  
49 49
  def testOldVersion(self):
50 50
    lvmvg = storage.LvmVgStorage()
51
    stdout = self._ReadTestData("vgreduce-removemissing-2.02.02.txt")
52
    vgs_fail = self._ReadTestData("vgs-missing-pvs-2.02.02.txt")
51
    stdout = testutils.ReadTestData("vgreduce-removemissing-2.02.02.txt")
52
    vgs_fail = testutils.ReadTestData("vgs-missing-pvs-2.02.02.txt")
53 53
    self.run_history = [
54 54
      ([self.VGREDUCE_CMD, "--removemissing", self.VGNAME],
55 55
       utils.RunResult(0, None, stdout, "", "", None, None)),
......
71 71

  
72 72
  def testNewVersion(self):
73 73
    lvmvg = storage.LvmVgStorage()
74
    stdout1 = self._ReadTestData("vgreduce-removemissing-2.02.66-fail.txt")
75
    stdout2 = self._ReadTestData("vgreduce-removemissing-2.02.66-ok.txt")
76
    vgs_fail = self._ReadTestData("vgs-missing-pvs-2.02.66.txt")
74
    stdout1 = testutils.ReadTestData("vgreduce-removemissing-2.02.66-fail.txt")
75
    stdout2 = testutils.ReadTestData("vgreduce-removemissing-2.02.66-ok.txt")
76
    vgs_fail = testutils.ReadTestData("vgs-missing-pvs-2.02.66.txt")
77 77
    # first: require --fail, check that it's used
78 78
    self.run_history = [
79 79
      ([self.VGREDUCE_CMD, "--removemissing", self.VGNAME],
b/test/py/ganeti.tools.node_daemon_setup_unittest.py
79 79
      }, _verify_fn=lambda _: None)
80 80

  
81 81
  def testNoPrivateKey(self):
82
    cert_filename = self._TestDataFilename("cert1.pem")
82
    cert_filename = testutils.TestDataFilename("cert1.pem")
83 83
    cert_pem = utils.ReadFile(cert_filename)
84 84

  
85 85
    self.assertRaises(errors.X509CertError,
......
97 97
    assert cert.get_subject()
98 98

  
99 99
  def testSuccessfulCheck(self):
100
    cert_filename = self._TestDataFilename("cert2.pem")
100
    cert_filename = testutils.TestDataFilename("cert2.pem")
101 101
    cert_pem = utils.ReadFile(cert_filename)
102 102
    result = \
103 103
      node_daemon_setup._VerifyCertificate(cert_pem, _check_fn=self._Check)
......
109 109
    self.assertTrue(key)
110 110

  
111 111
  def testMismatchingKey(self):
112
    cert1_path = self._TestDataFilename("cert1.pem")
113
    cert2_path = self._TestDataFilename("cert2.pem")
112
    cert1_path = testutils.TestDataFilename("cert1.pem")
113
    cert2_path = testutils.TestDataFilename("cert2.pem")
114 114

  
115 115
    # Extract certificate
116 116
    cert1 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
b/test/py/ganeti.tools.prepare_node_join_unittest.py
73 73
    prepare_node_join.VerifyCertificate({}, _verify_fn=NotImplemented)
74 74

  
75 75
  def testGivenPrivateKey(self):
76
    cert_filename = self._TestDataFilename("cert2.pem")
76
    cert_filename = testutils.TestDataFilename("cert2.pem")
77 77
    cert_pem = utils.ReadFile(cert_filename)
78 78

  
79 79
    self.assertRaises(_JoinError, prepare_node_join._VerifyCertificate,
......
90 90
    assert cert.get_subject()
91 91

  
92 92
  def testSuccessfulCheck(self):
93
    cert_filename = self._TestDataFilename("cert1.pem")
93
    cert_filename = testutils.TestDataFilename("cert1.pem")
94 94
    cert_pem = utils.ReadFile(cert_filename)
95 95
    prepare_node_join._VerifyCertificate(cert_pem, _check_fn=self._Check)
96 96

  
b/test/py/ganeti.utils.io_unittest.py
41 41

  
42 42
class TestReadFile(testutils.GanetiTestCase):
43 43
  def testReadAll(self):
44
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
44
    data = utils.ReadFile(testutils.TestDataFilename("cert1.pem"))
45 45
    self.assertEqual(len(data), 814)
46 46

  
47 47
    h = compat.md5_hash()
......
49 49
    self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
50 50

  
51 51
  def testReadSize(self):
52
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
52
    data = utils.ReadFile(testutils.TestDataFilename("cert1.pem"),
53 53
                          size=100)
54 54
    self.assertEqual(len(data), 100)
55 55

  
......
60 60
  def testCallback(self):
61 61
    def _Cb(fh):
62 62
      self.assertEqual(fh.tell(), 0)
63
    data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
63
    data = utils.ReadFile(testutils.TestDataFilename("cert1.pem"), preread=_Cb)
64 64
    self.assertEqual(len(data), 814)
65 65

  
66 66
  def testError(self):
......
73 73
    testutils.GanetiTestCase.setUp(self)
74 74

  
75 75
  def testDefault(self):
76
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"))
76
    data = utils.ReadOneLineFile(testutils.TestDataFilename("cert1.pem"))
77 77
    self.assertEqual(len(data), 27)
78 78
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
79 79

  
80 80
  def testNotStrict(self):
81
    data = utils.ReadOneLineFile(self._TestDataFilename("cert1.pem"),
81
    data = utils.ReadOneLineFile(testutils.TestDataFilename("cert1.pem"),
82 82
                                 strict=False)
83 83
    self.assertEqual(len(data), 27)
84 84
    self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
85 85

  
86 86
  def testStrictFailure(self):
87 87
    self.assertRaises(errors.GenericError, utils.ReadOneLineFile,
88
                      self._TestDataFilename("cert1.pem"), strict=True)
88
                      testutils.TestDataFilename("cert1.pem"), strict=True)
89 89

  
90 90
  def testLongLine(self):
91 91
    dummydata = (1024 * "Hello World! ")
b/test/py/ganeti.utils.process_unittest.py
379 379
      temp.close()
380 380

  
381 381
  def testNoInputRead(self):
382
    testfile = self._TestDataFilename("cert1.pem")
382
    testfile = testutils.TestDataFilename("cert1.pem")
383 383

  
384 384
    result = utils.RunCmd(["cat"], timeout=10.0)
385 385
    self.assertFalse(result.failed)
......
387 387
    self.assertEqual(result.stdout, "")
388 388

  
389 389
  def testInputFileHandle(self):
390
    testfile = self._TestDataFilename("cert1.pem")
390
    testfile = testutils.TestDataFilename("cert1.pem")
391 391

  
392 392
    result = utils.RunCmd(["cat"], input_fd=open(testfile, "r"))
393 393
    self.assertFalse(result.failed)
......
395 395
    self.assertEqual(result.stderr, "")
396 396

  
397 397
  def testInputNumericFileDescriptor(self):
398
    testfile = self._TestDataFilename("cert2.pem")
398
    testfile = testutils.TestDataFilename("cert2.pem")
399 399

  
400 400
    fh = open(testfile, "r")
401 401
    try:
......
408 408
    self.assertEqual(result.stderr, "")
409 409

  
410 410
  def testInputWithCloseFds(self):
411
    testfile = self._TestDataFilename("cert1.pem")
411
    testfile = testutils.TestDataFilename("cert1.pem")
412 412

  
413 413
    temp = open(self.fname, "r+")
414 414
    try:
b/test/py/ganeti.utils.x509_unittest.py
82 82

  
83 83
  def _LoadCert(self, name):
84 84
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
85
                                           self._ReadTestData(name))
85
                                           testutils.ReadTestData(name))
86 86

  
87 87
  def test(self):
88 88
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
......
157 157
    shutil.rmtree(self.tmpdir)
158 158

  
159 159
  def testVerifyCertificate(self):
160
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
160
    cert_pem = testutils.ReadTestData("cert1.pem")
161 161
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
162 162
                                           cert_pem)
163 163

  
......
297 297
    shutil.rmtree(self.tmpdir)
298 298

  
299 299
  def testMismatchingKey(self):
300
    other_cert = self._TestDataFilename("cert1.pem")
301
    node_cert = self._TestDataFilename("cert2.pem")
300
    other_cert = testutils.TestDataFilename("cert1.pem")
301
    node_cert = testutils.TestDataFilename("cert2.pem")
302 302

  
303 303
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
304 304
                                           utils.ReadFile(other_cert))
......
312 312
      self.fail("Exception was not raised")
313 313

  
314 314
  def testMatchingKey(self):
315
    cert_filename = self._TestDataFilename("cert2.pem")
315
    cert_filename = testutils.TestDataFilename("cert2.pem")
316 316

  
317 317
    # Extract certificate
318 318
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
......
323 323
    utils.CheckNodeCertificate(cert, _noded_cert_file=cert_filename)
324 324

  
325 325
  def testMissingFile(self):
326
    cert_path = self._TestDataFilename("cert1.pem")
326
    cert_path = testutils.TestDataFilename("cert1.pem")
327 327
    nodecert = utils.PathJoin(self.tmpdir, "does-not-exist")
328 328

  
329 329
    utils.CheckNodeCertificate(NotImplemented, _noded_cert_file=nodecert)
......
338 338
                      NotImplemented, _noded_cert_file=tmpfile)
339 339

  
340 340
  def testNoPrivateKey(self):
341
    cert = self._TestDataFilename("cert1.pem")
341
    cert = testutils.TestDataFilename("cert1.pem")
342 342
    self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
343 343
                      NotImplemented, _noded_cert_file=cert)
344 344

  
345 345
  def testMismatchInNodeCert(self):
346
    cert1_path = self._TestDataFilename("cert1.pem")
347
    cert2_path = self._TestDataFilename("cert2.pem")
346
    cert1_path = testutils.TestDataFilename("cert1.pem")
347
    cert2_path = testutils.TestDataFilename("cert2.pem")
348 348
    tmpfile = utils.PathJoin(self.tmpdir, "cert")
349 349

  
350 350
    # Extract certificate
b/test/py/testutils.py
36 36
  return os.environ.get("TOP_SRCDIR", ".")
37 37

  
38 38

  
39
def TestDataFilename(name):
40
  """Returns the filename of a given test data file.
41

  
42
  @type name: str
43
  @param name: the 'base' of the file name, as present in
44
      the test/data directory
45
  @rtype: str
46
  @return: the full path to the filename, such that it can
47
      be used in 'make distcheck' rules
48

  
49
  """
50
  return "%s/test/data/%s" % (GetSourceDir(), name)
51

  
52

  
53
def ReadTestData(name):
54
  """Returns the content of a test data file.
55

  
56
  This is just a very simple wrapper over utils.ReadFile with the
57
  proper test file name.
58

  
59
  """
60
  return utils.ReadFile(TestDataFilename(name))
61

  
62

  
39 63
def _SetupLogging(verbose):
40 64
  """Setupup logging infrastructure.
41 65

  
......
172 196
                            UnifyValueType(second),
173 197
                            msg=msg)
174 198

  
175
  @staticmethod
176
  def _TestDataFilename(name):
177
    """Returns the filename of a given test data file.
178

  
179
    @type name: str
180
    @param name: the 'base' of the file name, as present in
181
        the test/data directory
182
    @rtype: str
183
    @return: the full path to the filename, such that it can
184
        be used in 'make distcheck' rules
185

  
186
    """
187
    return "%s/test/data/%s" % (GetSourceDir(), name)
188

  
189
  @classmethod
190
  def _ReadTestData(cls, name):
191
    """Returns the contents of a test data file.
192

  
193
    This is just a very simple wrapper over utils.ReadFile with the
194
    proper test file name.
195

  
196
    """
197
    return utils.ReadFile(cls._TestDataFilename(name))
198

  
199 199
  def _CreateTempFile(self):
200 200
    """Creates a temporary file and adds it to the internal cleanup list.
201 201

  

Also available in: Unified diff