Revision 89ff748d test/py/ganeti.block.bdev_unittest.py

b/test/py/ganeti.block.bdev_unittest.py
32 32
from ganeti import objects
33 33
from ganeti import utils
34 34
from ganeti.block import bdev
35
from ganeti.block import drbd
35 36

  
36 37
import testutils
37 38

  
......
70 71
      }
71 72
    ]
72 73
    for d,r in zip(data, result):
73
      self.assertEqual(bdev.BaseDRBD._GetVersion(d), r)
74
      self.assertEqual(drbd.BaseDRBD._GetVersion(d), r)
74 75

  
75 76

  
76 77
class TestDRBD8Runner(testutils.GanetiTestCase):
77
  """Testing case for DRBD8"""
78
  """Testing case for drbd.DRBD8"""
78 79

  
79 80
  @staticmethod
80 81
  def _has_disk(data, dname, mname):
......
102 103

  
103 104
  def testParserCreation(self):
104 105
    """Test drbdsetup show parser creation"""
105
    bdev.DRBD8._GetShowParser()
106
    drbd.DRBD8._GetShowParser()
106 107

  
107 108
  def testParser80(self):
108 109
    """Test drbdsetup show parser for disk and network version 8.0"""
109 110
    data = testutils.ReadTestData("bdev-drbd-8.0.txt")
110
    result = bdev.DRBD8._GetDevInfo(data)
111
    result = drbd.DRBD8._GetDevInfo(data)
111 112
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
112 113
                                   "/dev/xenvg/test.meta"),
113 114
                    "Wrong local disk info")
......
118 119
  def testParser83(self):
119 120
    """Test drbdsetup show parser for disk and network version 8.3"""
120 121
    data = testutils.ReadTestData("bdev-drbd-8.3.txt")
121
    result = bdev.DRBD8._GetDevInfo(data)
122
    result = drbd.DRBD8._GetDevInfo(data)
122 123
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
123 124
                                   "/dev/xenvg/test.meta"),
124 125
                    "Wrong local disk info")
......
129 130
  def testParserNetIP4(self):
130 131
    """Test drbdsetup show parser for IPv4 network"""
131 132
    data = testutils.ReadTestData("bdev-drbd-net-ip4.txt")
132
    result = bdev.DRBD8._GetDevInfo(data)
133
    result = drbd.DRBD8._GetDevInfo(data)
