self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+class TestNodePowercycle(unittest.TestCase):
+ def test(self):
+ clfactory = _FakeClientFactory(_FakeClient)
+ handler = _CreateHandler(rlib2.R_2_nodes_name_powercycle, ["node20744"], {
+ "force": ["1"],
+ }, None, clfactory)
+ job_id = handler.POST()
+
+ cl = clfactory.GetNextClient()
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+
+ (exp_job_id, (op, )) = cl.GetNextSubmittedJob()
+ self.assertEqual(job_id, exp_job_id)
+ self.assertTrue(isinstance(op, opcodes.OpNodePowercycle))
+ self.assertEqual(op.node_name, "node20744")
+ self.assertTrue(op.force)
+
+ self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+
+
class TestGroupAssignNodes(unittest.TestCase):
def test(self):
clfactory = _FakeClientFactory(_FakeClient)
self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+class TestInstanceRecreateDisks(unittest.TestCase):
+ def test(self):
+ clfactory = _FakeClientFactory(_FakeClient)
+ handler = _CreateHandler(rlib2.R_2_instances_name_recreate_disks,
+ ["inst22357"], {}, {}, clfactory)
+ job_id = handler.POST()
+
+ cl = clfactory.GetNextClient()
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+
+ (exp_job_id, (op, )) = cl.GetNextSubmittedJob()
+ self.assertEqual(job_id, exp_job_id)
+ self.assertTrue(isinstance(op, opcodes.OpInstanceRecreateDisks))
+ self.assertEqual(op.instance_name, "inst22357")
+ self.assertFalse(hasattr(op, "dry_run"))
+ self.assertFalse(hasattr(op, "force"))
+
+ self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+
+
class TestInstanceFailover(unittest.TestCase):
def test(self):
clfactory = _FakeClientFactory(_FakeClient)
None,
{},
{ constants.BE_VCPUS: 2, },
- { constants.BE_MEMORY: 123, },
+ { constants.BE_MAXMEM: 200, },
+ { constants.BE_MEMORY: 256, },
{ constants.BE_VCPUS: 2,
- constants.BE_MEMORY: 1024,
- constants.BE_AUTO_BALANCE: True, }
+ constants.BE_MAXMEM: 1024,
+ constants.BE_MINMEM: 1024,
+ constants.BE_AUTO_BALANCE: True,
+ constants.BE_ALWAYS_FAILOVER: True, }
]
hvparam_variants = [
for osparams in [{}, { "some": "value", "other": "Hello World", }]:
for hvparams in [{}, { constants.HV_KERNEL_PATH: "/some/kernel", }]:
- for beparams in [{}, { constants.BE_MEMORY: 128, }]:
+ for beparams in [{}, { constants.BE_MAXMEM: 128, }]:
for force in [False, True]:
for nics in [[], [(0, { constants.INIC_IP: "192.0.2.1", })]]:
for disks in test_disks:
self.assertEqual(ops[1].os_type, "linux1")
self.assertFalse(ops[1].osparams)
+ def testErrors(self):
+ self.assertRaises(http.HttpBadRequest, self.Parse,
+ "foo", "not a dictionary")
+
class TestGroupRename(unittest.TestCase):
def test(self):
self.assertFalse(hasattr(op, "disks"))
self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+ def testNoDisks(self):
+ clfactory = _FakeClientFactory(_FakeClient)
+
+ handler = _CreateHandler(rlib2.R_2_instances_name_replace_disks,
+ ["inst20661"], {}, {}, clfactory)
+ self.assertRaises(http.HttpBadRequest, handler.POST)
+
+ for disks in [None, "", {}]:
+ handler = _CreateHandler(rlib2.R_2_instances_name_replace_disks,
+ ["inst20661"], {}, {
+ "disks": disks,
+ }, clfactory)
+ self.assertRaises(http.HttpBadRequest, handler.POST)
+
def testWrong(self):
clfactory = _FakeClientFactory(_FakeClient)
self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+class TestSimpleResources(unittest.TestCase):
+ def setUp(self):
+ self.clfactory = _FakeClientFactory(_FakeClient)
+
+ def tearDown(self):
+ self.assertRaises(IndexError, self.clfactory.GetNextClient)
+
+ def testFeatures(self):
+ handler = _CreateHandler(rlib2.R_2_features, [], {}, None, self.clfactory)
+ self.assertEqual(set(handler.GET()), rlib2.ALL_FEATURES)
+
+ def testEmpty(self):
+ for cls in [rlib2.R_root, rlib2.R_2]:
+ handler = _CreateHandler(cls, [], {}, None, self.clfactory)
+ self.assertTrue(handler.GET() is None)
+
+ def testVersion(self):
+ handler = _CreateHandler(rlib2.R_version, [], {}, None, self.clfactory)
+ self.assertEqual(handler.GET(), constants.RAPI_VERSION)
+
+
+class TestClusterInfo(unittest.TestCase):
+ class _ClusterInfoClient:
+ def __init__(self):
+ self.cluster_info = None
+
+ def QueryClusterInfo(self):
+ assert self.cluster_info is None
+ self.cluster_info = object()
+ return self.cluster_info
+
+ def test(self):
+ clfactory = _FakeClientFactory(self._ClusterInfoClient)
+ handler = _CreateHandler(rlib2.R_2_info, [], {}, None, clfactory)
+ result = handler.GET()
+ cl = clfactory.GetNextClient()
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+ self.assertEqual(result, cl.cluster_info)
+
+
if __name__ == '__main__':
testutils.GanetiTestProgram()