self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+class TestNodePowercycle(unittest.TestCase):
+ def test(self):
+ clfactory = _FakeClientFactory(_FakeClient)
+ handler = _CreateHandler(rlib2.R_2_nodes_name_powercycle, ["node20744"], {
+ "force": ["1"],
+ }, None, clfactory)
+ job_id = handler.POST()
+
+ cl = clfactory.GetNextClient()
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+
+ (exp_job_id, (op, )) = cl.GetNextSubmittedJob()
+ self.assertEqual(job_id, exp_job_id)
+ self.assertTrue(isinstance(op, opcodes.OpNodePowercycle))
+ self.assertEqual(op.node_name, "node20744")
+ self.assertTrue(op.force)
+
+ self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+
+
class TestGroupAssignNodes(unittest.TestCase):
def test(self):
clfactory = _FakeClientFactory(_FakeClient)
self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+class TestInstanceRecreateDisks(unittest.TestCase):
+ def test(self):
+ clfactory = _FakeClientFactory(_FakeClient)
+ handler = _CreateHandler(rlib2.R_2_instances_name_recreate_disks,
+ ["inst22357"], {}, {}, clfactory)
+ job_id = handler.POST()
+
+ cl = clfactory.GetNextClient()
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+
+ (exp_job_id, (op, )) = cl.GetNextSubmittedJob()
+ self.assertEqual(job_id, exp_job_id)
+ self.assertTrue(isinstance(op, opcodes.OpInstanceRecreateDisks))
+ self.assertEqual(op.instance_name, "inst22357")
+ self.assertFalse(hasattr(op, "dry_run"))
+ self.assertFalse(hasattr(op, "force"))
+
+ self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+
+
class TestInstanceFailover(unittest.TestCase):
def test(self):
clfactory = _FakeClientFactory(_FakeClient)
self.assertRaises(IndexError, cl.GetNextSubmittedJob)
-class TestParseInstanceCreateRequestVersion1(testutils.GanetiTestCase):
- def setUp(self):
- testutils.GanetiTestCase.setUp(self)
+class TestInstanceCreation(testutils.GanetiTestCase):
+ def test(self):
+ clfactory = _FakeClientFactory(_FakeClient)
- self.Parse = rlib2._ParseInstanceCreateRequestVersion1
+ name = "inst863.example.com"
- def test(self):
disk_variants = [
# No disks
[],
None,
{},
{ constants.BE_VCPUS: 2, },
- { constants.BE_MEMORY: 123, },
+ { constants.BE_MAXMEM: 200, },
+ { constants.BE_MEMORY: 256, },
{ constants.BE_VCPUS: 2,
- constants.BE_MEMORY: 1024,
- constants.BE_AUTO_BALANCE: True, }
+ constants.BE_MAXMEM: 1024,
+ constants.BE_MINMEM: 1024,
+ constants.BE_AUTO_BALANCE: True,
+ constants.BE_ALWAYS_FAILOVER: True, }
]
hvparam_variants = [
for disks in disk_variants:
for beparams in beparam_variants:
for hvparams in hvparam_variants:
- data = {
- "name": "inst1.example.com",
- "hypervisor": constants.HT_FAKE,
- "disks": disks,
- "nics": nics,
- "mode": mode,
- "disk_template": disk_template,
- "os": "debootstrap",
- }
-
- if beparams is not None:
- data["beparams"] = beparams
-
- if hvparams is not None:
- data["hvparams"] = hvparams
-
for dry_run in [False, True]:
- op = self.Parse(data, dry_run)
- self.assert_(isinstance(op, opcodes.OpInstanceCreate))
+ queryargs = {
+ "dry-run": str(int(dry_run)),
+ }
+
+ data = {
+ rlib2._REQ_DATA_VERSION: 1,
+ "name": name,
+ "hypervisor": constants.HT_FAKE,
+ "disks": disks,
+ "nics": nics,
+ "mode": mode,
+ "disk_template": disk_template,
+ "os": "debootstrap",
+ }
+
+ if beparams is not None:
+ data["beparams"] = beparams
+
+ if hvparams is not None:
+ data["hvparams"] = hvparams
+
+ handler = _CreateHandler(rlib2.R_2_instances, [],
+ queryargs, data, clfactory)
+ job_id = handler.POST()
+
+ cl = clfactory.GetNextClient()
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+
+ (exp_job_id, (op, )) = cl.GetNextSubmittedJob()
+ self.assertEqual(job_id, exp_job_id)
+ self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+
+ self.assertTrue(isinstance(op, opcodes.OpInstanceCreate))
+ self.assertEqual(op.instance_name, name)
self.assertEqual(op.mode, mode)
self.assertEqual(op.disk_template, disk_template)
self.assertEqual(op.dry_run, dry_run)
self.assertEqualValues(op.hvparams, hvparams)
def testLegacyName(self):
+ clfactory = _FakeClientFactory(_FakeClient)
+
name = "inst29128.example.com"
data = {
+ rlib2._REQ_DATA_VERSION: 1,
"name": name,
"disks": [],
"nics": [],
"mode": constants.INSTANCE_CREATE,
"disk_template": constants.DT_PLAIN,
}
- op = self.Parse(data, False)
- self.assert_(isinstance(op, opcodes.OpInstanceCreate))
+
+ handler = _CreateHandler(rlib2.R_2_instances, [], {}, data, clfactory)
+ job_id = handler.POST()
+
+ cl = clfactory.GetNextClient()
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+
+ (exp_job_id, (op, )) = cl.GetNextSubmittedJob()
+ self.assertEqual(job_id, exp_job_id)
+ self.assertTrue(isinstance(op, opcodes.OpInstanceCreate))
self.assertEqual(op.instance_name, name)
self.assertFalse(hasattr(op, "name"))
+ self.assertFalse(op.dry_run)
+
+ self.assertRaises(IndexError, cl.GetNextSubmittedJob)
# Define both
- data = {
- "name": name,
- "instance_name": "other.example.com",
- "disks": [],
- "nics": [],
- "mode": constants.INSTANCE_CREATE,
- "disk_template": constants.DT_PLAIN,
- }
- self.assertRaises(http.HttpBadRequest, self.Parse, data, False)
+ data["instance_name"] = "other.example.com"
+ assert "name" in data and "instance_name" in data
+ handler = _CreateHandler(rlib2.R_2_instances, [], {}, data, clfactory)
+ self.assertRaises(http.HttpBadRequest, handler.POST)
+ self.assertRaises(IndexError, clfactory.GetNextClient)
def testLegacyOs(self):
+ clfactory = _FakeClientFactory(_FakeClient)
+
name = "inst4673.example.com"
os = "linux29206"
data = {
+ rlib2._REQ_DATA_VERSION: 1,
"name": name,
"os_type": os,
"disks": [],
"mode": constants.INSTANCE_CREATE,
"disk_template": constants.DT_PLAIN,
}
- op = self.Parse(data, False)
- self.assert_(isinstance(op, opcodes.OpInstanceCreate))
+
+ handler = _CreateHandler(rlib2.R_2_instances, [], {}, data, clfactory)
+ job_id = handler.POST()
+
+ cl = clfactory.GetNextClient()
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+
+ (exp_job_id, (op, )) = cl.GetNextSubmittedJob()
+ self.assertEqual(job_id, exp_job_id)
+ self.assertTrue(isinstance(op, opcodes.OpInstanceCreate))
self.assertEqual(op.instance_name, name)
self.assertEqual(op.os_type, os)
self.assertFalse(hasattr(op, "os"))
+ self.assertFalse(op.dry_run)
+
+ self.assertRaises(IndexError, cl.GetNextSubmittedJob)
# Define both
- data = {
- "instance_name": name,
- "os": os,
- "os_type": "linux9584",
- "disks": [],
- "nics": [],
- "mode": constants.INSTANCE_CREATE,
- "disk_template": constants.DT_PLAIN,
- }
- self.assertRaises(http.HttpBadRequest, self.Parse, data, False)
+ data["os"] = "linux9584"
+ assert "os" in data and "os_type" in data
+ handler = _CreateHandler(rlib2.R_2_instances, [], {}, data, clfactory)
+ self.assertRaises(http.HttpBadRequest, handler.POST)
def testErrors(self):
+ clfactory = _FakeClientFactory(_FakeClient)
+
# Test all required fields
reqfields = {
+ rlib2._REQ_DATA_VERSION: 1,
"name": "inst1.example.com",
"disks": [],
"nics": [],
}
for name in reqfields.keys():
- self.assertRaises(http.HttpBadRequest, self.Parse,
- dict(i for i in reqfields.iteritems() if i[0] != name),
- False)
+ data = dict(i for i in reqfields.iteritems() if i[0] != name)
+
+ handler = _CreateHandler(rlib2.R_2_instances, [], {}, data, clfactory)
+ self.assertRaises(http.HttpBadRequest, handler.POST)
+ self.assertRaises(IndexError, clfactory.GetNextClient)
# Invalid disks and nics
for field in ["disks", "nics"]:
for invvalue in invalid_values:
data = reqfields.copy()
data[field] = invvalue
- self.assertRaises(http.HttpBadRequest, self.Parse, data, False)
+ handler = _CreateHandler(rlib2.R_2_instances, [], {}, data, clfactory)
+ self.assertRaises(http.HttpBadRequest, handler.POST)
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+
+ def testVersion(self):
+ clfactory = _FakeClientFactory(_FakeClient)
+
+ # No version field
+ data = {
+ "name": "inst1.example.com",
+ "disks": [],
+ "nics": [],
+ "mode": constants.INSTANCE_CREATE,
+ "disk_template": constants.DT_PLAIN,
+ }
+
+ handler = _CreateHandler(rlib2.R_2_instances, [], {}, data, clfactory)
+ self.assertRaises(http.HttpBadRequest, handler.POST)
+
+ # Old and incorrect versions
+ for version in [0, -1, 10483, "Hello World"]:
+ data[rlib2._REQ_DATA_VERSION] = version
+
+ handler = _CreateHandler(rlib2.R_2_instances, [], {}, data, clfactory)
+ self.assertRaises(http.HttpBadRequest, handler.POST)
+
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+
+ # Correct version
+ data[rlib2._REQ_DATA_VERSION] = 1
+ handler = _CreateHandler(rlib2.R_2_instances, [], {}, data, clfactory)
+ job_id = handler.POST()
+
+ cl = clfactory.GetNextClient()
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+
+ (exp_job_id, (op, )) = cl.GetNextSubmittedJob()
+ self.assertEqual(job_id, exp_job_id)
+ self.assertTrue(isinstance(op, opcodes.OpInstanceCreate))
+ self.assertRaises(IndexError, cl.GetNextSubmittedJob)
class TestBackupExport(unittest.TestCase):
for osparams in [{}, { "some": "value", "other": "Hello World", }]:
for hvparams in [{}, { constants.HV_KERNEL_PATH: "/some/kernel", }]:
- for beparams in [{}, { constants.BE_MEMORY: 128, }]:
+ for beparams in [{}, { constants.BE_MAXMEM: 128, }]:
for force in [False, True]:
for nics in [[], [(0, { constants.INIC_IP: "192.0.2.1", })]]:
for disks in test_disks:
self.assertEqual(ops[1].os_type, "linux1")
self.assertFalse(ops[1].osparams)
+ def testErrors(self):
+ self.assertRaises(http.HttpBadRequest, self.Parse,
+ "foo", "not a dictionary")
+
class TestGroupRename(unittest.TestCase):
def test(self):
self.assertFalse(hasattr(op, "disks"))
self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+ def testNoDisks(self):
+ clfactory = _FakeClientFactory(_FakeClient)
+
+ handler = _CreateHandler(rlib2.R_2_instances_name_replace_disks,
+ ["inst20661"], {}, {}, clfactory)
+ self.assertRaises(http.HttpBadRequest, handler.POST)
+
+ for disks in [None, "", {}]:
+ handler = _CreateHandler(rlib2.R_2_instances_name_replace_disks,
+ ["inst20661"], {}, {
+ "disks": disks,
+ }, clfactory)
+ self.assertRaises(http.HttpBadRequest, handler.POST)
+
def testWrong(self):
clfactory = _FakeClientFactory(_FakeClient)
self.assertRaises(IndexError, cl.GetNextSubmittedJob)
+class TestSimpleResources(unittest.TestCase):
+ def setUp(self):
+ self.clfactory = _FakeClientFactory(_FakeClient)
+
+ def tearDown(self):
+ self.assertRaises(IndexError, self.clfactory.GetNextClient)
+
+ def testFeatures(self):
+ handler = _CreateHandler(rlib2.R_2_features, [], {}, None, self.clfactory)
+ self.assertEqual(set(handler.GET()), rlib2.ALL_FEATURES)
+
+ def testEmpty(self):
+ for cls in [rlib2.R_root, rlib2.R_2]:
+ handler = _CreateHandler(cls, [], {}, None, self.clfactory)
+ self.assertTrue(handler.GET() is None)
+
+ def testVersion(self):
+ handler = _CreateHandler(rlib2.R_version, [], {}, None, self.clfactory)
+ self.assertEqual(handler.GET(), constants.RAPI_VERSION)
+
+
+class TestClusterInfo(unittest.TestCase):
+ class _ClusterInfoClient:
+ def __init__(self):
+ self.cluster_info = None
+
+ def QueryClusterInfo(self):
+ assert self.cluster_info is None
+ self.cluster_info = object()
+ return self.cluster_info
+
+ def test(self):
+ clfactory = _FakeClientFactory(self._ClusterInfoClient)
+ handler = _CreateHandler(rlib2.R_2_info, [], {}, None, clfactory)
+ result = handler.GET()
+ cl = clfactory.GetNextClient()
+ self.assertRaises(IndexError, clfactory.GetNextClient)
+ self.assertEqual(result, cl.cluster_info)
+
+
if __name__ == '__main__':
testutils.GanetiTestProgram()