133 134
    self.failUnless(("local_dev" not in result and
134 135
                     "meta_dev" not in result and
135 136
                     "meta_index" not in result),
......
141 142
  def testParserNetIP6(self):
142 143
    """Test drbdsetup show parser for IPv6 network"""
143 144
    data = testutils.ReadTestData("bdev-drbd-net-ip6.txt")
144
    result = bdev.DRBD8._GetDevInfo(data)
145
    result = drbd.DRBD8._GetDevInfo(data)
145 146
    self.failUnless(("local_dev" not in result and
146 147
                     "meta_dev" not in result and
147 148
                     "meta_index" not in result),
......
153 154
  def testParserDisk(self):
154 155
    """Test drbdsetup show parser for disk"""
155 156
    data = testutils.ReadTestData("bdev-drbd-disk.txt")
156
    result = bdev.DRBD8._GetDevInfo(data)
157
    result = drbd.DRBD8._GetDevInfo(data)
157 158
    self.failUnless(self._has_disk(result, "/dev/xenvg/test.data",
158 159
                                   "/dev/xenvg/test.meta"),
159 160
                    "Wrong local disk info")
......
174 175

  
175 176
    for vmaj, vmin, vrel, opts, meta in should_fail:
176 177
      self.assertRaises(errors.BlockDeviceError,
177
                        bdev.DRBD8._ComputeDiskBarrierArgs,
178
                        drbd.DRBD8._ComputeDiskBarrierArgs,
178 179
                        vmaj, vmin, vrel, opts, meta)
179 180

  
180 181
    # get the valid options from the frozenset(frozenset()) in constants.
......
184 185
    for vmaj, vmin, vrel in ((8, 0, 0), (8, 0, 11), (8, 2, 6)):
185 186
      for opts in valid_options:
186 187
        self.assertRaises(errors.BlockDeviceError,
187
                          bdev.DRBD8._ComputeDiskBarrierArgs,
188
                          drbd.DRBD8._ComputeDiskBarrierArgs,
188 189
                          vmaj, vmin, vrel, opts, True)
189 190

  
190 191
    # Versions with partial support (testing only options that are supported)
......
212 213
    for test in tests:
213 214
      vmaj, vmin, vrel, disabled_barriers, disable_meta_flush, expected = test
214 215
      args = \
215
        bdev.DRBD8._ComputeDiskBarrierArgs(vmaj, vmin, vrel,
216
        drbd.DRBD8._ComputeDiskBarrierArgs(vmaj, vmin, vrel,
216 217
                                           disabled_barriers,
217 218
                                           disable_meta_flush)
218 219
      self.failUnless(set(args) == set(expected),
......
221 222
    # Unsupported or invalid versions
222 223
    for vmaj, vmin, vrel in ((0, 7, 25), (9, 0, 0), (7, 0, 0), (8, 4, 0)):
223 224
      self.assertRaises(errors.BlockDeviceError,
224
                        bdev.DRBD8._ComputeDiskBarrierArgs,
225
                        drbd.DRBD8._ComputeDiskBarrierArgs,
225 226
                        vmaj, vmin, vrel, "n", True)
226 227

  
227 228
    # Invalid options
228 229
    for option in ("", "c", "whatever", "nbdfc", "nf"):
229 230
      self.assertRaises(errors.BlockDeviceError,
230
                        bdev.DRBD8._ComputeDiskBarrierArgs,
231
                        drbd.DRBD8._ComputeDiskBarrierArgs,
231 232
                        8, 3, 11, option, True)
232 233

  
233 234

  
......
243 244
    proc83_sync_data = testutils.TestDataFilename("proc_drbd83_sync.txt")
244 245
    proc83_sync_krnl_data = \
245 246
      testutils.TestDataFilename("proc_drbd83_sync_krnl2.6.39.txt")
246
    self.proc_data = bdev.DRBD8._GetProcData(filename=proc_data)
247
    self.proc80e_data = bdev.DRBD8._GetProcData(filename=proc80e_data)
248
    self.proc83_data = bdev.DRBD8._GetProcData(filename=proc83_data)
249
    self.proc83_sync_data = bdev.DRBD8._GetProcData(filename=proc83_sync_data)
247
    self.proc_data = drbd.DRBD8._GetProcData(filename=proc_data)
248
    self.proc80e_data = drbd.DRBD8._GetProcData(filename=proc80e_data)
249
    self.proc83_data = drbd.DRBD8._GetProcData(filename=proc83_data)
250
    self.proc83_sync_data = drbd.DRBD8._GetProcData(filename=proc83_sync_data)
250 251
    self.proc83_sync_krnl_data = \
251
      bdev.DRBD8._GetProcData(filename=proc83_sync_krnl_data)
252
    self.mass_data = bdev.DRBD8._MassageProcData(self.proc_data)
253
    self.mass80e_data = bdev.DRBD8._MassageProcData(self.proc80e_data)
254
    self.mass83_data = bdev.DRBD8._MassageProcData(self.proc83_data)
255
    self.mass83_sync_data = bdev.DRBD8._MassageProcData(self.proc83_sync_data)
252
      drbd.DRBD8._GetProcData(filename=proc83_sync_krnl_data)
253
    self.mass_data = drbd.DRBD8._MassageProcData(self.proc_data)
254
    self.mass80e_data = drbd.DRBD8._MassageProcData(self.proc80e_data)
255
    self.mass83_data = drbd.DRBD8._MassageProcData(self.proc83_data)
256
    self.mass83_sync_data = drbd.DRBD8._MassageProcData(self.proc83_sync_data)
256 257
    self.mass83_sync_krnl_data = \
257
      bdev.DRBD8._MassageProcData(self.proc83_sync_krnl_data)
258
      drbd.DRBD8._MassageProcData(self.proc83_sync_krnl_data)
258 259

  
259 260
  def testIOErrors(self):
260 261
    """Test handling of errors while reading the proc file."""
261 262
    temp_file = self._CreateTempFile()
262 263
    os.unlink(temp_file)
263 264
    self.failUnlessRaises(errors.BlockDeviceError,
264
                          bdev.DRBD8._GetProcData, filename=temp_file)
265
                          drbd.DRBD8._GetProcData, filename=temp_file)
265 266

  
266 267
  def testHelper(self):
267 268
    """Test reading usermode_helper in /sys."""
268 269
    sys_drbd_helper = testutils.TestDataFilename("sys_drbd_usermode_helper.txt")
269
    drbd_helper = bdev.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
270
    drbd_helper = drbd.DRBD8.GetUsermodeHelper(filename=sys_drbd_helper)
270 271
    self.failUnlessEqual(drbd_helper, "/bin/true")
271 272

  
272 273
  def testHelperIOErrors(self):
......
274 275
    temp_file = self._CreateTempFile()
275 276
    os.unlink(temp_file)
276 277
    self.failUnlessRaises(errors.BlockDeviceError,
277
                          bdev.DRBD8.GetUsermodeHelper, filename=temp_file)
278
                          drbd.DRBD8.GetUsermodeHelper, filename=temp_file)
278 279

  
279 280
  def testMinorNotFound(self):
280 281
    """Test not-found-minor in /proc"""
......
283 284
    self.failUnless(3 not in self.mass80e_data)
284 285

  
285 286
  def testLineNotMatch(self):
286
    """Test wrong line passed to DRBD8Status"""
287
    self.assertRaises(errors.BlockDeviceError, bdev.DRBD8Status, "foo")
287
    """Test wrong line passed to drbd.DRBD8Status"""
288
    self.assertRaises(errors.BlockDeviceError, drbd.DRBD8Status, "foo")
288 289

  
289 290
  def testMinor0(self):
290 291
    """Test connected, primary device"""
291 292
    for data in [self.mass_data, self.mass83_data]:
292
      stats = bdev.DRBD8Status(data[0])
293
      stats = drbd.DRBD8Status(data[0])
293 294
      self.failUnless(stats.is_in_use)
294 295
      self.failUnless(stats.is_connected and stats.is_primary and
295 296
                      stats.peer_secondary and stats.is_disk_uptodate)
......
297 298
  def testMinor1(self):
298 299
    """Test connected, secondary device"""
299 300
    for data in [self.mass_data, self.mass83_data]:
300
      stats = bdev.DRBD8Status(data[1])
301
      stats = drbd.DRBD8Status(data[1])
301 302
      self.failUnless(stats.is_in_use)
302 303
      self.failUnless(stats.is_connected and stats.is_secondary and
303 304
                      stats.peer_primary and stats.is_disk_uptodate)
......
305 306
  def testMinor2(self):
306 307
    """Test unconfigured device"""
307 308
    for data in [self.mass_data, self.mass83_data, self.mass80e_data]:
308
      stats = bdev.DRBD8Status(data[2])
309
      stats = drbd.DRBD8Status(data[2])
309 310
      self.failIf(stats.is_in_use)
310 311

  
311 312
  def testMinor4(self):
312 313
    """Test WFconn device"""
313 314
    for data in [self.mass_data, self.mass83_data]:
314
      stats = bdev.DRBD8Status(data[4])
315
      stats = drbd.DRBD8Status(data[4])
315 316
      self.failUnless(stats.is_in_use)
316 317
      self.failUnless(stats.is_wfconn and stats.is_primary and
317 318
                      stats.rrole == "Unknown" and
......
320 321
  def testMinor6(self):
321 322
    """Test diskless device"""
322 323
    for data in [self.mass_data, self.mass83_data]:
323
      stats = bdev.DRBD8Status(data[6])
324
      stats = drbd.DRBD8Status(data[6])
324 325
      self.failUnless(stats.is_in_use)
325 326
      self.failUnless(stats.is_connected and stats.is_secondary and
326 327
                      stats.peer_primary and stats.is_diskless)
......
328 329
  def testMinor8(self):
329 330
    """Test standalone device"""
330 331
    for data in [self.mass_data, self.mass83_data]:
331
      stats = bdev.DRBD8Status(data[8])
332
      stats = drbd.DRBD8Status(data[8])
332 333
      self.failUnless(stats.is_in_use)
333 334
      self.failUnless(stats.is_standalone and
334 335
                      stats.rrole == "Unknown" and
335 336
                      stats.is_disk_uptodate)
336 337

  
337 338
  def testDRBD83SyncFine(self):
338
    stats = bdev.DRBD8Status(self.mass83_sync_data[3])
339
    stats = drbd.DRBD8Status(self.mass83_sync_data[3])
339 340
    self.failUnless(stats.is_in_resync)
340 341
    self.failUnless(stats.sync_percent is not None)
341 342

  
342 343
  def testDRBD83SyncBroken(self):
343
    stats = bdev.DRBD8Status(self.mass83_sync_krnl_data[3])
344
    stats = drbd.DRBD8Status(self.mass83_sync_krnl_data[3])
344 345
    self.failUnless(stats.is_in_resync)
345 346
    self.failUnless(stats.sync_percent is not None)
346 347

  

Also available in: Unified diff