Rename spindle_usage to spindle_use on Ganeti side
[ganeti-local] / test / ganeti.cmdlib_unittest.py
index e69faf3..1f3eb5c 100755 (executable)
@@ -382,9 +382,11 @@ class TestClusterVerifyFiles(unittest.TestCase):
 
 
 class _FakeLU:
-  def __init__(self):
+  def __init__(self, cfg=NotImplemented, proc=NotImplemented):
     self.warning_log = []
     self.info_log = []
+    self.cfg = cfg
+    self.proc = proc
 
   def LogWarning(self, text, *args):
     self.warning_log.append((text, args))
@@ -447,5 +449,779 @@ class TestLoadNodeEvacResult(unittest.TestCase):
     self.assertFalse(lu.warning_log)
 
 
+class TestUpdateAndVerifySubDict(unittest.TestCase):
+  def setUp(self):
+    self.type_check = {
+        "a": constants.VTYPE_INT,
+        "b": constants.VTYPE_STRING,
+        "c": constants.VTYPE_BOOL,
+        "d": constants.VTYPE_STRING,
+        }
+
+  def test(self):
+    old_test = {
+      "foo": {
+        "d": "blubb",
+        "a": 321,
+        },
+      "baz": {
+        "a": 678,
+        "b": "678",
+        "c": True,
+        },
+      }
+    test = {
+      "foo": {
+        "a": 123,
+        "b": "123",
+        "c": True,
+        },
+      "bar": {
+        "a": 321,
+        "b": "321",
+        "c": False,
+        },
+      }
+
+    mv = {
+      "foo": {
+        "a": 123,
+        "b": "123",
+        "c": True,
+        "d": "blubb"
+        },
+      "bar": {
+        "a": 321,
+        "b": "321",
+        "c": False,
+        },
+      "baz": {
+        "a": 678,
+        "b": "678",
+        "c": True,
+        },
+      }
+
+    verified = cmdlib._UpdateAndVerifySubDict(old_test, test, self.type_check)
+    self.assertEqual(verified, mv)
+
+  def testWrong(self):
+    test = {
+      "foo": {
+        "a": "blubb",
+        "b": "123",
+        "c": True,
+        },
+      "bar": {
+        "a": 321,
+        "b": "321",
+        "c": False,
+        },
+      }
+
+    self.assertRaises(errors.TypeEnforcementError,
+                      cmdlib._UpdateAndVerifySubDict, {}, test, self.type_check)
+
+
+class TestHvStateHelper(unittest.TestCase):
+  def testWithoutOpData(self):
+    self.assertEqual(cmdlib._MergeAndVerifyHvState(None, NotImplemented), None)
+
+  def testWithoutOldData(self):
+    new = {
+      constants.HT_XEN_PVM: {
+        constants.HVST_MEMORY_TOTAL: 4096,
+        },
+      }
+    self.assertEqual(cmdlib._MergeAndVerifyHvState(new, None), new)
+
+  def testWithWrongHv(self):
+    new = {
+      "i-dont-exist": {
+        constants.HVST_MEMORY_TOTAL: 4096,
+        },
+      }
+    self.assertRaises(errors.OpPrereqError, cmdlib._MergeAndVerifyHvState, new,
+                      None)
+
+class TestDiskStateHelper(unittest.TestCase):
+  def testWithoutOpData(self):
+    self.assertEqual(cmdlib._MergeAndVerifyDiskState(None, NotImplemented),
+                     None)
+
+  def testWithoutOldData(self):
+    new = {
+      constants.LD_LV: {
+        "xenvg": {
+          constants.DS_DISK_RESERVED: 1024,
+          },
+        },
+      }
+    self.assertEqual(cmdlib._MergeAndVerifyDiskState(new, None), new)
+
+  def testWithWrongStorageType(self):
+    new = {
+      "i-dont-exist": {
+        "xenvg": {
+          constants.DS_DISK_RESERVED: 1024,
+          },
+        },
+      }
+    self.assertRaises(errors.OpPrereqError, cmdlib._MergeAndVerifyDiskState,
+                      new, None)
+
+
+class TestComputeMinMaxSpec(unittest.TestCase):
+  def setUp(self):
+    self.ipolicy = {
+      constants.ISPECS_MAX: {
+        constants.ISPEC_MEM_SIZE: 512,
+        constants.ISPEC_DISK_SIZE: 1024,
+        },
+      constants.ISPECS_MIN: {
+        constants.ISPEC_MEM_SIZE: 128,
+        constants.ISPEC_DISK_COUNT: 1,
+        },
+      }
+
+  def testNoneValue(self):
+    self.assertTrue(cmdlib._ComputeMinMaxSpec(constants.ISPEC_MEM_SIZE,
+                                              self.ipolicy, None) is None)
+
+  def testAutoValue(self):
+    self.assertTrue(cmdlib._ComputeMinMaxSpec(constants.ISPEC_MEM_SIZE,
+                                              self.ipolicy,
+                                              constants.VALUE_AUTO) is None)
+
+  def testNotDefined(self):
+    self.assertTrue(cmdlib._ComputeMinMaxSpec(constants.ISPEC_NIC_COUNT,
+                                              self.ipolicy, 3) is None)
+
+  def testNoMinDefined(self):
+    self.assertTrue(cmdlib._ComputeMinMaxSpec(constants.ISPEC_DISK_SIZE,
+                                              self.ipolicy, 128) is None)
+
+  def testNoMaxDefined(self):
+    self.assertTrue(cmdlib._ComputeMinMaxSpec(constants.ISPEC_DISK_COUNT,
+                                                self.ipolicy, 16) is None)
+
+  def testOutOfRange(self):
+    for (name, val) in ((constants.ISPEC_MEM_SIZE, 64),
+                        (constants.ISPEC_MEM_SIZE, 768),
+                        (constants.ISPEC_DISK_SIZE, 4096),
+                        (constants.ISPEC_DISK_COUNT, 0)):
+      min_v = self.ipolicy[constants.ISPECS_MIN].get(name, val)
+      max_v = self.ipolicy[constants.ISPECS_MAX].get(name, val)
+      self.assertEqual(cmdlib._ComputeMinMaxSpec(name, self.ipolicy, val),
+                       "%s value %s is not in range [%s, %s]" %
+                       (name, val,min_v, max_v))
+
+  def test(self):
+    for (name, val) in ((constants.ISPEC_MEM_SIZE, 256),
+                        (constants.ISPEC_MEM_SIZE, 128),
+                        (constants.ISPEC_MEM_SIZE, 512),
+                        (constants.ISPEC_DISK_SIZE, 1024),
+                        (constants.ISPEC_DISK_SIZE, 0),
+                        (constants.ISPEC_DISK_COUNT, 1),
+                        (constants.ISPEC_DISK_COUNT, 5)):
+      self.assertTrue(cmdlib._ComputeMinMaxSpec(name, self.ipolicy, val)
+                      is None)
+
+
+def _ValidateComputeMinMaxSpec(name, *_):
+  assert name in constants.ISPECS_PARAMETERS
+  return None
+
+
+class _SpecWrapper:
+  def __init__(self, spec):
+    self.spec = spec
+
+  def ComputeMinMaxSpec(self, *args):
+    return self.spec.pop(0)
+
+
+class TestComputeIPolicySpecViolation(unittest.TestCase):
+  def test(self):
+    compute_fn = _ValidateComputeMinMaxSpec
+    ret = cmdlib._ComputeIPolicySpecViolation(NotImplemented, 1024, 1, 1, 1,
+                                              [1024], 1, _compute_fn=compute_fn)
+    self.assertEqual(ret, [])
+
+  def testInvalidArguments(self):
+    self.assertRaises(AssertionError, cmdlib._ComputeIPolicySpecViolation,
+                      NotImplemented, 1024, 1, 1, 1, [], 1)
+
+  def testInvalidSpec(self):
+    spec = _SpecWrapper([None, False, "foo", None, "bar", None])
+    compute_fn = spec.ComputeMinMaxSpec
+    ret = cmdlib._ComputeIPolicySpecViolation(NotImplemented, 1024, 1, 1, 1,
+                                              [1024], 1, _compute_fn=compute_fn)
+    self.assertEqual(ret, ["foo", "bar"])
+    self.assertFalse(spec.spec)
+
+
+class _StubComputeIPolicySpecViolation:
+  def __init__(self, mem_size, cpu_count, disk_count, nic_count, disk_sizes,
+               spindle_use):
+    self.mem_size = mem_size
+    self.cpu_count = cpu_count
+    self.disk_count = disk_count
+    self.nic_count = nic_count
+    self.disk_sizes = disk_sizes
+    self.spindle_use = spindle_use
+
+  def __call__(self, _, mem_size, cpu_count, disk_count, nic_count, disk_sizes,
+               spindle_use):
+    assert self.mem_size == mem_size
+    assert self.cpu_count == cpu_count
+    assert self.disk_count == disk_count
+    assert self.nic_count == nic_count
+    assert self.disk_sizes == disk_sizes
+    assert self.spindle_use == spindle_use
+
+    return []
+
+
+class TestComputeIPolicyInstanceViolation(unittest.TestCase):
+  def test(self):
+    beparams = {
+      constants.BE_MAXMEM: 2048,
+      constants.BE_VCPUS: 2,
+      constants.BE_SPINDLE_USE: 4,
+      }
+    disks = [objects.Disk(size=512)]
+    instance = objects.Instance(beparams=beparams, disks=disks, nics=[])
+    stub = _StubComputeIPolicySpecViolation(2048, 2, 1, 0, [512], 4)
+    ret = cmdlib._ComputeIPolicyInstanceViolation(NotImplemented, instance,
+                                                  _compute_fn=stub)
+    self.assertEqual(ret, [])
+
+
+class TestComputeIPolicyInstanceSpecViolation(unittest.TestCase):
+  def test(self):
+    ispec = {
+      constants.ISPEC_MEM_SIZE: 2048,
+      constants.ISPEC_CPU_COUNT: 2,
+      constants.ISPEC_DISK_COUNT: 1,
+      constants.ISPEC_DISK_SIZE: [512],
+      constants.ISPEC_NIC_COUNT: 0,
+      constants.ISPEC_SPINDLE_USE: 1,
+      }
+    stub = _StubComputeIPolicySpecViolation(2048, 2, 1, 0, [512], 1)
+    ret = cmdlib._ComputeIPolicyInstanceSpecViolation(NotImplemented, ispec,
+                                                      _compute_fn=stub)
+    self.assertEqual(ret, [])
+
+
+class _CallRecorder:
+  def __init__(self, return_value=None):
+    self.called = False
+    self.return_value = return_value
+
+  def __call__(self, *args):
+    self.called = True
+    return self.return_value
+
+
+class TestComputeIPolicyNodeViolation(unittest.TestCase):
+  def setUp(self):
+    self.recorder = _CallRecorder(return_value=[])
+
+  def testSameGroup(self):
+    ret = cmdlib._ComputeIPolicyNodeViolation(NotImplemented, NotImplemented,
+                                              "foo", "foo",
+                                              _compute_fn=self.recorder)
+    self.assertFalse(self.recorder.called)
+    self.assertEqual(ret, [])
+
+  def testDifferentGroup(self):
+    ret = cmdlib._ComputeIPolicyNodeViolation(NotImplemented, NotImplemented,
+                                              "foo", "bar",
+                                              _compute_fn=self.recorder)
+    self.assertTrue(self.recorder.called)
+    self.assertEqual(ret, [])
+
+
+class _FakeConfigForTargetNodeIPolicy:
+  def __init__(self, node_info=NotImplemented):
+    self._node_info = node_info
+
+  def GetNodeInfo(self, _):
+    return self._node_info
+
+
+class TestCheckTargetNodeIPolicy(unittest.TestCase):
+  def setUp(self):
+    self.instance = objects.Instance(primary_node="blubb")
+    self.target_node = objects.Node(group="bar")
+    node_info = objects.Node(group="foo")
+    fake_cfg = _FakeConfigForTargetNodeIPolicy(node_info=node_info)
+    self.lu = _FakeLU(cfg=fake_cfg)
+
+  def testNoViolation(self):
+    compute_recoder = _CallRecorder(return_value=[])
+    cmdlib._CheckTargetNodeIPolicy(self.lu, NotImplemented, self.instance,
+                                   self.target_node,
+                                   _compute_fn=compute_recoder)
+    self.assertTrue(compute_recoder.called)
+    self.assertEqual(self.lu.warning_log, [])
+
+  def testNoIgnore(self):
+    compute_recoder = _CallRecorder(return_value=["mem_size not in range"])
+    self.assertRaises(errors.OpPrereqError, cmdlib._CheckTargetNodeIPolicy,
+                      self.lu, NotImplemented, self.instance, self.target_node,
+                      _compute_fn=compute_recoder)
+    self.assertTrue(compute_recoder.called)
+    self.assertEqual(self.lu.warning_log, [])
+
+  def testIgnoreViolation(self):
+    compute_recoder = _CallRecorder(return_value=["mem_size not in range"])
+    cmdlib._CheckTargetNodeIPolicy(self.lu, NotImplemented, self.instance,
+                                   self.target_node, ignore=True,
+                                   _compute_fn=compute_recoder)
+    self.assertTrue(compute_recoder.called)
+    msg = ("Instance does not meet target node group's (bar) instance policy:"
+           " mem_size not in range")
+    self.assertEqual(self.lu.warning_log, [(msg, ())])
+
+
+class TestApplyContainerMods(unittest.TestCase):
+  def testEmptyContainer(self):
+    container = []
+    chgdesc = []
+    cmdlib.ApplyContainerMods("test", container, chgdesc, [], None, None, None)
+    self.assertEqual(container, [])
+    self.assertEqual(chgdesc, [])
+
+  def testAdd(self):
+    container = []
+    chgdesc = []
+    mods = cmdlib.PrepareContainerMods([
+      (constants.DDM_ADD, -1, "Hello"),
+      (constants.DDM_ADD, -1, "World"),
+      (constants.DDM_ADD, 0, "Start"),
+      (constants.DDM_ADD, -1, "End"),
+      ], None)
+    cmdlib.ApplyContainerMods("test", container, chgdesc, mods,
+                              None, None, None)
+    self.assertEqual(container, ["Start", "Hello", "World", "End"])
+    self.assertEqual(chgdesc, [])
+
+    mods = cmdlib.PrepareContainerMods([
+      (constants.DDM_ADD, 0, "zero"),
+      (constants.DDM_ADD, 3, "Added"),
+      (constants.DDM_ADD, 5, "four"),
+      (constants.DDM_ADD, 7, "xyz"),
+      ], None)
+    cmdlib.ApplyContainerMods("test", container, chgdesc, mods,
+                              None, None, None)
+    self.assertEqual(container,
+                     ["zero", "Start", "Hello", "Added", "World", "four",
+                      "End", "xyz"])
+    self.assertEqual(chgdesc, [])
+
+    for idx in [-2, len(container) + 1]:
+      mods = cmdlib.PrepareContainerMods([
+        (constants.DDM_ADD, idx, "error"),
+        ], None)
+      self.assertRaises(IndexError, cmdlib.ApplyContainerMods,
+                        "test", container, None, mods, None, None, None)
+
+  def testRemoveError(self):
+    for idx in [0, 1, 2, 100, -1, -4]:
+      mods = cmdlib.PrepareContainerMods([
+        (constants.DDM_REMOVE, idx, None),
+        ], None)
+      self.assertRaises(IndexError, cmdlib.ApplyContainerMods,
+                        "test", [], None, mods, None, None, None)
+
+    mods = cmdlib.PrepareContainerMods([
+      (constants.DDM_REMOVE, 0, object()),
+      ], None)
+    self.assertRaises(AssertionError, cmdlib.ApplyContainerMods,
+                      "test", [""], None, mods, None, None, None)
+
+  def testAddError(self):
+    for idx in range(-100, -1) + [100]:
+      mods = cmdlib.PrepareContainerMods([
+        (constants.DDM_ADD, idx, None),
+        ], None)
+      self.assertRaises(IndexError, cmdlib.ApplyContainerMods,
+                        "test", [], None, mods, None, None, None)
+
+  def testRemove(self):
+    container = ["item 1", "item 2"]
+    mods = cmdlib.PrepareContainerMods([
+      (constants.DDM_ADD, -1, "aaa"),
+      (constants.DDM_REMOVE, -1, None),
+      (constants.DDM_ADD, -1, "bbb"),
+      ], None)
+    chgdesc = []
+    cmdlib.ApplyContainerMods("test", container, chgdesc, mods,
+                              None, None, None)
+    self.assertEqual(container, ["item 1", "item 2", "bbb"])
+    self.assertEqual(chgdesc, [
+      ("test/2", "remove"),
+      ])
+
+  def testModify(self):
+    container = ["item 1", "item 2"]
+    mods = cmdlib.PrepareContainerMods([
+      (constants.DDM_MODIFY, -1, "a"),
+      (constants.DDM_MODIFY, 0, "b"),
+      (constants.DDM_MODIFY, 1, "c"),
+      ], None)
+    chgdesc = []
+    cmdlib.ApplyContainerMods("test", container, chgdesc, mods,
+                              None, None, None)
+    self.assertEqual(container, ["item 1", "item 2"])
+    self.assertEqual(chgdesc, [])
+
+    for idx in [-2, len(container) + 1]:
+      mods = cmdlib.PrepareContainerMods([
+        (constants.DDM_MODIFY, idx, "error"),
+        ], None)
+      self.assertRaises(IndexError, cmdlib.ApplyContainerMods,
+                        "test", container, None, mods, None, None, None)
+
+  class _PrivateData:
+    def __init__(self):
+      self.data = None
+
+  @staticmethod
+  def _CreateTestFn(idx, params, private):
+    private.data = ("add", idx, params)
+    return ((100 * idx, params), [
+      ("test/%s" % idx, hex(idx)),
+      ])
+
+  @staticmethod
+  def _ModifyTestFn(idx, item, params, private):
+    private.data = ("modify", idx, params)
+    return [
+      ("test/%s" % idx, "modify %s" % params),
+      ]
+
+  @staticmethod
+  def _RemoveTestFn(idx, item, private):
+    private.data = ("remove", idx, item)
+
+  def testAddWithCreateFunction(self):
+    container = []
+    chgdesc = []
+    mods = cmdlib.PrepareContainerMods([
+      (constants.DDM_ADD, -1, "Hello"),
+      (constants.DDM_ADD, -1, "World"),
+      (constants.DDM_ADD, 0, "Start"),
+      (constants.DDM_ADD, -1, "End"),
+      (constants.DDM_REMOVE, 2, None),
+      (constants.DDM_MODIFY, -1, "foobar"),
+      (constants.DDM_REMOVE, 2, None),
+      (constants.DDM_ADD, 1, "More"),
+      ], self._PrivateData)
+    cmdlib.ApplyContainerMods("test", container, chgdesc, mods,
+      self._CreateTestFn, self._ModifyTestFn, self._RemoveTestFn)
+    self.assertEqual(container, [
+      (000, "Start"),
+      (100, "More"),
+      (000, "Hello"),
+      ])
+    self.assertEqual(chgdesc, [
+      ("test/0", "0x0"),
+      ("test/1", "0x1"),
+      ("test/0", "0x0"),
+      ("test/3", "0x3"),
+      ("test/2", "remove"),
+      ("test/2", "modify foobar"),
+      ("test/2", "remove"),
+      ("test/1", "0x1")
+      ])
+    self.assertTrue(compat.all(op == private.data[0]
+                               for (op, _, _, private) in mods))
+    self.assertEqual([private.data for (op, _, _, private) in mods], [
+      ("add", 0, "Hello"),
+      ("add", 1, "World"),
+      ("add", 0, "Start"),
+      ("add", 3, "End"),
+      ("remove", 2, (100, "World")),
+      ("modify", 2, "foobar"),
+      ("remove", 2, (300, "End")),
+      ("add", 1, "More"),
+      ])
+
+
+class _FakeConfigForGenDiskTemplate:
+  def __init__(self):
+    self._unique_id = itertools.count()
+    self._drbd_minor = itertools.count(20)
+    self._port = itertools.count(constants.FIRST_DRBD_PORT)
+    self._secret = itertools.count()
+
+  def GetVGName(self):
+    return "testvg"
+
+  def GenerateUniqueID(self, ec_id):
+    return "ec%s-uq%s" % (ec_id, self._unique_id.next())
+
+  def AllocateDRBDMinor(self, nodes, instance):
+    return [self._drbd_minor.next()
+            for _ in nodes]
+
+  def AllocatePort(self):
+    return self._port.next()
+
+  def GenerateDRBDSecret(self, ec_id):
+    return "ec%s-secret%s" % (ec_id, self._secret.next())
+
+
+class _FakeProcForGenDiskTemplate:
+  def GetECId(self):
+    return 0
+
+
+class TestGenerateDiskTemplate(unittest.TestCase):
+  def setUp(self):
+    nodegroup = objects.NodeGroup(name="ng")
+    nodegroup.UpgradeConfig()
+
+    cfg = _FakeConfigForGenDiskTemplate()
+    proc = _FakeProcForGenDiskTemplate()
+
+    self.lu = _FakeLU(cfg=cfg, proc=proc)
+    self.nodegroup = nodegroup
+
+  def testWrongDiskTemplate(self):
+    gdt = cmdlib._GenerateDiskTemplate
+    disk_template = "##unknown##"
+
+    assert disk_template not in constants.DISK_TEMPLATES
+
+    self.assertRaises(errors.ProgrammerError, gdt, self.lu, disk_template,
+                      "inst26831.example.com", "node30113.example.com", [], [],
+                      NotImplemented, NotImplemented, 0, self.lu.LogInfo,
+                      self.nodegroup.diskparams)
+
+  def testDiskless(self):
+    gdt = cmdlib._GenerateDiskTemplate
+
+    result = gdt(self.lu, constants.DT_DISKLESS, "inst27734.example.com",
+                 "node30113.example.com", [], [],
+                 NotImplemented, NotImplemented, 0, self.lu.LogInfo,
+                 self.nodegroup.diskparams)
+    self.assertEqual(result, [])
+
+  def _TestTrivialDisk(self, template, disk_info, base_index, exp_dev_type,
+                       file_storage_dir=NotImplemented,
+                       file_driver=NotImplemented,
+                       req_file_storage=NotImplemented,
+                       req_shr_file_storage=NotImplemented):
+    gdt = cmdlib._GenerateDiskTemplate
+
+    map(lambda params: utils.ForceDictType(params,
+                                           constants.IDISK_PARAMS_TYPES),
+        disk_info)
+
+    # Check if non-empty list of secondaries is rejected
+    self.assertRaises(errors.ProgrammerError, gdt, self.lu,
+                      template, "inst25088.example.com",
+                      "node185.example.com", ["node323.example.com"], [],
+                      NotImplemented, NotImplemented, base_index,
+                      self.lu.LogInfo, self.nodegroup.diskparams,
+                      _req_file_storage=req_file_storage,
+                      _req_shr_file_storage=req_shr_file_storage)
+
+    result = gdt(self.lu, template, "inst21662.example.com",
+                 "node21741.example.com", [],
+                 disk_info, file_storage_dir, file_driver, base_index,
+                 self.lu.LogInfo, self.nodegroup.diskparams,
+                 _req_file_storage=req_file_storage,
+                 _req_shr_file_storage=req_shr_file_storage)
+
+    for (idx, disk) in enumerate(result):
+      self.assertTrue(isinstance(disk, objects.Disk))
+      self.assertEqual(disk.dev_type, exp_dev_type)
+      self.assertEqual(disk.size, disk_info[idx][constants.IDISK_SIZE])
+      self.assertEqual(disk.mode, disk_info[idx][constants.IDISK_MODE])
+      self.assertTrue(disk.children is None)
+
+    self._CheckIvNames(result, base_index, base_index + len(disk_info))
+    cmdlib._UpdateIvNames(base_index, result)
+    self._CheckIvNames(result, base_index, base_index + len(disk_info))
+
+    return result
+
+  def _CheckIvNames(self, disks, base_index, end_index):
+    self.assertEqual(map(operator.attrgetter("iv_name"), disks),
+                     ["disk/%s" % i for i in range(base_index, end_index)])
+
+  def testPlain(self):
+    disk_info = [{
+      constants.IDISK_SIZE: 1024,
+      constants.IDISK_MODE: constants.DISK_RDWR,
+      }, {
+      constants.IDISK_SIZE: 4096,
+      constants.IDISK_VG: "othervg",
+      constants.IDISK_MODE: constants.DISK_RDWR,
+      }]
+
+    result = self._TestTrivialDisk(constants.DT_PLAIN, disk_info, 3,
+                                   constants.LD_LV)
+
+    self.assertEqual(map(operator.attrgetter("logical_id"), result), [
+      ("testvg", "ec0-uq0.disk3"),
+      ("othervg", "ec0-uq1.disk4"),
+      ])
+
+  @staticmethod
+  def _AllowFileStorage():
+    pass
+
+  @staticmethod
+  def _ForbidFileStorage():
+    raise errors.OpPrereqError("Disallowed in test")
+
+  def testFile(self):
+    self.assertRaises(errors.OpPrereqError, self._TestTrivialDisk,
+                      constants.DT_FILE, [], 0, NotImplemented,
+                      req_file_storage=self._ForbidFileStorage)
+    self.assertRaises(errors.OpPrereqError, self._TestTrivialDisk,
+                      constants.DT_SHARED_FILE, [], 0, NotImplemented,
+                      req_shr_file_storage=self._ForbidFileStorage)
+
+    for disk_template in [constants.DT_FILE, constants.DT_SHARED_FILE]:
+      disk_info = [{
+        constants.IDISK_SIZE: 80 * 1024,
+        constants.IDISK_MODE: constants.DISK_RDONLY,
+        }, {
+        constants.IDISK_SIZE: 4096,
+        constants.IDISK_MODE: constants.DISK_RDWR,
+        }, {
+        constants.IDISK_SIZE: 6 * 1024,
+        constants.IDISK_MODE: constants.DISK_RDWR,
+        }]
+
+      result = self._TestTrivialDisk(disk_template, disk_info, 2,
+        constants.LD_FILE, file_storage_dir="/tmp",
+        file_driver=constants.FD_BLKTAP,
+        req_file_storage=self._AllowFileStorage,
+        req_shr_file_storage=self._AllowFileStorage)
+
+      self.assertEqual(map(operator.attrgetter("logical_id"), result), [
+        (constants.FD_BLKTAP, "/tmp/disk2"),
+        (constants.FD_BLKTAP, "/tmp/disk3"),
+        (constants.FD_BLKTAP, "/tmp/disk4"),
+        ])
+
+  def testBlock(self):
+    disk_info = [{
+      constants.IDISK_SIZE: 8 * 1024,
+      constants.IDISK_MODE: constants.DISK_RDWR,
+      constants.IDISK_ADOPT: "/tmp/some/block/dev",
+      }]
+
+    result = self._TestTrivialDisk(constants.DT_BLOCK, disk_info, 10,
+                                   constants.LD_BLOCKDEV)
+
+    self.assertEqual(map(operator.attrgetter("logical_id"), result), [
+      (constants.BLOCKDEV_DRIVER_MANUAL, "/tmp/some/block/dev"),
+      ])
+
+  def testRbd(self):
+    disk_info = [{
+      constants.IDISK_SIZE: 8 * 1024,
+      constants.IDISK_MODE: constants.DISK_RDONLY,
+      }, {
+      constants.IDISK_SIZE: 100 * 1024,
+      constants.IDISK_MODE: constants.DISK_RDWR,
+      }]
+
+    result = self._TestTrivialDisk(constants.DT_RBD, disk_info, 0,
+                                   constants.LD_RBD)
+
+    self.assertEqual(map(operator.attrgetter("logical_id"), result), [
+      ("rbd", "ec0-uq0.rbd.disk0"),
+      ("rbd", "ec0-uq1.rbd.disk1"),
+      ])
+
+  def testDrbd8(self):
+    gdt = cmdlib._GenerateDiskTemplate
+    drbd8_defaults = constants.DISK_LD_DEFAULTS[constants.LD_DRBD8]
+    drbd8_default_metavg = drbd8_defaults[constants.LDP_DEFAULT_METAVG]
+
+    disk_info = [{
+      constants.IDISK_SIZE: 1024,
+      constants.IDISK_MODE: constants.DISK_RDWR,
+      }, {
+      constants.IDISK_SIZE: 100 * 1024,
+      constants.IDISK_MODE: constants.DISK_RDONLY,
+      constants.IDISK_METAVG: "metavg",
+      }, {
+      constants.IDISK_SIZE: 4096,
+      constants.IDISK_MODE: constants.DISK_RDWR,
+      constants.IDISK_VG: "vgxyz",
+      },
+      ]
+
+    exp_logical_ids = [[
+      (self.lu.cfg.GetVGName(), "ec0-uq0.disk0_data"),
+      (drbd8_default_metavg, "ec0-uq0.disk0_meta"),
+      ], [
+      (self.lu.cfg.GetVGName(), "ec0-uq1.disk1_data"),
+      ("metavg", "ec0-uq1.disk1_meta"),
+      ], [
+      ("vgxyz", "ec0-uq2.disk2_data"),
+      (drbd8_default_metavg, "ec0-uq2.disk2_meta"),
+      ]]
+
+    assert len(exp_logical_ids) == len(disk_info)
+
+    map(lambda params: utils.ForceDictType(params,
+                                           constants.IDISK_PARAMS_TYPES),
+        disk_info)
+
+    # Check if empty list of secondaries is rejected
+    self.assertRaises(errors.ProgrammerError, gdt, self.lu, constants.DT_DRBD8,
+                      "inst827.example.com", "node1334.example.com", [],
+                      disk_info, NotImplemented, NotImplemented, 0,
+                      self.lu.LogInfo, self.nodegroup.diskparams)
+
+    result = gdt(self.lu, constants.DT_DRBD8, "inst827.example.com",
+                 "node1334.example.com", ["node12272.example.com"],
+                 disk_info, NotImplemented, NotImplemented, 0, self.lu.LogInfo,
+                 self.nodegroup.diskparams)
+
+    for (idx, disk) in enumerate(result):
+      self.assertTrue(isinstance(disk, objects.Disk))
+      self.assertEqual(disk.dev_type, constants.LD_DRBD8)
+      self.assertEqual(disk.size, disk_info[idx][constants.IDISK_SIZE])
+      self.assertEqual(disk.mode, disk_info[idx][constants.IDISK_MODE])
+
+      for child in disk.children:
+        self.assertTrue(isinstance(disk, objects.Disk))
+        self.assertEqual(child.dev_type, constants.LD_LV)
+        self.assertTrue(child.children is None)
+
+      self.assertEqual(map(operator.attrgetter("logical_id"), disk.children),
+                       exp_logical_ids[idx])
+
+      self.assertEqual(len(disk.children), 2)
+      self.assertEqual(disk.children[0].size, disk.size)
+      self.assertEqual(disk.children[1].size, cmdlib.DRBD_META_SIZE)
+
+    self._CheckIvNames(result, 0, len(disk_info))
+    cmdlib._UpdateIvNames(0, result)
+    self._CheckIvNames(result, 0, len(disk_info))
+
+    self.assertEqual(map(operator.attrgetter("logical_id"), result), [
+      ("node1334.example.com", "node12272.example.com",
+       constants.FIRST_DRBD_PORT, 20, 21, "ec0-secret0"),
+      ("node1334.example.com", "node12272.example.com",
+       constants.FIRST_DRBD_PORT + 1, 22, 23, "ec0-secret1"),
+      ("node1334.example.com", "node12272.example.com",
+       constants.FIRST_DRBD_PORT + 2, 24, 25, "ec0-secret2"),
+      ])
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()