Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.block.bdev_unittest.py @ d01e51a5

History | View | Annotate | Download (23.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the bdev module"""
23

    
24

    
25
import os
26
import random
27
import unittest
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import objects
33
from ganeti import utils
34
from ganeti.block import bdev
35
from ganeti.block import drbd
36

    
37
import testutils
38

    
39

    
40
class TestDRBD8(testutils.GanetiTestCase):
41
  def testGetVersion(self):
42
    data = [
43
      ["version: 8.0.12 (api:76/proto:86-91)"],
44
      ["version: 8.2.7 (api:88/proto:0-100)"],
45
      ["version: 8.3.7.49 (api:188/proto:13-191)"],
46
    ]
47
    result = [
48
      {
49
      "k_major": 8,
50
      "k_minor": 0,
51
      "k_point": 12,
52
      "api": 76,
53
      "proto": 86,
54
      "proto2": "91",
55
      },
56
      {
57
      "k_major": 8,
58
      "k_minor": 2,
59
      "k_point": 7,
60
      "api": 88,
61
      "proto": 0,
62
      "proto2": "100",
63
      },
64
      {
65
      "k_major": 8,
66
      "k_minor": 3,
67
      "k_point": 7,
68
      "api": 188,
69
      "proto": 13,
70
      "proto2": "191",
71
      }
72
    ]
73
    for d,r in zip(data, result):
74
      info = drbd.DRBD8Info.CreateFromLines(d)
75
      self.assertEqual(info.GetVersion(), r)
76

    
77

    
78
class TestDRBD8Runner(testutils.GanetiTestCase):
79
  """Testing case for drbd.DRBD8"""
80

    
81
  @staticmethod
82
  def _has_disk(data, dname, mname):
83
    """Check local disk corectness"""
84
    retval = (
85
      "local_dev" in data and
86
      data["local_dev"] == dname and
87
      "meta_dev" in data and
88
      data["meta_dev"] == mname and
89
      "meta_index" in data and
90
      data["meta_index"] == 0
91
      )
92
    return retval
93

    
94
  @staticmethod
95
  def _has_net(data, local, remote):
96
    """Check network connection parameters"""
97
    retval = (
98
      "local_addr" in data and
99
      data["local_addr"] == local and
100
      "remote_addr" in data and
101
      data["remote_addr"] == remote
102
      )
103
    return retval
104

    
105
  def testParserCreation(self):
106
    """Test drbdsetup show parser creation"""
107
    drbd.DRBD8ShowInfo._GetShowParser()
108

    
109
  def testParser80(self):
110
    """Test drbdsetup show parser for disk and network version 8.0"""
111
    data = testutils.ReadTestData("bdev-drbd-8.0.txt")
112
    result = drbd.DRBD8ShowInfo.GetDevInfo(data)
113
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
114
                                   "/dev/xenvg/test.meta"),
115
                    "Wrong local disk info")
116
    self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
117
                                  ("192.0.2.2", 11000)),
118
                    "Wrong network info (8.0.x)")
119

    
120
  def testParser83(self):
121
    """Test drbdsetup show parser for disk and network version 8.3"""
122
    data = testutils.ReadTestData("bdev-drbd-8.3.txt")
123
    result = drbd.DRBD8ShowInfo.GetDevInfo(data)
124
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
125
                                   "/dev/xenvg/test.meta"),
126
                    "Wrong local disk info")
127
    self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
128
                                  ("192.0.2.2", 11000)),
129
                    "Wrong network info (8.0.x)")
130

    
131
  def testParserNetIP4(self):
132
    """Test drbdsetup show parser for IPv4 network"""
133
    data = testutils.ReadTestData("bdev-drbd-net-ip4.txt")
134
    result = drbd.DRBD8ShowInfo.GetDevInfo(data)
135
    self.failUnless(("local_dev" not in result and
136
                     "meta_dev" not in result and
137
                     "meta_index" not in result),
138
                    "Should not find local disk info")
139
    self.failUnless(self._has_net(result, ("192.0.2.1", 11002),
140
                                  ("192.0.2.2", 11002)),
141
                    "Wrong network info (IPv4)")
142

    
143
  def testParserNetIP6(self):
144
    """Test drbdsetup show parser for IPv6 network"""
145
    data = testutils.ReadTestData("bdev-drbd-net-ip6.txt")
146
    result = drbd.DRBD8ShowInfo.GetDevInfo(data)
147
    self.failUnless(("local_dev" not in result and
148
                     "meta_dev" not in result and
149
                     "meta_index" not in result),
150
                    "Should not find local disk info")
151
    self.failUnless(self._has_net(result, ("2001:db8:65::1", 11048),
152
                                  ("2001:db8:66::1", 11048)),
153
                    "Wrong network info (IPv6)")
154

    
155
  def testParserDisk(self):
156
    """Test drbdsetup show parser for disk"""
157
    data = testutils.ReadTestData("bdev-drbd-disk.txt")
158
    result = drbd.DRBD8ShowInfo.GetDevInfo(data)
159
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
160
                                   "/dev/xenvg/test.meta"),
161
                    "Wrong local disk info")
162
    self.failUnless(("local_addr" not in result and
163
                     "remote_addr" not in result),
164
                    "Should not find network info")
165

    
166
  def testBarriersOptions(self):
167
    """Test class method that generates drbdsetup options for disk barriers"""
168
    # Tests that should fail because of wrong version/options combinations
169
    should_fail = [
170
      (8, 0, 12, "bfd", True),
171
      (8, 0, 12, "fd", False),
172
      (8, 0, 12, "b", True),
173
      (8, 2, 7, "bfd", True),
174
      (8, 2, 7, "b", True)
175
    ]
176

    
177
    for vmaj, vmin, vrel, opts, meta in should_fail:
178
      self.assertRaises(errors.BlockDeviceError,
179
                        drbd.DRBD8._ComputeDiskBarrierArgs,
180
                        vmaj, vmin, vrel, opts, meta)
181

    
182
    # get the valid options from the frozenset(frozenset()) in constants.
183
    valid_options = [list(x)[0] for x in constants.DRBD_VALID_BARRIER_OPT]
184

    
185
    # Versions that do not support anything
186
    for vmaj, vmin, vrel in ((8, 0, 0), (8, 0, 11), (8, 2, 6)):
187
      for opts in valid_options:
188
        self.assertRaises(errors.BlockDeviceError,
189
                          drbd.DRBD8._ComputeDiskBarrierArgs,
190
                          vmaj, vmin, vrel, opts, True)
191

    
192
    # Versions with partial support (testing only options that are supported)
193
    tests = [
194
      (8, 0, 12, "n", False, []),
195
      (8, 0, 12, "n", True, ["--no-md-flushes"]),
196
      (8, 2, 7, "n", False, []),
197
      (8, 2, 7, "fd", False, ["--no-disk-flushes", "--no-disk-drain"]),
198
      (8, 0, 12, "n", True, ["--no-md-flushes"]),
199
      ]
200

    
201
    # Versions that support everything
202
    for vmaj, vmin, vrel in ((8, 3, 0), (8, 3, 12)):
203
      tests.append((vmaj, vmin, vrel, "bfd", True,
204
                    ["--no-disk-barrier", "--no-disk-drain",
205
                     "--no-disk-flushes", "--no-md-flushes"]))
206
      tests.append((vmaj, vmin, vrel, "n", False, []))
207
      tests.append((vmaj, vmin, vrel, "b", True,
208
                    ["--no-disk-barrier", "--no-md-flushes"]))
209
      tests.append((vmaj, vmin, vrel, "fd", False,
210
                    ["--no-disk-flushes", "--no-disk-drain"]))
211
      tests.append((vmaj, vmin, vrel, "n", True, ["--no-md-flushes"]))
212

    
213
    # Test execution
214
    for test in tests:
215
      vmaj, vmin, vrel, disabled_barriers, disable_meta_flush, expected = test
216
      args = \
217
        drbd.DRBD8._ComputeDiskBarrierArgs(vmaj, vmin, vrel,
218
                                           disabled_barriers,
219
                                           disable_meta_flush)
220
      self.failUnless(set(args) == set(expected),
221
                      "For test %s, got wrong results %s" % (test, args))
222

    
223
    # Unsupported or invalid versions
224
    for vmaj, vmin, vrel in ((0, 7, 25), (9, 0, 0), (7, 0, 0), (8, 4, 0)):
225
      self.assertRaises(errors.BlockDeviceError,
226
                        drbd.DRBD8._ComputeDiskBarrierArgs,
227
                        vmaj, vmin, vrel, "n", True)
228

    
229
    # Invalid options
230
    for option in ("", "c", "whatever", "nbdfc", "nf"):
231
      self.assertRaises(errors.BlockDeviceError,
232
                        drbd.DRBD8._ComputeDiskBarrierArgs,
233
                        8, 3, 11, option, True)
234

    
235

    
236
class TestDRBD8Status(testutils.GanetiTestCase):
237
  """Testing case for DRBD8 /proc status"""
238

    
239
  def setUp(self):
240
    """Read in txt data"""
241
    testutils.GanetiTestCase.setUp(self)
242
    proc_data = testutils.TestDataFilename("proc_drbd8.txt")
243
    proc80e_data = testutils.TestDataFilename("proc_drbd80-emptyline.txt")
244
    proc83_data = testutils.TestDataFilename("proc_drbd83.txt")
245
    proc83_sync_data = testutils.TestDataFilename("proc_drbd83_sync.txt")
246
    proc83_sync_krnl_data = \
247
      testutils.TestDataFilename("proc_drbd83_sync_krnl2.6.39.txt")
248
    proc84_data = testutils.TestDataFilename("proc_drbd84.txt")
249
    proc84_sync_data = testutils.TestDataFilename("proc_drbd84_sync.txt")
250

    
251
    self.proc80ev_data = \
252
      testutils.TestDataFilename("proc_drbd80-emptyversion.txt")
253

    
254
    self.drbd_info = drbd.DRBD8Info.CreateFromFile(filename=proc_data)
255
    self.drbd_info80e = drbd.DRBD8Info.CreateFromFile(filename=proc80e_data)
256
    self.drbd_info83 = drbd.DRBD8Info.CreateFromFile(filename=proc83_data)
257
    self.drbd_info83_sync = \
258
      drbd.DRBD8Info.CreateFromFile(filename=proc83_sync_data)
259
    self.drbd_info83_sync_krnl = \
260
      drbd.DRBD8Info.CreateFromFile(filename=proc83_sync_krnl_data)
261
    self.drbd_info84 = drbd.DRBD8Info.CreateFromFile(filename=proc84_data)
262
    self.drbd_info84_sync = \
263
      drbd.DRBD8Info.CreateFromFile(filename=proc84_sync_data)
264

    
265
  def testIOErrors(self):
266
    """Test handling of errors while reading the proc file."""
267
    temp_file = self._CreateTempFile()
268
    os.unlink(temp_file)
269
    self.failUnlessRaises(errors.BlockDeviceError,
270
                          drbd.DRBD8Info.CreateFromFile, filename=temp_file)
271

    
272
  def testHelper(self):
273
    """Test reading usermode_helper in /sys."""
274
    sys_drbd_helper = testutils.TestDataFilename("sys_drbd_usermode_helper.txt")
275
    drbd_helper = drbd.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
276
    self.failUnlessEqual(drbd_helper, "/bin/true")
277

    
278
  def testHelperIOErrors(self):
279
    """Test handling of errors while reading usermode_helper in /sys."""
280
    temp_file = self._CreateTempFile()
281
    os.unlink(temp_file)
282
    self.failUnlessRaises(errors.BlockDeviceError,
283
                          drbd.DRBD8.GetUsermodeHelper, filename=temp_file)
284

    
285
  def testMinorNotFound(self):
286
    """Test not-found-minor in /proc"""
287
    self.failUnless(not self.drbd_info.HasMinorStatus(9))
288
    self.failUnless(not self.drbd_info83.HasMinorStatus(9))
289
    self.failUnless(not self.drbd_info80e.HasMinorStatus(3))
290

    
291
  def testLineNotMatch(self):
292
    """Test wrong line passed to drbd.DRBD8Status"""
293
    self.assertRaises(errors.BlockDeviceError, drbd.DRBD8Status, "foo")
294

    
295
  def testMinor0(self):
296
    """Test connected, primary device"""
297
    for info in [self.drbd_info, self.drbd_info83, self.drbd_info84]:
298
      stats = info.GetMinorStatus(0)
299
      self.failUnless(stats.is_in_use)
300
      self.failUnless(stats.is_connected and stats.is_primary and
301
                      stats.peer_secondary and stats.is_disk_uptodate)
302

    
303
  def testMinor1(self):
304
    """Test connected, secondary device"""
305
    for info in [self.drbd_info, self.drbd_info83, self.drbd_info84]:
306
      stats = info.GetMinorStatus(1)
307
      self.failUnless(stats.is_in_use)
308
      self.failUnless(stats.is_connected and stats.is_secondary and
309
                      stats.peer_primary and stats.is_disk_uptodate)
310

    
311
  def testMinor2(self):
312
    """Test unconfigured device"""
313
    for info in [self.drbd_info, self.drbd_info83,
314
                 self.drbd_info80e, self.drbd_info84]:
315
      stats = info.GetMinorStatus(2)
316
      self.failIf(stats.is_in_use)
317

    
318
  def testMinor4(self):
319
    """Test WFconn device"""
320
    for info in [self.drbd_info, self.drbd_info83, self.drbd_info84]:
321
      stats = info.GetMinorStatus(4)
322
      self.failUnless(stats.is_in_use)
323
      self.failUnless(stats.is_wfconn and stats.is_primary and
324
                      stats.rrole == "Unknown" and
325
                      stats.is_disk_uptodate)
326

    
327
  def testMinor6(self):
328
    """Test diskless device"""
329
    for info in [self.drbd_info, self.drbd_info83, self.drbd_info84]:
330
      stats = info.GetMinorStatus(6)
331
      self.failUnless(stats.is_in_use)
332
      self.failUnless(stats.is_connected and stats.is_secondary and
333
                      stats.peer_primary and stats.is_diskless)
334

    
335
  def testMinor8(self):
336
    """Test standalone device"""
337
    for info in [self.drbd_info, self.drbd_info83, self.drbd_info84]:
338
      stats = info.GetMinorStatus(8)
339
      self.failUnless(stats.is_in_use)
340
      self.failUnless(stats.is_standalone and
341
                      stats.rrole == "Unknown" and
342
                      stats.is_disk_uptodate)
343

    
344
  def testDRBD83SyncFine(self):
345
    stats = self.drbd_info83_sync.GetMinorStatus(3)
346
    self.failUnless(stats.is_in_resync)
347
    self.failUnless(stats.sync_percent is not None)
348

    
349
  def testDRBD83SyncBroken(self):
350
    stats = self.drbd_info83_sync_krnl.GetMinorStatus(3)
351
    self.failUnless(stats.is_in_resync)
352
    self.failUnless(stats.sync_percent is not None)
353

    
354
  def testDRBD84Sync(self):
355
    stats = self.drbd_info84_sync.GetMinorStatus(5)
356
    self.failUnless(stats.is_in_resync)
357
    self.failUnless(stats.sync_percent is not None)
358

    
359
  def testDRBDEmptyVersion(self):
360
    self.assertRaises(errors.BlockDeviceError,
361
                      drbd.DRBD8Info.CreateFromFile,
362
                      filename=self.proc80ev_data)
363

    
364

    
365
class TestRADOSBlockDevice(testutils.GanetiTestCase):
366
  def setUp(self):
367
    """Set up input data"""
368
    testutils.GanetiTestCase.setUp(self)
369

    
370
    self.plain_output_old_ok = \
371
      testutils.ReadTestData("bdev-rbd/plain_output_old_ok.txt")
372
    self.plain_output_old_no_matches = \
373
      testutils.ReadTestData("bdev-rbd/plain_output_old_no_matches.txt")
374
    self.plain_output_old_extra_matches = \
375
      testutils.ReadTestData("bdev-rbd/plain_output_old_extra_matches.txt")
376
    self.plain_output_old_empty = \
377
      testutils.ReadTestData("bdev-rbd/plain_output_old_empty.txt")
378
    self.plain_output_new_ok = \
379
      testutils.ReadTestData("bdev-rbd/plain_output_new_ok.txt")
380
    self.plain_output_new_no_matches = \
381
      testutils.ReadTestData("bdev-rbd/plain_output_new_no_matches.txt")
382
    self.plain_output_new_extra_matches = \
383
      testutils.ReadTestData("bdev-rbd/plain_output_new_extra_matches.txt")
384
    # This file is completely empty, and as such it's not shipped.
385
    self.plain_output_new_empty = ""
386
    self.json_output_ok = testutils.ReadTestData("bdev-rbd/json_output_ok.txt")
387
    self.json_output_no_matches = \
388
      testutils.ReadTestData("bdev-rbd/json_output_no_matches.txt")
389
    self.json_output_extra_matches = \
390
      testutils.ReadTestData("bdev-rbd/json_output_extra_matches.txt")
391
    self.json_output_empty = \
392
      testutils.ReadTestData("bdev-rbd/json_output_empty.txt")
393
    self.output_invalid = testutils.ReadTestData("bdev-rbd/output_invalid.txt")
394

    
395
    self.volume_name = "d7ab910a-4933-4ffe-88d0-faf2ce31390a.rbd.disk0"
396

    
397
  def test_ParseRbdShowmappedJson(self):
398
    parse_function = bdev.RADOSBlockDevice._ParseRbdShowmappedJson
399

    
400
    self.assertEqual(parse_function(self.json_output_ok, self.volume_name),
401
                     "/dev/rbd3")
402
    self.assertEqual(parse_function(self.json_output_empty, self.volume_name),
403
                     None)
404
    self.assertEqual(parse_function(self.json_output_no_matches,
405
                     self.volume_name), None)
406
    self.assertRaises(errors.BlockDeviceError, parse_function,
407
                      self.json_output_extra_matches, self.volume_name)
408
    self.assertRaises(errors.BlockDeviceError, parse_function,
409
                      self.output_invalid, self.volume_name)
410

    
411
  def test_ParseRbdShowmappedPlain(self):
412
    parse_function = bdev.RADOSBlockDevice._ParseRbdShowmappedPlain
413

    
414
    self.assertEqual(parse_function(self.plain_output_new_ok,
415
                     self.volume_name), "/dev/rbd3")
416
    self.assertEqual(parse_function(self.plain_output_old_ok,
417
                     self.volume_name), "/dev/rbd3")
418
    self.assertEqual(parse_function(self.plain_output_new_empty,
419
                     self.volume_name), None)
420
    self.assertEqual(parse_function(self.plain_output_old_empty,
421
                     self.volume_name), None)
422
    self.assertEqual(parse_function(self.plain_output_new_no_matches,
423
                     self.volume_name), None)
424
    self.assertEqual(parse_function(self.plain_output_old_no_matches,
425
                     self.volume_name), None)
426
    self.assertRaises(errors.BlockDeviceError, parse_function,
427
                      self.plain_output_new_extra_matches, self.volume_name)
428
    self.assertRaises(errors.BlockDeviceError, parse_function,
429
                      self.plain_output_old_extra_matches, self.volume_name)
430
    self.assertRaises(errors.BlockDeviceError, parse_function,
431
                      self.output_invalid, self.volume_name)
432

    
433
class TestComputeWrongFileStoragePathsInternal(unittest.TestCase):
434
  def testPaths(self):
435
    paths = bdev._GetForbiddenFileStoragePaths()
436

    
437
    for path in ["/bin", "/usr/local/sbin", "/lib64", "/etc", "/sys"]:
438
      self.assertTrue(path in paths)
439

    
440
    self.assertEqual(set(map(os.path.normpath, paths)), paths)
441

    
442
  def test(self):
443
    vfsp = bdev._ComputeWrongFileStoragePaths
444
    self.assertEqual(vfsp([]), [])
445
    self.assertEqual(vfsp(["/tmp"]), [])
446
    self.assertEqual(vfsp(["/bin/ls"]), ["/bin/ls"])
447
    self.assertEqual(vfsp(["/bin"]), ["/bin"])
448
    self.assertEqual(vfsp(["/usr/sbin/vim", "/srv/file-storage"]),
449
                     ["/usr/sbin/vim"])
450

    
451

    
452
class TestComputeWrongFileStoragePaths(testutils.GanetiTestCase):
453
  def test(self):
454
    tmpfile = self._CreateTempFile()
455

    
456
    utils.WriteFile(tmpfile, data="""
457
      /tmp
458
      x/y///z/relative
459
      # This is a test file
460
      /srv/storage
461
      /bin
462
      /usr/local/lib32/
463
      relative/path
464
      """)
465

    
466
    self.assertEqual(bdev.ComputeWrongFileStoragePaths(_filename=tmpfile), [
467
      "/bin",
468
      "/usr/local/lib32",
469
      "relative/path",
470
      "x/y/z/relative",
471
      ])
472

    
473

    
474
class TestCheckFileStoragePathInternal(unittest.TestCase):
475
  def testNonAbsolute(self):
476
    for i in ["", "tmp", "foo/bar/baz"]:
477
      self.assertRaises(errors.FileStoragePathError,
478
                        bdev._CheckFileStoragePath, i, ["/tmp"])
479

    
480
    self.assertRaises(errors.FileStoragePathError,
481
                      bdev._CheckFileStoragePath, "/tmp", ["tmp", "xyz"])
482

    
483
  def testNoAllowed(self):
484
    self.assertRaises(errors.FileStoragePathError,
485
                      bdev._CheckFileStoragePath, "/tmp", [])
486

    
487
  def testNoAdditionalPathComponent(self):
488
    self.assertRaises(errors.FileStoragePathError,
489
                      bdev._CheckFileStoragePath, "/tmp/foo", ["/tmp/foo"])
490

    
491
  def testAllowed(self):
492
    bdev._CheckFileStoragePath("/tmp/foo/a", ["/tmp/foo"])
493
    bdev._CheckFileStoragePath("/tmp/foo/a/x", ["/tmp/foo"])
494

    
495

    
496
class TestCheckFileStoragePath(testutils.GanetiTestCase):
497
  def testNonExistantFile(self):
498
    filename = "/tmp/this/file/does/not/exist"
499
    assert not os.path.exists(filename)
500
    self.assertRaises(errors.FileStoragePathError,
501
                      bdev.CheckFileStoragePath, "/bin/", _filename=filename)
502
    self.assertRaises(errors.FileStoragePathError,
503
                      bdev.CheckFileStoragePath, "/srv/file-storage",
504
                      _filename=filename)
505

    
506
  def testAllowedPath(self):
507
    tmpfile = self._CreateTempFile()
508

    
509
    utils.WriteFile(tmpfile, data="""
510
      /srv/storage
511
      """)
512

    
513
    bdev.CheckFileStoragePath("/srv/storage/inst1", _filename=tmpfile)
514

    
515
    # No additional path component
516
    self.assertRaises(errors.FileStoragePathError,
517
                      bdev.CheckFileStoragePath, "/srv/storage",
518
                      _filename=tmpfile)
519

    
520
    # Forbidden path
521
    self.assertRaises(errors.FileStoragePathError,
522
                      bdev.CheckFileStoragePath, "/usr/lib64/xyz",
523
                      _filename=tmpfile)
524

    
525

    
526
class TestLoadAllowedFileStoragePaths(testutils.GanetiTestCase):
527
  def testDevNull(self):
528
    self.assertEqual(bdev._LoadAllowedFileStoragePaths("/dev/null"), [])
529

    
530
  def testNonExistantFile(self):
531
    filename = "/tmp/this/file/does/not/exist"
532
    assert not os.path.exists(filename)
533
    self.assertEqual(bdev._LoadAllowedFileStoragePaths(filename), [])
534

    
535
  def test(self):
536
    tmpfile = self._CreateTempFile()
537

    
538
    utils.WriteFile(tmpfile, data="""
539
      # This is a test file
540
      /tmp
541
      /srv/storage
542
      relative/path
543
      """)
544

    
545
    self.assertEqual(bdev._LoadAllowedFileStoragePaths(tmpfile), [
546
      "/tmp",
547
      "/srv/storage",
548
      "relative/path",
549
      ])
550

    
551

    
552
class TestExclusiveStoragePvs(unittest.TestCase):
553
  """Test cases for functions dealing with LVM PV and exclusive storage"""
554
  # Allowance for rounding
555
  _EPS = 1e-4
556
  _MARGIN = constants.PART_MARGIN + constants.PART_RESERVED + _EPS
557

    
558
  @staticmethod
559
  def _GenerateRandomPvInfo(rnd, name, vg):
560
    # Granularity is .01 MiB
561
    size = rnd.randint(1024 * 100, 10 * 1024 * 1024 * 100)
562
    if rnd.choice([False, True]):
563
      free = float(rnd.randint(0, size)) / 100.0
564
    else:
565
      free = float(size) / 100.0
566
    size = float(size) / 100.0
567
    attr = "a-"
568
    return objects.LvmPvInfo(name=name, vg_name=vg, size=size, free=free,
569
                             attributes=attr)
570

    
571
  def testGetStdPvSize(self):
572
    """Test cases for bdev.LogicalVolume._GetStdPvSize()"""
573
    rnd = random.Random(9517)
574
    for _ in range(0, 50):
575
      # Identical volumes
576
      pvi = self._GenerateRandomPvInfo(rnd, "disk", "myvg")
577
      onesize = bdev.LogicalVolume._GetStdPvSize([pvi])
578
      self.assertTrue(onesize <= pvi.size)
579
      self.assertTrue(onesize > pvi.size * (1 - self._MARGIN))
580
      for length in range(2, 10):
581
        n_size = bdev.LogicalVolume._GetStdPvSize([pvi] * length)
582
        self.assertEqual(onesize, n_size)
583

    
584
      # Mixed volumes
585
      for length in range(1, 10):
586
        pvlist = [self._GenerateRandomPvInfo(rnd, "disk", "myvg")
587
                  for _ in range(0, length)]
588
        std_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
589
        self.assertTrue(compat.all(std_size <= pvi.size for pvi in pvlist))
590
        self.assertTrue(compat.any(std_size > pvi.size * (1 - self._MARGIN)
591
                                   for pvi in pvlist))
592
        pvlist.append(pvlist[0])
593
        p1_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
594
        self.assertEqual(std_size, p1_size)
595

    
596
  def testComputeNumPvs(self):
597
    """Test cases for bdev.LogicalVolume._ComputeNumPvs()"""
598
    rnd = random.Random(8067)
599
    for _ in range(0, 1000):
600
      pvlist = [self._GenerateRandomPvInfo(rnd, "disk", "myvg")]
601
      lv_size = float(rnd.randint(10 * 100, 1024 * 1024 * 100)) / 100.0
602
      num_pv = bdev.LogicalVolume._ComputeNumPvs(lv_size, pvlist)
603
      std_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
604
      self.assertTrue(num_pv >= 1)
605
      self.assertTrue(num_pv * std_size >= lv_size)
606
      self.assertTrue((num_pv - 1) * std_size < lv_size * (1 + self._EPS))
607

    
608
  def testGetEmptyPvNames(self):
609
    """Test cases for bdev.LogicalVolume._GetEmptyPvNames()"""
610
    rnd = random.Random(21126)
611
    for _ in range(0, 100):
612
      num_pvs = rnd.randint(1, 20)
613
      pvlist = [self._GenerateRandomPvInfo(rnd, "disk%d" % n, "myvg")
614
                for n in range(0, num_pvs)]
615
      for num_req in range(1, num_pvs + 2):
616
        epvs = bdev.LogicalVolume._GetEmptyPvNames(pvlist, num_req)
617
        epvs_set = compat.UniqueFrozenset(epvs)
618
        if len(epvs) > 1:
619
          self.assertEqual(len(epvs), len(epvs_set))
620
        for pvi in pvlist:
621
          if pvi.name in epvs_set:
622
            self.assertEqual(pvi.size, pvi.free)
623
          else:
624
            # There should be no remaining empty PV when less than the
625
            # requeste number of PVs has been returned
626
            self.assertTrue(len(epvs) == num_req or pvi.free != pvi.size)
627

    
628

    
629
if __name__ == "__main__":
630
  testutils.GanetiTestProgram()