Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.bdev_unittest.py @ 00ef625c

History | View | Annotate | Download (20.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the bdev module"""
23

    
24

    
25
import os
26
import random
27
import unittest
28

    
29
from ganeti import bdev
30
from ganeti import compat
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import objects
34
from ganeti import utils
35

    
36
import testutils
37

    
38

    
39
class TestBaseDRBD(testutils.GanetiTestCase):
40
  def testGetVersion(self):
41
    data = [
42
      ["version: 8.0.12 (api:76/proto:86-91)"],
43
      ["version: 8.2.7 (api:88/proto:0-100)"],
44
      ["version: 8.3.7.49 (api:188/proto:13-191)"],
45
    ]
46
    result = [
47
      {
48
      "k_major": 8,
49
      "k_minor": 0,
50
      "k_point": 12,
51
      "api": 76,
52
      "proto": 86,
53
      "proto2": "91",
54
      },
55
      {
56
      "k_major": 8,
57
      "k_minor": 2,
58
      "k_point": 7,
59
      "api": 88,
60
      "proto": 0,
61
      "proto2": "100",
62
      },
63
      {
64
      "k_major": 8,
65
      "k_minor": 3,
66
      "k_point": 7,
67
      "api": 188,
68
      "proto": 13,
69
      "proto2": "191",
70
      }
71
    ]
72
    for d,r in zip(data, result):
73
      self.assertEqual(bdev.BaseDRBD._GetVersion(d), r)
74

    
75

    
76
class TestDRBD8Runner(testutils.GanetiTestCase):
77
  """Testing case for DRBD8"""
78

    
79
  @staticmethod
80
  def _has_disk(data, dname, mname):
81
    """Check local disk corectness"""
82
    retval = (
83
      "local_dev" in data and
84
      data["local_dev"] == dname and
85
      "meta_dev" in data and
86
      data["meta_dev"] == mname and
87
      "meta_index" in data and
88
      data["meta_index"] == 0
89
      )
90
    return retval
91

    
92
  @staticmethod
93
  def _has_net(data, local, remote):
94
    """Check network connection parameters"""
95
    retval = (
96
      "local_addr" in data and
97
      data["local_addr"] == local and
98
      "remote_addr" in data and
99
      data["remote_addr"] == remote
100
      )
101
    return retval
102

    
103
  def testParserCreation(self):
104
    """Test drbdsetup show parser creation"""
105
    bdev.DRBD8._GetShowParser()
106

    
107
  def testParser80(self):
108
    """Test drbdsetup show parser for disk and network version 8.0"""
109
    data = testutils.ReadTestData("bdev-drbd-8.0.txt")
110
    result = bdev.DRBD8._GetDevInfo(data)
111
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
112
                                   "/dev/xenvg/test.meta"),
113
                    "Wrong local disk info")
114
    self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
115
                                  ("192.0.2.2", 11000)),
116
                    "Wrong network info (8.0.x)")
117

    
118
  def testParser83(self):
119
    """Test drbdsetup show parser for disk and network version 8.3"""
120
    data = testutils.ReadTestData("bdev-drbd-8.3.txt")
121
    result = bdev.DRBD8._GetDevInfo(data)
122
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
123
                                   "/dev/xenvg/test.meta"),
124
                    "Wrong local disk info")
125
    self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
126
                                  ("192.0.2.2", 11000)),
127
                    "Wrong network info (8.0.x)")
128

    
129
  def testParserNetIP4(self):
130
    """Test drbdsetup show parser for IPv4 network"""
131
    data = testutils.ReadTestData("bdev-drbd-net-ip4.txt")
132
    result = bdev.DRBD8._GetDevInfo(data)
133
    self.failUnless(("local_dev" not in result and
134
                     "meta_dev" not in result and
135
                     "meta_index" not in result),
136
                    "Should not find local disk info")
137
    self.failUnless(self._has_net(result, ("192.0.2.1", 11002),
138
                                  ("192.0.2.2", 11002)),
139
                    "Wrong network info (IPv4)")
140

    
141
  def testParserNetIP6(self):
142
    """Test drbdsetup show parser for IPv6 network"""
143
    data = testutils.ReadTestData("bdev-drbd-net-ip6.txt")
144
    result = bdev.DRBD8._GetDevInfo(data)
145
    self.failUnless(("local_dev" not in result and
146
                     "meta_dev" not in result and
147
                     "meta_index" not in result),
148
                    "Should not find local disk info")
149
    self.failUnless(self._has_net(result, ("2001:db8:65::1", 11048),
150
                                  ("2001:db8:66::1", 11048)),
151
                    "Wrong network info (IPv6)")
152

    
153
  def testParserDisk(self):
154
    """Test drbdsetup show parser for disk"""
155
    data = testutils.ReadTestData("bdev-drbd-disk.txt")
156
    result = bdev.DRBD8._GetDevInfo(data)
157
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
158
                                   "/dev/xenvg/test.meta"),
159
                    "Wrong local disk info")
160
    self.failUnless(("local_addr" not in result and
161
                     "remote_addr" not in result),
162
                    "Should not find network info")
163

    
164
  def testBarriersOptions(self):
165
    """Test class method that generates drbdsetup options for disk barriers"""
166
    # Tests that should fail because of wrong version/options combinations
167
    should_fail = [
168
      (8, 0, 12, "bfd", True),
169
      (8, 0, 12, "fd", False),
170
      (8, 0, 12, "b", True),
171
      (8, 2, 7, "bfd", True),
172
      (8, 2, 7, "b", True)
173
    ]
174

    
175
    for vmaj, vmin, vrel, opts, meta in should_fail:
176
      self.assertRaises(errors.BlockDeviceError,
177
                        bdev.DRBD8._ComputeDiskBarrierArgs,
178
                        vmaj, vmin, vrel, opts, meta)
179

    
180
    # get the valid options from the frozenset(frozenset()) in constants.
181
    valid_options = [list(x)[0] for x in constants.DRBD_VALID_BARRIER_OPT]
182

    
183
    # Versions that do not support anything
184
    for vmaj, vmin, vrel in ((8, 0, 0), (8, 0, 11), (8, 2, 6)):
185
      for opts in valid_options:
186
        self.assertRaises(errors.BlockDeviceError,
187
                          bdev.DRBD8._ComputeDiskBarrierArgs,
188
                          vmaj, vmin, vrel, opts, True)
189

    
190
    # Versions with partial support (testing only options that are supported)
191
    tests = [
192
      (8, 0, 12, "n", False, []),
193
      (8, 0, 12, "n", True, ["--no-md-flushes"]),
194
      (8, 2, 7, "n", False, []),
195
      (8, 2, 7, "fd", False, ["--no-disk-flushes", "--no-disk-drain"]),
196
      (8, 0, 12, "n", True, ["--no-md-flushes"]),
197
      ]
198

    
199
    # Versions that support everything
200
    for vmaj, vmin, vrel in ((8, 3, 0), (8, 3, 12)):
201
      tests.append((vmaj, vmin, vrel, "bfd", True,
202
                    ["--no-disk-barrier", "--no-disk-drain",
203
                     "--no-disk-flushes", "--no-md-flushes"]))
204
      tests.append((vmaj, vmin, vrel, "n", False, []))
205
      tests.append((vmaj, vmin, vrel, "b", True,
206
                    ["--no-disk-barrier", "--no-md-flushes"]))
207
      tests.append((vmaj, vmin, vrel, "fd", False,
208
                    ["--no-disk-flushes", "--no-disk-drain"]))
209
      tests.append((vmaj, vmin, vrel, "n", True, ["--no-md-flushes"]))
210

    
211
    # Test execution
212
    for test in tests:
213
      vmaj, vmin, vrel, disabled_barriers, disable_meta_flush, expected = test
214
      args = \
215
        bdev.DRBD8._ComputeDiskBarrierArgs(vmaj, vmin, vrel,
216
                                           disabled_barriers,
217
                                           disable_meta_flush)
218
      self.failUnless(set(args) == set(expected),
219
                      "For test %s, got wrong results %s" % (test, args))
220

    
221
    # Unsupported or invalid versions
222
    for vmaj, vmin, vrel in ((0, 7, 25), (9, 0, 0), (7, 0, 0), (8, 4, 0)):
223
      self.assertRaises(errors.BlockDeviceError,
224
                        bdev.DRBD8._ComputeDiskBarrierArgs,
225
                        vmaj, vmin, vrel, "n", True)
226

    
227
    # Invalid options
228
    for option in ("", "c", "whatever", "nbdfc", "nf"):
229
      self.assertRaises(errors.BlockDeviceError,
230
                        bdev.DRBD8._ComputeDiskBarrierArgs,
231
                        8, 3, 11, option, True)
232

    
233

    
234
class TestDRBD8Status(testutils.GanetiTestCase):
235
  """Testing case for DRBD8 /proc status"""
236

    
237
  def setUp(self):
238
    """Read in txt data"""
239
    testutils.GanetiTestCase.setUp(self)
240
    proc_data = testutils.TestDataFilename("proc_drbd8.txt")
241
    proc80e_data = testutils.TestDataFilename("proc_drbd80-emptyline.txt")
242
    proc83_data = testutils.TestDataFilename("proc_drbd83.txt")
243
    proc83_sync_data = testutils.TestDataFilename("proc_drbd83_sync.txt")
244
    proc83_sync_krnl_data = \
245
      testutils.TestDataFilename("proc_drbd83_sync_krnl2.6.39.txt")
246
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
247
    self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
248
    self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
249
    self.proc83_sync_data = bdev.DRBD8._GetProcData(filename=proc83_sync_data)
250
    self.proc83_sync_krnl_data = \
251
      bdev.DRBD8._GetProcData(filename=proc83_sync_krnl_data)
252
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
253
    self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
254
    self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
255
    self.mass83_sync_data = bdev.DRBD8._MassageProcData(self.proc83_sync_data)
256
    self.mass83_sync_krnl_data = \
257
      bdev.DRBD8._MassageProcData(self.proc83_sync_krnl_data)
258

    
259
  def testIOErrors(self):
260
    """Test handling of errors while reading the proc file."""
261
    temp_file = self._CreateTempFile()
262
    os.unlink(temp_file)
263
    self.failUnlessRaises(errors.BlockDeviceError,
264
                          bdev.DRBD8._GetProcData, filename=temp_file)
265

    
266
  def testHelper(self):
267
    """Test reading usermode_helper in /sys."""
268
    sys_drbd_helper = testutils.TestDataFilename("sys_drbd_usermode_helper.txt")
269
    drbd_helper = bdev.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
270
    self.failUnlessEqual(drbd_helper, "/bin/true")
271

    
272
  def testHelperIOErrors(self):
273
    """Test handling of errors while reading usermode_helper in /sys."""
274
    temp_file = self._CreateTempFile()
275
    os.unlink(temp_file)
276
    self.failUnlessRaises(errors.BlockDeviceError,
277
                          bdev.DRBD8.GetUsermodeHelper, filename=temp_file)
278

    
279
  def testMinorNotFound(self):
280
    """Test not-found-minor in /proc"""
281
    self.failUnless(9 not in self.mass_data)
282
    self.failUnless(9 not in self.mass83_data)
283
    self.failUnless(3 not in self.mass80e_data)
284

    
285
  def testLineNotMatch(self):
286
    """Test wrong line passed to DRBD8Status"""
287
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
288

    
289
  def testMinor0(self):
290
    """Test connected, primary device"""
291
    for data in [self.mass_data, self.mass83_data]:
292
      stats = bdev.DRBD8Status(data[0])
293
      self.failUnless(stats.is_in_use)
294
      self.failUnless(stats.is_connected and stats.is_primary and
295
                      stats.peer_secondary and stats.is_disk_uptodate)
296

    
297
  def testMinor1(self):
298
    """Test connected, secondary device"""
299
    for data in [self.mass_data, self.mass83_data]:
300
      stats = bdev.DRBD8Status(data[1])
301
      self.failUnless(stats.is_in_use)
302
      self.failUnless(stats.is_connected and stats.is_secondary and
303
                      stats.peer_primary and stats.is_disk_uptodate)
304

    
305
  def testMinor2(self):
306
    """Test unconfigured device"""
307
    for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
308
      stats = bdev.DRBD8Status(data[2])
309
      self.failIf(stats.is_in_use)
310

    
311
  def testMinor4(self):
312
    """Test WFconn device"""
313
    for data in [self.mass_data, self.mass83_data]:
314
      stats = bdev.DRBD8Status(data[4])
315
      self.failUnless(stats.is_in_use)
316
      self.failUnless(stats.is_wfconn and stats.is_primary and
317
                      stats.rrole == "Unknown" and
318
                      stats.is_disk_uptodate)
319

    
320
  def testMinor6(self):
321
    """Test diskless device"""
322
    for data in [self.mass_data, self.mass83_data]:
323
      stats = bdev.DRBD8Status(data[6])
324
      self.failUnless(stats.is_in_use)
325
      self.failUnless(stats.is_connected and stats.is_secondary and
326
                      stats.peer_primary and stats.is_diskless)
327

    
328
  def testMinor8(self):
329
    """Test standalone device"""
330
    for data in [self.mass_data, self.mass83_data]:
331
      stats = bdev.DRBD8Status(data[8])
332
      self.failUnless(stats.is_in_use)
333
      self.failUnless(stats.is_standalone and
334
                      stats.rrole == "Unknown" and
335
                      stats.is_disk_uptodate)
336

    
337
  def testDRBD83SyncFine(self):
338
    stats = bdev.DRBD8Status(self.mass83_sync_data[3])
339
    self.failUnless(stats.is_in_resync)
340
    self.failUnless(stats.sync_percent is not None)
341

    
342
  def testDRBD83SyncBroken(self):
343
    stats = bdev.DRBD8Status(self.mass83_sync_krnl_data[3])
344
    self.failUnless(stats.is_in_resync)
345
    self.failUnless(stats.sync_percent is not None)
346

    
347

    
348
class TestRADOSBlockDevice(testutils.GanetiTestCase):
349
  def test_ParseRbdShowmappedOutput(self):
350
    volume_name = "abc9778-8e8ace5b.rbd.disk0"
351
    output_ok = \
352
      ("0\trbd\te69f28e5-9817.rbd.disk0\t-\t/dev/rbd0\n"
353
       "1\t/dev/rbd0\tabc9778-8e8ace5b.rbd.disk0\t-\t/dev/rbd16\n"
354
       "line\twith\tfewer\tfields\n"
355
       "")
356
    output_empty = ""
357
    output_no_matches = \
358
      ("0\trbd\te69f28e5-9817.rbd.disk0\t-\t/dev/rbd0\n"
359
       "1\trbd\tabcdef01-9817.rbd.disk0\t-\t/dev/rbd10\n"
360
       "2\trbd\tcdef0123-9817.rbd.disk0\t-\t/dev/rbd12\n"
361
       "something\twith\tfewer\tfields"
362
       "")
363
    output_extra_matches = \
364
      ("0\t/dev/rbd0\tabc9778-8e8ace5b.rbd.disk0\t-\t/dev/rbd11\n"
365
       "1\trbd\te69f28e5-9817.rbd.disk0\t-\t/dev/rbd0\n"
366
       "2\t/dev/rbd0\tabc9778-8e8ace5b.rbd.disk0\t-\t/dev/rbd16\n"
367
       "something\twith\tfewer\tfields"
368
       "")
369

    
370
    parse_function = bdev.RADOSBlockDevice._ParseRbdShowmappedOutput
371
    self.assertEqual(parse_function(output_ok, volume_name), "/dev/rbd16")
372
    self.assertRaises(errors.BlockDeviceError, parse_function,
373
                      output_empty, volume_name)
374
    self.assertEqual(parse_function(output_no_matches, volume_name), None)
375
    self.assertRaises(errors.BlockDeviceError, parse_function,
376
                      output_extra_matches, volume_name)
377

    
378

    
379
class TestComputeWrongFileStoragePathsInternal(unittest.TestCase):
380
  def testPaths(self):
381
    paths = bdev._GetForbiddenFileStoragePaths()
382

    
383
    for path in ["/bin", "/usr/local/sbin", "/lib64", "/etc", "/sys"]:
384
      self.assertTrue(path in paths)
385

    
386
    self.assertEqual(set(map(os.path.normpath, paths)), paths)
387

    
388
  def test(self):
389
    vfsp = bdev._ComputeWrongFileStoragePaths
390
    self.assertEqual(vfsp([]), [])
391
    self.assertEqual(vfsp(["/tmp"]), [])
392
    self.assertEqual(vfsp(["/bin/ls"]), ["/bin/ls"])
393
    self.assertEqual(vfsp(["/bin"]), ["/bin"])
394
    self.assertEqual(vfsp(["/usr/sbin/vim", "/srv/file-storage"]),
395
                     ["/usr/sbin/vim"])
396

    
397

    
398
class TestComputeWrongFileStoragePaths(testutils.GanetiTestCase):
399
  def test(self):
400
    tmpfile = self._CreateTempFile()
401

    
402
    utils.WriteFile(tmpfile, data="""
403
      /tmp
404
      x/y///z/relative
405
      # This is a test file
406
      /srv/storage
407
      /bin
408
      /usr/local/lib32/
409
      relative/path
410
      """)
411

    
412
    self.assertEqual(bdev.ComputeWrongFileStoragePaths(_filename=tmpfile), [
413
      "/bin",
414
      "/usr/local/lib32",
415
      "relative/path",
416
      "x/y/z/relative",
417
      ])
418

    
419

    
420
class TestCheckFileStoragePathInternal(unittest.TestCase):
421
  def testNonAbsolute(self):
422
    for i in ["", "tmp", "foo/bar/baz"]:
423
      self.assertRaises(errors.FileStoragePathError,
424
                        bdev._CheckFileStoragePath, i, ["/tmp"])
425

    
426
    self.assertRaises(errors.FileStoragePathError,
427
                      bdev._CheckFileStoragePath, "/tmp", ["tmp", "xyz"])
428

    
429
  def testNoAllowed(self):
430
    self.assertRaises(errors.FileStoragePathError,
431
                      bdev._CheckFileStoragePath, "/tmp", [])
432

    
433
  def testNoAdditionalPathComponent(self):
434
    self.assertRaises(errors.FileStoragePathError,
435
                      bdev._CheckFileStoragePath, "/tmp/foo", ["/tmp/foo"])
436

    
437
  def testAllowed(self):
438
    bdev._CheckFileStoragePath("/tmp/foo/a", ["/tmp/foo"])
439
    bdev._CheckFileStoragePath("/tmp/foo/a/x", ["/tmp/foo"])
440

    
441

    
442
class TestCheckFileStoragePath(testutils.GanetiTestCase):
443
  def testNonExistantFile(self):
444
    filename = "/tmp/this/file/does/not/exist"
445
    assert not os.path.exists(filename)
446
    self.assertRaises(errors.FileStoragePathError,
447
                      bdev.CheckFileStoragePath, "/bin/", _filename=filename)
448
    self.assertRaises(errors.FileStoragePathError,
449
                      bdev.CheckFileStoragePath, "/srv/file-storage",
450
                      _filename=filename)
451

    
452
  def testAllowedPath(self):
453
    tmpfile = self._CreateTempFile()
454

    
455
    utils.WriteFile(tmpfile, data="""
456
      /srv/storage
457
      """)
458

    
459
    bdev.CheckFileStoragePath("/srv/storage/inst1", _filename=tmpfile)
460

    
461
    # No additional path component
462
    self.assertRaises(errors.FileStoragePathError,
463
                      bdev.CheckFileStoragePath, "/srv/storage",
464
                      _filename=tmpfile)
465

    
466
    # Forbidden path
467
    self.assertRaises(errors.FileStoragePathError,
468
                      bdev.CheckFileStoragePath, "/usr/lib64/xyz",
469
                      _filename=tmpfile)
470

    
471

    
472
class TestLoadAllowedFileStoragePaths(testutils.GanetiTestCase):
473
  def testDevNull(self):
474
    self.assertEqual(bdev._LoadAllowedFileStoragePaths("/dev/null"), [])
475

    
476
  def testNonExistantFile(self):
477
    filename = "/tmp/this/file/does/not/exist"
478
    assert not os.path.exists(filename)
479
    self.assertEqual(bdev._LoadAllowedFileStoragePaths(filename), [])
480

    
481
  def test(self):
482
    tmpfile = self._CreateTempFile()
483

    
484
    utils.WriteFile(tmpfile, data="""
485
      # This is a test file
486
      /tmp
487
      /srv/storage
488
      relative/path
489
      """)
490

    
491
    self.assertEqual(bdev._LoadAllowedFileStoragePaths(tmpfile), [
492
      "/tmp",
493
      "/srv/storage",
494
      "relative/path",
495
      ])
496

    
497

    
498
class TestExclusiveStoragePvs(unittest.TestCase):
499
  """Test cases for functions dealing with LVM PV and exclusive storage"""
500
  # Allowance for rounding
501
  _EPS = 1e-4
502
  _MARGIN = constants.PART_MARGIN + constants.PART_RESERVED + _EPS
503

    
504
  @staticmethod
505
  def _GenerateRandomPvInfo(rnd, name, vg):
506
    # Granularity is .01 MiB
507
    size = rnd.randint(1024 * 100, 10 * 1024 * 1024 * 100)
508
    if rnd.choice([False, True]):
509
      free = float(rnd.randint(0, size)) / 100.0
510
    else:
511
      free = float(size) / 100.0
512
    size = float(size) / 100.0
513
    attr = "a-"
514
    return objects.LvmPvInfo(name=name, vg_name=vg, size=size, free=free,
515
                             attributes=attr)
516

    
517
  def testGetStdPvSize(self):
518
    """Test cases for bdev.LogicalVolume._GetStdPvSize()"""
519
    rnd = random.Random(9517)
520
    for _ in range(0, 50):
521
      # Identical volumes
522
      pvi = self._GenerateRandomPvInfo(rnd, "disk", "myvg")
523
      onesize = bdev.LogicalVolume._GetStdPvSize([pvi])
524
      self.assertTrue(onesize <= pvi.size)
525
      self.assertTrue(onesize > pvi.size * (1 - self._MARGIN))
526
      for length in range(2, 10):
527
        n_size = bdev.LogicalVolume._GetStdPvSize([pvi] * length)
528
        self.assertEqual(onesize, n_size)
529

    
530
      # Mixed volumes
531
      for length in range(1, 10):
532
        pvlist = [self._GenerateRandomPvInfo(rnd, "disk", "myvg")
533
                  for _ in range(0, length)]
534
        std_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
535
        self.assertTrue(compat.all(std_size <= pvi.size for pvi in pvlist))
536
        self.assertTrue(compat.any(std_size > pvi.size * (1 - self._MARGIN)
537
                                   for pvi in pvlist))
538
        pvlist.append(pvlist[0])
539
        p1_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
540
        self.assertEqual(std_size, p1_size)
541

    
542
  def testComputeNumPvs(self):
543
    """Test cases for bdev.LogicalVolume._ComputeNumPvs()"""
544
    rnd = random.Random(8067)
545
    for _ in range(0, 1000):
546
      pvlist = [self._GenerateRandomPvInfo(rnd, "disk", "myvg")]
547
      lv_size = float(rnd.randint(10 * 100, 1024 * 1024 * 100)) / 100.0
548
      num_pv = bdev.LogicalVolume._ComputeNumPvs(lv_size, pvlist)
549
      std_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
550
      self.assertTrue(num_pv >= 1)
551
      self.assertTrue(num_pv * std_size >= lv_size)
552
      self.assertTrue((num_pv - 1) * std_size < lv_size * (1 + self._EPS))
553

    
554
  def testGetEmptyPvNames(self):
555
    """Test cases for bdev.LogicalVolume._GetEmptyPvNames()"""
556
    rnd = random.Random(21126)
557
    for _ in range(0, 100):
558
      num_pvs = rnd.randint(1, 20)
559
      pvlist = [self._GenerateRandomPvInfo(rnd, "disk%d" % n, "myvg")
560
                for n in range(0, num_pvs)]
561
      for num_req in range(1, num_pvs + 2):
562
        epvs = bdev.LogicalVolume._GetEmptyPvNames(pvlist, num_req)
563
        epvs_set = compat.UniqueFrozenset(epvs)
564
        if len(epvs) > 1:
565
          self.assertEqual(len(epvs), len(epvs_set))
566
        for pvi in pvlist:
567
          if pvi.name in epvs_set:
568
            self.assertEqual(pvi.size, pvi.free)
569
          else:
570
            # There should be no remaining empty PV when less than the
571
            # requeste number of PVs has been returned
572
            self.assertTrue(len(epvs) == num_req or pvi.free != pvi.size)
573

    
574

    
575
if __name__ == "__main__":
576
  testutils.GanetiTestProgram()