Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.bdev_unittest.py @ 2e076ede

History | View | Annotate | Download (23 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the bdev module"""
23

    
24

    
25
import os
26
import random
27
import unittest
28

    
29
from ganeti import bdev
30
from ganeti import compat
31
from ganeti import constants
32
from ganeti import errors
33
from ganeti import objects
34
from ganeti import utils
35

    
36
import testutils
37

    
38

    
39
class TestBaseDRBD(testutils.GanetiTestCase):
40
  def testGetVersion(self):
41
    data = [
42
      ["version: 8.0.12 (api:76/proto:86-91)"],
43
      ["version: 8.2.7 (api:88/proto:0-100)"],
44
      ["version: 8.3.7.49 (api:188/proto:13-191)"],
45
    ]
46
    result = [
47
      {
48
      "k_major": 8,
49
      "k_minor": 0,
50
      "k_point": 12,
51
      "api": 76,
52
      "proto": 86,
53
      "proto2": "91",
54
      },
55
      {
56
      "k_major": 8,
57
      "k_minor": 2,
58
      "k_point": 7,
59
      "api": 88,
60
      "proto": 0,
61
      "proto2": "100",
62
      },
63
      {
64
      "k_major": 8,
65
      "k_minor": 3,
66
      "k_point": 7,
67
      "api": 188,
68
      "proto": 13,
69
      "proto2": "191",
70
      }
71
    ]
72
    for d,r in zip(data, result):
73
      self.assertEqual(bdev.BaseDRBD._GetVersion(d), r)
74

    
75

    
76
class TestDRBD8Runner(testutils.GanetiTestCase):
77
  """Testing case for DRBD8"""
78

    
79
  @staticmethod
80
  def _has_disk(data, dname, mname):
81
    """Check local disk corectness"""
82
    retval = (
83
      "local_dev" in data and
84
      data["local_dev"] == dname and
85
      "meta_dev" in data and
86
      data["meta_dev"] == mname and
87
      "meta_index" in data and
88
      data["meta_index"] == 0
89
      )
90
    return retval
91

    
92
  @staticmethod
93
  def _has_net(data, local, remote):
94
    """Check network connection parameters"""
95
    retval = (
96
      "local_addr" in data and
97
      data["local_addr"] == local and
98
      "remote_addr" in data and
99
      data["remote_addr"] == remote
100
      )
101
    return retval
102

    
103
  def testParserCreation(self):
104
    """Test drbdsetup show parser creation"""
105
    bdev.DRBD8._GetShowParser()
106

    
107
  def testParser80(self):
108
    """Test drbdsetup show parser for disk and network version 8.0"""
109
    data = testutils.ReadTestData("bdev-drbd-8.0.txt")
110
    result = bdev.DRBD8._GetDevInfo(data)
111
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
112
                                   "/dev/xenvg/test.meta"),
113
                    "Wrong local disk info")
114
    self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
115
                                  ("192.0.2.2", 11000)),
116
                    "Wrong network info (8.0.x)")
117

    
118
  def testParser83(self):
119
    """Test drbdsetup show parser for disk and network version 8.3"""
120
    data = testutils.ReadTestData("bdev-drbd-8.3.txt")
121
    result = bdev.DRBD8._GetDevInfo(data)
122
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
123
                                   "/dev/xenvg/test.meta"),
124
                    "Wrong local disk info")
125
    self.failUnless(self._has_net(result, ("192.0.2.1", 11000),
126
                                  ("192.0.2.2", 11000)),
127
                    "Wrong network info (8.0.x)")
128

    
129
  def testParserNetIP4(self):
130
    """Test drbdsetup show parser for IPv4 network"""
131
    data = testutils.ReadTestData("bdev-drbd-net-ip4.txt")
132
    result = bdev.DRBD8._GetDevInfo(data)
133
    self.failUnless(("local_dev" not in result and
134
                     "meta_dev" not in result and
135
                     "meta_index" not in result),
136
                    "Should not find local disk info")
137
    self.failUnless(self._has_net(result, ("192.0.2.1", 11002),
138
                                  ("192.0.2.2", 11002)),
139
                    "Wrong network info (IPv4)")
140

    
141
  def testParserNetIP6(self):
142
    """Test drbdsetup show parser for IPv6 network"""
143
    data = testutils.ReadTestData("bdev-drbd-net-ip6.txt")
144
    result = bdev.DRBD8._GetDevInfo(data)
145
    self.failUnless(("local_dev" not in result and
146
                     "meta_dev" not in result and
147
                     "meta_index" not in result),
148
                    "Should not find local disk info")
149
    self.failUnless(self._has_net(result, ("2001:db8:65::1", 11048),
150
                                  ("2001:db8:66::1", 11048)),
151
                    "Wrong network info (IPv6)")
152

    
153
  def testParserDisk(self):
154
    """Test drbdsetup show parser for disk"""
155
    data = testutils.ReadTestData("bdev-drbd-disk.txt")
156
    result = bdev.DRBD8._GetDevInfo(data)
157
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
158
                                   "/dev/xenvg/test.meta"),
159
                    "Wrong local disk info")
160
    self.failUnless(("local_addr" not in result and
161
                     "remote_addr" not in result),
162
                    "Should not find network info")
163

    
164
  def testBarriersOptions(self):
165
    """Test class method that generates drbdsetup options for disk barriers"""
166
    # Tests that should fail because of wrong version/options combinations
167
    should_fail = [
168
      (8, 0, 12, "bfd", True),
169
      (8, 0, 12, "fd", False),
170
      (8, 0, 12, "b", True),
171
      (8, 2, 7, "bfd", True),
172
      (8, 2, 7, "b", True)
173
    ]
174

    
175
    for vmaj, vmin, vrel, opts, meta in should_fail:
176
      self.assertRaises(errors.BlockDeviceError,
177
                        bdev.DRBD8._ComputeDiskBarrierArgs,
178
                        vmaj, vmin, vrel, opts, meta)
179

    
180
    # get the valid options from the frozenset(frozenset()) in constants.
181
    valid_options = [list(x)[0] for x in constants.DRBD_VALID_BARRIER_OPT]
182

    
183
    # Versions that do not support anything
184
    for vmaj, vmin, vrel in ((8, 0, 0), (8, 0, 11), (8, 2, 6)):
185
      for opts in valid_options:
186
        self.assertRaises(errors.BlockDeviceError,
187
                          bdev.DRBD8._ComputeDiskBarrierArgs,
188
                          vmaj, vmin, vrel, opts, True)
189

    
190
    # Versions with partial support (testing only options that are supported)
191
    tests = [
192
      (8, 0, 12, "n", False, []),
193
      (8, 0, 12, "n", True, ["--no-md-flushes"]),
194
      (8, 2, 7, "n", False, []),
195
      (8, 2, 7, "fd", False, ["--no-disk-flushes", "--no-disk-drain"]),
196
      (8, 0, 12, "n", True, ["--no-md-flushes"]),
197
      ]
198

    
199
    # Versions that support everything
200
    for vmaj, vmin, vrel in ((8, 3, 0), (8, 3, 12)):
201
      tests.append((vmaj, vmin, vrel, "bfd", True,
202
                    ["--no-disk-barrier", "--no-disk-drain",
203
                     "--no-disk-flushes", "--no-md-flushes"]))
204
      tests.append((vmaj, vmin, vrel, "n", False, []))
205
      tests.append((vmaj, vmin, vrel, "b", True,
206
                    ["--no-disk-barrier", "--no-md-flushes"]))
207
      tests.append((vmaj, vmin, vrel, "fd", False,
208
                    ["--no-disk-flushes", "--no-disk-drain"]))
209
      tests.append((vmaj, vmin, vrel, "n", True, ["--no-md-flushes"]))
210

    
211
    # Test execution
212
    for test in tests:
213
      vmaj, vmin, vrel, disabled_barriers, disable_meta_flush, expected = test
214
      args = \
215
        bdev.DRBD8._ComputeDiskBarrierArgs(vmaj, vmin, vrel,
216
                                           disabled_barriers,
217
                                           disable_meta_flush)
218
      self.failUnless(set(args) == set(expected),
219
                      "For test %s, got wrong results %s" % (test, args))
220

    
221
    # Unsupported or invalid versions
222
    for vmaj, vmin, vrel in ((0, 7, 25), (9, 0, 0), (7, 0, 0), (8, 4, 0)):
223
      self.assertRaises(errors.BlockDeviceError,
224
                        bdev.DRBD8._ComputeDiskBarrierArgs,
225
                        vmaj, vmin, vrel, "n", True)
226

    
227
    # Invalid options
228
    for option in ("", "c", "whatever", "nbdfc", "nf"):
229
      self.assertRaises(errors.BlockDeviceError,
230
                        bdev.DRBD8._ComputeDiskBarrierArgs,
231
                        8, 3, 11, option, True)
232

    
233

    
234
class TestDRBD8Status(testutils.GanetiTestCase):
235
  """Testing case for DRBD8 /proc status"""
236

    
237
  def setUp(self):
238
    """Read in txt data"""
239
    testutils.GanetiTestCase.setUp(self)
240
    proc_data = testutils.TestDataFilename("proc_drbd8.txt")
241
    proc80e_data = testutils.TestDataFilename("proc_drbd80-emptyline.txt")
242
    proc83_data = testutils.TestDataFilename("proc_drbd83.txt")
243
    proc83_sync_data = testutils.TestDataFilename("proc_drbd83_sync.txt")
244
    proc83_sync_krnl_data = \
245
      testutils.TestDataFilename("proc_drbd83_sync_krnl2.6.39.txt")
246
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
247
    self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
248
    self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
249
    self.proc83_sync_data = bdev.DRBD8._GetProcData(filename=proc83_sync_data)
250
    self.proc83_sync_krnl_data = \
251
      bdev.DRBD8._GetProcData(filename=proc83_sync_krnl_data)
252
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
253
    self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
254
    self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
255
    self.mass83_sync_data = bdev.DRBD8._MassageProcData(self.proc83_sync_data)
256
    self.mass83_sync_krnl_data = \
257
      bdev.DRBD8._MassageProcData(self.proc83_sync_krnl_data)
258

    
259
  def testIOErrors(self):
260
    """Test handling of errors while reading the proc file."""
261
    temp_file = self._CreateTempFile()
262
    os.unlink(temp_file)
263
    self.failUnlessRaises(errors.BlockDeviceError,
264
                          bdev.DRBD8._GetProcData, filename=temp_file)
265

    
266
  def testHelper(self):
267
    """Test reading usermode_helper in /sys."""
268
    sys_drbd_helper = testutils.TestDataFilename("sys_drbd_usermode_helper.txt")
269
    drbd_helper = bdev.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
270
    self.failUnlessEqual(drbd_helper, "/bin/true")
271

    
272
  def testHelperIOErrors(self):
273
    """Test handling of errors while reading usermode_helper in /sys."""
274
    temp_file = self._CreateTempFile()
275
    os.unlink(temp_file)
276
    self.failUnlessRaises(errors.BlockDeviceError,
277
                          bdev.DRBD8.GetUsermodeHelper, filename=temp_file)
278

    
279
  def testMinorNotFound(self):
280
    """Test not-found-minor in /proc"""
281
    self.failUnless(9 not in self.mass_data)
282
    self.failUnless(9 not in self.mass83_data)
283
    self.failUnless(3 not in self.mass80e_data)
284

    
285
  def testLineNotMatch(self):
286
    """Test wrong line passed to DRBD8Status"""
287
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
288

    
289
  def testMinor0(self):
290
    """Test connected, primary device"""
291
    for data in [self.mass_data, self.mass83_data]:
292
      stats = bdev.DRBD8Status(data[0])
293
      self.failUnless(stats.is_in_use)
294
      self.failUnless(stats.is_connected and stats.is_primary and
295
                      stats.peer_secondary and stats.is_disk_uptodate)
296

    
297
  def testMinor1(self):
298
    """Test connected, secondary device"""
299
    for data in [self.mass_data, self.mass83_data]:
300
      stats = bdev.DRBD8Status(data[1])
301
      self.failUnless(stats.is_in_use)
302
      self.failUnless(stats.is_connected and stats.is_secondary and
303
                      stats.peer_primary and stats.is_disk_uptodate)
304

    
305
  def testMinor2(self):
306
    """Test unconfigured device"""
307
    for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
308
      stats = bdev.DRBD8Status(data[2])
309
      self.failIf(stats.is_in_use)
310

    
311
  def testMinor4(self):
312
    """Test WFconn device"""
313
    for data in [self.mass_data, self.mass83_data]:
314
      stats = bdev.DRBD8Status(data[4])
315
      self.failUnless(stats.is_in_use)
316
      self.failUnless(stats.is_wfconn and stats.is_primary and
317
                      stats.rrole == "Unknown" and
318
                      stats.is_disk_uptodate)
319

    
320
  def testMinor6(self):
321
    """Test diskless device"""
322
    for data in [self.mass_data, self.mass83_data]:
323
      stats = bdev.DRBD8Status(data[6])
324
      self.failUnless(stats.is_in_use)
325
      self.failUnless(stats.is_connected and stats.is_secondary and
326
                      stats.peer_primary and stats.is_diskless)
327

    
328
  def testMinor8(self):
329
    """Test standalone device"""
330
    for data in [self.mass_data, self.mass83_data]:
331
      stats = bdev.DRBD8Status(data[8])
332
      self.failUnless(stats.is_in_use)
333
      self.failUnless(stats.is_standalone and
334
                      stats.rrole == "Unknown" and
335
                      stats.is_disk_uptodate)
336

    
337
  def testDRBD83SyncFine(self):
338
    stats = bdev.DRBD8Status(self.mass83_sync_data[3])
339
    self.failUnless(stats.is_in_resync)
340
    self.failUnless(stats.sync_percent is not None)
341

    
342
  def testDRBD83SyncBroken(self):
343
    stats = bdev.DRBD8Status(self.mass83_sync_krnl_data[3])
344
    self.failUnless(stats.is_in_resync)
345
    self.failUnless(stats.sync_percent is not None)
346

    
347

    
348
class TestRADOSBlockDevice(testutils.GanetiTestCase):
349
  def setUp(self):
350
    """Set up input data"""
351
    testutils.GanetiTestCase.setUp(self)
352

    
353
    self.plain_output_old_ok = \
354
      testutils.ReadTestData("bdev-rbd/plain_output_old_ok.txt")
355
    self.plain_output_old_no_matches = \
356
      testutils.ReadTestData("bdev-rbd/plain_output_old_no_matches.txt")
357
    self.plain_output_old_extra_matches = \
358
      testutils.ReadTestData("bdev-rbd/plain_output_old_extra_matches.txt")
359
    self.plain_output_old_empty = \
360
      testutils.ReadTestData("bdev-rbd/plain_output_old_empty.txt")
361
    self.plain_output_new_ok = \
362
      testutils.ReadTestData("bdev-rbd/plain_output_new_ok.txt")
363
    self.plain_output_new_no_matches = \
364
      testutils.ReadTestData("bdev-rbd/plain_output_new_no_matches.txt")
365
    self.plain_output_new_extra_matches = \
366
      testutils.ReadTestData("bdev-rbd/plain_output_new_extra_matches.txt")
367
    # This file is completely empty, and as such it's not shipped.
368
    self.plain_output_new_empty = ""
369
    self.json_output_ok = testutils.ReadTestData("bdev-rbd/json_output_ok.txt")
370
    self.json_output_no_matches = \
371
      testutils.ReadTestData("bdev-rbd/json_output_no_matches.txt")
372
    self.json_output_extra_matches = \
373
      testutils.ReadTestData("bdev-rbd/json_output_extra_matches.txt")
374
    self.json_output_empty = \
375
      testutils.ReadTestData("bdev-rbd/json_output_empty.txt")
376
    self.output_invalid = testutils.ReadTestData("bdev-rbd/output_invalid.txt")
377

    
378
    self.volume_name = "d7ab910a-4933-4ffe-88d0-faf2ce31390a.rbd.disk0"
379

    
380
  def test_ParseRbdShowmappedJson(self):
381
    parse_function = bdev.RADOSBlockDevice._ParseRbdShowmappedJson
382

    
383
    self.assertEqual(parse_function(self.json_output_ok, self.volume_name),
384
                     "/dev/rbd3")
385
    self.assertEqual(parse_function(self.json_output_empty, self.volume_name),
386
                     None)
387
    self.assertEqual(parse_function(self.json_output_no_matches,
388
                     self.volume_name), None)
389
    self.assertRaises(errors.BlockDeviceError, parse_function,
390
                      self.json_output_extra_matches, self.volume_name)
391
    self.assertRaises(errors.BlockDeviceError, parse_function,
392
                      self.output_invalid, self.volume_name)
393

    
394
  def test_ParseRbdShowmappedPlain(self):
395
    parse_function = bdev.RADOSBlockDevice._ParseRbdShowmappedPlain
396

    
397
    self.assertEqual(parse_function(self.plain_output_new_ok,
398
                     self.volume_name), "/dev/rbd3")
399
    self.assertEqual(parse_function(self.plain_output_old_ok,
400
                     self.volume_name), "/dev/rbd3")
401
    self.assertEqual(parse_function(self.plain_output_new_empty,
402
                     self.volume_name), None)
403
    self.assertEqual(parse_function(self.plain_output_old_empty,
404
                     self.volume_name), None)
405
    self.assertEqual(parse_function(self.plain_output_new_no_matches,
406
                     self.volume_name), None)
407
    self.assertEqual(parse_function(self.plain_output_old_no_matches,
408
                     self.volume_name), None)
409
    self.assertRaises(errors.BlockDeviceError, parse_function,
410
                      self.plain_output_new_extra_matches, self.volume_name)
411
    self.assertRaises(errors.BlockDeviceError, parse_function,
412
                      self.plain_output_old_extra_matches, self.volume_name)
413
    self.assertRaises(errors.BlockDeviceError, parse_function,
414
                      self.output_invalid, self.volume_name)
415

    
416
class TestComputeWrongFileStoragePathsInternal(unittest.TestCase):
417
  def testPaths(self):
418
    paths = bdev._GetForbiddenFileStoragePaths()
419

    
420
    for path in ["/bin", "/usr/local/sbin", "/lib64", "/etc", "/sys"]:
421
      self.assertTrue(path in paths)
422

    
423
    self.assertEqual(set(map(os.path.normpath, paths)), paths)
424

    
425
  def test(self):
426
    vfsp = bdev._ComputeWrongFileStoragePaths
427
    self.assertEqual(vfsp([]), [])
428
    self.assertEqual(vfsp(["/tmp"]), [])
429
    self.assertEqual(vfsp(["/bin/ls"]), ["/bin/ls"])
430
    self.assertEqual(vfsp(["/bin"]), ["/bin"])
431
    self.assertEqual(vfsp(["/usr/sbin/vim", "/srv/file-storage"]),
432
                     ["/usr/sbin/vim"])
433

    
434

    
435
class TestComputeWrongFileStoragePaths(testutils.GanetiTestCase):
436
  def test(self):
437
    tmpfile = self._CreateTempFile()
438

    
439
    utils.WriteFile(tmpfile, data="""
440
      /tmp
441
      x/y///z/relative
442
      # This is a test file
443
      /srv/storage
444
      /bin
445
      /usr/local/lib32/
446
      relative/path
447
      """)
448

    
449
    self.assertEqual(bdev.ComputeWrongFileStoragePaths(_filename=tmpfile), [
450
      "/bin",
451
      "/usr/local/lib32",
452
      "relative/path",
453
      "x/y/z/relative",
454
      ])
455

    
456

    
457
class TestCheckFileStoragePathInternal(unittest.TestCase):
458
  def testNonAbsolute(self):
459
    for i in ["", "tmp", "foo/bar/baz"]:
460
      self.assertRaises(errors.FileStoragePathError,
461
                        bdev._CheckFileStoragePath, i, ["/tmp"])
462

    
463
    self.assertRaises(errors.FileStoragePathError,
464
                      bdev._CheckFileStoragePath, "/tmp", ["tmp", "xyz"])
465

    
466
  def testNoAllowed(self):
467
    self.assertRaises(errors.FileStoragePathError,
468
                      bdev._CheckFileStoragePath, "/tmp", [])
469

    
470
  def testNoAdditionalPathComponent(self):
471
    self.assertRaises(errors.FileStoragePathError,
472
                      bdev._CheckFileStoragePath, "/tmp/foo", ["/tmp/foo"])
473

    
474
  def testAllowed(self):
475
    bdev._CheckFileStoragePath("/tmp/foo/a", ["/tmp/foo"])
476
    bdev._CheckFileStoragePath("/tmp/foo/a/x", ["/tmp/foo"])
477

    
478

    
479
class TestCheckFileStoragePath(testutils.GanetiTestCase):
480
  def testNonExistantFile(self):
481
    filename = "/tmp/this/file/does/not/exist"
482
    assert not os.path.exists(filename)
483
    self.assertRaises(errors.FileStoragePathError,
484
                      bdev.CheckFileStoragePath, "/bin/", _filename=filename)
485
    self.assertRaises(errors.FileStoragePathError,
486
                      bdev.CheckFileStoragePath, "/srv/file-storage",
487
                      _filename=filename)
488

    
489
  def testAllowedPath(self):
490
    tmpfile = self._CreateTempFile()
491

    
492
    utils.WriteFile(tmpfile, data="""
493
      /srv/storage
494
      """)
495

    
496
    bdev.CheckFileStoragePath("/srv/storage/inst1", _filename=tmpfile)
497

    
498
    # No additional path component
499
    self.assertRaises(errors.FileStoragePathError,
500
                      bdev.CheckFileStoragePath, "/srv/storage",
501
                      _filename=tmpfile)
502

    
503
    # Forbidden path
504
    self.assertRaises(errors.FileStoragePathError,
505
                      bdev.CheckFileStoragePath, "/usr/lib64/xyz",
506
                      _filename=tmpfile)
507

    
508

    
509
class TestLoadAllowedFileStoragePaths(testutils.GanetiTestCase):
510
  def testDevNull(self):
511
    self.assertEqual(bdev._LoadAllowedFileStoragePaths("/dev/null"), [])
512

    
513
  def testNonExistantFile(self):
514
    filename = "/tmp/this/file/does/not/exist"
515
    assert not os.path.exists(filename)
516
    self.assertEqual(bdev._LoadAllowedFileStoragePaths(filename), [])
517

    
518
  def test(self):
519
    tmpfile = self._CreateTempFile()
520

    
521
    utils.WriteFile(tmpfile, data="""
522
      # This is a test file
523
      /tmp
524
      /srv/storage
525
      relative/path
526
      """)
527

    
528
    self.assertEqual(bdev._LoadAllowedFileStoragePaths(tmpfile), [
529
      "/tmp",
530
      "/srv/storage",
531
      "relative/path",
532
      ])
533

    
534

    
535
class TestExclusiveStoragePvs(unittest.TestCase):
536
  """Test cases for functions dealing with LVM PV and exclusive storage"""
537
  # Allowance for rounding
538
  _EPS = 1e-4
539
  _MARGIN = constants.PART_MARGIN + constants.PART_RESERVED + _EPS
540

    
541
  @staticmethod
542
  def _GenerateRandomPvInfo(rnd, name, vg):
543
    # Granularity is .01 MiB
544
    size = rnd.randint(1024 * 100, 10 * 1024 * 1024 * 100)
545
    if rnd.choice([False, True]):
546
      free = float(rnd.randint(0, size)) / 100.0
547
    else:
548
      free = float(size) / 100.0
549
    size = float(size) / 100.0
550
    attr = "a-"
551
    return objects.LvmPvInfo(name=name, vg_name=vg, size=size, free=free,
552
                             attributes=attr)
553

    
554
  def testGetStdPvSize(self):
555
    """Test cases for bdev.LogicalVolume._GetStdPvSize()"""
556
    rnd = random.Random(9517)
557
    for _ in range(0, 50):
558
      # Identical volumes
559
      pvi = self._GenerateRandomPvInfo(rnd, "disk", "myvg")
560
      onesize = bdev.LogicalVolume._GetStdPvSize([pvi])
561
      self.assertTrue(onesize <= pvi.size)
562
      self.assertTrue(onesize > pvi.size * (1 - self._MARGIN))
563
      for length in range(2, 10):
564
        n_size = bdev.LogicalVolume._GetStdPvSize([pvi] * length)
565
        self.assertEqual(onesize, n_size)
566

    
567
      # Mixed volumes
568
      for length in range(1, 10):
569
        pvlist = [self._GenerateRandomPvInfo(rnd, "disk", "myvg")
570
                  for _ in range(0, length)]
571
        std_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
572
        self.assertTrue(compat.all(std_size <= pvi.size for pvi in pvlist))
573
        self.assertTrue(compat.any(std_size > pvi.size * (1 - self._MARGIN)
574
                                   for pvi in pvlist))
575
        pvlist.append(pvlist[0])
576
        p1_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
577
        self.assertEqual(std_size, p1_size)
578

    
579
  def testComputeNumPvs(self):
580
    """Test cases for bdev.LogicalVolume._ComputeNumPvs()"""
581
    rnd = random.Random(8067)
582
    for _ in range(0, 1000):
583
      pvlist = [self._GenerateRandomPvInfo(rnd, "disk", "myvg")]
584
      lv_size = float(rnd.randint(10 * 100, 1024 * 1024 * 100)) / 100.0
585
      num_pv = bdev.LogicalVolume._ComputeNumPvs(lv_size, pvlist)
586
      std_size = bdev.LogicalVolume._GetStdPvSize(pvlist)
587
      self.assertTrue(num_pv >= 1)
588
      self.assertTrue(num_pv * std_size >= lv_size)
589
      self.assertTrue((num_pv - 1) * std_size < lv_size * (1 + self._EPS))
590

    
591
  def testGetEmptyPvNames(self):
592
    """Test cases for bdev.LogicalVolume._GetEmptyPvNames()"""
593
    rnd = random.Random(21126)
594
    for _ in range(0, 100):
595
      num_pvs = rnd.randint(1, 20)
596
      pvlist = [self._GenerateRandomPvInfo(rnd, "disk%d" % n, "myvg")
597
                for n in range(0, num_pvs)]
598
      for num_req in range(1, num_pvs + 2):
599
        epvs = bdev.LogicalVolume._GetEmptyPvNames(pvlist, num_req)
600
        epvs_set = compat.UniqueFrozenset(epvs)
601
        if len(epvs) > 1:
602
          self.assertEqual(len(epvs), len(epvs_set))
603
        for pvi in pvlist:
604
          if pvi.name in epvs_set:
605
            self.assertEqual(pvi.size, pvi.free)
606
          else:
607
            # There should be no remaining empty PV when less than the
608
            # requeste number of PVs has been returned
609
            self.assertTrue(len(epvs) == num_req or pvi.free != pvi.size)
610

    
611

    
612
if __name__ == "__main__":
613
  testutils.GanetiTestProgram()