4 # Copyright (C) 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the cli module"""
25 from cStringIO import StringIO
30 from ganeti import constants
31 from ganeti import cli
32 from ganeti import errors
33 from ganeti import utils
34 from ganeti.errors import OpPrereqError, ParameterError
37 class TestParseTimespec(unittest.TestCase):
38 """Testing case for ParseTimespec"""
40 def testValidTimes(self):
41 """Test valid timespecs"""
48 ('1w', 60 * 60 * 24 * 7),
52 for value, expected_result in test_data:
53 self.failUnlessEqual(cli.ParseTimespec(value), expected_result)
55 def testInvalidTime(self):
56 """Test invalid timespecs"""
63 for value in test_data:
64 self.failUnlessRaises(OpPrereqError, cli.ParseTimespec, value)
67 class TestSplitKeyVal(unittest.TestCase):
68 """Testing case for cli._SplitKeyVal"""
69 DATA = "a=b,c,no_d,-e"
70 RESULT = {"a": "b", "c": True, "d": False, "e": None}
72 def testSplitKeyVal(self):
74 self.failUnlessEqual(cli._SplitKeyVal("option", self.DATA), self.RESULT)
76 def testDuplicateParam(self):
77 """Test duplicate parameters"""
78 for data in ("a=1,a=2", "a,no_a"):
79 self.failUnlessRaises(ParameterError, cli._SplitKeyVal,
82 def testEmptyData(self):
83 """Test how we handle splitting an empty string"""
84 self.failUnlessEqual(cli._SplitKeyVal("option", ""), {})
86 class TestIdentKeyVal(unittest.TestCase):
87 """Testing case for cli.check_ident_key_val"""
89 def testIdentKeyVal(self):
90 """Test identkeyval"""
92 return cli.check_ident_key_val("option", "opt", value)
94 self.assertEqual(cikv("foo:bar"), ("foo", {"bar": True}))
95 self.assertEqual(cikv("foo:bar=baz"), ("foo", {"bar": "baz"}))
96 self.assertEqual(cikv("bar:b=c,c=a"), ("bar", {"b": "c", "c": "a"}))
97 self.assertEqual(cikv("no_bar"), ("bar", False))
98 self.assertRaises(ParameterError, cikv, "no_bar:foo")
99 self.assertRaises(ParameterError, cikv, "no_bar:foo=baz")
100 self.assertEqual(cikv("-foo"), ("foo", None))
101 self.assertRaises(ParameterError, cikv, "-foo:a=c")
104 class TestToStream(unittest.TestCase):
105 """Test the ToStream functions"""
115 cli._ToStream(buf, data)
116 self.failUnlessEqual(buf.getvalue(), data+'\n')
118 def testParams(self):
120 cli._ToStream(buf, "foo %s", 1)
121 self.failUnlessEqual(buf.getvalue(), "foo 1\n")
123 cli._ToStream(buf, "foo %s", (15,16))
124 self.failUnlessEqual(buf.getvalue(), "foo (15, 16)\n")
126 cli._ToStream(buf, "foo %s %s", "a", "b")
127 self.failUnlessEqual(buf.getvalue(), "foo a b\n")
130 class TestGenerateTable(unittest.TestCase):
131 HEADERS = dict([("f%s" % i, "Field%s" % i) for i in range(5)])
133 FIELDS1 = ["f1", "f2"]
140 def _test(self, headers, fields, separator, data,
141 numfields, unitfields, units, expected):
142 table = cli.GenerateTable(headers, fields, separator, data,
143 numfields=numfields, unitfields=unitfields,
145 self.assertEqual(table, expected)
154 self._test(self.HEADERS, self.FIELDS1, None, self.DATA1,
155 None, None, "m", exp)
157 def testNoFields(self):
158 self._test(self.HEADERS, [], None, [[], []],
159 None, None, "m", ["", "", ""])
160 self._test(None, [], None, [[], []],
161 None, None, "m", ["", ""])
163 def testSeparator(self):
164 for sep in ["#", ":", ",", "^", "!", "%", "|", "###", "%%", "!!!", "||"]:
166 "Field1%sField2" % sep,
171 self._test(self.HEADERS, self.FIELDS1, sep, self.DATA1,
172 None, None, "m", exp)
174 def testNoHeader(self):
180 self._test(None, self.FIELDS1, None, self.DATA1,
181 None, None, "m", exp)
183 def testUnknownField(self):
193 self._test(headers, ["f1", "UNKNOWN"], None, self.DATA1,
194 None, None, "m", exp)
196 def testNumfields(self):
197 fields = ["f1", "f2", "f3"]
204 "Field1 Field2 Field3",
209 self._test(self.HEADERS, fields, None, data,
210 ["f2", "f3"], None, "m", exp)
212 def testUnitfields(self):
214 "Field1 Field2 Field3",
221 "Field1:Field2:Field3",
227 for sep, expected in [(None, expnosep), (":", expsep)]:
228 fields = ["f1", "f2", "f3"]
234 self._test(self.HEADERS, fields, sep, data,
235 ["f2", "f3"], ["f3"], "h", expected)
237 def testUnusual(self):
247 self._test(self.HEADERS, ["f1", "f2"], None, data,
248 None, None, "m", exp)
251 class _MockJobPollCb(cli.JobPollCbBase, cli.JobPollReportCbBase):
252 def __init__(self, tc, job_id):
257 self._expect_notchanged = False
258 self._expect_log = []
260 def CheckEmpty(self):
261 self.tc.assertFalse(self._wfjcr)
262 self.tc.assertFalse(self._jobstatus)
263 self.tc.assertFalse(self._expect_notchanged)
264 self.tc.assertFalse(self._expect_log)
266 def AddWfjcResult(self, *args):
267 self._wfjcr.append(args)
269 def AddQueryJobsResult(self, *args):
270 self._jobstatus.append(args)
272 def WaitForJobChangeOnce(self, job_id, fields,
273 prev_job_info, prev_log_serial):
274 self.tc.assertEqual(job_id, self.job_id)
275 self.tc.assertEqualValues(fields, ["status"])
276 self.tc.assertFalse(self._expect_notchanged)
277 self.tc.assertFalse(self._expect_log)
279 (exp_prev_job_info, exp_prev_log_serial, result) = self._wfjcr.pop(0)
280 self.tc.assertEqualValues(prev_job_info, exp_prev_job_info)
281 self.tc.assertEqual(prev_log_serial, exp_prev_log_serial)
283 if result == constants.JOB_NOTCHANGED:
284 self._expect_notchanged = True
286 (_, logmsgs) = result
288 self._expect_log.extend(logmsgs)
292 def QueryJobs(self, job_ids, fields):
293 self.tc.assertEqual(job_ids, [self.job_id])
294 self.tc.assertEqualValues(fields, ["status", "opstatus", "opresult"])
295 self.tc.assertFalse(self._expect_notchanged)
296 self.tc.assertFalse(self._expect_log)
298 result = self._jobstatus.pop(0)
299 self.tc.assertEqual(len(fields), len(result))
302 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
303 self.tc.assertEqual(job_id, self.job_id)
304 self.tc.assertEqualValues((serial, timestamp, log_type, log_msg),
305 self._expect_log.pop(0))
307 def ReportNotChanged(self, job_id, status):
308 self.tc.assertEqual(job_id, self.job_id)
309 self.tc.assert_(self._expect_notchanged)
310 self._expect_notchanged = False
313 class TestGenericPollJob(testutils.GanetiTestCase):
314 def testSuccessWithLog(self):
316 cbs = _MockJobPollCb(self, job_id)
318 cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
320 cbs.AddWfjcResult(None, None,
321 ((constants.JOB_STATUS_QUEUED, ), None))
323 cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None,
324 constants.JOB_NOTCHANGED)
326 cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None,
327 ((constants.JOB_STATUS_RUNNING, ),
328 [(1, utils.SplitTime(1273491611.0),
329 constants.ELOG_MESSAGE, "Step 1"),
330 (2, utils.SplitTime(1273491615.9),
331 constants.ELOG_MESSAGE, "Step 2"),
332 (3, utils.SplitTime(1273491625.02),
333 constants.ELOG_MESSAGE, "Step 3"),
334 (4, utils.SplitTime(1273491635.05),
335 constants.ELOG_MESSAGE, "Step 4"),
336 (37, utils.SplitTime(1273491645.0),
337 constants.ELOG_MESSAGE, "Step 5"),
338 (203, utils.SplitTime(127349155.0),
339 constants.ELOG_MESSAGE, "Step 6")]))
341 cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 203,
342 ((constants.JOB_STATUS_RUNNING, ),
343 [(300, utils.SplitTime(1273491711.01),
344 constants.ELOG_MESSAGE, "Step X"),
345 (302, utils.SplitTime(1273491815.8),
346 constants.ELOG_MESSAGE, "Step Y"),
347 (303, utils.SplitTime(1273491925.32),
348 constants.ELOG_MESSAGE, "Step Z")]))
350 cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 303,
351 ((constants.JOB_STATUS_SUCCESS, ), None))
353 cbs.AddQueryJobsResult(constants.JOB_STATUS_SUCCESS,
354 [constants.OP_STATUS_SUCCESS,
355 constants.OP_STATUS_SUCCESS],
356 ["Hello World", "Foo man bar"])
358 self.assertEqual(["Hello World", "Foo man bar"],
359 cli.GenericPollJob(job_id, cbs, cbs))
362 def testJobLost(self):
365 cbs = _MockJobPollCb(self, job_id)
366 cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
367 cbs.AddWfjcResult(None, None, None)
368 self.assertRaises(errors.JobLost, cli.GenericPollJob, job_id, cbs, cbs)
374 cbs = _MockJobPollCb(self, job_id)
375 cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
376 cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
377 cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
378 [constants.OP_STATUS_SUCCESS,
379 constants.OP_STATUS_ERROR],
380 ["Hello World", "Error code 123"])
381 self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
384 def testError2(self):
387 cbs = _MockJobPollCb(self, job_id)
388 cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
389 encexc = errors.EncodeException(errors.LockError("problem"))
390 cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
391 [constants.OP_STATUS_ERROR], [encexc])
392 self.assertRaises(errors.LockError, cli.GenericPollJob, job_id, cbs, cbs)
395 def testWeirdError(self):
398 cbs = _MockJobPollCb(self, job_id)
399 cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None))
400 cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR,
401 [constants.OP_STATUS_RUNNING,
402 constants.OP_STATUS_RUNNING],
404 self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
407 def testCancel(self):
410 cbs = _MockJobPollCb(self, job_id)
411 cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED)
412 cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_CANCELING, ), None))
413 cbs.AddQueryJobsResult(constants.JOB_STATUS_CANCELING,
414 [constants.OP_STATUS_CANCELING,
415 constants.OP_STATUS_CANCELING],
417 self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs)
421 if __name__ == '__main__':
422 testutils.GanetiTestProgram()